diff --git a/satellite/metabase/copy_object.go b/satellite/metabase/copy_object.go index 338b0c1a8..a2d83dfd0 100644 --- a/satellite/metabase/copy_object.go +++ b/satellite/metabase/copy_object.go @@ -162,76 +162,79 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje } originalObject := Object{} + copyObject := Object{} + var copyMetadata []byte var ancestorStreamIDBytes []byte - err = db.db.QueryRowContext(ctx, ` - SELECT - objects.stream_id, - expires_at, - segment_count, - encrypted_metadata, - total_plain_size, total_encrypted_size, fixed_segment_size, - encryption, - segment_copies.ancestor_stream_id - FROM objects - LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id - WHERE - project_id = $1 AND - bucket_name = $2 AND - object_key = $3 AND - version = $4 AND - status = `+committedStatus, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version). - Scan( - &originalObject.StreamID, - &originalObject.ExpiresAt, - &originalObject.SegmentCount, - &originalObject.EncryptedMetadata, - &originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize, - encryptionParameters{&originalObject.Encryption}, - &ancestorStreamIDBytes, - ) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return Object{}, storj.ErrObjectNotFound.Wrap(Error.Wrap(err)) + err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { + err = tx.QueryRowContext(ctx, ` + SELECT + objects.stream_id, + expires_at, + segment_count, + encrypted_metadata, + total_plain_size, total_encrypted_size, fixed_segment_size, + encryption, + segment_copies.ancestor_stream_id + FROM objects + LEFT JOIN segment_copies ON objects.stream_id = segment_copies.stream_id + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + status = `+committedStatus, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version). + Scan( + &originalObject.StreamID, + &originalObject.ExpiresAt, + &originalObject.SegmentCount, + &originalObject.EncryptedMetadata, + &originalObject.TotalPlainSize, &originalObject.TotalEncryptedSize, &originalObject.FixedSegmentSize, + encryptionParameters{&originalObject.Encryption}, + &ancestorStreamIDBytes, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return storj.ErrObjectNotFound.Wrap(Error.Wrap(err)) + } + return Error.New("unable to query object status: %w", err) } - return Object{}, Error.New("unable to query object status: %w", err) - } - originalObject.BucketName = opts.BucketName - originalObject.ProjectID = opts.ProjectID - originalObject.Version = opts.Version - originalObject.Status = Committed + originalObject.BucketName = opts.BucketName + originalObject.ProjectID = opts.ProjectID + originalObject.Version = opts.Version + originalObject.Status = Committed - if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) { - return Object{}, ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys)) - } + if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) { + return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys)) + } - var newSegments struct { - Positions []int64 - EncryptedKeys [][]byte - EncryptedKeyNonces [][]byte - } + var newSegments struct { + Positions []int64 + EncryptedKeys [][]byte + EncryptedKeyNonces [][]byte + } - for _, u := range opts.NewSegmentKeys { - newSegments.EncryptedKeys = append(newSegments.EncryptedKeys, u.EncryptedKey) - newSegments.EncryptedKeyNonces = append(newSegments.EncryptedKeyNonces, u.EncryptedKeyNonce) - newSegments.Positions = append(newSegments.Positions, int64(u.Position.Encode())) - } + for _, u := range opts.NewSegmentKeys { + newSegments.EncryptedKeys = append(newSegments.EncryptedKeys, u.EncryptedKey) + newSegments.EncryptedKeyNonces = append(newSegments.EncryptedKeyNonces, u.EncryptedKeyNonce) + newSegments.Positions = append(newSegments.Positions, int64(u.Position.Encode())) + } - positions := make([]int64, originalObject.SegmentCount) + positions := make([]int64, originalObject.SegmentCount) - rootPieceIDs := make([][]byte, originalObject.SegmentCount) + rootPieceIDs := make([][]byte, originalObject.SegmentCount) - expiresAts := make([]*time.Time, originalObject.SegmentCount) - encryptedSizes := make([]int32, originalObject.SegmentCount) - plainSizes := make([]int32, originalObject.SegmentCount) - plainOffsets := make([]int64, originalObject.SegmentCount) - inlineDatas := make([][]byte, originalObject.SegmentCount) + expiresAts := make([]*time.Time, originalObject.SegmentCount) + encryptedSizes := make([]int32, originalObject.SegmentCount) + plainSizes := make([]int32, originalObject.SegmentCount) + plainOffsets := make([]int64, originalObject.SegmentCount) + inlineDatas := make([][]byte, originalObject.SegmentCount) - redundancySchemes := make([]int64, originalObject.SegmentCount) - // TODO: there are probably columns that we can skip - // maybe it's possible to have the select and the insert in one query - err = withRows(db.db.QueryContext(ctx, ` + redundancySchemes := make([]int64, originalObject.SegmentCount) + // TODO: there are probably columns that we can skip + // maybe it's possible to have the select and the insert in one query + err = withRows(tx.QueryContext(ctx, ` SELECT position, expires_at, @@ -244,48 +247,48 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje ORDER BY position ASC LIMIT $2 `, originalObject.StreamID, originalObject.SegmentCount))(func(rows tagsql.Rows) error { - index := 0 - for rows.Next() { - err = rows.Scan( - &positions[index], - &expiresAts[index], - &rootPieceIDs[index], - &encryptedSizes[index], &plainOffsets[index], &plainSizes[index], - &redundancySchemes[index], - &inlineDatas[index], - ) - if err != nil { + index := 0 + for rows.Next() { + err = rows.Scan( + &positions[index], + &expiresAts[index], + &rootPieceIDs[index], + &encryptedSizes[index], &plainOffsets[index], &plainSizes[index], + &redundancySchemes[index], + &inlineDatas[index], + ) + if err != nil { + return err + } + index++ + } + + if err = rows.Err(); err != nil { return err } - index++ + return nil + }) + if err != nil { + return Error.New("unable to copy object: %w", err) } - if err = rows.Err(); err != nil { - return err + onlyInlineSegments := true + for index := range positions { + if newSegments.Positions[index] != positions[index] { + return Error.New("missing new segment keys for segment %d", positions[index]) + } + if onlyInlineSegments && (encryptedSizes[index] > 0) && len(inlineDatas[index]) == 0 { + onlyInlineSegments = false + } } - return nil - }) - if err != nil { - return Object{}, Error.New("unable to copy object: %w", err) - } - onlyInlineSegments := true - for index := range positions { - if newSegments.Positions[index] != positions[index] { - return Object{}, Error.New("missing new segment keys for segment %d", positions[index]) + copyMetadata = originalObject.EncryptedMetadata + if opts.OverrideMetadata { + copyMetadata = opts.NewEncryptedMetadata } - if onlyInlineSegments && (encryptedSizes[index] > 0) && len(inlineDatas[index]) == 0 { - onlyInlineSegments = false - } - } - copyMetadata := originalObject.EncryptedMetadata - if opts.OverrideMetadata { - copyMetadata = opts.NewEncryptedMetadata - } + copyObject = originalObject - copyObject := originalObject - err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { // TODO we need to handle metadata correctly (copy from original object or replace) row := tx.QueryRowContext(ctx, ` WITH existing_object AS (