From e5ac8430c386b76b62c9f66c16519dde683d3d56 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 4 Oct 2022 17:38:20 +0200 Subject: [PATCH] satellite/metainfo: delete pieces from nodes on object commit We have new flow where existing object is deleted not on begin object but on commit object. Deletion on commit object is still missing deletion from storage nodes. This change adds this part to the code. Fixes https://github.com/storj/storj/issues/5222 Change-Id: Ibfd34665b2a055ec6c0d6e260c1a57e8a4c62b0e --- satellite/metabase/commit.go | 19 +- satellite/metabase/commit_test.go | 51 +++++ satellite/metainfo/endpoint_object.go | 3 + satellite/metainfo/endpoint_object_test.go | 235 +++++++++++++-------- 4 files changed, 221 insertions(+), 87 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index b6c53ed66..bcd88223a 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -485,6 +485,10 @@ type CommitObject struct { EncryptedMetadataEncryptedKey []byte // optional DisallowDelete bool + // OnDelete will be triggered when/if existing object will be overwritten on commit. + // Wil be only executed after succesfull commit + delete DB operation. + // Error on this function won't revert back committed object. + OnDelete func(segments []DeletedSegmentInfo) } // Verify verifies reqest fields. @@ -507,7 +511,8 @@ func (c *CommitObject) Verify() error { return nil } -// CommitObject adds a pending object to the database. +// CommitObject adds a pending object to the database. If another committed object is under target location +// it will be deleted. func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) { defer mon.Task()(&ctx)(&err) @@ -515,6 +520,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + deletedSegments := []DeletedSegmentInfo{} + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID) if err != nil { @@ -657,8 +664,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec } for _, version := range versionsToDelete { - // TODO delete pieces from stoage nodes - _, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ + deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{ ObjectLocation: ObjectLocation{ ProjectID: opts.ProjectID, BucketName: opts.BucketName, @@ -669,6 +675,8 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec if err != nil { return Error.New("failed to delete existing object: %w", err) } + + deletedSegments = append(deletedSegments, deleteResult.Segments...) } object.StreamID = opts.StreamID @@ -687,6 +695,11 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec return Object{}, err } + // we can execute this only when whole transaction is committed without any error + if len(deletedSegments) > 0 && opts.OnDelete != nil { + opts.OnDelete(deletedSegments) + } + mon.Meter("object_commit").Mark(1) mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount)) mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize) diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index c3ce52d6f..1df195261 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "storj.io/common/memory" "storj.io/common/storj" "storj.io/common/testcontext" @@ -2935,6 +2937,55 @@ func TestCommitObject(t *testing.T) { }) } +func TestCommitObject_MultipleVersions(t *testing.T) { + metabasetest.RunWithConfig(t, metabase.Config{ + ApplicationName: "satellite-test", + MinPartSize: 5 * memory.MiB, + MaxNumberOfParts: 1000, + MultipleVersions: true, + }, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + t.Run("OnDelete", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + // check deleted segments + obj := metabasetest.RandObjectStream() + + _, expectedSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 3) + + expectedDeletedSegments := []metabase.DeletedSegmentInfo{} + for _, segment := range expectedSegments { + expectedDeletedSegments = append(expectedDeletedSegments, metabase.DeletedSegmentInfo{ + RootPieceID: segment.RootPieceID, + Pieces: segment.Pieces, + }) + } + + obj.Version++ + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + deletedSegments := []metabase.DeletedSegmentInfo{} + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + OnDelete: func(segments []metabase.DeletedSegmentInfo) { + deletedSegments = append(deletedSegments, segments...) + }, + }, + }.Check(ctx, t, db) + + require.Equal(t, expectedDeletedSegments, deletedSegments) + }) + }) +} + func TestCommitObjectWithIncorrectPartSize(t *testing.T) { metabasetest.RunWithConfig(t, metabase.Config{ ApplicationName: "satellite-test", diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index d197ffd79..cd20bfdc1 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -270,6 +270,9 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit Encryption: encryption, DisallowDelete: !allowDelete, + OnDelete: func(segments []metabase.DeletedSegmentInfo) { + endpoint.deleteSegmentPieces(ctx, segments) + }, } // uplink can send empty metadata with not empty key/nonce // we need to fix it on uplink side but that part will be diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index 63c12958e..20af17581 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -35,6 +35,7 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" + "storj.io/storj/storage" "storj.io/uplink" "storj.io/uplink/private/metaclient" "storj.io/uplink/private/object" @@ -1952,6 +1953,8 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) { Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Metainfo.MultipleVersions = true + + testplanet.ReconfigureRS(2, 3, 4, 4)(log, index, config) }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { @@ -1959,112 +1962,176 @@ func TestEndpoint_Object_MultipleVersions(t *testing.T) { require.NoError(t, err) defer ctx.Check(project.Close) - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB)) - require.NoError(t, err) - - // override object to have it with version 2 - expectedData := testrand.Bytes(11 * memory.KiB) - err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData) - require.NoError(t, err) - - objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 1) - require.EqualValues(t, 2, objects[0].Version) - - // add some pending uploads, each will have version higher then 2 - uploadIDs := []string{} - for i := 0; i < 10; i++ { - info, err := project.BeginUpload(ctx, "multipleversions", "object", nil) - require.NoError(t, err) - uploadIDs = append(uploadIDs, info.UploadID) + deleteBucket := func(bucketName string) func() error { + return func() error { + _, err := project.DeleteBucketWithObjects(ctx, bucketName) + return err + } } - checkDownload := func(objectKey string, expectedData []byte) { - data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey) - require.NoError(t, err) - require.Equal(t, expectedData, data) - } + t.Run("multiple versions", func(t *testing.T) { + defer ctx.Check(deleteBucket("multipleversions")) - checkDownload("object", expectedData) - - err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil) - require.NoError(t, err) - - checkDownload("object_moved", expectedData) - - err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil) - require.NoError(t, err) - - checkDownload("object", expectedData) - - iterator := project.ListObjects(ctx, "multipleversions", nil) - require.True(t, iterator.Next()) - require.Equal(t, "object", iterator.Item().Key) - require.NoError(t, iterator.Err()) - - { // server side copy - _, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", testrand.Bytes(10*memory.MiB)) require.NoError(t, err) - checkDownload("object_copy", expectedData) + // override object to have it with version 2 + expectedData := testrand.Bytes(11 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "multipleversions", "object", expectedData) + require.NoError(t, err) + + objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx) + require.NoError(t, err) + require.Len(t, objects, 1) + require.EqualValues(t, 2, objects[0].Version) + + // add some pending uploads, each will have version higher then 2 + uploadIDs := []string{} + for i := 0; i < 10; i++ { + info, err := project.BeginUpload(ctx, "multipleversions", "object", nil) + require.NoError(t, err) + uploadIDs = append(uploadIDs, info.UploadID) + } + + checkDownload := func(objectKey string, expectedData []byte) { + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "multipleversions", objectKey) + require.NoError(t, err) + require.Equal(t, expectedData, data) + } + + checkDownload("object", expectedData) + + err = project.MoveObject(ctx, "multipleversions", "object", "multipleversions", "object_moved", nil) + require.NoError(t, err) + + checkDownload("object_moved", expectedData) + + err = project.MoveObject(ctx, "multipleversions", "object_moved", "multipleversions", "object", nil) + require.NoError(t, err) + + checkDownload("object", expectedData) + + iterator := project.ListObjects(ctx, "multipleversions", nil) + require.True(t, iterator.Next()) + require.Equal(t, "object", iterator.Item().Key) + require.NoError(t, iterator.Err()) + + { // server side copy + _, err = project.CopyObject(ctx, "multipleversions", "object", "multipleversions", "object_copy", nil) + require.NoError(t, err) + + checkDownload("object_copy", expectedData) + + _, err = project.DeleteObject(ctx, "multipleversions", "object") + require.NoError(t, err) + + _, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil) + require.NoError(t, err) + + checkDownload("object", expectedData) + + _, err = project.DeleteObject(ctx, "multipleversions", "object_copy") + require.NoError(t, err) + + checkDownload("object", expectedData) + } + + err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0]) + require.NoError(t, err) + checkDownload("object", expectedData) + + expectedData = testrand.Bytes(12 * memory.KiB) + upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1) + require.NoError(t, err) + _, err = upload.Write(expectedData) + require.NoError(t, err) + require.NoError(t, upload.Commit()) + _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil) + require.NoError(t, err) + + checkDownload("object", expectedData) _, err = project.DeleteObject(ctx, "multipleversions", "object") require.NoError(t, err) - _, err = project.CopyObject(ctx, "multipleversions", "object_copy", "multipleversions", "object", nil) + iterator = project.ListObjects(ctx, "multipleversions", nil) + require.False(t, iterator.Next()) + require.NoError(t, iterator.Err()) + + // use next available pending upload + upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1) + require.NoError(t, err) + _, err = upload.Write(expectedData) + require.NoError(t, err) + require.NoError(t, upload.Commit()) + _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil) require.NoError(t, err) checkDownload("object", expectedData) - _, err = project.DeleteObject(ctx, "multipleversions", "object_copy") + uploads := project.ListUploads(ctx, "multipleversions", nil) + count := 0 + for uploads.Next() { + require.Equal(t, "object", uploads.Item().Key) + count++ + } + // we started with 10 pending object and during test we abort/commit 3 objects + pendingUploadsLeft := 7 + require.Equal(t, pendingUploadsLeft, count) + }) + + t.Run("override object", func(t *testing.T) { + defer ctx.Check(deleteBucket("bucket")) + + bucketName := "bucket" + objectName := "file1" + + err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB)) require.NoError(t, err) - checkDownload("object", expectedData) - } + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) - err = project.AbortUpload(ctx, "multipleversions", "object", uploadIDs[0]) - require.NoError(t, err) - checkDownload("object", expectedData) + pieceIDs := map[storj.NodeID]storj.PieceID{} + for _, piece := range segments[0].Pieces { + pieceIDs[piece.StorageNode] = segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number)) + } - expectedData = testrand.Bytes(12 * memory.KiB) - upload, err := project.UploadPart(ctx, "multipleversions", "object", uploadIDs[1], 1) - require.NoError(t, err) - _, err = upload.Write(expectedData) - require.NoError(t, err) - require.NoError(t, upload.Commit()) - _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[1], nil) - require.NoError(t, err) + for _, node := range planet.StorageNodes { + pieceID, ok := pieceIDs[node.ID()] + require.True(t, ok) + piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: pieceID.Bytes(), + }) + require.NoError(t, err) + require.NotNil(t, piece) + } - checkDownload("object", expectedData) + expectedData := testrand.Bytes(5 * memory.KiB) + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData) + require.NoError(t, err) - _, err = project.DeleteObject(ctx, "multipleversions", "object") - require.NoError(t, err) + planet.WaitForStorageNodeDeleters(ctx) - iterator = project.ListObjects(ctx, "multipleversions", nil) - require.False(t, iterator.Next()) - require.NoError(t, iterator.Err()) + // verify that old object pieces are not stored on storage nodes anymore + for _, node := range planet.StorageNodes { + pieceID, ok := pieceIDs[node.ID()] + require.True(t, ok) - // use next available pending upload - upload, err = project.UploadPart(ctx, "multipleversions", "object", uploadIDs[2], 1) - require.NoError(t, err) - _, err = upload.Write(expectedData) - require.NoError(t, err) - require.NoError(t, upload.Commit()) - _, err = project.CommitUpload(ctx, "multipleversions", "object", uploadIDs[2], nil) - require.NoError(t, err) + piece, err := node.DB.Pieces().Stat(ctx, storage.BlobRef{ + Namespace: planet.Satellites[0].ID().Bytes(), + Key: pieceID.Bytes(), + }) + require.Error(t, err) + require.Nil(t, piece) + } - checkDownload("object", expectedData) - - uploads := project.ListUploads(ctx, "multipleversions", nil) - count := 0 - for uploads.Next() { - require.Equal(t, "object", uploads.Item().Key) - count++ - } - // we started with 10 pending object and during test we abort/commit 3 objects - pendingUploadsLeft := 7 - require.Equal(t, pendingUploadsLeft, count) + data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName) + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) }) }