satellite/metabase: server-side copy copies metadata
..instead of using segment_copies and ancestor_stream_id, etc. This bypasses reference counting entirely, depending on our mark+sweep+ bloomfilter garbage collection strategy to get rid of pieces once they are no longer part of a segment. This is only safe to do after we have stopped passing delete requests on to storage nodes. Refs: https://github.com/storj/storj/issues/5889 Change-Id: I37bdcffaa752f84fd85045235d6875b3526b5ecc
This commit is contained in:
parent
ddf1f1c340
commit
a4d68b9b7e
@ -22,6 +22,7 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/gc/bloomfilter"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/rangedloop"
|
||||
@ -299,6 +300,157 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestGarbageCollectionWithCopies checks that server-side copy elements are not
|
||||
// affecting GC and nothing unexpected was deleted from storage nodes.
|
||||
func TestGarbageCollectionWithCopiesWithDuplicateMetadata(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
Reconfigure: testplanet.Reconfigure{
|
||||
Satellite: testplanet.Combine(
|
||||
testplanet.ReconfigureRS(2, 3, 4, 4),
|
||||
func(log *zap.Logger, index int, config *satellite.Config) {
|
||||
config.Metainfo.ServerSideCopyDuplicateMetadata = true
|
||||
},
|
||||
),
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
|
||||
access := planet.Uplinks[0].Access[planet.Satellites[0].NodeURL().ID]
|
||||
accessString, err := access.Serialize()
|
||||
require.NoError(t, err)
|
||||
|
||||
gcsender := planet.Satellites[0].GarbageCollection.Sender
|
||||
gcsender.Config.AccessGrant = accessString
|
||||
|
||||
// configure filter uploader
|
||||
config := planet.Satellites[0].Config.GarbageCollectionBF
|
||||
config.AccessGrant = accessString
|
||||
|
||||
project, err := planet.Uplinks[0].OpenProject(ctx, satellite)
|
||||
require.NoError(t, err)
|
||||
defer ctx.Check(project.Close)
|
||||
|
||||
allSpaceUsedForPieces := func() (all int64) {
|
||||
for _, node := range planet.StorageNodes {
|
||||
_, piecesContent, _, err := node.Storage2.Store.SpaceUsedTotalAndBySatellite(ctx)
|
||||
require.NoError(t, err)
|
||||
all += piecesContent
|
||||
}
|
||||
return all
|
||||
}
|
||||
|
||||
expectedRemoteData := testrand.Bytes(8 * memory.KiB)
|
||||
expectedInlineData := testrand.Bytes(1 * memory.KiB)
|
||||
|
||||
encryptedSize, err := encryption.CalcEncryptedSize(int64(len(expectedRemoteData)), storj.EncryptionParameters{
|
||||
CipherSuite: storj.EncAESGCM,
|
||||
BlockSize: 29 * 256 * memory.B.Int32(), // hardcoded value from uplink
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
redundancyStrategy, err := planet.Satellites[0].Config.Metainfo.RS.RedundancyStrategy()
|
||||
require.NoError(t, err)
|
||||
|
||||
pieceSize := eestream.CalcPieceSize(encryptedSize, redundancyStrategy.ErasureScheme)
|
||||
singleRemoteUsed := pieceSize * int64(len(planet.StorageNodes))
|
||||
totalUsedByNodes := 2 * singleRemoteUsed // two remote objects
|
||||
|
||||
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "remote", expectedRemoteData))
|
||||
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "inline", expectedInlineData))
|
||||
require.NoError(t, planet.Uplinks[0].Upload(ctx, satellite, "testbucket", "remote-no-copy", expectedRemoteData))
|
||||
|
||||
_, err = project.CopyObject(ctx, "testbucket", "remote", "testbucket", "remote-copy", nil)
|
||||
require.NoError(t, err)
|
||||
_, err = project.CopyObject(ctx, "testbucket", "inline", "testbucket", "inline-copy", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||
|
||||
afterTotalUsedByNodes := allSpaceUsedForPieces()
|
||||
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||
|
||||
// Wait for bloom filter observer to finish
|
||||
rangedloopConfig := planet.Satellites[0].Config.RangedLoop
|
||||
|
||||
observer := bloomfilter.NewObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB)
|
||||
segments := rangedloop.NewMetabaseRangeSplitter(planet.Satellites[0].Metabase.DB, rangedloopConfig.AsOfSystemInterval, rangedloopConfig.BatchSize)
|
||||
rangedLoop := rangedloop.NewService(zap.NewNop(), planet.Satellites[0].Config.RangedLoop, segments,
|
||||
[]rangedloop.Observer{observer})
|
||||
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
err = gcsender.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
node.Storage2.RetainService.TestWaitUntilEmpty()
|
||||
}
|
||||
|
||||
// we should see all space used by all objects
|
||||
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
||||
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||
|
||||
for _, toDelete := range []string{
|
||||
// delete ancestors, no change in used space
|
||||
"remote",
|
||||
"inline",
|
||||
// delete object without copy, used space should be decreased
|
||||
"remote-no-copy",
|
||||
} {
|
||||
_, err = project.DeleteObject(ctx, "testbucket", toDelete)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// run GC
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
err = gcsender.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
node.Storage2.RetainService.TestWaitUntilEmpty()
|
||||
}
|
||||
|
||||
// verify that we deleted only pieces for "remote-no-copy" object
|
||||
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
||||
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||
|
||||
// delete rest of objects to verify that everything will be removed also from SNs
|
||||
for _, toDelete := range []string{
|
||||
"remote-copy",
|
||||
"inline-copy",
|
||||
} {
|
||||
_, err = project.DeleteObject(ctx, "testbucket", toDelete)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
planet.WaitForStorageNodeDeleters(ctx)
|
||||
|
||||
// run GC
|
||||
_, err = rangedLoop.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// send to storagenode
|
||||
err = gcsender.RunOnce(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
node.Storage2.RetainService.TestWaitUntilEmpty()
|
||||
}
|
||||
|
||||
// verify that nothing more was deleted from storage nodes after GC
|
||||
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
||||
require.EqualValues(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||
})
|
||||
}
|
||||
|
||||
func getSegment(ctx *testcontext.Context, t *testing.T, satellite *testplanet.Satellite, upl *testplanet.Uplink, bucket, path string) (_ metabase.ObjectLocation, _ metabase.Segment) {
|
||||
access := upl.Access[satellite.ID()]
|
||||
|
||||
|
@ -52,6 +52,10 @@ type FinishCopyObject struct {
|
||||
|
||||
NewSegmentKeys []EncryptedKeyAndNonce
|
||||
|
||||
// If set, copy the object by duplicating the metadata and
|
||||
// remote_alias_pieces list, rather than using segment_copies.
|
||||
DuplicateMetadata bool
|
||||
|
||||
// VerifyLimits holds a callback by which the caller can interrupt the copy
|
||||
// if it turns out completing the copy would exceed a limit.
|
||||
// It will be called only once.
|
||||
@ -147,47 +151,96 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
plainSizes := make([]int32, sourceObject.SegmentCount)
|
||||
plainOffsets := make([]int64, sourceObject.SegmentCount)
|
||||
inlineDatas := make([][]byte, sourceObject.SegmentCount)
|
||||
placementConstraints := make([]storj.PlacementConstraint, sourceObject.SegmentCount)
|
||||
remoteAliasPiecesLists := make([][]byte, sourceObject.SegmentCount)
|
||||
|
||||
redundancySchemes := make([]int64, sourceObject.SegmentCount)
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position,
|
||||
expires_at,
|
||||
root_piece_id,
|
||||
encrypted_size, plain_offset, plain_size,
|
||||
redundancy,
|
||||
inline_data
|
||||
FROM segments
|
||||
WHERE stream_id = $1
|
||||
ORDER BY position ASC
|
||||
LIMIT $2
|
||||
|
||||
if opts.DuplicateMetadata {
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position,
|
||||
expires_at,
|
||||
root_piece_id,
|
||||
encrypted_size, plain_offset, plain_size,
|
||||
redundancy,
|
||||
remote_alias_pieces,
|
||||
placement,
|
||||
inline_data
|
||||
FROM segments
|
||||
WHERE stream_id = $1
|
||||
ORDER BY position ASC
|
||||
LIMIT $2
|
||||
`, sourceObject.StreamID, sourceObject.SegmentCount))(func(rows tagsql.Rows) error {
|
||||
index := 0
|
||||
for rows.Next() {
|
||||
err := rows.Scan(
|
||||
&positions[index],
|
||||
&expiresAts[index],
|
||||
&rootPieceIDs[index],
|
||||
&encryptedSizes[index], &plainOffsets[index], &plainSizes[index],
|
||||
&redundancySchemes[index],
|
||||
&inlineDatas[index],
|
||||
)
|
||||
if err != nil {
|
||||
index := 0
|
||||
for rows.Next() {
|
||||
err := rows.Scan(
|
||||
&positions[index],
|
||||
&expiresAts[index],
|
||||
&rootPieceIDs[index],
|
||||
&encryptedSizes[index], &plainOffsets[index], &plainSizes[index],
|
||||
&redundancySchemes[index],
|
||||
&remoteAliasPiecesLists[index],
|
||||
&placementConstraints[index],
|
||||
&inlineDatas[index],
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index++
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
index++
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if index != int(sourceObject.SegmentCount) {
|
||||
return Error.New("could not load all of the segment information")
|
||||
}
|
||||
|
||||
if index != int(sourceObject.SegmentCount) {
|
||||
return Error.New("could not load all of the segment information")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position,
|
||||
expires_at,
|
||||
root_piece_id,
|
||||
encrypted_size, plain_offset, plain_size,
|
||||
redundancy,
|
||||
inline_data
|
||||
FROM segments
|
||||
WHERE stream_id = $1
|
||||
ORDER BY position ASC
|
||||
LIMIT $2
|
||||
`, sourceObject.StreamID, sourceObject.SegmentCount))(func(rows tagsql.Rows) error {
|
||||
index := 0
|
||||
for rows.Next() {
|
||||
err := rows.Scan(
|
||||
&positions[index],
|
||||
&expiresAts[index],
|
||||
&rootPieceIDs[index],
|
||||
&encryptedSizes[index], &plainOffsets[index], &plainSizes[index],
|
||||
&redundancySchemes[index],
|
||||
&inlineDatas[index],
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index++
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if index != int(sourceObject.SegmentCount) {
|
||||
return Error.New("could not load all of the segment information")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
@ -275,6 +328,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
root_piece_id,
|
||||
redundancy,
|
||||
encrypted_size, plain_offset, plain_size,
|
||||
remote_alias_pieces, placement,
|
||||
inline_data
|
||||
) SELECT
|
||||
$1, UNNEST($2::INT8[]), UNNEST($3::timestamptz[]),
|
||||
@ -282,12 +336,14 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
UNNEST($6::BYTEA[]),
|
||||
UNNEST($7::INT8[]),
|
||||
UNNEST($8::INT4[]), UNNEST($9::INT8[]), UNNEST($10::INT4[]),
|
||||
UNNEST($11::BYTEA[])
|
||||
UNNEST($11::BYTEA[]), UNNEST($12::INT2[]),
|
||||
UNNEST($13::BYTEA[])
|
||||
`, opts.NewStreamID, pgutil.Int8Array(newSegments.Positions), pgutil.NullTimestampTZArray(expiresAts),
|
||||
pgutil.ByteaArray(newSegments.EncryptedKeyNonces), pgutil.ByteaArray(newSegments.EncryptedKeys),
|
||||
pgutil.ByteaArray(rootPieceIDs),
|
||||
pgutil.Int8Array(redundancySchemes),
|
||||
pgutil.Int4Array(encryptedSizes), pgutil.Int8Array(plainOffsets), pgutil.Int4Array(plainSizes),
|
||||
pgutil.ByteaArray(remoteAliasPiecesLists), pgutil.PlacementConstraintArray(placementConstraints),
|
||||
pgutil.ByteaArray(inlineDatas),
|
||||
)
|
||||
if err != nil {
|
||||
@ -298,15 +354,17 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO segment_copies (
|
||||
stream_id, ancestor_stream_id
|
||||
) VALUES (
|
||||
$1, $2
|
||||
)
|
||||
`, opts.NewStreamID, ancestorStreamID)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
if !opts.DuplicateMetadata {
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO segment_copies (
|
||||
stream_id, ancestor_stream_id
|
||||
) VALUES (
|
||||
$1, $2
|
||||
)
|
||||
`, opts.NewStreamID, ancestorStreamID)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -321,7 +321,7 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjectStream,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: metabase.BucketLocation{
|
||||
@ -362,7 +362,7 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©ObjectStream,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: metabase.BucketLocation{
|
||||
@ -420,12 +420,78 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj1,
|
||||
CopyObjectStream: ©ObjectStream1,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
copyObj2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj2,
|
||||
CopyObjectStream: ©ObjectStream2,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
// done preparing, delete bucket 1
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
Bucket: metabase.BucketLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: "bucket2",
|
||||
},
|
||||
BatchSize: 2,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Prepare for check.
|
||||
// obj1 is the same as before, copyObj2 should now be the original
|
||||
for i := range copySegments2 {
|
||||
copySegments2[i].Pieces = originalSegments2[i].Pieces
|
||||
}
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj1),
|
||||
metabase.RawObject(copyObj2),
|
||||
},
|
||||
Segments: append(copySegments2, metabasetest.SegmentsToRaw(originalSegments1)...),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete bucket which has one ancestor and one copy with duplicate metadata", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
originalObjStream1 := metabasetest.RandObjectStream()
|
||||
originalObjStream1.BucketName = "bucket1"
|
||||
|
||||
projectID := originalObjStream1.ProjectID
|
||||
|
||||
originalObjStream2 := metabasetest.RandObjectStream()
|
||||
originalObjStream2.ProjectID = projectID
|
||||
originalObjStream2.BucketName = "bucket2"
|
||||
|
||||
originalObj1, originalSegments1 := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjStream1,
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream1, byte(numberOfSegments))
|
||||
|
||||
originalObj2, originalSegments2 := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjStream2,
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream2, byte(numberOfSegments))
|
||||
|
||||
copyObjectStream1 := metabasetest.RandObjectStream()
|
||||
copyObjectStream1.ProjectID = projectID
|
||||
copyObjectStream1.BucketName = "bucket2" // copy from bucket 1 to bucket 2
|
||||
|
||||
copyObjectStream2 := metabasetest.RandObjectStream()
|
||||
copyObjectStream2.ProjectID = projectID
|
||||
copyObjectStream2.BucketName = "bucket1" // copy from bucket 2 to bucket 1
|
||||
|
||||
metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj1,
|
||||
CopyObjectStream: ©ObjectStream1,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
copyObj2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj2,
|
||||
CopyObjectStream: ©ObjectStream2,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
// done preparing, delete bucket 1
|
||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||
|
@ -987,7 +987,7 @@ func TestDeleteCopy(t *testing.T) {
|
||||
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
var copies []metabase.RawCopy
|
||||
if numberOfSegments > 0 {
|
||||
@ -1042,10 +1042,10 @@ func TestDeleteCopy(t *testing.T) {
|
||||
|
||||
copyObject1, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
@ -1092,7 +1092,7 @@ func TestDeleteCopy(t *testing.T) {
|
||||
|
||||
copyObject, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
@ -1134,10 +1134,10 @@ func TestDeleteCopy(t *testing.T) {
|
||||
|
||||
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
Version: originalObj.Version,
|
||||
@ -1206,6 +1206,201 @@ func TestDeleteCopy(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
for _, numberOfSegments := range []int{0, 1, 3} {
|
||||
t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) {
|
||||
t.Run("delete copy", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
originalObjStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream, byte(numberOfSegments))
|
||||
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
// check that copy went OK
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
metabase.RawObject(copyObj),
|
||||
},
|
||||
Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments...),
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: copyObj.Location(),
|
||||
Version: copyObj.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{copyObj},
|
||||
Segments: rawSegmentsToDeletedSegmentInfo(copySegments),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// Verify that we are back at the original single object
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
},
|
||||
Segments: metabasetest.SegmentsToRaw(originalSegments),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete one of two copies", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
originalObjectStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjectStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: copyObject1.Location(),
|
||||
Version: copyObject1.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{copyObject1},
|
||||
Segments: rawSegmentsToDeletedSegmentInfo(copySegments1),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// Verify that only one of the copies is deleted
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
metabase.RawObject(copyObject2),
|
||||
},
|
||||
Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments2...),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete original", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
originalObjectStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjectStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
ObjectLocation: originalObj.Location(),
|
||||
Version: originalObj.Version,
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{originalObj},
|
||||
Segments: rawSegmentsToDeletedSegmentInfo(copySegments),
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
for i := range copySegments {
|
||||
copySegments[i].Pieces = originalSegments[i].Pieces
|
||||
}
|
||||
|
||||
// verify that the copy is left
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(copyObject),
|
||||
},
|
||||
Segments: copySegments,
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete original and leave two copies", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
originalObjectStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: originalObjectStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
|
||||
Version: originalObj.Version,
|
||||
ObjectLocation: originalObj.Location(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
var expectedAncestorStreamID uuid.UUID
|
||||
|
||||
if numberOfSegments > 0 {
|
||||
segments, err := db.TestingAllSegments(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, segments)
|
||||
|
||||
if segments[0].StreamID == copyObject1.StreamID {
|
||||
expectedAncestorStreamID = copyObject1.StreamID
|
||||
} else {
|
||||
expectedAncestorStreamID = copyObject2.StreamID
|
||||
}
|
||||
}
|
||||
|
||||
// set pieces in expected ancestor for verifcation
|
||||
for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} {
|
||||
for i := range segments {
|
||||
if segments[i].StreamID == expectedAncestorStreamID {
|
||||
segments[i].Pieces = originalSegments[i].Pieces
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// verify that two functioning copies are left and the original object is gone
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(copyObject1),
|
||||
metabase.RawObject(copyObject2),
|
||||
},
|
||||
Segments: append(copySegments1, copySegments2...),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteObjectLastCommitted(t *testing.T) {
|
||||
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := metabasetest.RandObjectStream()
|
||||
@ -1402,3 +1597,12 @@ func TestDeleteObjectLastCommitted(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func rawSegmentsToDeletedSegmentInfo(segments []metabase.RawSegment) []metabase.DeletedSegmentInfo {
|
||||
result := make([]metabase.DeletedSegmentInfo, len(segments))
|
||||
for i := range segments {
|
||||
result[i].RootPieceID = segments[i].RootPieceID
|
||||
result[i].Pieces = segments[i].Pieces
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
@ -345,7 +345,53 @@ func TestGetObjectLastCommitted(t *testing.T) {
|
||||
copiedObj, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©ObjStream,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
Version: 1,
|
||||
ObjectLocation: obj.Location(),
|
||||
},
|
||||
Result: metabase.DeleteObjectResult{
|
||||
Objects: []metabase.Object{originalObject},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.GetObjectLastCommitted{
|
||||
Opts: metabase.GetObjectLastCommitted{
|
||||
ObjectLocation: copiedObj.Location(),
|
||||
},
|
||||
Result: copiedObj,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: copiedObj.ProjectID,
|
||||
BucketName: copiedObj.BucketName,
|
||||
ObjectKey: copiedObj.ObjectKey,
|
||||
Version: copiedObj.Version,
|
||||
StreamID: copiedObj.StreamID,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
Encryption: metabasetest.DefaultEncryption,
|
||||
EncryptedMetadata: copiedObj.EncryptedMetadata,
|
||||
EncryptedMetadataNonce: copiedObj.EncryptedMetadataNonce,
|
||||
EncryptedMetadataEncryptedKey: copiedObj.EncryptedMetadataEncryptedKey,
|
||||
},
|
||||
}}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get latest copied object version with duplicate metadata", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
copyObjStream := metabasetest.RandObjectStream()
|
||||
originalObject := metabasetest.CreateObject(ctx, t, db, obj, 0)
|
||||
|
||||
copiedObj, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©ObjStream,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
metabasetest.DeleteObjectExactVersion{
|
||||
Opts: metabase.DeleteObjectExactVersion{
|
||||
@ -1114,7 +1160,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
|
||||
|
||||
copyObj, _, newSegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
metabasetest.GetLatestObjectLastSegment{
|
||||
Opts: metabase.GetLatestObjectLastSegment{
|
||||
@ -1150,6 +1196,54 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get segment copy with duplicate metadata", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
objStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: objStream,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
|
||||
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
|
||||
},
|
||||
}.Run(ctx, t, db, objStream, 1)
|
||||
|
||||
copyObj, _, newSegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
metabasetest.GetLatestObjectLastSegment{
|
||||
Opts: metabase.GetLatestObjectLastSegment{
|
||||
ObjectLocation: originalObj.Location(),
|
||||
},
|
||||
Result: originalSegments[0],
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
copySegmentGet := originalSegments[0]
|
||||
copySegmentGet.StreamID = copyObj.StreamID
|
||||
copySegmentGet.EncryptedETag = nil
|
||||
copySegmentGet.InlineData = []byte{}
|
||||
copySegmentGet.EncryptedKey = newSegments[0].EncryptedKey
|
||||
copySegmentGet.EncryptedKeyNonce = newSegments[0].EncryptedKeyNonce
|
||||
|
||||
metabasetest.GetLatestObjectLastSegment{
|
||||
Opts: metabase.GetLatestObjectLastSegment{
|
||||
ObjectLocation: copyObj.Location(),
|
||||
},
|
||||
Result: copySegmentGet,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
metabase.RawObject(copyObj),
|
||||
},
|
||||
Segments: append(metabasetest.SegmentsToRaw(originalSegments), newSegments...),
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("Get empty inline segment copy", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
|
@ -282,7 +282,51 @@ func TestListSegments(t *testing.T) {
|
||||
_, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©Stream,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
expectedSegments := []metabase.Segment{}
|
||||
for _, segment := range copySegments {
|
||||
expectedSegments = append(expectedSegments, metabase.Segment(segment))
|
||||
}
|
||||
|
||||
metabasetest.ListSegments{
|
||||
Opts: metabase.ListSegments{
|
||||
StreamID: copyStream.StreamID,
|
||||
},
|
||||
Result: metabase.ListSegmentsResult{
|
||||
Segments: expectedSegments,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
if numberOfSegments > 0 {
|
||||
expectedSegments[0].Pieces = originalSegments[0].Pieces
|
||||
}
|
||||
|
||||
metabasetest.ListSegments{
|
||||
Opts: metabase.ListSegments{
|
||||
StreamID: copyStream.StreamID,
|
||||
UpdateFirstWithAncestor: true,
|
||||
},
|
||||
Result: metabase.ListSegmentsResult{
|
||||
Segments: expectedSegments,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("segments from copy with duplicate metadata", func(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
for _, numberOfSegments := range []byte{0, 1, 2, 10} {
|
||||
originalObjectStream := metabasetest.RandObjectStream()
|
||||
originalObject, originalSegments := metabasetest.CreateTestObject{}.
|
||||
Run(ctx, t, db, originalObjectStream, numberOfSegments)
|
||||
|
||||
copyStream := metabasetest.RandObjectStream()
|
||||
_, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©Stream,
|
||||
}.Run(ctx, t, db, true)
|
||||
|
||||
expectedSegments := []metabase.Segment{}
|
||||
for _, segment := range copySegments {
|
||||
|
@ -321,7 +321,10 @@ type CreateObjectCopy struct {
|
||||
}
|
||||
|
||||
// Run creates the copy.
|
||||
func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB) (copyObj metabase.Object, expectedOriginalSegments []metabase.RawSegment, expectedCopySegments []metabase.RawSegment) {
|
||||
//
|
||||
// The duplicateMetadata argument is a hack and it will be great to get rid of it once
|
||||
// duplicateMetadata is no longer an option.
|
||||
func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB, duplicateMetadata bool) (copyObj metabase.Object, expectedOriginalSegments []metabase.RawSegment, expectedCopySegments []metabase.RawSegment) {
|
||||
var copyStream metabase.ObjectStream
|
||||
if cc.CopyObjectStream != nil {
|
||||
copyStream = *cc.CopyObjectStream
|
||||
@ -360,6 +363,11 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
} else {
|
||||
expectedCopySegments[i].InlineData = []byte{}
|
||||
}
|
||||
|
||||
if duplicateMetadata {
|
||||
expectedCopySegments[i].Pieces = make(metabase.Pieces, len(expectedOriginalSegments[i].Pieces))
|
||||
copy(expectedCopySegments[i].Pieces, expectedOriginalSegments[i].Pieces)
|
||||
}
|
||||
}
|
||||
|
||||
opts := cc.FinishObject
|
||||
@ -374,6 +382,7 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
NewEncryptedMetadataKey: testrand.Bytes(32),
|
||||
}
|
||||
}
|
||||
opts.DuplicateMetadata = duplicateMetadata
|
||||
|
||||
copyObj, err := db.FinishCopyObject(ctx, *opts)
|
||||
require.NoError(t, err)
|
||||
|
@ -193,7 +193,7 @@ func TestGetStreamPieceCountByNodeID(t *testing.T) {
|
||||
_, _, _ = metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©Stream,
|
||||
}.Run(ctx, t, db)
|
||||
}.Run(ctx, t, db, false)
|
||||
|
||||
metabasetest.GetStreamPieceCountByNodeID{
|
||||
Opts: metabase.GetStreamPieceCountByNodeID{
|
||||
|
@ -141,9 +141,12 @@ type Config struct {
|
||||
RateLimiter RateLimiterConfig `help:"rate limiter configuration"`
|
||||
UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"`
|
||||
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
|
||||
|
||||
// TODO remove this flag when server-side copy implementation will be finished
|
||||
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
|
||||
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
|
||||
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
|
||||
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`
|
||||
ServerSideCopyDuplicateMetadata bool `help:"perform server-side copy by duplicating metadata, instead of using segment_copies" default:"false"`
|
||||
|
||||
// TODO remove when we benchmarking are done and decision is made.
|
||||
TestListingQuery bool `default:"false" help:"test the new query for non-recursive listing"`
|
||||
}
|
||||
|
@ -1995,6 +1995,7 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
|
||||
NewEncryptedMetadata: req.NewEncryptedMetadata,
|
||||
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce,
|
||||
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
|
||||
DuplicateMetadata: endpoint.config.ServerSideCopyDuplicateMetadata,
|
||||
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
|
||||
return endpoint.addStorageUsageUpToLimit(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
|
||||
},
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -673,6 +673,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy
|
||||
# metainfo.server-side-copy-disabled: false
|
||||
|
||||
# perform server-side copy by duplicating metadata, instead of using segment_copies
|
||||
# metainfo.server-side-copy-duplicate-metadata: false
|
||||
|
||||
# test the new query for non-recursive listing
|
||||
# metainfo.test-listing-query: false
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user