satellite/metabase: copy inside transaction

Read the source object and write the destination object in the same
transaction, to prevent breaking the object because it was deleted
simultaneously.

This is probably the root cause of the metainfo loop halting from
2022-06-21 onwards, where 2 objects lost their root_piece_id during
copying.

Part of https://github.com/storj/storj/issues/4930

Change-Id: I9c45d56a7bfb48ecd5f4906ee1cca42922901e90
This commit is contained in:
Erik van Velzen 2022-06-24 01:15:42 +02:00
parent e3ac0ae2c2
commit 77fea6137f

View File

@ -162,76 +162,79 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
}
originalObject := Object{}
copyObject := Object{}
var copyMetadata []byte
var ancestorStreamIDBytes []byte
err = db.db.QueryRowContext(ctx, `
SELECT
objects.stream_id,
expires_at,
segment_count,
encrypted_metadata,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
segment_copies.ancestor_stream_id
FROM objects
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
status = `+committedStatus,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(
&originalObject.StreamID,
&originalObject.ExpiresAt,
&originalObject.SegmentCount,
&originalObject.EncryptedMetadata,
&originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize,
encryptionParameters{&originalObject.Encryption},
&ancestorStreamIDBytes,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
err = tx.QueryRowContext(ctx, `
SELECT
objects.stream_id,
expires_at,
segment_count,
encrypted_metadata,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
segment_copies.ancestor_stream_id
FROM objects
LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4 AND
status = `+committedStatus,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version).
Scan(
&originalObject.StreamID,
&originalObject.ExpiresAt,
&originalObject.SegmentCount,
&originalObject.EncryptedMetadata,
&originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize,
encryptionParameters{&originalObject.Encryption},
&ancestorStreamIDBytes,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storj.ErrObjectNotFound.Wrap(Error.Wrap(err))
}
return Error.New("unable to query object status: %w", err)
}
return Object{}, Error.New("unable to query object status: %w", err)
}
originalObject.BucketName = opts.BucketName
originalObject.ProjectID = opts.ProjectID
originalObject.Version = opts.Version
originalObject.Status = Committed
originalObject.BucketName = opts.BucketName
originalObject.ProjectID = opts.ProjectID
originalObject.Version = opts.Version
originalObject.Status = Committed
if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
return Object{}, ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
}
if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
}
var newSegments struct {
Positions []int64
EncryptedKeys [][]byte
EncryptedKeyNonces [][]byte
}
var newSegments struct {
Positions []int64
EncryptedKeys [][]byte
EncryptedKeyNonces [][]byte
}
for _, u := range opts.NewSegmentKeys {
newSegments.EncryptedKeys = append(newSegments.EncryptedKeys, u.EncryptedKey)
newSegments.EncryptedKeyNonces = append(newSegments.EncryptedKeyNonces, u.EncryptedKeyNonce)
newSegments.Positions = append(newSegments.Positions, int64(u.Position.Encode()))
}
for _, u := range opts.NewSegmentKeys {
newSegments.EncryptedKeys = append(newSegments.EncryptedKeys, u.EncryptedKey)
newSegments.EncryptedKeyNonces = append(newSegments.EncryptedKeyNonces, u.EncryptedKeyNonce)
newSegments.Positions = append(newSegments.Positions, int64(u.Position.Encode()))
}
positions := make([]int64, originalObject.SegmentCount)
positions := make([]int64, originalObject.SegmentCount)
rootPieceIDs := make([][]byte, originalObject.SegmentCount)
rootPieceIDs := make([][]byte, originalObject.SegmentCount)
expiresAts := make([]*time.Time, originalObject.SegmentCount)
encryptedSizes := make([]int32, originalObject.SegmentCount)
plainSizes := make([]int32, originalObject.SegmentCount)
plainOffsets := make([]int64, originalObject.SegmentCount)
inlineDatas := make([][]byte, originalObject.SegmentCount)
expiresAts := make([]*time.Time, originalObject.SegmentCount)
encryptedSizes := make([]int32, originalObject.SegmentCount)
plainSizes := make([]int32, originalObject.SegmentCount)
plainOffsets := make([]int64, originalObject.SegmentCount)
inlineDatas := make([][]byte, originalObject.SegmentCount)
redundancySchemes := make([]int64, originalObject.SegmentCount)
// TODO: there are probably columns that we can skip
// maybe it's possible to have the select and the insert in one query
err = withRows(db.db.QueryContext(ctx, `
redundancySchemes := make([]int64, originalObject.SegmentCount)
// TODO: there are probably columns that we can skip
// maybe it's possible to have the select and the insert in one query
err = withRows(tx.QueryContext(ctx, `
SELECT
position,
expires_at,
@ -244,48 +247,48 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
ORDER BY position ASC
LIMIT $2
`, originalObject.StreamID, originalObject.SegmentCount))(func(rows tagsql.Rows) error {
index := 0
for rows.Next() {
err = rows.Scan(
&positions[index],
&expiresAts[index],
&rootPieceIDs[index],
&encryptedSizes[index], &plainOffsets[index], &plainSizes[index],
&redundancySchemes[index],
&inlineDatas[index],
)
if err != nil {
index := 0
for rows.Next() {
err = rows.Scan(
&positions[index],
&expiresAts[index],
&rootPieceIDs[index],
&encryptedSizes[index], &plainOffsets[index], &plainSizes[index],
&redundancySchemes[index],
&inlineDatas[index],
)
if err != nil {
return err
}
index++
}
if err = rows.Err(); err != nil {
return err
}
index++
return nil
})
if err != nil {
return Error.New("unable to copy object: %w", err)
}
if err = rows.Err(); err != nil {
return err
onlyInlineSegments := true
for index := range positions {
if newSegments.Positions[index] != positions[index] {
return Error.New("missing new segment keys for segment %d", positions[index])
}
if onlyInlineSegments && (encryptedSizes[index] > 0) && len(inlineDatas[index]) == 0 {
onlyInlineSegments = false
}
}
return nil
})
if err != nil {
return Object{}, Error.New("unable to copy object: %w", err)
}
onlyInlineSegments := true
for index := range positions {
if newSegments.Positions[index] != positions[index] {
return Object{}, Error.New("missing new segment keys for segment %d", positions[index])
copyMetadata = originalObject.EncryptedMetadata
if opts.OverrideMetadata {
copyMetadata = opts.NewEncryptedMetadata
}
if onlyInlineSegments && (encryptedSizes[index] > 0) && len(inlineDatas[index]) == 0 {
onlyInlineSegments = false
}
}
copyMetadata := originalObject.EncryptedMetadata
if opts.OverrideMetadata {
copyMetadata = opts.NewEncryptedMetadata
}
copyObject = originalObject
copyObject := originalObject
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// TODO we need to handle metadata correctly (copy from original object or replace)
row := tx.QueryRowContext(ctx, `
WITH existing_object AS (