diff --git a/satellite/metabase/delete_part.go b/satellite/metabase/delete_part.go new file mode 100644 index 000000000..5e0b19a5d --- /dev/null +++ b/satellite/metabase/delete_part.go @@ -0,0 +1,89 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "math" + + "storj.io/common/storj" + "storj.io/common/uuid" + "storj.io/private/tagsql" +) + +// DeletePart contains arguments necessary for deleting single part. +type DeletePart struct { + StreamID uuid.UUID + PartNumber uint32 + + DeletePieces func(ctx context.Context, segment DeletedSegmentInfo) error +} + +// DeletePart deletes all segments for given part. +func (db *DB) DeletePart(ctx context.Context, opts DeletePart) (err error) { + defer mon.Task()(&ctx)(&err) + + if opts.StreamID.IsZero() { + return ErrInvalidRequest.New("StreamID missing") + } + + if opts.DeletePieces == nil { + return ErrInvalidRequest.New("DeletePieces missing") + } + + minPosition := SegmentPosition{ + Part: opts.PartNumber, + }.Encode() + maxPosition := SegmentPosition{ + Part: opts.PartNumber, + Index: math.MaxUint32, + }.Encode() + + type Deleted struct { + RootPieceID storj.PieceID + AliasPieces AliasPieces + } + deleted := make([]Deleted, 0, 10) + err = withRows(db.db.QueryContext(ctx, ` + DELETE FROM segments WHERE + stream_id = $1 AND position BETWEEN $2 AND $3 + RETURNING + root_piece_id, remote_alias_pieces + `, opts.StreamID, minPosition, maxPosition))(func(rows tagsql.Rows) error { + for rows.Next() { + var rootPieceID storj.PieceID + var aliasPieces AliasPieces + err := rows.Scan(&rootPieceID, &aliasPieces) + if err != nil { + return err + } + // this code assumes that at some point we will limit number of segments per part + deleted = append(deleted, Deleted{ + RootPieceID: rootPieceID, + AliasPieces: aliasPieces, + }) + + } + return nil + }) + if err != nil { + return Error.Wrap(err) + } + + for _, item := range deleted { + deleteInfo := DeletedSegmentInfo{ + RootPieceID: item.RootPieceID, + } + deleteInfo.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, item.AliasPieces) + if err != nil { + return err + } + err = opts.DeletePieces(ctx, deleteInfo) + if err != nil { + return Error.Wrap(err) + } + } + + return nil +} diff --git a/satellite/metabase/delete_part_test.go b/satellite/metabase/delete_part_test.go new file mode 100644 index 000000000..71367b02c --- /dev/null +++ b/satellite/metabase/delete_part_test.go @@ -0,0 +1,215 @@ +// Copyright (C) 2021 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "context" + "math" + "testing" + "time" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite/metabase" + "storj.io/storj/satellite/metabase/metabasetest" +) + +func TestDeletePart(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + defaultDeletePieces := func(ctx context.Context, segment metabase.DeletedSegmentInfo) error { + return nil + } + + t.Run("StreamID missing", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.DeletePart{ + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "StreamID missing", + }.Check(ctx, t, db) + + }) + + t.Run("DeletePieces missing", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: obj.StreamID, + DeletePieces: defaultDeletePieces, + }, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("empty metabase", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: obj.StreamID, + DeletePieces: defaultDeletePieces, + }, + }.Check(ctx, t, db) + metabasetest.Verify{}.Check(ctx, t, db) + }) + + t.Run("no segments", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + object := metabasetest.CreateObject(ctx, t, db, obj, 0) + + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: object.StreamID, + DeletePieces: defaultDeletePieces, + }, + }.Check(ctx, t, db) + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(object), + }, + }.Check(ctx, t, db) + }) + + t.Run("no StreamID", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + object := metabasetest.CreateObject(ctx, t, db, obj, 1) + + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: testrand.UUID(), + DeletePieces: defaultDeletePieces, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(object), + }, + Segments: []metabase.RawSegment{ + metabasetest.DefaultRawSegment(object.ObjectStream, metabase.SegmentPosition{ + Part: 0, Index: 0, + }), + }, + }.Check(ctx, t, db) + }) + + t.Run("success", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + segments := []metabase.SegmentPosition{ + {Part: 0, Index: 0}, + {Part: 1, Index: 0}, + {Part: 1, Index: 10}, + {Part: 1, Index: math.MaxUint32}, + {Part: 2, Index: 0}, + {Part: 50, Index: 0}, + } + for i, segmentPosition := range segments { + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: segmentPosition, + RootPieceID: storj.PieceID{byte(i + 1)}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + + commitDefaultSegment(ctx, t, db, obj, segmentPosition) + } + + // delete non-existing part + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: obj.StreamID, + PartNumber: 100, + }, + }.Check(ctx, t, db) + + // delete only single part + metabasetest.DeletePart{ + Opts: metabase.DeletePart{ + StreamID: obj.StreamID, + PartNumber: 1, + }, + Result: []metabase.DeletedSegmentInfo{ + { + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + }, + { + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + }, + { + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + }, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + CreatedAt: now, + Status: metabase.Pending, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + metabasetest.DefaultRawSegment(obj, metabase.SegmentPosition{ + Part: 0, Index: 0, + }), + metabasetest.DefaultRawSegment(obj, metabase.SegmentPosition{ + Part: 2, Index: 0, + }), + metabasetest.DefaultRawSegment(obj, metabase.SegmentPosition{ + Part: 50, Index: 0, + }), + }, + }.Check(ctx, t, db) + }) + }) +} + +func commitDefaultSegment(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, segmentPosition metabase.SegmentPosition) { + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: segmentPosition, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + }, + }.Check(ctx, t, db) +} diff --git a/satellite/metabase/metabasetest/defaults.go b/satellite/metabase/metabasetest/defaults.go index bc0387b88..78fb45938 100644 --- a/satellite/metabase/metabasetest/defaults.go +++ b/satellite/metabase/metabasetest/defaults.go @@ -3,7 +3,12 @@ package metabasetest -import "storj.io/common/storj" +import ( + "time" + + "storj.io/common/storj" + "storj.io/storj/satellite/metabase" +) // DefaultRedundancy contains default redundancy scheme. var DefaultRedundancy = storj.RedundancyScheme{ @@ -20,3 +25,24 @@ var DefaultEncryption = storj.EncryptionParameters{ CipherSuite: storj.EncAESGCM, BlockSize: 29 * 256, } + +// DefaultRawSegment returns default raw segment. +func DefaultRawSegment(obj metabase.ObjectStream, segmentPosition metabase.SegmentPosition) metabase.RawSegment { + now := time.Now() + return metabase.RawSegment{ + StreamID: obj.StreamID, + Position: segmentPosition, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + CreatedAt: &now, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedETag: []byte{5}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: DefaultRedundancy, + } +} diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 004156d98..b80a572b9 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -660,3 +660,31 @@ func (step ListNodeAliases) Check(ctx *testcontext.Context, t testing.TB, db *me checkError(t, err, step.ErrClass, step.ErrText) return result } + +// DeletePart is for testing metabase.DeletePart. +type DeletePart struct { + Opts metabase.DeletePart + Result []metabase.DeletedSegmentInfo + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step DeletePart) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + result := []metabase.DeletedSegmentInfo{} + step.Opts.DeletePieces = func(ctx context.Context, segment metabase.DeletedSegmentInfo) error { + result = append(result, segment) + return nil + } + + err := db.DeletePart(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + if len(result) == 0 { + result = nil + } + sortDeletedSegments(step.Result) + sortDeletedSegments(result) + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +}