satellite/metainfo/metabase: add commit with specifying a list of segments
Change-Id: Ibb9999545691d150f36e20ce70ac00a3802ad7b2
This commit is contained in:
parent
f077564bb7
commit
365410d10b
@ -394,14 +394,8 @@ type CommitObject struct {
|
||||
EncryptedMetadata []byte
|
||||
EncryptedMetadataNonce []byte
|
||||
EncryptedMetadataEncryptedKey []byte
|
||||
|
||||
// TODO: proof
|
||||
Proofs []SegmentProof
|
||||
}
|
||||
|
||||
// SegmentProof ensures that segments cannot be tampered with.
|
||||
type SegmentProof struct{}
|
||||
|
||||
// CommitObject adds a pending object to the database.
|
||||
func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -410,17 +404,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
return Object{}, err
|
||||
}
|
||||
|
||||
// TODO: deduplicate basic checks.
|
||||
switch {
|
||||
case len(opts.Proofs) > 0:
|
||||
return db.commitObjectWithProofs(ctx, opts)
|
||||
default:
|
||||
return db.commitObjectWithoutProofs(ctx, opts)
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) commitObjectWithoutProofs(ctx context.Context, opts CommitObject) (object Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
type Segment struct {
|
||||
Position SegmentPosition
|
||||
@ -550,11 +533,6 @@ func (db *DB) commitObjectWithoutProofs(ctx context.Context, opts CommitObject)
|
||||
return object, nil
|
||||
}
|
||||
|
||||
func (db *DB) commitObjectWithProofs(ctx context.Context, opts CommitObject) (object Object, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return Object{}, Error.New("unimplemented")
|
||||
}
|
||||
|
||||
// UpdateObjectMetadata contains arguments necessary for updating an object metadata.
|
||||
type UpdateObjectMetadata struct {
|
||||
ObjectStream
|
||||
|
348
satellite/metainfo/metabase/commit_object.go
Normal file
348
satellite/metainfo/metabase/commit_object.go
Normal file
@ -0,0 +1,348 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metabase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/dbutil/pgutil"
|
||||
"storj.io/storj/private/dbutil/txutil"
|
||||
"storj.io/storj/private/tagsql"
|
||||
)
|
||||
|
||||
// CommitObjectWithSegments contains arguments necessary for committing an object.
|
||||
type CommitObjectWithSegments struct {
|
||||
ObjectStream
|
||||
|
||||
EncryptedMetadata []byte
|
||||
EncryptedMetadataNonce []byte
|
||||
EncryptedMetadataEncryptedKey []byte
|
||||
|
||||
// TODO: this probably should use segment ranges rather than individual items
|
||||
Segments []SegmentPosition
|
||||
}
|
||||
|
||||
// CommitObjectWithSegments commits pending object to the database.
|
||||
func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWithSegments) (object Object, deletedSegments []DeletedSegmentInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if err := opts.ObjectStream.Verify(); err != nil {
|
||||
return Object{}, nil, err
|
||||
}
|
||||
if err := verifySegmentOrder(opts.Segments); err != nil {
|
||||
return Object{}, nil, err
|
||||
}
|
||||
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
|
||||
// TODO: should we prevent this from executing when the object has been committed
|
||||
// currently this requires quite a lot of database communication, so invalid handling can be expensive.
|
||||
|
||||
segmentsInDatabase, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
finalSegments, segmentsToDelete, err := determineCommitActions(opts.Segments, segmentsInDatabase)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = updateSegmentOffsets(ctx, tx, opts.StreamID, finalSegments)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deletedSegments, err = deleteSegmentsNotInCommit(ctx, tx, opts.StreamID, segmentsToDelete)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: would we even need this when we make main index plain_offset?
|
||||
fixedSegmentSize := int32(0)
|
||||
if len(finalSegments) > 0 {
|
||||
fixedSegmentSize = finalSegments[0].EncryptedSize
|
||||
for _, seg := range finalSegments[:len(finalSegments)-1] {
|
||||
if seg.EncryptedSize != fixedSegmentSize {
|
||||
fixedSegmentSize = -1
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var totalPlainSize, totalEncryptedSize int64
|
||||
for _, seg := range finalSegments {
|
||||
totalPlainSize += int64(seg.PlainSize)
|
||||
totalEncryptedSize += int64(seg.EncryptedSize)
|
||||
}
|
||||
|
||||
err = tx.QueryRow(ctx, `
|
||||
UPDATE objects SET
|
||||
status =`+committedStatus+`,
|
||||
segment_count = $6,
|
||||
|
||||
encrypted_metadata_nonce = $7,
|
||||
encrypted_metadata = $8,
|
||||
encrypted_metadata_encrypted_key = $9,
|
||||
|
||||
total_plain_size = $10,
|
||||
total_encrypted_size = $11,
|
||||
fixed_segment_size = $12,
|
||||
zombie_deletion_deadline = NULL
|
||||
WHERE
|
||||
project_id = $1 AND
|
||||
bucket_name = $2 AND
|
||||
object_key = $3 AND
|
||||
version = $4 AND
|
||||
stream_id = $5 AND
|
||||
status = `+pendingStatus+`
|
||||
RETURNING
|
||||
created_at, expires_at,
|
||||
encryption;
|
||||
`, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID,
|
||||
len(finalSegments),
|
||||
opts.EncryptedMetadataNonce, opts.EncryptedMetadata, opts.EncryptedMetadataEncryptedKey,
|
||||
totalPlainSize,
|
||||
totalEncryptedSize,
|
||||
fixedSegmentSize,
|
||||
).
|
||||
Scan(
|
||||
&object.CreatedAt, &object.ExpiresAt,
|
||||
encryptionParameters{&object.Encryption},
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return storj.ErrObjectNotFound.Wrap(Error.New("object with specified version and pending status is missing"))
|
||||
}
|
||||
return Error.New("failed to update object: %w", err)
|
||||
}
|
||||
|
||||
object.StreamID = opts.StreamID
|
||||
object.ProjectID = opts.ProjectID
|
||||
object.BucketName = opts.BucketName
|
||||
object.ObjectKey = opts.ObjectKey
|
||||
object.Version = opts.Version
|
||||
object.Status = Committed
|
||||
object.SegmentCount = int32(len(finalSegments))
|
||||
object.EncryptedMetadataNonce = opts.EncryptedMetadataNonce
|
||||
object.EncryptedMetadata = opts.EncryptedMetadata
|
||||
object.EncryptedMetadataEncryptedKey = opts.EncryptedMetadataEncryptedKey
|
||||
object.TotalPlainSize = totalPlainSize
|
||||
object.TotalEncryptedSize = totalEncryptedSize
|
||||
object.FixedSegmentSize = fixedSegmentSize
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return Object{}, nil, err
|
||||
}
|
||||
return object, deletedSegments, nil
|
||||
}
|
||||
|
||||
func verifySegmentOrder(positions []SegmentPosition) error {
|
||||
if len(positions) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
last := positions[0]
|
||||
for _, next := range positions[1:] {
|
||||
if !last.Less(next) {
|
||||
return Error.New("segments not in ascending order, got %v before %v", last, next)
|
||||
}
|
||||
last = next
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// segmentInfoForCommit is database state prior to deleting objects.
|
||||
type segmentInfoForCommit struct {
|
||||
Position SegmentPosition
|
||||
EncryptedSize int32
|
||||
PlainOffset int64
|
||||
PlainSize int32
|
||||
}
|
||||
|
||||
// fetchSegmentsForCommit loads information necessary for validating segment existence and offsets.
|
||||
func fetchSegmentsForCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID) (segments []segmentInfoForCommit, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = withRows(tx.Query(ctx, `
|
||||
SELECT position, encrypted_size, plain_offset, plain_size
|
||||
FROM segments
|
||||
WHERE stream_id = $1
|
||||
ORDER BY position
|
||||
`, streamID))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var segment segmentInfoForCommit
|
||||
err := rows.Scan(&segment.Position, &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize)
|
||||
if err != nil {
|
||||
return Error.New("failed to scan segments: %w", err)
|
||||
}
|
||||
segments = append(segments, segment)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.New("failed to fetch segments: %w", err)
|
||||
}
|
||||
return segments, nil
|
||||
}
|
||||
|
||||
type segmentToCommit struct {
|
||||
Position SegmentPosition
|
||||
OldPlainOffset int64
|
||||
PlainSize int32
|
||||
EncryptedSize int32
|
||||
}
|
||||
|
||||
// determineCommitActions detects how should the database be updated and which segments should be deleted.
|
||||
func determineCommitActions(segments []SegmentPosition, segmentsInDatabase []segmentInfoForCommit) (commit []segmentToCommit, toDelete []SegmentPosition, err error) {
|
||||
var invalidSegments errs.Group
|
||||
|
||||
commit = make([]segmentToCommit, 0, len(segments))
|
||||
diffSegmentsWithDatabase(segments, segmentsInDatabase, func(a *SegmentPosition, b *segmentInfoForCommit) {
|
||||
// If we do not have an appropriate segment in the database it means
|
||||
// either the segment was deleted before commit finished or the
|
||||
// segment was not uploaded. Either way we need to fail the commit.
|
||||
if b == nil {
|
||||
invalidSegments.Add(fmt.Errorf("%v: segment not committed", *a))
|
||||
return
|
||||
}
|
||||
|
||||
// If we do not commit a segment that's in a database we should delete them.
|
||||
// This could happen when the user tries to upload a segment,
|
||||
// fails, reuploads and then during commit decides to not commit into the object.
|
||||
if a == nil {
|
||||
toDelete = append(toDelete, b.Position)
|
||||
return
|
||||
}
|
||||
|
||||
commit = append(commit, segmentToCommit{
|
||||
Position: *a,
|
||||
OldPlainOffset: b.PlainOffset,
|
||||
PlainSize: b.PlainSize,
|
||||
EncryptedSize: b.EncryptedSize,
|
||||
})
|
||||
})
|
||||
|
||||
if err := invalidSegments.Err(); err != nil {
|
||||
return nil, nil, Error.New("segments and database does not match: %v", err)
|
||||
}
|
||||
return commit, toDelete, nil
|
||||
}
|
||||
|
||||
// updateSegmentOffsets updates segment offsets that didn't match the database state.
|
||||
func updateSegmentOffsets(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, updates []segmentToCommit) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(updates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// We may be able to skip this, if the database state have been already submitted
|
||||
// and the plain offsets haven't changed.
|
||||
|
||||
// Update plain offsets of the segments.
|
||||
var update struct {
|
||||
Positions []int64
|
||||
PlainOffsets []int64
|
||||
}
|
||||
expectedOffset := int64(0)
|
||||
for _, u := range updates {
|
||||
if u.OldPlainOffset != expectedOffset {
|
||||
update.Positions = append(update.Positions, int64(u.Position.Encode()))
|
||||
update.PlainOffsets = append(update.PlainOffsets, expectedOffset)
|
||||
}
|
||||
expectedOffset += int64(u.PlainSize)
|
||||
}
|
||||
|
||||
if len(update.Positions) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
updateResult, err := tx.Exec(ctx, `
|
||||
UPDATE segments
|
||||
SET plain_offset = P.plain_offset
|
||||
FROM (SELECT unnest($2::INT8[]), unnest($3::INT8[])) as P(position, plain_offset)
|
||||
WHERE segments.stream_id = $1 AND segments.position = P.position
|
||||
`, streamID, pgutil.Int8Array(update.Positions), pgutil.Int8Array(update.PlainOffsets))
|
||||
if err != nil {
|
||||
return Error.New("unable to update segments offsets: %w", err)
|
||||
}
|
||||
affected, err := updateResult.RowsAffected()
|
||||
if err != nil {
|
||||
return Error.New("unable to get number of affected segments: %w", err)
|
||||
}
|
||||
if affected != int64(len(update.Positions)) {
|
||||
return Error.New("not all segments were updated, expected %d got %d", len(update.Positions), affected)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteSegmentsNotInCommit deletes the listed segments inside the tx.
|
||||
func deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, segments []SegmentPosition) (deletedSegments []DeletedSegmentInfo, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
if len(segments) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
positions := []int64{}
|
||||
for _, p := range segments {
|
||||
positions = append(positions, int64(p.Encode()))
|
||||
}
|
||||
|
||||
// This potentially could be done together with the previous database call.
|
||||
err = withRows(tx.Query(ctx, `
|
||||
DELETE FROM segments
|
||||
WHERE stream_id = $1 AND position = ANY($2)
|
||||
RETURNING root_piece_id, remote_pieces
|
||||
`, streamID, pgutil.Int8Array(positions)))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
var deleted DeletedSegmentInfo
|
||||
err := rows.Scan(&deleted.RootPieceID, &deleted.Pieces)
|
||||
if err != nil {
|
||||
return Error.New("failed to scan segments: %w", err)
|
||||
}
|
||||
// we don't need to report info about inline segments
|
||||
if deleted.RootPieceID.IsZero() {
|
||||
continue
|
||||
}
|
||||
deletedSegments = append(deletedSegments, deleted)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, Error.New("unable to delete segments: %w", err)
|
||||
}
|
||||
|
||||
return deletedSegments, nil
|
||||
}
|
||||
|
||||
// diffSegmentsWithDatabase matches up segment positions with their database information.
|
||||
func diffSegmentsWithDatabase(as []SegmentPosition, bs []segmentInfoForCommit, cb func(a *SegmentPosition, b *segmentInfoForCommit)) {
|
||||
for len(as) > 0 && len(bs) > 0 {
|
||||
if as[0] == bs[0].Position {
|
||||
cb(&as[0], &bs[0])
|
||||
as, bs = as[1:], bs[1:]
|
||||
} else if as[0].Less(bs[0].Position) {
|
||||
cb(&as[0], nil)
|
||||
as = as[1:]
|
||||
} else {
|
||||
cb(nil, &bs[0])
|
||||
bs = bs[1:]
|
||||
}
|
||||
}
|
||||
for i := range as {
|
||||
cb(&as[i], nil)
|
||||
}
|
||||
for i := range bs {
|
||||
cb(nil, &bs[i])
|
||||
}
|
||||
}
|
568
satellite/metainfo/metabase/commit_object_test.go
Normal file
568
satellite/metainfo/metabase/commit_object_test.go
Normal file
@ -0,0 +1,568 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metabase_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
func TestCommitObjectWithSegments(t *testing.T) {
|
||||
All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
|
||||
obj := randObjectStream()
|
||||
|
||||
for _, test := range invalidObjectStreams(obj) {
|
||||
test := test
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: test.ObjectStream,
|
||||
},
|
||||
ErrClass: test.ErrClass,
|
||||
ErrText: test.ErrText,
|
||||
}.Check(ctx, t, db)
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("invalid order", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
|
||||
pos01 := metabase.SegmentPosition{Part: 0, Index: 1}
|
||||
pos10 := metabase.SegmentPosition{Part: 1, Index: 0}
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos01,
|
||||
pos00,
|
||||
},
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "segments not in ascending order, got {0 1} before {0 0}",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos10,
|
||||
pos00,
|
||||
},
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "segments not in ascending order, got {1 0} before {0 0}",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos00,
|
||||
pos00,
|
||||
},
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "segments not in ascending order, got {0 0} before {0 0}",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("version without pending", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: object with specified version and pending status is missing", // TODO: this error message could be better
|
||||
}.Check(ctx, t, db)
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("version", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 5,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
encryptedMetadata := testrand.Bytes(1024)
|
||||
encryptedMetadataNonce := testrand.Nonce()
|
||||
encryptedMetadataKey := testrand.Bytes(265)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
// disallow for double commit
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
},
|
||||
ErrClass: &storj.ErrObjectNotFound,
|
||||
ErrText: "metabase: object with specified version and pending status is missing", // TODO: this error message could be better
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: metabase.ObjectStream{
|
||||
ProjectID: obj.ProjectID,
|
||||
BucketName: obj.BucketName,
|
||||
ObjectKey: obj.ObjectKey,
|
||||
Version: 5,
|
||||
StreamID: obj.StreamID,
|
||||
},
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
EncryptedMetadataNonce: encryptedMetadataNonce[:],
|
||||
EncryptedMetadata: encryptedMetadata,
|
||||
EncryptedMetadataEncryptedKey: encryptedMetadataKey,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("segments missing in database", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos00,
|
||||
},
|
||||
},
|
||||
ErrClass: &metabase.Error,
|
||||
ErrText: "segments and database does not match: {0 0}: segment not committed",
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Pending,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete segments that are not in proofs", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
|
||||
rootPieceID00 := testrand.PieceID()
|
||||
pieces00 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey00 := testrand.Bytes(32)
|
||||
encryptedKeyNonce00 := testrand.Bytes(32)
|
||||
|
||||
pos01 := metabase.SegmentPosition{Part: 0, Index: 1}
|
||||
rootPieceID01 := testrand.PieceID()
|
||||
pieces01 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey01 := testrand.Bytes(32)
|
||||
encryptedKeyNonce01 := testrand.Bytes(32)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
|
||||
Position: pos00,
|
||||
RootPieceID: rootPieceID00,
|
||||
Pieces: pieces00,
|
||||
|
||||
EncryptedKey: encryptedKey00,
|
||||
EncryptedKeyNonce: encryptedKeyNonce00,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
|
||||
Position: pos01,
|
||||
RootPieceID: rootPieceID01,
|
||||
Pieces: pieces01,
|
||||
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos01,
|
||||
},
|
||||
},
|
||||
Deleted: []metabase.DeletedSegmentInfo{
|
||||
{RootPieceID: rootPieceID00, Pieces: pieces00},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
SegmentCount: 1,
|
||||
|
||||
TotalPlainSize: 512,
|
||||
TotalEncryptedSize: 1024,
|
||||
FixedSegmentSize: 1024,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: pos01,
|
||||
|
||||
RootPieceID: rootPieceID01,
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainOffset: 0,
|
||||
PlainSize: 512,
|
||||
|
||||
Redundancy: defaultTestRedundancy,
|
||||
|
||||
Pieces: pieces01,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("delete inline segments that are not in proofs", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
|
||||
data00 := testrand.Bytes(32)
|
||||
encryptedKey00 := testrand.Bytes(32)
|
||||
encryptedKeyNonce00 := testrand.Bytes(32)
|
||||
|
||||
pos01 := metabase.SegmentPosition{Part: 0, Index: 1}
|
||||
data01 := testrand.Bytes(1024)
|
||||
encryptedKey01 := testrand.Bytes(32)
|
||||
encryptedKeyNonce01 := testrand.Bytes(32)
|
||||
|
||||
CommitInlineSegment{
|
||||
Opts: metabase.CommitInlineSegment{
|
||||
ObjectStream: obj,
|
||||
Position: pos00,
|
||||
|
||||
EncryptedKey: encryptedKey00,
|
||||
EncryptedKeyNonce: encryptedKeyNonce00,
|
||||
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
|
||||
InlineData: data00,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitInlineSegment{
|
||||
Opts: metabase.CommitInlineSegment{
|
||||
ObjectStream: obj,
|
||||
Position: pos01,
|
||||
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
|
||||
InlineData: data01,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos01,
|
||||
},
|
||||
},
|
||||
Deleted: nil,
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
SegmentCount: 1,
|
||||
|
||||
TotalPlainSize: 512,
|
||||
TotalEncryptedSize: 1024,
|
||||
FixedSegmentSize: 1024,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: pos01,
|
||||
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainOffset: 0,
|
||||
PlainSize: 512,
|
||||
|
||||
InlineData: data01,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("updated plain offset", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
pos00 := metabase.SegmentPosition{Part: 0, Index: 0}
|
||||
rootPieceID00 := testrand.PieceID()
|
||||
pieces00 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey00 := testrand.Bytes(32)
|
||||
encryptedKeyNonce00 := testrand.Bytes(32)
|
||||
|
||||
pos01 := metabase.SegmentPosition{Part: 0, Index: 1}
|
||||
rootPieceID01 := testrand.PieceID()
|
||||
pieces01 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}}
|
||||
encryptedKey01 := testrand.Bytes(32)
|
||||
encryptedKeyNonce01 := testrand.Bytes(32)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
|
||||
Position: pos00,
|
||||
RootPieceID: rootPieceID00,
|
||||
Pieces: pieces00,
|
||||
|
||||
EncryptedKey: encryptedKey00,
|
||||
EncryptedKeyNonce: encryptedKeyNonce00,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitSegment{
|
||||
Opts: metabase.CommitSegment{
|
||||
ObjectStream: obj,
|
||||
|
||||
Position: pos01,
|
||||
RootPieceID: rootPieceID01,
|
||||
Pieces: pieces01,
|
||||
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainSize: 512,
|
||||
PlainOffset: 0,
|
||||
Redundancy: defaultTestRedundancy,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
Segments: []metabase.SegmentPosition{
|
||||
pos00,
|
||||
pos01,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
SegmentCount: 2,
|
||||
|
||||
TotalPlainSize: 1024,
|
||||
TotalEncryptedSize: 2048,
|
||||
FixedSegmentSize: 1024,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
Segments: []metabase.RawSegment{
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: pos00,
|
||||
|
||||
RootPieceID: rootPieceID00,
|
||||
EncryptedKey: encryptedKey00,
|
||||
EncryptedKeyNonce: encryptedKeyNonce00,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainOffset: 0,
|
||||
PlainSize: 512,
|
||||
|
||||
Redundancy: defaultTestRedundancy,
|
||||
|
||||
Pieces: pieces00,
|
||||
},
|
||||
{
|
||||
StreamID: obj.StreamID,
|
||||
Position: pos01,
|
||||
|
||||
RootPieceID: rootPieceID01,
|
||||
EncryptedKey: encryptedKey01,
|
||||
EncryptedKeyNonce: encryptedKeyNonce01,
|
||||
|
||||
EncryptedSize: 1024,
|
||||
PlainOffset: 512,
|
||||
PlainSize: 512,
|
||||
|
||||
Redundancy: defaultTestRedundancy,
|
||||
|
||||
Pieces: pieces01,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("no segments", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
Opts: metabase.BeginObjectExactVersion{
|
||||
ObjectStream: obj,
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
Version: 1,
|
||||
}.Check(ctx, t, db)
|
||||
now := time.Now()
|
||||
|
||||
CommitObjectWithSegments{
|
||||
Opts: metabase.CommitObjectWithSegments{
|
||||
ObjectStream: obj,
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
|
||||
Verify{
|
||||
Objects: []metabase.RawObject{
|
||||
{
|
||||
ObjectStream: obj,
|
||||
CreatedAt: now,
|
||||
Status: metabase.Committed,
|
||||
|
||||
Encryption: defaultTestEncryption,
|
||||
},
|
||||
},
|
||||
}.Check(ctx, t, db)
|
||||
})
|
||||
})
|
||||
}
|
@ -1428,7 +1428,7 @@ func TestCommitObject(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("no proofs with version without pending", func(t *testing.T) {
|
||||
t.Run("version without pending", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
CommitObject{
|
||||
@ -1447,7 +1447,7 @@ func TestCommitObject(t *testing.T) {
|
||||
Verify{}.Check(ctx, t, db)
|
||||
})
|
||||
|
||||
t.Run("no proofs with version", func(t *testing.T) {
|
||||
t.Run("version", func(t *testing.T) {
|
||||
defer DeleteAll{}.Check(ctx, t, db)
|
||||
|
||||
BeginObjectExactVersion{
|
||||
|
@ -260,6 +260,9 @@ func SegmentPositionFromEncoded(v uint64) SegmentPosition {
|
||||
// Encode encodes a segment position into an uint64, that can be stored in a database.
|
||||
func (pos SegmentPosition) Encode() uint64 { return uint64(pos.Part)<<32 | uint64(pos.Index) }
|
||||
|
||||
// Less returns whether pos should before b.
|
||||
func (pos SegmentPosition) Less(b SegmentPosition) bool { return pos.Encode() < b.Encode() }
|
||||
|
||||
// Version is used to uniquely identify objects with the same key.
|
||||
type Version int64
|
||||
|
||||
|
@ -62,6 +62,23 @@ func (step CommitObject) Check(ctx *testcontext.Context, t *testing.T, db *metab
|
||||
return object
|
||||
}
|
||||
|
||||
type CommitObjectWithSegments struct {
|
||||
Opts metabase.CommitObjectWithSegments
|
||||
Deleted []metabase.DeletedSegmentInfo
|
||||
ErrClass *errs.Class
|
||||
ErrText string
|
||||
}
|
||||
|
||||
func (step CommitObjectWithSegments) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) metabase.Object {
|
||||
object, deleted, err := db.CommitObjectWithSegments(ctx, step.Opts)
|
||||
checkError(t, err, step.ErrClass, step.ErrText)
|
||||
if err == nil {
|
||||
require.Equal(t, step.Opts.ObjectStream, object.ObjectStream)
|
||||
}
|
||||
require.Equal(t, step.Deleted, deleted)
|
||||
return object
|
||||
}
|
||||
|
||||
type BeginSegment struct {
|
||||
Opts metabase.BeginSegment
|
||||
ErrClass *errs.Class
|
||||
|
Loading…
Reference in New Issue
Block a user