satellite/metainfo/metabase: remove sql transaction while committing segments

Change-Id: Ia2cb1df63e1cbb8b9d36b0161a75b9c9cb3a56f9
This commit is contained in:
Michał Niewrzał 2021-01-21 13:17:51 +01:00 committed by Michal Niewrzal
parent 292caa9043
commit ce675b7707

View File

@ -257,56 +257,45 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error)
// TODO: verify opts.Pieces is compatible with opts.Redundancy // TODO: verify opts.Pieces is compatible with opts.Redundancy
return txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { // Verify that object exists and is partial.
// Verify that object exists and is partial. _, err = db.db.ExecContext(ctx, `
var value int INSERT INTO segments (
err = tx.QueryRowContext(ctx, ` stream_id, position,
SELECT 1 root_piece_id, encrypted_key_nonce, encrypted_key,
FROM objects WHERE encrypted_size, plain_offset, plain_size,
project_id = $1 AND redundancy,
bucket_name = $2 AND remote_pieces
object_key = $3 AND ) VALUES (
version = $4 AND (SELECT stream_id
stream_id = $5 AND FROM objects WHERE
status = `+pendingStatus, project_id = $10 AND
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value) bucket_name = $11 AND
if err != nil { object_key = $12 AND
if errors.Is(err, sql.ErrNoRows) { version = $13 AND
return Error.New("pending object missing") stream_id = $14 AND
} status = `+pendingStatus+
return Error.New("unable to query object status: %w", err) ` ), $1,
$2, $3, $4,
$5, $6, $7,
$8,
$9
)`, opts.Position,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize,
redundancyScheme{&opts.Redundancy},
opts.Pieces,
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return Error.New("pending object missing")
} }
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
// Insert into segments. return ErrConflict.New("segment already exists")
_, err = tx.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
redundancy,
remote_pieces
) VALUES (
$1, $2,
$3, $4, $5,
$6, $7, $8,
$9,
$10
)`,
opts.StreamID, opts.Position,
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize,
redundancyScheme{&opts.Redundancy},
opts.Pieces,
)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return ErrConflict.New("segment already exists")
}
return Error.New("unable to insert segment: %w", err)
} }
return Error.New("unable to insert segment: %w", err)
return nil }
}) return nil
} }
// CommitInlineSegment contains all necessary information about the segment. // CommitInlineSegment contains all necessary information about the segment.
@ -347,53 +336,42 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment)
return ErrInvalidRequest.New("PlainOffset negative") return ErrInvalidRequest.New("PlainOffset negative")
} }
return txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error { // Verify that object exists and is partial.
// Verify that object exists and is partial. _, err = db.db.ExecContext(ctx, `
var value int INSERT INTO segments (
err = tx.QueryRowContext(ctx, ` stream_id, position,
SELECT 1 root_piece_id, encrypted_key_nonce, encrypted_key,
FROM objects WHERE encrypted_size, plain_offset, plain_size,
project_id = $1 AND inline_data
bucket_name = $2 AND ) VALUES (
object_key = $3 AND (SELECT stream_id
version = $4 AND FROM objects WHERE
stream_id = $5 AND project_id = $9 AND
status = `+pendingStatus, bucket_name = $10 AND
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value) object_key = $11 AND
if err != nil { version = $12 AND
if errors.Is(err, sql.ErrNoRows) { stream_id = $13 AND
return Error.New("pending object missing") status = `+pendingStatus+
} ` ), $1,
return Error.New("unable to query object status: %w", err) $2, $3, $4,
$5, $6, $7,
$8
)`, opts.Position,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize,
opts.InlineData,
opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
return Error.New("pending object missing")
} }
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
// Insert into segments. return ErrConflict.New("segment already exists")
_, err = tx.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
inline_data
) VALUES (
$1, $2,
$3, $4, $5,
$6, $7, $8,
$9
)`,
opts.StreamID, opts.Position,
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
len(opts.InlineData), opts.PlainOffset, opts.PlainSize,
opts.InlineData,
)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return ErrConflict.New("segment already exists")
}
return Error.New("unable to insert segment: %w", err)
} }
return Error.New("unable to insert segment: %w", err)
return nil }
}) return nil
} }
// CommitObject contains arguments necessary for committing an object. // CommitObject contains arguments necessary for committing an object.