5c2131ed0d
We have a bug in our behavior while doing API pods deployment. At this time its possible to have pods with multiple versions flag set true only partially for some of pods. Because of that it's possible to start new object without removing existing/older version on BeginObject (new behavior) and also don't remove that existing/older object on CommitObject. That can cause to have two committed objects with different versions and that's a state we want to avoid. To fix it we are removing multiple versions flag from CommitObject to always try delete existing objects. This way even if we don't remove existing object on BeginObject it will be always removed while committing. Fixes https://github.com/storj/storj/issues/5373 Change-Id: Idc334bf5cc785d2f559af96e92c3de6d82ca58ba
736 lines
22 KiB
Go
736 lines
22 KiB
Go
// Copyright (C) 2020 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package metabase
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"time"
|
|
|
|
pgxerrcode "github.com/jackc/pgerrcode"
|
|
"github.com/zeebo/errs"
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/memory"
|
|
"storj.io/common/storj"
|
|
"storj.io/private/dbutil/pgutil/pgerrcode"
|
|
"storj.io/private/dbutil/txutil"
|
|
"storj.io/private/tagsql"
|
|
)
|
|
|
|
// we need to disable PlainSize validation for old uplinks.
|
|
const validatePlainSize = false
|
|
|
|
const defaultZombieDeletionPeriod = 24 * time.Hour
|
|
|
|
var (
|
|
// ErrInvalidRequest is used to indicate invalid requests.
|
|
ErrInvalidRequest = errs.Class("metabase: invalid request")
|
|
// ErrConflict is used to indicate conflict with the request.
|
|
ErrConflict = errs.Class("metabase: conflict")
|
|
)
|
|
|
|
// BeginObjectNextVersion contains arguments necessary for starting an object upload.
|
|
type BeginObjectNextVersion struct {
|
|
ObjectStream
|
|
|
|
ExpiresAt *time.Time
|
|
ZombieDeletionDeadline *time.Time
|
|
|
|
EncryptedMetadata []byte // optional
|
|
EncryptedMetadataNonce []byte // optional
|
|
EncryptedMetadataEncryptedKey []byte // optional
|
|
|
|
Encryption storj.EncryptionParameters
|
|
}
|
|
|
|
// Verify verifies get object request fields.
|
|
func (opts *BeginObjectNextVersion) Verify() error {
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if opts.Version != NextVersion {
|
|
return ErrInvalidRequest.New("Version should be metabase.NextVersion")
|
|
}
|
|
|
|
if opts.EncryptedMetadata == nil && (opts.EncryptedMetadataNonce != nil || opts.EncryptedMetadataEncryptedKey != nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be not set if EncryptedMetadata is not set")
|
|
} else if opts.EncryptedMetadata != nil && (opts.EncryptedMetadataNonce == nil || opts.EncryptedMetadataEncryptedKey == nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be set if EncryptedMetadata is set")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BeginObjectNextVersion adds a pending object to the database, with automatically assigned version.
|
|
func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (object Object, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
if opts.ZombieDeletionDeadline == nil {
|
|
deadline := time.Now().Add(defaultZombieDeletionPeriod)
|
|
opts.ZombieDeletionDeadline = &deadline
|
|
}
|
|
|
|
object = Object{
|
|
ObjectStream: ObjectStream{
|
|
ProjectID: opts.ProjectID,
|
|
BucketName: opts.BucketName,
|
|
ObjectKey: opts.ObjectKey,
|
|
StreamID: opts.StreamID,
|
|
},
|
|
ExpiresAt: opts.ExpiresAt,
|
|
Encryption: opts.Encryption,
|
|
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
|
|
}
|
|
|
|
if err := db.db.QueryRowContext(ctx, `
|
|
INSERT INTO objects (
|
|
project_id, bucket_name, object_key, version, stream_id,
|
|
expires_at, encryption,
|
|
zombie_deletion_deadline,
|
|
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
|
|
) VALUES (
|
|
$1, $2, $3,
|
|
coalesce((
|
|
SELECT version + 1
|
|
FROM objects
|
|
WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3
|
|
ORDER BY version DESC
|
|
LIMIT 1
|
|
), 1),
|
|
$4, $5, $6,
|
|
$7,
|
|
$8, $9, $10)
|
|
RETURNING status, version, created_at
|
|
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
|
|
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
|
|
opts.ZombieDeletionDeadline,
|
|
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
|
|
).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil {
|
|
return Object{}, Error.New("unable to insert object: %w", err)
|
|
}
|
|
|
|
mon.Meter("object_begin").Mark(1)
|
|
|
|
return object, nil
|
|
}
|
|
|
|
// BeginObjectExactVersion contains arguments necessary for starting an object upload.
|
|
type BeginObjectExactVersion struct {
|
|
ObjectStream
|
|
|
|
ExpiresAt *time.Time
|
|
ZombieDeletionDeadline *time.Time
|
|
|
|
EncryptedMetadata []byte // optional
|
|
EncryptedMetadataNonce []byte // optional
|
|
EncryptedMetadataEncryptedKey []byte // optional
|
|
|
|
Encryption storj.EncryptionParameters
|
|
}
|
|
|
|
// Verify verifies get object reqest fields.
|
|
func (opts *BeginObjectExactVersion) Verify() error {
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if opts.Version == NextVersion {
|
|
return ErrInvalidRequest.New("Version should not be metabase.NextVersion")
|
|
}
|
|
|
|
if opts.EncryptedMetadata == nil && (opts.EncryptedMetadataNonce != nil || opts.EncryptedMetadataEncryptedKey != nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be not set if EncryptedMetadata is not set")
|
|
} else if opts.EncryptedMetadata != nil && (opts.EncryptedMetadataNonce == nil || opts.EncryptedMetadataEncryptedKey == nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be set if EncryptedMetadata is set")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BeginObjectExactVersion adds a pending object to the database, with specific version.
|
|
func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion) (committed Object, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
if opts.ZombieDeletionDeadline == nil {
|
|
deadline := time.Now().Add(defaultZombieDeletionPeriod)
|
|
opts.ZombieDeletionDeadline = &deadline
|
|
}
|
|
|
|
object := Object{
|
|
ObjectStream: ObjectStream{
|
|
ProjectID: opts.ProjectID,
|
|
BucketName: opts.BucketName,
|
|
ObjectKey: opts.ObjectKey,
|
|
Version: opts.Version,
|
|
StreamID: opts.StreamID,
|
|
},
|
|
ExpiresAt: opts.ExpiresAt,
|
|
Encryption: opts.Encryption,
|
|
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
|
|
}
|
|
|
|
err = db.db.QueryRowContext(ctx, `
|
|
INSERT INTO objects (
|
|
project_id, bucket_name, object_key, version, stream_id,
|
|
expires_at, encryption,
|
|
zombie_deletion_deadline,
|
|
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7,
|
|
$8,
|
|
$9, $10, $11
|
|
)
|
|
RETURNING status, created_at
|
|
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
|
|
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
|
|
opts.ZombieDeletionDeadline,
|
|
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
|
|
).Scan(
|
|
&object.Status, &object.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
|
|
return Object{}, Error.Wrap(ErrObjectAlreadyExists.New(""))
|
|
}
|
|
return Object{}, Error.New("unable to insert object: %w", err)
|
|
}
|
|
|
|
mon.Meter("object_begin").Mark(1)
|
|
|
|
return object, nil
|
|
}
|
|
|
|
// BeginSegment contains options to verify, whether a new segment upload can be started.
|
|
type BeginSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
|
|
// TODO: unused field, can remove
|
|
RootPieceID storj.PieceID
|
|
|
|
Pieces Pieces
|
|
}
|
|
|
|
// BeginSegment verifies, whether a new segment upload can be started.
|
|
func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := opts.Pieces.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if opts.RootPieceID.IsZero() {
|
|
return ErrInvalidRequest.New("RootPieceID missing")
|
|
}
|
|
|
|
// NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment.
|
|
// however, we should prevent creating segements for non-partial objects.
|
|
|
|
// Verify that object exists and is partial.
|
|
var value int
|
|
err = db.db.QueryRowContext(ctx, `
|
|
SELECT 1
|
|
FROM objects WHERE
|
|
project_id = $1 AND
|
|
bucket_name = $2 AND
|
|
object_key = $3 AND
|
|
version = $4 AND
|
|
stream_id = $5 AND
|
|
status = `+pendingStatus,
|
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&value)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return ErrPendingObjectMissing.New("")
|
|
}
|
|
return Error.New("unable to query object status: %w", err)
|
|
}
|
|
|
|
mon.Meter("segment_begin").Mark(1)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitSegment contains all necessary information about the segment.
|
|
type CommitSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
RootPieceID storj.PieceID
|
|
|
|
ExpiresAt *time.Time
|
|
|
|
EncryptedKeyNonce []byte
|
|
EncryptedKey []byte
|
|
|
|
PlainOffset int64 // offset in the original data stream
|
|
PlainSize int32 // size before encryption
|
|
EncryptedSize int32 // segment size after encryption
|
|
|
|
EncryptedETag []byte
|
|
|
|
Redundancy storj.RedundancyScheme
|
|
|
|
Pieces Pieces
|
|
|
|
Placement storj.PlacementConstraint
|
|
}
|
|
|
|
// CommitSegment commits segment to the database.
|
|
func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := opts.Pieces.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch {
|
|
case opts.RootPieceID.IsZero():
|
|
return ErrInvalidRequest.New("RootPieceID missing")
|
|
case len(opts.EncryptedKey) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKey missing")
|
|
case len(opts.EncryptedKeyNonce) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKeyNonce missing")
|
|
case opts.EncryptedSize <= 0:
|
|
return ErrInvalidRequest.New("EncryptedSize negative or zero")
|
|
case opts.PlainSize <= 0 && validatePlainSize:
|
|
return ErrInvalidRequest.New("PlainSize negative or zero")
|
|
case opts.PlainOffset < 0:
|
|
return ErrInvalidRequest.New("PlainOffset negative")
|
|
case opts.Redundancy.IsZero():
|
|
return ErrInvalidRequest.New("Redundancy zero")
|
|
}
|
|
|
|
if len(opts.Pieces) < int(opts.Redundancy.OptimalShares) {
|
|
return ErrInvalidRequest.New("number of pieces is less than redundancy optimal shares value")
|
|
}
|
|
|
|
aliasPieces, err := db.aliasCache.EnsurePiecesToAliases(ctx, opts.Pieces)
|
|
if err != nil {
|
|
return Error.New("unable to convert pieces to aliases: %w", err)
|
|
}
|
|
|
|
// Verify that object exists and is partial.
|
|
_, err = db.db.ExecContext(ctx, `
|
|
INSERT INTO segments (
|
|
stream_id, position, expires_at,
|
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
|
encrypted_size, plain_offset, plain_size, encrypted_etag,
|
|
redundancy,
|
|
remote_alias_pieces,
|
|
placement
|
|
) VALUES (
|
|
(SELECT stream_id
|
|
FROM objects WHERE
|
|
project_id = $12 AND
|
|
bucket_name = $13 AND
|
|
object_key = $14 AND
|
|
version = $15 AND
|
|
stream_id = $16 AND
|
|
status = `+pendingStatus+
|
|
` ), $1, $2,
|
|
$3, $4, $5,
|
|
$6, $7, $8, $9,
|
|
$10,
|
|
$11,
|
|
$17
|
|
)
|
|
ON CONFLICT(stream_id, position)
|
|
DO UPDATE SET
|
|
expires_at = $2,
|
|
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
|
|
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
|
|
redundancy = $10,
|
|
remote_alias_pieces = $11,
|
|
placement = $17
|
|
`, opts.Position, opts.ExpiresAt,
|
|
opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey,
|
|
opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
|
|
redundancyScheme{&opts.Redundancy},
|
|
aliasPieces,
|
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
|
|
opts.Placement,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
return ErrPendingObjectMissing.New("")
|
|
}
|
|
return Error.New("unable to insert segment: %w", err)
|
|
}
|
|
|
|
mon.Meter("segment_commit").Mark(1)
|
|
mon.IntVal("segment_commit_encrypted_size").Observe(int64(opts.EncryptedSize))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitInlineSegment contains all necessary information about the segment.
|
|
type CommitInlineSegment struct {
|
|
ObjectStream
|
|
|
|
Position SegmentPosition
|
|
|
|
ExpiresAt *time.Time
|
|
|
|
EncryptedKeyNonce []byte
|
|
EncryptedKey []byte
|
|
|
|
PlainOffset int64 // offset in the original data stream
|
|
PlainSize int32 // size before encryption
|
|
EncryptedETag []byte
|
|
|
|
InlineData []byte
|
|
}
|
|
|
|
// CommitInlineSegment commits inline segment to the database.
|
|
func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: do we have a lower limit for inline data?
|
|
// TODO should we move check for max inline segment from metainfo here
|
|
|
|
switch {
|
|
case len(opts.EncryptedKey) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKey missing")
|
|
case len(opts.EncryptedKeyNonce) == 0:
|
|
return ErrInvalidRequest.New("EncryptedKeyNonce missing")
|
|
case opts.PlainSize <= 0 && validatePlainSize:
|
|
return ErrInvalidRequest.New("PlainSize negative or zero")
|
|
case opts.PlainOffset < 0:
|
|
return ErrInvalidRequest.New("PlainOffset negative")
|
|
}
|
|
|
|
// Verify that object exists and is partial.
|
|
_, err = db.db.ExecContext(ctx, `
|
|
INSERT INTO segments (
|
|
stream_id, position, expires_at,
|
|
root_piece_id, encrypted_key_nonce, encrypted_key,
|
|
encrypted_size, plain_offset, plain_size, encrypted_etag,
|
|
inline_data
|
|
) VALUES (
|
|
(SELECT stream_id
|
|
FROM objects WHERE
|
|
project_id = $11 AND
|
|
bucket_name = $12 AND
|
|
object_key = $13 AND
|
|
version = $14 AND
|
|
stream_id = $15 AND
|
|
status = `+pendingStatus+
|
|
` ), $1, $2,
|
|
$3, $4, $5,
|
|
$6, $7, $8, $9,
|
|
$10
|
|
)
|
|
ON CONFLICT(stream_id, position)
|
|
DO UPDATE SET
|
|
expires_at = $2,
|
|
root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5,
|
|
encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9,
|
|
inline_data = $10
|
|
`, opts.Position, opts.ExpiresAt,
|
|
storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey,
|
|
len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag,
|
|
opts.InlineData,
|
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
|
|
)
|
|
if err != nil {
|
|
if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
return ErrPendingObjectMissing.New("")
|
|
}
|
|
return Error.New("unable to insert segment: %w", err)
|
|
}
|
|
|
|
mon.Meter("segment_commit").Mark(1)
|
|
mon.IntVal("segment_commit_encrypted_size").Observe(int64(len(opts.InlineData)))
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitObject contains arguments necessary for committing an object.
|
|
type CommitObject struct {
|
|
ObjectStream
|
|
|
|
Encryption storj.EncryptionParameters
|
|
|
|
// this flag controls if we want to set metadata fields with CommitObject
|
|
// it's possible to set metadata with BeginObject request so we need to
|
|
// be explicit if we would like to set it with CommitObject which will
|
|
// override any existing metadata.
|
|
OverrideEncryptedMetadata bool
|
|
EncryptedMetadata []byte // optional
|
|
EncryptedMetadataNonce []byte // optional
|
|
EncryptedMetadataEncryptedKey []byte // optional
|
|
|
|
DisallowDelete bool
|
|
// OnDelete will be triggered when/if existing object will be overwritten on commit.
|
|
// Wil be only executed after succesfull commit + delete DB operation.
|
|
// Error on this function won't revert back committed object.
|
|
OnDelete func(segments []DeletedSegmentInfo)
|
|
}
|
|
|
|
// Verify verifies reqest fields.
|
|
func (c *CommitObject) Verify() error {
|
|
if err := c.ObjectStream.Verify(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if c.Encryption.CipherSuite != storj.EncUnspecified && c.Encryption.BlockSize <= 0 {
|
|
return ErrInvalidRequest.New("Encryption.BlockSize is negative or zero")
|
|
}
|
|
|
|
if c.OverrideEncryptedMetadata {
|
|
if c.EncryptedMetadata == nil && (c.EncryptedMetadataNonce != nil || c.EncryptedMetadataEncryptedKey != nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be not set if EncryptedMetadata is not set")
|
|
} else if c.EncryptedMetadata != nil && (c.EncryptedMetadataNonce == nil || c.EncryptedMetadataEncryptedKey == nil) {
|
|
return ErrInvalidRequest.New("EncryptedMetadataNonce and EncryptedMetadataEncryptedKey must be set if EncryptedMetadata is set")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CommitObject adds a pending object to the database. If another committed object is under target location
|
|
// it will be deleted.
|
|
func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
if err := opts.Verify(); err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
deletedSegments := []DeletedSegmentInfo{}
|
|
|
|
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
|
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
|
if err != nil {
|
|
return Error.New("failed to fetch segments: %w", err)
|
|
}
|
|
|
|
if err = db.validateParts(segments); err != nil {
|
|
return err
|
|
}
|
|
|
|
finalSegments := convertToFinalSegments(segments)
|
|
err = updateSegmentOffsets(ctx, tx, opts.StreamID, finalSegments)
|
|
if err != nil {
|
|
return Error.New("failed to update segments: %w", err)
|
|
}
|
|
|
|
// TODO: would we even need this when we make main index plain_offset?
|
|
fixedSegmentSize := int32(0)
|
|
if len(finalSegments) > 0 {
|
|
fixedSegmentSize = finalSegments[0].PlainSize
|
|
for i, seg := range finalSegments {
|
|
if seg.Position.Part != 0 || seg.Position.Index != uint32(i) {
|
|
fixedSegmentSize = -1
|
|
break
|
|
}
|
|
if i < len(finalSegments)-1 && seg.PlainSize != fixedSegmentSize {
|
|
fixedSegmentSize = -1
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var totalPlainSize, totalEncryptedSize int64
|
|
for _, seg := range finalSegments {
|
|
totalPlainSize += int64(seg.PlainSize)
|
|
totalEncryptedSize += int64(seg.EncryptedSize)
|
|
}
|
|
|
|
args := []interface{}{
|
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
|
|
len(segments),
|
|
totalPlainSize,
|
|
totalEncryptedSize,
|
|
fixedSegmentSize,
|
|
encryptionParameters{&opts.Encryption},
|
|
}
|
|
|
|
metadataColumns := ""
|
|
if opts.OverrideEncryptedMetadata {
|
|
args = append(args,
|
|
opts.EncryptedMetadataNonce,
|
|
opts.EncryptedMetadata,
|
|
opts.EncryptedMetadataEncryptedKey,
|
|
)
|
|
metadataColumns = `,
|
|
encrypted_metadata_nonce = $11,
|
|
encrypted_metadata = $12,
|
|
encrypted_metadata_encrypted_key = $13
|
|
`
|
|
}
|
|
|
|
versionsToDelete := []Version{}
|
|
if err := withRows(tx.QueryContext(ctx, `
|
|
SELECT version
|
|
FROM objects
|
|
WHERE
|
|
project_id = $1 AND
|
|
bucket_name = $2 AND
|
|
object_key = $3 AND
|
|
status = `+committedStatus,
|
|
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey))(func(rows tagsql.Rows) error {
|
|
for rows.Next() {
|
|
var version Version
|
|
if err := rows.Scan(&version); err != nil {
|
|
return Error.New("failed to scan previous object: %w", err)
|
|
}
|
|
|
|
versionsToDelete = append(versionsToDelete, version)
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return Error.New("failed to find previous objects: %w", err)
|
|
}
|
|
|
|
if len(versionsToDelete) > 1 {
|
|
db.log.Warn("object with multiple committed versions were found!",
|
|
zap.Stringer("Project ID", opts.ProjectID), zap.String("Bucket Name", opts.BucketName),
|
|
zap.ByteString("Object Key", []byte(opts.ObjectKey)), zap.Int("deleted", len(versionsToDelete)))
|
|
|
|
mon.Meter("multiple_committed_versions").Mark(1)
|
|
}
|
|
|
|
if len(versionsToDelete) != 0 && opts.DisallowDelete {
|
|
return ErrPermissionDenied.New("no permissions to delete existing object")
|
|
}
|
|
|
|
err = tx.QueryRowContext(ctx, `
|
|
UPDATE objects SET
|
|
status =`+committedStatus+`,
|
|
segment_count = $6,
|
|
|
|
total_plain_size = $7,
|
|
total_encrypted_size = $8,
|
|
fixed_segment_size = $9,
|
|
zombie_deletion_deadline = NULL,
|
|
|
|
-- TODO should we allow to override existing encryption parameters or return error if don't match with opts?
|
|
encryption = CASE
|
|
WHEN objects.encryption = 0 AND $10 <> 0 THEN $10
|
|
WHEN objects.encryption = 0 AND $10 = 0 THEN NULL
|
|
ELSE objects.encryption
|
|
END
|
|
`+metadataColumns+`
|
|
WHERE
|
|
project_id = $1 AND
|
|
bucket_name = $2 AND
|
|
object_key = $3 AND
|
|
version = $4 AND
|
|
stream_id = $5 AND
|
|
status = `+pendingStatus+`
|
|
RETURNING
|
|
created_at, expires_at,
|
|
encrypted_metadata, encrypted_metadata_encrypted_key, encrypted_metadata_nonce,
|
|
encryption
|
|
`, args...).Scan(
|
|
&object.CreatedAt, &object.ExpiresAt,
|
|
&object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey, &object.EncryptedMetadataNonce,
|
|
encryptionParameters{&object.Encryption},
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return storj.ErrObjectNotFound.Wrap(Error.New("object with specified version and pending status is missing"))
|
|
} else if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation {
|
|
// TODO maybe we should check message if 'encryption' label is there
|
|
return ErrInvalidRequest.New("Encryption is missing")
|
|
}
|
|
return Error.New("failed to update object: %w", err)
|
|
}
|
|
|
|
for _, version := range versionsToDelete {
|
|
deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
|
|
ObjectLocation: ObjectLocation{
|
|
ProjectID: opts.ProjectID,
|
|
BucketName: opts.BucketName,
|
|
ObjectKey: opts.ObjectKey,
|
|
},
|
|
Version: version,
|
|
}, tx)
|
|
if err != nil {
|
|
return Error.New("failed to delete existing object: %w", err)
|
|
}
|
|
|
|
deletedSegments = append(deletedSegments, deleteResult.Segments...)
|
|
}
|
|
|
|
object.StreamID = opts.StreamID
|
|
object.ProjectID = opts.ProjectID
|
|
object.BucketName = opts.BucketName
|
|
object.ObjectKey = opts.ObjectKey
|
|
object.Version = opts.Version
|
|
object.Status = Committed
|
|
object.SegmentCount = int32(len(segments))
|
|
object.TotalPlainSize = totalPlainSize
|
|
object.TotalEncryptedSize = totalEncryptedSize
|
|
object.FixedSegmentSize = fixedSegmentSize
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return Object{}, err
|
|
}
|
|
|
|
// we can execute this only when whole transaction is committed without any error
|
|
if len(deletedSegments) > 0 && opts.OnDelete != nil {
|
|
opts.OnDelete(deletedSegments)
|
|
}
|
|
|
|
mon.Meter("object_commit").Mark(1)
|
|
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
|
|
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)
|
|
|
|
return object, nil
|
|
}
|
|
|
|
func (db *DB) validateParts(segments []segmentInfoForCommit) error {
|
|
partSize := make(map[uint32]memory.Size)
|
|
|
|
var lastPart uint32
|
|
for _, segment := range segments {
|
|
partSize[segment.Position.Part] += memory.Size(segment.PlainSize)
|
|
if lastPart < segment.Position.Part {
|
|
lastPart = segment.Position.Part
|
|
}
|
|
}
|
|
|
|
if len(partSize) > db.config.MaxNumberOfParts {
|
|
return Error.New("exceeded maximum number of parts: %d", db.config.MaxNumberOfParts)
|
|
}
|
|
|
|
for part, size := range partSize {
|
|
// Last part has no minimum size.
|
|
if part == lastPart {
|
|
continue
|
|
}
|
|
|
|
if size < db.config.MinPartSize {
|
|
return Error.New("size of part number %d is below minimum threshold, got: %s, min: %s", part, size, db.config.MinPartSize)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|