ae345320fe
It turns out that some DB methods are not passing context to to lower levels even if have context as a parameter. Change-Id: I82f4fc1a47c362687a91364d85e4468057e53134
555 lines
16 KiB
Go
555 lines
16 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metabase
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"time"
|
|
|
|
pgxerrcode "github.com/jackc/pgerrcode"
|
|
"github.com/zeebo/errs"
|
|
|
|
"storj.io/common/storj"
|
|
"storj.io/private/dbutil/pgutil/pgerrcode"
|
|
"storj.io/private/dbutil/txutil"
|
|
"storj.io/private/tagsql"
|
|
)
|
|
|
|
// we need to disable PlainSize validation for old uplinks.
|
|
const validatePlainSize = false
|
|
|
|
const defaultZombieDeletionPeriod = 24 * time.Hour
|
|
|
|
var (
|
|
// ErrInvalidRequest is used to indicate invalid requests.
|
|
ErrInvalidRequest = errs.Class("metabase: invalid request")
|
|
// ErrConflict is used to indicate conflict with the request.
|
|
ErrConflict = errs.Class("metabase: conflict")
|
|
)
|
|
|
|
// BeginObjectNextVersion contains arguments necessary for starting an object upload.
|
|
type BeginObjectNextVersion struct {
|
|
ObjectStream
|
|
|
|
ExpiresAt *time.Time
|
|
ZombieDeletionDeadline *time.Time
|
|
|
|
Encryption storj.EncryptionParameters
|
|
}
|
|
|
|
// BeginObjectNextVersion adds a pending object to the database, with automatically assigned version.
|
|
func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (committed Version, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if opts.Version != NextVersion {
|
|
return -1, ErrInvalidRequest.New("Version should be metabase.NextVersion")
|
|
}
|
|
|
|
if opts.ZombieDeletionDeadline == nil {
|
|
deadline := time.Now().Add(defaultZombieDeletionPeriod)
|
|
opts.ZombieDeletionDeadline = &deadline
|
|
}
|
|
|
|
row := db.db.QueryRowContext(ctx, `
|
|
INSERT INTO objects (
|
|
project_id, bucket_name, object_key, version, stream_id,
|
|
expires_at, encryption,
|
|
zombie_deletion_deadline
|
|
) VALUES (
|
|
$1, $2, $3,
|
|
coalesce((
|
|
SELECT version + 1
|
|
FROM objects
|
|
WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3
|
|
ORDER BY version DESC
|
|
LIMIT 1
|
|
), 1),
|
|
$4, $5, $6,
|
|
$7)
|
|
RETURNING version
|
|
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.StreamID,
|
|
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
|
|
opts.ZombieDeletionDeadline)
|
|
|
|
var v int64
|
|
if err := row.Scan(&v); err != nil {
|
|
return -1, Error.New("unable to insert object: %w", err)
|
|
}
|
|
|
|
mon.Meter("object_begin").Mark(1)
|
|
|
|
return Version(v), nil
|
|
}
|
|
|
|
// BeginObjectExactVersion contains arguments necessary for starting an object upload.
|
|
type BeginObjectExactVersion struct {
|
|
ObjectStream
|
|
|
|
ExpiresAt *time.Time
|
|
ZombieDeletionDeadline *time.Time
|
|
|
|
Encryption storj.EncryptionParameters
|
|
}
|
|
|
|
// BeginObjectExactVersion adds a pending object to the database, with specific version.
|
|
func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion) (committed Object, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
if opts.Version == NextVersion {
|
|
return Object{}, ErrInvalidRequest.New("Version should not be metabase.NextVersion")
|
|
}
|
|
|
|
if opts.ZombieDeletionDeadline == nil {
|
|
deadline := time.Now().Add(defaultZombieDeletionPeriod)
|
|
opts.ZombieDeletionDeadline = &deadline
|
|
}
|
|
|
|
object := Object{
|
|
ObjectStream: ObjectStream{
|
|
ProjectID: opts.ProjectID,
|
|
BucketName: opts.BucketName,
|
|
ObjectKey: opts.ObjectKey,
|
|
Version: opts.Version,
|
|
StreamID: opts.StreamID,
|
|
},
|
|
ExpiresAt: opts.ExpiresAt,
|
|
Encryption: opts.Encryption,
|
|
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
|
|
}
|
|
|
|
err = db.db.QueryRowContext(ctx, `
|
|
INSERT INTO objects (
|
|
project_id, bucket_name, object_key, version, stream_id,
|
|
expires_at, encryption,
|
|
zombie_deletion_deadline
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7,
|
|
$8
|
|
)
|
|
RETURNING status, created_at
|
|
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
|
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
|
|
opts.ZombieDeletionDeadline).
|
|
Scan(
|
|
&object.Status, &object.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
|
return Object{}, ErrConflict.New("object already exists")
|
|
}
|
|
return Object{}, Error.New("unable to insert object: %w", err)
|
|
}
|
|
|
|
mon.Meter("object_begin").Mark(1)
|
|
|
|
return object, nil
|
|
}
|
|
|
|
// BeginSegment contains options to verify, whether a new segment upload can be started.
|
|
type BeginSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
RootPieceID storj.PieceID
|
|
Pieces Pieces
|
|
}
|
|
|
|
// BeginSegment verifies, whether a new segment upload can be started.
|
|
func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := opts.Pieces.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if opts.RootPieceID.IsZero() {
|
|
return ErrInvalidRequest.New("RootPieceID missing")
|
|
}
|
|
|
|
// NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment.
|
|
// however, we should prevent creating segements for non-partial objects.
|
|
|
|
// NOTE: these queries could be combined into one.
|
|
|
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
|
// Verify that object exists and is partial.
|
|
var value int
|
|
err = tx.QueryRowContext(ctx, `
|
|
SELECT 1
|
|
FROM objects WHERE
|
|
project_id = $1 AND
|
|
bucket_name = $2 AND
|
|
object_key = $3 AND
|
|
version = $4 AND
|
|
stream_id = $5 AND
|
|
status = `+pendingStatus,
|
|
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return Error.New("pending object missing")
|
|
}
|
|
return Error.New("unable to query object status: %w", err)
|
|
}
|
|
|
|
// Verify that the segment does not exist.
|
|
err = tx.QueryRowContext(ctx, `
|
|
SELECT 1
|
|
FROM segments WHERE
|
|
stream_id = $1 AND
|
|
position = $2
|
|
`, opts.StreamID, opts.Position).Scan(&value)
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return Error.New("unable to query segments: %w", err)
|
|
}
|
|
err = nil //nolint: wastedassign, ineffassign // ignore any other err result (explicitly)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mon.Meter("segment_begin").Mark(1)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitSegment contains all necessary information about the segment.
|
|
type CommitSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
RootPieceID storj.PieceID
|
|
|
|
ExpiresAt *time.Time
|
|
|
|
EncryptedKeyNonce []byte
|
|
EncryptedKey []byte
|
|
|
|
PlainOffset int64 // offset in the original data stream
|
|
PlainSize int32 // size before encryption
|
|
EncryptedSize int32 // segment size after encryption
|
|
|
|
EncryptedETag []byte
|
|
|
|
Redundancy storj.RedundancyScheme
|
|
|
|
Pieces Pieces
|
|
}
|
|
|
|
// CommitSegment commits segment to the database.
|
|
func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := opts.Pieces.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch {
|
|
case opts.RootPieceID.IsZero():
|
|
return ErrInvalidRequest.New("RootPieceID missing")
|
|
case len(opts.EncryptedKey) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKey missing")
|
|
case len(opts.EncryptedKeyNonce) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKeyNonce missing")
|
|
case opts.EncryptedSize <= 0:
|
|
return ErrInvalidRequest.New("EncryptedSize negative or zero")
|
|
case opts.PlainSize <= 0 && validatePlainSize:
|
|
return ErrInvalidRequest.New("PlainSize negative or zero")
|
|
case opts.PlainOffset < 0:
|
|
return ErrInvalidRequest.New("PlainOffset negative")
|
|
case opts.Redundancy.IsZero():
|
|
return ErrInvalidRequest.New("Redundancy zero")
|
|
}
|
|
|
|
if len(opts.Pieces) < int(opts.Redundancy.OptimalShares) {
|
|
return ErrInvalidRequest.New("number of pieces is less than redundancy optimal shares value")
|
|
}
|
|
|
|
aliasPieces, err := db.aliasCache.ConvertPiecesToAliases(ctx, opts.Pieces)
|
|
if err != nil {
|
|
return Error.New("unable to convert pieces to aliases: %w", err)
|
|
}
|
|
|
|
// Verify that object exists and is partial.
|
|
_, err = db.db.ExecContext(ctx, `
|
|
INSERT INTO segments (
|
|
stream_id, position, expires_at,
|
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
|
encrypted_size, plain_offset, plain_size, encrypted_etag,
|
|
redundancy,
|
|
remote_alias_pieces
|
|
) VALUES (
|
|
(SELECT stream_id
|
|
FROM objects WHERE
|
|
project_id = $12 AND
|
|
bucket_name = $13 AND
|
|
object_key = $14 AND
|
|
version = $15 AND
|
|
stream_id = $16 AND
|
|
status = `+pendingStatus+
|
|
` ), $1, $2,
|
|
$3, $4, $5,
|
|
$6, $7, $8, $9,
|
|
$10,
|
|
$11
|
|
)`, opts.Position, opts.ExpiresAt,
|
|
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
|
|
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
|
|
redundancyScheme{&opts.Redundancy},
|
|
aliasPieces,
|
|
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
return Error.New("pending object missing")
|
|
}
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
|
return ErrConflict.New("segment already exists")
|
|
}
|
|
return Error.New("unable to insert segment: %w", err)
|
|
}
|
|
|
|
mon.Meter("segment_commit").Mark(1)
|
|
mon.IntVal("segment_commit_encrypted_size").Observe(int64(opts.EncryptedSize))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitInlineSegment contains all necessary information about the segment.
|
|
type CommitInlineSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
|
|
ExpiresAt *time.Time
|
|
|
|
EncryptedKeyNonce []byte
|
|
EncryptedKey []byte
|
|
|
|
PlainOffset int64 // offset in the original data stream
|
|
PlainSize int32 // size before encryption
|
|
EncryptedETag []byte
|
|
|
|
InlineData []byte
|
|
}
|
|
|
|
// CommitInlineSegment commits inline segment to the database.
|
|
func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: do we have a lower limit for inline data?
|
|
// TODO should we move check for max inline segment from metainfo here
|
|
|
|
switch {
|
|
case len(opts.EncryptedKey) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKey missing")
|
|
case len(opts.EncryptedKeyNonce) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKeyNonce missing")
|
|
case opts.PlainSize <= 0 && validatePlainSize:
|
|
return ErrInvalidRequest.New("PlainSize negative or zero")
|
|
case opts.PlainOffset < 0:
|
|
return ErrInvalidRequest.New("PlainOffset negative")
|
|
}
|
|
|
|
// Verify that object exists and is partial.
|
|
_, err = db.db.ExecContext(ctx, `
|
|
INSERT INTO segments (
|
|
stream_id, position, expires_at,
|
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
|
encrypted_size, plain_offset, plain_size, encrypted_etag,
|
|
inline_data
|
|
) VALUES (
|
|
(SELECT stream_id
|
|
FROM objects WHERE
|
|
project_id = $11 AND
|
|
bucket_name = $12 AND
|
|
object_key = $13 AND
|
|
version = $14 AND
|
|
stream_id = $15 AND
|
|
status = `+pendingStatus+
|
|
` ), $1, $2,
|
|
$3, $4, $5,
|
|
$6, $7, $8, $9,
|
|
$10
|
|
)`, opts.Position, opts.ExpiresAt,
|
|
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
|
|
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
|
|
opts.InlineData,
|
|
opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
return Error.New("pending object missing")
|
|
}
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
|
return ErrConflict.New("segment already exists")
|
|
}
|
|
return Error.New("unable to insert segment: %w", err)
|
|
}
|
|
|
|
mon.Meter("segment_commit").Mark(1)
|
|
mon.IntVal("segment_commit_encrypted_size").Observe(int64(len(opts.InlineData)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitObject contains arguments necessary for committing an object.
|
|
type CommitObject struct {
|
|
ObjectStream
|
|
|
|
Encryption storj.EncryptionParameters
|
|
|
|
EncryptedMetadata []byte
|
|
EncryptedMetadataNonce []byte
|
|
EncryptedMetadataEncryptedKey []byte
|
|
}
|
|
|
|
// CommitObject adds a pending object to the database.
|
|
func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
if opts.Encryption.CipherSuite != storj.EncUnspecified && opts.Encryption.BlockSize <= 0 {
|
|
return Object{}, ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
|
|
}
|
|
|
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
|
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
|
if err != nil {
|
|
return Error.New("failed to fetch segments: %w", err)
|
|
}
|
|
|
|
finalSegments := convertToFinalSegments(segments)
|
|
err = updateSegmentOffsets(ctx, tx, opts.StreamID, finalSegments)
|
|
if err != nil {
|
|
return Error.New("failed to update segments: %w", err)
|
|
}
|
|
|
|
// TODO: would we even need this when we make main index plain_offset?
|
|
fixedSegmentSize := int32(0)
|
|
if len(finalSegments) > 0 {
|
|
fixedSegmentSize = finalSegments[0].PlainSize
|
|
for i, seg := range finalSegments {
|
|
if seg.Position.Part != 0 || seg.Position.Index != uint32(i) {
|
|
fixedSegmentSize = -1
|
|
break
|
|
}
|
|
if i < len(finalSegments)-1 && seg.PlainSize != fixedSegmentSize {
|
|
fixedSegmentSize = -1
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var totalPlainSize, totalEncryptedSize int64
|
|
for _, seg := range finalSegments {
|
|
totalPlainSize += int64(seg.PlainSize)
|
|
totalEncryptedSize += int64(seg.EncryptedSize)
|
|
}
|
|
|
|
err = tx.QueryRowContext(ctx, `
|
|
UPDATE objects SET
|
|
status =`+committedStatus+`,
|
|
segment_count = $6,
|
|
|
|
encrypted_metadata_nonce = $7,
|
|
encrypted_metadata = $8,
|
|
encrypted_metadata_encrypted_key = $9,
|
|
|
|
total_plain_size = $10,
|
|
total_encrypted_size = $11,
|
|
fixed_segment_size = $12,
|
|
zombie_deletion_deadline = NULL,
|
|
|
|
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
|
|
encryption = CASE
|
|
WHEN objects.encryption = 0 AND $13 <> 0 THEN $13
|
|
WHEN objects.encryption = 0 AND $13 = 0 THEN NULL
|
|
ELSE objects.encryption
|
|
END
|
|
WHERE
|
|
project_id = $1 AND
|
|
bucket_name = $2 AND
|
|
object_key = $3 AND
|
|
version = $4 AND
|
|
stream_id = $5 AND
|
|
status = `+pendingStatus+`
|
|
RETURNING
|
|
created_at, expires_at,
|
|
encryption;
|
|
`, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
|
len(segments),
|
|
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey,
|
|
totalPlainSize,
|
|
totalEncryptedSize,
|
|
fixedSegmentSize,
|
|
encryptionParameters{&opts.Encryption},
|
|
).
|
|
Scan(
|
|
&object.CreatedAt, &object.ExpiresAt,
|
|
encryptionParameters{&object.Encryption},
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return storj.ErrObjectNotFound.Wrap(Error.New("object with specified version and pending status is missing"))
|
|
} else if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
// TODO maybe we should check message if 'encryption' label is there
|
|
return ErrInvalidRequest.New("Encryption is missing")
|
|
}
|
|
return Error.New("failed to update object: %w", err)
|
|
}
|
|
|
|
object.StreamID = opts.StreamID
|
|
object.ProjectID = opts.ProjectID
|
|
object.BucketName = opts.BucketName
|
|
object.ObjectKey = opts.ObjectKey
|
|
object.Version = opts.Version
|
|
object.Status = Committed
|
|
object.SegmentCount = int32(len(segments))
|
|
object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce
|
|
object.EncryptedMetadata = opts.EncryptedMetadata
|
|
object.EncryptedMetadataEncryptedKey = opts.EncryptedMetadataEncryptedKey
|
|
object.TotalPlainSize = totalPlainSize
|
|
object.TotalEncryptedSize = totalEncryptedSize
|
|
object.FixedSegmentSize = fixedSegmentSize
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
mon.Meter("object_commit").Mark(1)
|
|
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
|
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
|
|
|
return object, nil
|
|
}
|