diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index b6c53ed66..bcd88223a 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -485,6 +485,10 @@ type CommitObject struct { EncryptedMetadataEncryptedKey []byte // optional DisallowDelete bool + // OnDelete will be triggered when/if existing object will be overwritten on commit. + // Wil be only executed after succesfull commit + delete DB operation. + // Error on this function won't revert back committed object. + OnDelete func(segments []DeletedSegmentInfo) } // Verify verifies reqest fields. @@ -507,7 +511,8 @@ func (c *CommitObject) Verify() error { return nil } -// CommitObject adds a pending object to the database. +// CommitObject adds a pending object to the database. If another committed object is under target location +// it will be deleted. func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) { defer mon.Task()(&ctx)(&err) @@ -515,6 +520,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + deletedSegments := []DeletedSegmentInfo{} + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) if err != nil { @@ -657,8 +664,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec } for _, version := range versionsToDelete { - // TODO delete pieces from stoage nodes - _, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ + deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ ObjectLocation: ObjectLocation{ ProjectID: opts.ProjectID, BucketName: opts.BucketName, @@ -669,6 +675,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec if err != nil { return Error.New("failed to delete existing object: %w", err) } + + deletedSegments = append(deletedSegments, deleteResult.Segments...) } object.StreamID = opts.StreamID @@ -687,6 +695,11 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + // we can execute this only when whole transaction is committed without any error + if len(deletedSegments) > 0 && opts.OnDelete != nil { + opts.OnDelete(deletedSegments) + } + mon.Meter("object_commit").Mark(1) mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount)) mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize) diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index c3ce52d6f..1df195261 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" @@ -2935,6 +2937,55 @@ func TestCommitObject(t *testing.T) { }) } +func TestCommitObject_MultipleVersions(t *testing.T) { + metabasetest.RunWithConfig(t, metabase.Config{ + ApplicationName: "satellite-test", + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 1000, + MultipleVersions: true, + }, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + t.Run("OnDelete", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + // check deleted segments + obj := metabasetest.RandObjectStream() + + _, expectedSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 3) + + expectedDeletedSegments := []metabase.DeletedSegmentInfo{} + for _, segment := range expectedSegments { + expectedDeletedSegments = append(expectedDeletedSegments, metabase.DeletedSegmentInfo{ + RootPieceID: segment.RootPieceID, + Pieces: segment.Pieces, + }) + } + + obj.Version++ + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + deletedSegments := []metabase.DeletedSegmentInfo{} + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + OnDelete: func(segments []metabase.DeletedSegmentInfo) { + deletedSegments = append(deletedSegments, segments...) + }, + }, + }.Check(ctx, t, db) + + require.Equal(t, expectedDeletedSegments, deletedSegments) + }) + }) +} + func TestCommitObjectWithIncorrectPartSize(t *testing.T) { metabasetest.RunWithConfig(t, metabase.Config{ ApplicationName: "satellite-test", diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index d197ffd79..cd20bfdc1 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -270,6 +270,9 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit Encryption: encryption, DisallowDelete: !allowDelete, + OnDelete: func(segments []metabase.DeletedSegmentInfo) { + endpoint.deleteSegmentPieces(ctx, segments) + }, } // uplink can send empty metadata with not empty key/nonce // we need to fix it on uplink side but that part will be diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 63c12958e..20af17581 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -35,6 +35,7 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" + "storj.io/storj/storage" "storj.io/uplink" "storj.io/uplink/private/metaclient" "storj.io/uplink/private/object" @@ -1952,6 +1953,8 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.MultipleVersions = true + + testplanet.ReconfigureRS(2, 3, 4, 4)(log, index, config) }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -1959,112 +1962,176 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) { require.NoError(t, err) defer ctx.Check(project.Close) - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB)) - require.NoError(t, err) - - // override object to have it with version 2 - expectedData := testrand.Bytes(11 * memory.KiB) - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData) - require.NoError(t, err) - - objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 1) - require.EqualValues(t, 2, objects[0].Version) - - // add some pending uploads, each will have version higher then 2 - uploadIDs := []string{} - for i := 0; i < 10; i++ { - info, err := project.BeginUpload(ctx, "multipleversions", "object", nil) - require.NoError(t, err) - uploadIDs = append(uploadIDs, info.UploadID) + deleteBucket := func(bucketName string) func() error { + return func() error { + _, err := project.DeleteBucketWithObjects(ctx, bucketName) + return err + } } - checkDownload := func(objectKey string, expectedData []byte) { - data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey) - require.NoError(t, err) - require.Equal(t, expectedData, data) - } + t.Run("multiple versions", func(t *testing.T) { + defer ctx.Check(deleteBucket("multipleversions")) - checkDownload("object", expectedData) - - err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil) - require.NoError(t, err) - - checkDownload("object_moved", expectedData) - - err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil) - require.NoError(t, err) - - checkDownload("object", expectedData) - - iterator := project.ListObjects(ctx, "multipleversions", nil) - require.True(t, iterator.Next()) - require.Equal(t, "object", iterator.Item().Key) - require.NoError(t, iterator.Err()) - - { // server side copy - _, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB)) require.NoError(t, err) - checkDownload("object_copy", expectedData) + // override object to have it with version 2 + expectedData := testrand.Bytes(11 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData) + require.NoError(t, err) + + objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, 1) + require.EqualValues(t, 2, objects[0].Version) + + // add some pending uploads, each will have version higher then 2 + uploadIDs := []string{} + for i := 0; i < 10; i++ { + info, err := project.BeginUpload(ctx, "multipleversions", "object", nil) + require.NoError(t, err) + uploadIDs = append(uploadIDs, info.UploadID) + } + + checkDownload := func(objectKey string, expectedData []byte) { + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey) + require.NoError(t, err) + require.Equal(t, expectedData, data) + } + + checkDownload("object", expectedData) + + err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil) + require.NoError(t, err) + + checkDownload("object_moved", expectedData) + + err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil) + require.NoError(t, err) + + checkDownload("object", expectedData) + + iterator := project.ListObjects(ctx, "multipleversions", nil) + require.True(t, iterator.Next()) + require.Equal(t, "object", iterator.Item().Key) + require.NoError(t, iterator.Err()) + + { // server side copy + _, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil) + require.NoError(t, err) + + checkDownload("object_copy", expectedData) + + _, err = project.DeleteObject(ctx, "multipleversions", "object") + require.NoError(t, err) + + _, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil) + require.NoError(t, err) + + checkDownload("object", expectedData) + + _, err = project.DeleteObject(ctx, "multipleversions", "object_copy") + require.NoError(t, err) + + checkDownload("object", expectedData) + } + + err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0]) + require.NoError(t, err) + checkDownload("object", expectedData) + + expectedData = testrand.Bytes(12 * memory.KiB) + upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1) + require.NoError(t, err) + _, err = upload.Write(expectedData) + require.NoError(t, err) + require.NoError(t, upload.Commit()) + _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil) + require.NoError(t, err) + + checkDownload("object", expectedData) _, err = project.DeleteObject(ctx, "multipleversions", "object") require.NoError(t, err) - _, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil) + iterator = project.ListObjects(ctx, "multipleversions", nil) + require.False(t, iterator.Next()) + require.NoError(t, iterator.Err()) + + // use next available pending upload + upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1) + require.NoError(t, err) + _, err = upload.Write(expectedData) + require.NoError(t, err) + require.NoError(t, upload.Commit()) + _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil) require.NoError(t, err) checkDownload("object", expectedData) - _, err = project.DeleteObject(ctx, "multipleversions", "object_copy") + uploads := project.ListUploads(ctx, "multipleversions", nil) + count := 0 + for uploads.Next() { + require.Equal(t, "object", uploads.Item().Key) + count++ + } + // we started with 10 pending object and during test we abort/commit 3 objects + pendingUploadsLeft := 7 + require.Equal(t, pendingUploadsLeft, count) + }) + + t.Run("override object", func(t *testing.T) { + defer ctx.Check(deleteBucket("bucket")) + + bucketName := "bucket" + objectName := "file1" + + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB)) require.NoError(t, err) - checkDownload("object", expectedData) - } + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) - err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0]) - require.NoError(t, err) - checkDownload("object", expectedData) + pieceIDs := map[storj.NodeID]storj.PieceID{} + for _, piece := range segments[0].Pieces { + pieceIDs[piece.StorageNode] = segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) + } - expectedData = testrand.Bytes(12 * memory.KiB) - upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1) - require.NoError(t, err) - _, err = upload.Write(expectedData) - require.NoError(t, err) - require.NoError(t, upload.Commit()) - _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil) - require.NoError(t, err) + for _, node := range planet.StorageNodes { + pieceID, ok := pieceIDs[node.ID()] + require.True(t, ok) + piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: pieceID.Bytes(), + }) + require.NoError(t, err) + require.NotNil(t, piece) + } - checkDownload("object", expectedData) + expectedData := testrand.Bytes(5 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData) + require.NoError(t, err) - _, err = project.DeleteObject(ctx, "multipleversions", "object") - require.NoError(t, err) + planet.WaitForStorageNodeDeleters(ctx) - iterator = project.ListObjects(ctx, "multipleversions", nil) - require.False(t, iterator.Next()) - require.NoError(t, iterator.Err()) + // verify that old object pieces are not stored on storage nodes anymore + for _, node := range planet.StorageNodes { + pieceID, ok := pieceIDs[node.ID()] + require.True(t, ok) - // use next available pending upload - upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1) - require.NoError(t, err) - _, err = upload.Write(expectedData) - require.NoError(t, err) - require.NoError(t, upload.Commit()) - _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil) - require.NoError(t, err) + piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: pieceID.Bytes(), + }) + require.Error(t, err) + require.Nil(t, piece) + } - checkDownload("object", expectedData) - - uploads := project.ListUploads(ctx, "multipleversions", nil) - count := 0 - for uploads.Next() { - require.Equal(t, "object", uploads.Item().Key) - count++ - } - // we started with 10 pending object and during test we abort/commit 3 objects - pendingUploadsLeft := 7 - require.Equal(t, pendingUploadsLeft, count) + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName) + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) }) }