diff --git a/modules/packages/content_store.go b/modules/packages/content_store.go index be416ac26..13763db98 100644 --- a/modules/packages/content_store.go +++ b/modules/packages/content_store.go @@ -32,6 +32,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) { return s.store.Open(KeyToRelativePath(key)) } +// FIXME: Workaround to be removed in v1.20 +// https://github.com/go-gitea/gitea/issues/19586 +func (s *ContentStore) Has(key BlobHash256Key) error { + _, err := s.store.Stat(KeyToRelativePath(key)) + return err +} + // Save stores a package blob func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error { _, err := s.store.Save(KeyToRelativePath(key), r, size) diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go index df6b7aed9..06b450de0 100644 --- a/routers/api/packages/container/blob.go +++ b/routers/api/packages/container/blob.go @@ -7,8 +7,11 @@ package container import ( "context" "encoding/hex" + "errors" "fmt" + "os" "strings" + "sync" "code.gitea.io/gitea/models/db" packages_model "code.gitea.io/gitea/models/packages" @@ -16,9 +19,12 @@ import ( "code.gitea.io/gitea/modules/log" packages_module "code.gitea.io/gitea/modules/packages" container_module "code.gitea.io/gitea/modules/packages/container" + "code.gitea.io/gitea/modules/util" packages_service "code.gitea.io/gitea/services/packages" ) +var uploadVersionMutex sync.Mutex + // saveAsPackageBlob creates a package blob from an upload // The uploaded blob gets stored in a special upload version to link them to the package/image func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) { @@ -28,6 +34,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic contentStore := packages_module.NewContentStore() + var uploadVersion *packages_model.PackageVersion + + // FIXME: Replace usage of mutex with database transaction + // https://github.com/go-gitea/gitea/pull/21862 + uploadVersionMutex.Lock() err := db.WithTx(db.DefaultContext, func(ctx context.Context) error { created := true p := &packages_model.Package{ @@ -68,11 +79,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic } } + uploadVersion = pv + + return nil + }) + uploadVersionMutex.Unlock() + if err != nil { + return nil, err + } + + err = db.WithTx(db.DefaultContext, func(ctx context.Context) error { pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb) if err != nil { log.Error("Error inserting package blob: %v", err) return err } + // FIXME: Workaround to be removed in v1.20 + // https://github.com/go-gitea/gitea/issues/19586 + if exists { + err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256)) + if err != nil && (errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist)) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256) + exists = false + } + } if !exists { if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil { log.Error("Error saving package blob in content store: %v", err) @@ -83,7 +113,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256)) pf := &packages_model.PackageFile{ - VersionID: pv.ID, + VersionID: uploadVersion.ID, BlobID: pb.ID, Name: filename, LowerName: filename, diff --git a/routers/api/packages/container/container.go b/routers/api/packages/container/container.go index 5bc64e1b2..7af06a917 100644 --- a/routers/api/packages/container/container.go +++ b/routers/api/packages/container/container.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "net/url" + "os" "regexp" "strconv" "strings" @@ -24,6 +25,7 @@ import ( container_module "code.gitea.io/gitea/modules/packages/container" "code.gitea.io/gitea/modules/packages/container/oci" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/routers/api/packages/helper" packages_service "code.gitea.io/gitea/services/packages" container_service "code.gitea.io/gitea/services/packages/container" @@ -193,7 +195,7 @@ func InitiateUploadBlob(ctx *context.Context) { mount := ctx.FormTrim("mount") from := ctx.FormTrim("from") if mount != "" { - blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{ + blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{ Image: from, Digest: mount, }) @@ -406,7 +408,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri return nil, container_model.ErrContainerBlobNotExist } - return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{ + return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{ OwnerID: ctx.Package.Owner.ID, Image: ctx.Params("image"), Digest: digest, @@ -548,7 +550,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe return nil, container_model.ErrContainerBlobNotExist } - return container_model.GetContainerBlob(ctx, opts) + return workaroundGetContainerBlob(ctx, opts) } // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry @@ -688,3 +690,23 @@ func GetTagList(ctx *context.Context) { Tags: tags, }) } + +// FIXME: Workaround to be removed in v1.20 +// https://github.com/go-gitea/gitea/issues/19586 +func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) { + blob, err := container_model.GetContainerBlob(ctx, opts) + if err != nil { + return nil, err + } + + err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256)) + if err != nil { + if errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256) + return nil, container_model.ErrContainerBlobNotExist + } + return nil, err + } + + return blob, nil +} diff --git a/routers/api/packages/container/manifest.go b/routers/api/packages/container/manifest.go index a48b1de3b..64c958c2d 100644 --- a/routers/api/packages/container/manifest.go +++ b/routers/api/packages/container/manifest.go @@ -6,8 +6,10 @@ package container import ( "context" + "errors" "fmt" "io" + "os" "strings" "code.gitea.io/gitea/models/db" @@ -19,6 +21,7 @@ import ( packages_module "code.gitea.io/gitea/modules/packages" container_module "code.gitea.io/gitea/modules/packages/container" "code.gitea.io/gitea/modules/packages/container/oci" + "code.gitea.io/gitea/modules/util" packages_service "code.gitea.io/gitea/services/packages" ) @@ -403,6 +406,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack log.Error("Error inserting package blob: %v", err) return nil, false, "", err } + // FIXME: Workaround to be removed in v1.20 + // https://github.com/go-gitea/gitea/issues/19586 + if exists { + err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256)) + if err != nil && (errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist)) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256) + exists = false + } + } if !exists { contentStore := packages_module.NewContentStore() if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil { diff --git a/tests/integration/api_packages_container_test.go b/tests/integration/api_packages_container_test.go index ba76ee4ba..60cbecd06 100644 --- a/tests/integration/api_packages_container_test.go +++ b/tests/integration/api_packages_container_test.go @@ -6,10 +6,12 @@ package integration import ( "bytes" + "crypto/sha256" "encoding/base64" "fmt" "net/http" "strings" + "sync" "testing" "code.gitea.io/gitea/models/db" @@ -594,6 +596,32 @@ func TestPackageContainer(t *testing.T) { }) } + // https://github.com/go-gitea/gitea/issues/19586 + t.Run("ParallelUpload", func(t *testing.T) { + defer tests.PrintCurrentTest(t)() + + url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + + content := []byte{byte(i)} + digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content)) + + go func() { + defer wg.Done() + + req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content)) + addTokenAuthHeader(req, userToken) + resp := MakeRequest(t, req, http.StatusCreated) + + assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest")) + }() + } + wg.Wait() + }) + t.Run("OwnerNameChange", func(t *testing.T) { defer tests.PrintCurrentTest(t)()