satellite/metabase: flatten copy references when copying
If B is a copy of A, and C is a copy of B, then in the segment_copies table, it should appear that C is a copy of A. Fixes https://github.com/storj/storj/issues/4538 Change-Id: I7e6b03f7584597cf616cd1e0cd0156386771d207
This commit is contained in:
parent
d253b4c033
commit
f6b4d522be
@ -10,7 +10,6 @@ import (
|
||||
"time"
|
||||
|
||||
pgxerrcode "github.com/jackc/pgerrcode"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
@ -168,13 +167,46 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
originalObject, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
|
||||
opts.Version,
|
||||
opts.Location(),
|
||||
})
|
||||
originalObject := Object{}
|
||||
|
||||
var ancestorStreamIDBytes []byte
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
objects.stream_id,
|
||||
expires_at,
|
||||
segment_count,
|
||||
encrypted_metadata,
|
||||
total_plain_size, total_encrypted_size, fixed_segment_size,
|
||||
encryption,
|
||||
segment_copies.ancestor_stream_id
|
||||
FROM objects
|
||||
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4 AND
|
||||
status = `+committedStatus,
|
||||
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
|
||||
Scan(
|
||||
&originalObject.StreamID,
|
||||
&originalObject.ExpiresAt,
|
||||
&originalObject.SegmentCount,
|
||||
&originalObject.EncryptedMetadata,
|
||||
&originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize,
|
||||
encryptionParameters{&originalObject.Encryption},
|
||||
&ancestorStreamIDBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return Object{}, errs.Wrap(err)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
|
||||
}
|
||||
return Object{}, Error.New("unable to query object status: %w", err)
|
||||
}
|
||||
originalObject.BucketName = opts.BucketName
|
||||
originalObject.ProjectID = opts.ProjectID
|
||||
originalObject.Version = opts.Version
|
||||
originalObject.Status = Committed
|
||||
|
||||
if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
|
||||
return Object{}, ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
|
||||
@ -312,20 +344,30 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
|
||||
return Error.New("unable to copy segments: %w", err)
|
||||
}
|
||||
|
||||
var ancestorStreamID uuid.UUID
|
||||
if len(ancestorStreamIDBytes) != 0 {
|
||||
ancestorStreamID, err = uuid.FromBytes(ancestorStreamIDBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
ancestorStreamID = originalObject.StreamID
|
||||
}
|
||||
// TODO : we need flatten references
|
||||
_, err = db.db.ExecContext(ctx, `
|
||||
INSERT INTO segment_copies (
|
||||
stream_id, ancestor_stream_id
|
||||
) VALUES (
|
||||
$1, $2
|
||||
)
|
||||
`, opts.NewStreamID, originalObject.StreamID)
|
||||
INSERT INTO segment_copies (
|
||||
stream_id, ancestor_stream_id
|
||||
) VALUES (
|
||||
$1, $2
|
||||
)
|
||||
`, opts.NewStreamID, ancestorStreamID)
|
||||
if err != nil {
|
||||
return Error.New("unable to copy object: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return Object{}, err
|
||||
}
|
||||
|
@ -277,10 +277,10 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
NewBucket: newBucketName,
|
||||
ObjectStream: obj,
|
||||
NewEncryptedObjectKey: metabasetest.RandObjectKey(),
|
||||
NewStreamID: newStreamID,
|
||||
|
||||
OverrideMetadata: true,
|
||||
NewEncryptedMetadata: testrand.BytesInt(256),
|
||||
NewStreamID: newStreamID,
|
||||
},
|
||||
ErrClass: &metabase.ErrInvalidRequest,
|
||||
ErrText: "EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be set if EncryptedMetadata is set",
|
||||
@ -466,8 +466,9 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
numberOfSegments := 10
|
||||
copyStream := metabasetest.RandObjectStream()
|
||||
|
||||
originalObj, originalSegments := metabasetest.CreateTestObject{
|
||||
originalObj, _ := metabasetest.CreateTestObject{
|
||||
CommitObject: &metabase.CommitObject{
|
||||
ObjectStream: obj,
|
||||
EncryptedMetadata: testrand.Bytes(64),
|
||||
@ -476,21 +477,51 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, obj, byte(numberOfSegments))
|
||||
|
||||
copyObj, newSegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
copyObj, expectedOriginalSegments, expectedCopySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©Stream,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
var expectedRawSegments []metabase.RawSegment
|
||||
expectedRawSegments = append(expectedRawSegments, expectedOriginalSegments...)
|
||||
expectedRawSegments = append(expectedRawSegments, expectedCopySegments...)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
metabase.RawObject(copyObj),
|
||||
},
|
||||
Segments: append(metabasetest.SegmentsToRaw(originalSegments), newSegments...),
|
||||
Segments: expectedRawSegments,
|
||||
Copies: []metabase.RawCopy{{
|
||||
StreamID: copyObj.StreamID,
|
||||
AncestorStreamID: originalObj.StreamID,
|
||||
}},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// TODO find better names
|
||||
copyOfCopyStream := metabasetest.RandObjectStream()
|
||||
copyOfCopyObj, _, expectedCopyOfCopySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: copyObj,
|
||||
CopyObjectStream: ©OfCopyStream,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
expectedRawSegments = append(expectedRawSegments, expectedCopyOfCopySegments...)
|
||||
|
||||
metabasetest.Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
metabase.RawObject(originalObj),
|
||||
metabase.RawObject(copyObj),
|
||||
metabase.RawObject(copyOfCopyObj),
|
||||
},
|
||||
Segments: expectedRawSegments,
|
||||
Copies: []metabase.RawCopy{{
|
||||
StreamID: copyStream.StreamID,
|
||||
AncestorStreamID: originalObj.StreamID,
|
||||
}, {
|
||||
StreamID: copyOfCopyObj.StreamID,
|
||||
AncestorStreamID: originalObj.StreamID,
|
||||
}},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("finish copy object with new metadata", func(t *testing.T) {
|
||||
@ -519,7 +550,7 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
// do a copy without OverrideMetadata field set to true,
|
||||
// metadata shouldn't be updated even if NewEncryptedMetadata
|
||||
// field is set
|
||||
copyObjNoOverride, _ := metabasetest.CreateObjectCopy{
|
||||
copyObjNoOverride, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©StreamNoOverride,
|
||||
FinishObject: &metabase.FinishCopyObject{
|
||||
@ -543,7 +574,7 @@ func TestFinishCopyObject(t *testing.T) {
|
||||
|
||||
// do a copy WITH OverrideMetadata field set to true,
|
||||
// metadata should be updated to NewEncryptedMetadata
|
||||
copyObj, _ := metabasetest.CreateObjectCopy{
|
||||
copyObj, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
CopyObjectStream: ©Stream,
|
||||
FinishObject: &metabase.FinishCopyObject{
|
||||
|
@ -963,7 +963,7 @@ func TestDeleteCopy(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjStream, byte(numberOfSegments))
|
||||
|
||||
copyObj, copySegments := metabasetest.CreateObjectCopy{
|
||||
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
@ -1011,10 +1011,10 @@ func TestDeleteCopy(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject1, _ := metabasetest.CreateObjectCopy{
|
||||
copyObject1, _, _ := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
copyObject2, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
@ -1054,7 +1054,7 @@ func TestDeleteCopy(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject, copySegments := metabasetest.CreateObjectCopy{
|
||||
copyObject, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
@ -1087,10 +1087,10 @@ func TestDeleteCopy(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
|
||||
|
||||
copyObject1, copySegments1 := metabasetest.CreateObjectCopy{
|
||||
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
copyObject2, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
|
@ -900,7 +900,7 @@ func TestGetLatestObjectLastSegment(t *testing.T) {
|
||||
},
|
||||
}.Run(ctx, t, db, objStream, 1)
|
||||
|
||||
copyObj, newSegments := metabasetest.CreateObjectCopy{
|
||||
copyObj, _, newSegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObj,
|
||||
}.Run(ctx, t, db)
|
||||
|
||||
|
@ -279,7 +279,7 @@ func TestListSegments(t *testing.T) {
|
||||
Run(ctx, t, db, originalObjectStream, numberOfSegments)
|
||||
|
||||
copyStream := metabasetest.RandObjectStream()
|
||||
_, copySegments := metabasetest.CreateObjectCopy{
|
||||
_, _, copySegments := metabasetest.CreateObjectCopy{
|
||||
OriginalObject: originalObject,
|
||||
CopyObjectStream: ©Stream,
|
||||
}.Run(ctx, t, db)
|
||||
|
@ -310,7 +310,7 @@ type CreateObjectCopy struct {
|
||||
}
|
||||
|
||||
// Run creates the copy.
|
||||
func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB) (metabase.Object, []metabase.RawSegment) {
|
||||
func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metabase.DB) (copyObj metabase.Object, expectedOriginalSegments []metabase.RawSegment, expectedCopySegments []metabase.RawSegment) {
|
||||
|
||||
var copyStream metabase.ObjectStream
|
||||
if cc.CopyObjectStream != nil {
|
||||
@ -320,7 +320,8 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
}
|
||||
|
||||
newEncryptedKeysNonces := make([]metabase.EncryptedKeyAndNonce, cc.OriginalObject.SegmentCount)
|
||||
newSegments := make([]metabase.RawSegment, cc.OriginalObject.SegmentCount)
|
||||
expectedOriginalSegments = make([]metabase.RawSegment, cc.OriginalObject.SegmentCount)
|
||||
expectedCopySegments = make([]metabase.RawSegment, cc.OriginalObject.SegmentCount)
|
||||
expectedEncryptedSize := 1060
|
||||
|
||||
for i := 0; i < int(cc.OriginalObject.SegmentCount); i++ {
|
||||
@ -330,36 +331,29 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
EncryptedKey: testrand.Bytes(32),
|
||||
}
|
||||
|
||||
var originalSegment metabase.RawSegment
|
||||
if len(cc.OriginalSegments) == 0 {
|
||||
originalSegment = DefaultRawSegment(cc.OriginalObject.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||
// TODO: place this calculation in metabasetest.
|
||||
originalSegment.PlainOffset = int64(i) * int64(originalSegment.PlainSize)
|
||||
// TODO: we should use the same value for encrypted size in both test methods.
|
||||
originalSegment.EncryptedSize = int32(expectedEncryptedSize)
|
||||
expectedOriginalSegments[i] = DefaultRawSegment(cc.OriginalObject.ObjectStream, metabase.SegmentPosition{Index: uint32(i)})
|
||||
|
||||
// TODO: place this calculation in metabasetest.
|
||||
expectedOriginalSegments[i].PlainOffset = int64(int32(i) * expectedOriginalSegments[i].PlainSize)
|
||||
// TODO: we should use the same value for encrypted size in both test methods.
|
||||
expectedOriginalSegments[i].EncryptedSize = int32(expectedEncryptedSize)
|
||||
|
||||
expectedCopySegments[i] = metabase.RawSegment{}
|
||||
expectedCopySegments[i].StreamID = copyStream.StreamID
|
||||
expectedCopySegments[i].EncryptedKeyNonce = newEncryptedKeysNonces[i].EncryptedKeyNonce
|
||||
expectedCopySegments[i].EncryptedKey = newEncryptedKeysNonces[i].EncryptedKey
|
||||
expectedCopySegments[i].EncryptedSize = expectedOriginalSegments[i].EncryptedSize
|
||||
expectedCopySegments[i].Position = expectedOriginalSegments[i].Position
|
||||
expectedCopySegments[i].RootPieceID = expectedOriginalSegments[i].RootPieceID
|
||||
expectedCopySegments[i].Redundancy = expectedOriginalSegments[i].Redundancy
|
||||
expectedCopySegments[i].PlainSize = expectedOriginalSegments[i].PlainSize
|
||||
expectedCopySegments[i].PlainOffset = expectedOriginalSegments[i].PlainOffset
|
||||
expectedCopySegments[i].CreatedAt = time.Now().UTC()
|
||||
if len(expectedOriginalSegments[i].InlineData) > 0 {
|
||||
expectedCopySegments[i].InlineData = expectedOriginalSegments[i].InlineData
|
||||
} else {
|
||||
originalSegment = metabase.RawSegment(cc.OriginalSegments[i])
|
||||
expectedCopySegments[i].InlineData = []byte{}
|
||||
}
|
||||
|
||||
newSegmentInlineData := originalSegment.InlineData
|
||||
if newSegmentInlineData == nil {
|
||||
newSegmentInlineData = []uint8{}
|
||||
}
|
||||
newSegment := metabase.RawSegment{
|
||||
StreamID: copyStream.StreamID,
|
||||
EncryptedKeyNonce: newEncryptedKeysNonces[i].EncryptedKeyNonce,
|
||||
EncryptedKey: newEncryptedKeysNonces[i].EncryptedKey,
|
||||
EncryptedSize: originalSegment.EncryptedSize,
|
||||
Position: originalSegment.Position,
|
||||
RootPieceID: originalSegment.RootPieceID,
|
||||
Redundancy: originalSegment.Redundancy,
|
||||
PlainSize: originalSegment.PlainSize,
|
||||
PlainOffset: originalSegment.PlainOffset,
|
||||
InlineData: newSegmentInlineData,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
newSegments[i] = newSegment
|
||||
}
|
||||
|
||||
opts := cc.FinishObject
|
||||
@ -378,7 +372,7 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
|
||||
copyObj, err := db.FinishCopyObject(ctx, *opts)
|
||||
require.NoError(t, err)
|
||||
|
||||
return copyObj, newSegments
|
||||
return copyObj, expectedOriginalSegments, expectedCopySegments
|
||||
}
|
||||
|
||||
// SegmentsToRaw converts a slice of Segment to a slice of RawSegment.
|
||||
|
Loading…
Reference in New Issue
Block a user