From 809eb14ac293bacad03da9f15134ae4050c1e964 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 28 Oct 2020 16:28:06 +0100 Subject: [PATCH] satellite/metainfo/metabase: move metainfo PoC into storj repo Change-Id: I39356d8bc7305b4a8ea0c1fb5603010ad72a68b9 --- Jenkinsfile.public | 4 + go.mod | 1 + go.sum | 2 + satellite/metainfo/metabase/commit.go | 519 ++++++++ satellite/metainfo/metabase/commit_test.go | 1312 ++++++++++++++++++++ satellite/metainfo/metabase/common.go | 84 ++ satellite/metainfo/metabase/db.go | 122 ++ satellite/metainfo/metabase/db_test.go | 83 ++ satellite/metainfo/metabase/delete.go | 320 +++++ satellite/metainfo/metabase/delete_test.go | 691 +++++++++++ satellite/metainfo/metabase/encoding.go | 198 +++ satellite/metainfo/metabase/get.go | 197 +++ satellite/metainfo/metabase/get_test.go | 389 ++++++ satellite/metainfo/metabase/raw.go | 200 +++ satellite/metainfo/metabase/test_test.go | 207 +++ 15 files changed, 4329 insertions(+) create mode 100644 satellite/metainfo/metabase/commit.go create mode 100644 satellite/metainfo/metabase/commit_test.go create mode 100644 satellite/metainfo/metabase/db.go create mode 100644 satellite/metainfo/metabase/db_test.go create mode 100644 satellite/metainfo/metabase/delete.go create mode 100644 satellite/metainfo/metabase/delete_test.go create mode 100644 satellite/metainfo/metabase/encoding.go create mode 100644 satellite/metainfo/metabase/get.go create mode 100644 satellite/metainfo/metabase/get_test.go create mode 100644 satellite/metainfo/metabase/raw.go create mode 100644 satellite/metainfo/metabase/test_test.go diff --git a/Jenkinsfile.public b/Jenkinsfile.public index e505293d6..90bb9b4b4 100644 --- a/Jenkinsfile.public +++ b/Jenkinsfile.public @@ -64,6 +64,7 @@ pipeline { 'cockroach://root@localhost:26259/testcockroach?sslmode=disable' STORJ_TEST_COCKROACH_ALT = 'cockroach://root@localhost:26260/testcockroach?sslmode=disable' STORJ_TEST_POSTGRES = 'postgres://postgres@localhost/teststorj?sslmode=disable' + STORJ_TEST_DATABASES = 'crdb|pgx|postgres://root@localhost:26259/testmetabase?sslmode=disable;pg|pgx|postgres://postgres@localhost/testmetabase?sslmode=disable' COVERFLAGS = "${ env.BRANCH_NAME != 'master' ? '' : '-coverprofile=.build/coverprofile -coverpkg=storj.io/storj/private/...,storj.io/storj/pkg/...,storj.io/storj/satellite/...,storj.io/storj/storage/...,storj.io/storj/storagenode/...,storj.io/storj/versioncontrol/...'}" } steps { @@ -73,7 +74,10 @@ pipeline { sh 'cockroach sql --insecure --host=localhost:26259 -e \'create database testcockroach;\'' sh 'cockroach sql --insecure --host=localhost:26260 -e \'create database testcockroach;\'' + sh 'cockroach sql --insecure --host=localhost:26259 -e \'create database testmetabase;\'' + sh 'psql -U postgres -c \'create database teststorj;\'' + sh 'psql -U postgres -c \'create database testmetabase;\'' sh 'use-ports -from 1024 -to 10000 &' sh 'go test -parallel 4 -p 6 -vet=off $COVERFLAGS -timeout 20m -json -race ./... 2>&1 | tee .build/tests.json | xunit -out .build/tests.xml' sh 'check-clean-directory' diff --git a/go.mod b/go.mod index 8679d3d16..f062f4859 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/gorilla/schema v1.2.0 github.com/graphql-go/graphql v0.7.9 github.com/jackc/pgconn v1.7.0 + github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451 github.com/jackc/pgtype v1.5.0 github.com/jackc/pgx/v4 v4.9.0 github.com/jtolds/monkit-hw/v2 v2.0.0-20191108235325-141a0da276b3 diff --git a/go.sum b/go.sum index c8544952c..a9dbd9a57 100644 --- a/go.sum +++ b/go.sum @@ -251,6 +251,8 @@ github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.7.0 h1:pwjzcYyfmz/HQOQlENvG1OcDqauTGaqlVahq934F0/U= github.com/jackc/pgconn v1.7.0/go.mod h1:sF/lPpNEMEOp+IYhyQGdAvrG20gWf6A1tKlr0v7JMeA= +github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451 h1:WAvSpGf7MsFuzAtK4Vk7R4EVe+liW4x83r4oWu0WHKw= +github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA= diff --git a/satellite/metainfo/metabase/commit.go b/satellite/metainfo/metabase/commit.go new file mode 100644 index 000000000..a43fda106 --- /dev/null +++ b/satellite/metainfo/metabase/commit.go @@ -0,0 +1,519 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "database/sql" + "errors" + "time" + + pgxerrcode "github.com/jackc/pgerrcode" + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/storj/private/dbutil/pgutil/pgerrcode" +) + +var ( + // ErrInvalidRequest is used to indicate invalid requests. + ErrInvalidRequest = errs.Class("metabase: invalid request") + // ErrConflict is used to indicate conflict with the request. + ErrConflict = errs.Class("metabase: conflict") +) + +// BeginObjectNextVersion contains arguments necessary for starting an object upload. +type BeginObjectNextVersion struct { + ObjectStream + + ExpiresAt *time.Time + ZombieDeletionDeadline *time.Time + + // TODO: should we include encrypted metadata + // TODO: should we include encryption +} + +// BeginObjectNextVersion adds a pending object to the database, with automatically assigned version. +func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion) (committed Version, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return -1, err + } + + if opts.Version != NextVersion { + return -1, ErrInvalidRequest.New("Version should be metabase.NextVersion") + } + + row := db.db.QueryRow(ctx, ` + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + expires_at, zombie_deletion_deadline + ) VALUES ( + $1, $2, $3, + coalesce(( + SELECT version + 1 + FROM objects + WHERE project_id = $1 AND bucket_name = $2 AND object_key = $3 + ORDER BY version DESC + LIMIT 1 + ), 1), + $4, $5, $6) + RETURNING version + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.StreamID, + opts.ExpiresAt, opts.ZombieDeletionDeadline) + + var v int64 + if err := row.Scan(&v); err != nil { + return -1, Error.New("unable to insert object: %w", err) + } + + return Version(v), nil +} + +// BeginObjectExactVersion contains arguments necessary for starting an object upload. +type BeginObjectExactVersion struct { + ObjectStream + + ExpiresAt *time.Time + ZombieDeletionDeadline *time.Time + + // TODO: should we include encrypted metadata + // TODO: should we include encryption +} + +// BeginObjectExactVersion adds a pending object to the database, with specific version. +func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion) (committed Version, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return -1, err + } + + if opts.Version == NextVersion { + return -1, ErrInvalidRequest.New("Version should not be metabase.NextVersion") + } + + _, err = db.db.ExecContext(ctx, ` + INSERT INTO objects ( + project_id, bucket_name, object_key, version, stream_id, + expires_at, zombie_deletion_deadline + ) values ( + $1, $2, $3, $4, $5, + $6, $7 + ) + `, + opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID, + opts.ExpiresAt, opts.ZombieDeletionDeadline) + if err != nil { + if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { + return -1, ErrConflict.New("object already exists") + } + return -1, Error.New("unable to insert object: %w", err) + } + + return opts.Version, nil +} + +// BeginSegment contains options to verify, whether a new segment upload can be started. +type BeginSegment struct { + ObjectStream + + Position SegmentPosition + RootPieceID storj.PieceID + Pieces Pieces +} + +// BeginSegment verifies, whether a new segment upload can be started. +func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return err + } + + switch { + case opts.RootPieceID.IsZero(): + return ErrInvalidRequest.New("RootPieceID missing") + case len(opts.Pieces) == 0: + return ErrInvalidRequest.New("Pieces missing") + } + + // TODO: verify opts.Pieces content. + + // NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment. + // however, we should prevent creating segements for non-partial objects. + + // NOTE: these queries could be combined into one. + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + // Verify that object exists and is partial. + var value int + err = tx.QueryRow(ctx, ` + SELECT 1 + FROM objects WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + stream_id = $5 AND + status = 0 + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Error.New("pending object missing") + } + return Error.New("unable to query object status: %w", err) + } + + // Verify that the segment does not exist. + err = tx.QueryRow(ctx, ` + SELECT 1 + FROM segments WHERE + stream_id = $1 AND + position = $2 + `, opts.StreamID, opts.Position).Scan(&value) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return Error.New("unable to query segments: %w", err) + } + err = nil // ignore any other err result (explicitly) + + err, committed = tx.Commit(), true + if err != nil { + return Error.New("unable to commit tx: %w", err) + } + + return nil +} + +// CommitSegment contains all necessary information about the segment. +type CommitSegment struct { + ObjectStream + + Position SegmentPosition + RootPieceID storj.PieceID + + EncryptedKeyNonce []byte + EncryptedKey []byte + + PlainOffset int64 // offset in the original data stream + PlainSize int32 // size before encryption + EncryptedSize int32 // segment size after encryption + + Redundancy storj.RedundancyScheme + + Pieces Pieces +} + +// CommitSegment commits segment to the database. +func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return err + } + + switch { + case opts.RootPieceID.IsZero(): + return ErrInvalidRequest.New("RootPieceID missing") + case len(opts.Pieces) == 0: + return ErrInvalidRequest.New("Pieces missing") + case len(opts.EncryptedKey) == 0: + return ErrInvalidRequest.New("EncryptedKey missing") + case len(opts.EncryptedKeyNonce) == 0: + return ErrInvalidRequest.New("EncryptedKeyNonce missing") + case opts.EncryptedSize <= 0: + return ErrInvalidRequest.New("EncryptedSize negative or zero") + case opts.PlainSize <= 0: + return ErrInvalidRequest.New("PlainSize negative or zero") + case opts.PlainOffset < 0: + return ErrInvalidRequest.New("PlainOffset negative") + case opts.Redundancy.IsZero(): + return ErrInvalidRequest.New("Redundancy zero") + } + + // TODO: verify opts.Pieces content is non-zero + // TODO: verify opts.Pieces is compatible with opts.Redundancy + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + // Verify that object exists and is partial. + var value int + err = tx.QueryRowContext(ctx, ` + SELECT 1 + FROM objects WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + stream_id = $5 AND + status = 0 + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Error.New("pending object missing") + } + return Error.New("unable to query object status: %w", err) + } + + // Insert into segments. + _, err = tx.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, + redundancy, + remote_pieces + ) VALUES ( + $1, $2, + $3, $4, $5, + $6, $7, $8, + $9, + $10 + )`, + opts.StreamID, opts.Position, + opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, + opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, + redundancyScheme{&opts.Redundancy}, + opts.Pieces, + ) + if err != nil { + if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { + return ErrConflict.New("segment already exists") + } + return Error.New("unable to insert segment: %w", err) + } + + err, committed = tx.Commit(), true + if err != nil { + return Error.New("unable to commit tx: %w", err) + } + + return nil +} + +// CommitInlineSegment contains all necessary information about the segment. +type CommitInlineSegment struct { + ObjectStream + + Position SegmentPosition + RootPieceID storj.PieceID // TODO: do we need this? + + EncryptedKeyNonce []byte + EncryptedKey []byte + + PlainOffset int64 // offset in the original data stream + PlainSize int32 // size before encryption + EncryptedSize int32 // segment size after encryption + + Redundancy storj.RedundancyScheme // TODO: do we need this? + + InlineData []byte +} + +// CommitInlineSegment commits inline segment to the database. +func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return err + } + + // TODO: do we have a lower limit for inline data? + + switch { + case opts.RootPieceID.IsZero(): + return ErrInvalidRequest.New("RootPieceID missing") + case len(opts.InlineData) == 0: + return ErrInvalidRequest.New("InlineData missing") + case len(opts.EncryptedKey) == 0: + return ErrInvalidRequest.New("EncryptedKey missing") + case len(opts.EncryptedKeyNonce) == 0: + return ErrInvalidRequest.New("EncryptedKeyNonce missing") + case opts.EncryptedSize <= 0: + return ErrInvalidRequest.New("EncryptedSize negative or zero") + case opts.PlainSize <= 0: + return ErrInvalidRequest.New("PlainSize negative or zero") + case opts.PlainOffset < 0: + return ErrInvalidRequest.New("PlainOffset negative") + case opts.Redundancy.IsZero(): + return ErrInvalidRequest.New("Redundancy zero") + } + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + // Verify that object exists and is partial. + var value int + err = tx.QueryRowContext(ctx, ` + SELECT 1 + FROM objects WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + stream_id = $5 AND + status = 0 + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID).Scan(&value) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Error.New("pending object missing") + } + return Error.New("unable to query object status: %w", err) + } + + // Insert into segments. + _, err = tx.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, + redundancy, + inline_data + ) VALUES ( + $1, $2, + $3, $4, $5, + $6, $7, $8, + $9, + $10 + )`, + opts.StreamID, opts.Position, + opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, + opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, + redundancyScheme{&opts.Redundancy}, + opts.InlineData, + ) + if err != nil { + if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation { + return ErrConflict.New("segment already exists") + } + return Error.New("unable to insert segment: %w", err) + } + + err, committed = tx.Commit(), true + if err != nil { + return Error.New("unable to commit tx: %w", err) + } + + return nil +} + +// CommitObject contains arguments necessary for committing an object. +type CommitObject struct { + ObjectStream + + Encryption storj.EncryptionParameters + + // TODO: proof + Proofs []SegmentProof +} + +// SegmentProof ensures that segments cannot be tampered with. +type SegmentProof struct{} + +// CommitObject adds a pending object to the database. +func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.ObjectStream.Verify(); err != nil { + return err + } + + if opts.Encryption.IsZero() { + return ErrInvalidRequest.New("encryption is zero") + } + + // TODO: deduplicate basic checks. + switch { + case len(opts.Proofs) > 0: + return db.commitObjectWithProofs(ctx, opts) + default: + return db.commitObjectWithoutProofs(ctx, opts) + } +} + +func (db *DB) commitObjectWithoutProofs(ctx context.Context, opts CommitObject) (err error) { + defer mon.Task()(&ctx)(&err) + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + // TODO: fetch info from segments + + result, err := tx.ExecContext(ctx, ` + UPDATE objects SET + status = 1, -- committed + encryption = $6, + segment_count = 0, -- TODO + total_encrypted_size = 0, -- TODO + fixed_segment_size = 0, -- TODO + zombie_deletion_deadline = NULL + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + stream_id = $5 AND + status = 0; + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version, opts.StreamID, + encryptionParameters{&opts.Encryption}, + ) + if err != nil { + return Error.New("failed to update object: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return Error.New("failed to get rows affected: %w", err) + } + if rowsAffected == 0 { + return Error.New("object with specified version and pending status is missing") + } + + // TODO: delete segments + + err = tx.Commit() + committed = true + + return Error.Wrap(err) +} + +func (db *DB) commitObjectWithProofs(ctx context.Context, opts CommitObject) (err error) { + defer mon.Task()(&ctx)(&err) + return Error.New("unimplemented") +} diff --git a/satellite/metainfo/metabase/commit_test.go b/satellite/metainfo/metabase/commit_test.go new file mode 100644 index 000000000..fc8016a8f --- /dev/null +++ b/satellite/metainfo/metabase/commit_test.go @@ -0,0 +1,1312 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "testing" + "time" + + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + "storj.io/storj/satellite/metainfo/metabase" +) + +var defaultTestRedundancy = storj.RedundancyScheme{ + Algorithm: storj.ReedSolomon, + ShareSize: 2048, + RequiredShares: 4, + RepairShares: 5, + OptimalShares: 6, + TotalShares: 7, +} + +var defaultTestEncryption = storj.EncryptionParameters{ + CipherSuite: storj.EncAESGCM, + BlockSize: 29 * 256, +} + +func randObjectStream() metabase.ObjectStream { + return metabase.ObjectStream{ + ProjectID: testrand.UUID(), + BucketName: testrand.BucketName(), + ObjectKey: metabase.ObjectKey(testrand.Bytes(16)), + Version: 1, + StreamID: testrand.UUID(), + } +} + +type invalidObjectStream struct { + Name string + ObjectStream metabase.ObjectStream + ErrClass *errs.Class + ErrText string +} + +func invalidObjectStreams(base metabase.ObjectStream) []invalidObjectStream { + var tests []invalidObjectStream + { + stream := base + stream.ProjectID = uuid.UUID{} + tests = append(tests, invalidObjectStream{ + Name: "ProjectID missing", + ObjectStream: stream, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "ProjectID missing", + }) + } + { + stream := base + stream.BucketName = "" + tests = append(tests, invalidObjectStream{ + Name: "BucketName missing", + ObjectStream: stream, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "BucketName missing", + }) + } + { + stream := base + stream.ObjectKey = "" + tests = append(tests, invalidObjectStream{ + Name: "ObjectKey missing", + ObjectStream: stream, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "ObjectKey missing", + }) + } + { + stream := base + stream.Version = -1 + tests = append(tests, invalidObjectStream{ + Name: "Version invalid", + ObjectStream: stream, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version invalid: -1", + }) + } + { + stream := base + stream.StreamID = uuid.UUID{} + tests = append(tests, invalidObjectStream{ + Name: "StreamID missing", + ObjectStream: stream, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "StreamID missing", + }) + } + + return tests +} + +func TestBeginObjectNextVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: test.ObjectStream, + }, + Version: -1, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("disallow exact version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: -1, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version should be metabase.NextVersion", + }.Check(ctx, t, db) + }) + + t.Run("NextVersion", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: metabase.NextVersion, + StreamID: obj.StreamID, + }, + }, + Version: 1, + }.Check(ctx, t, db) + + now2 := time.Now() + BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: metabase.NextVersion, + StreamID: obj.StreamID, + }, + }, + Version: 2, + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 1, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + Status: metabase.Pending, + }, + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 2, + StreamID: obj.StreamID, + }, + CreatedAt: now2, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + // TODO: expires at date + // TODO: zombie deletion deadline + + // TODO: older committed version exists + // TODO: newer committed version exists, we could reject the request + }) +} + +func TestBeginObjectExactVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: test.ObjectStream, + }, + Version: -1, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("disallow NextVersion", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: metabase.NextVersion, + StreamID: obj.StreamID, + }, + }, + Version: -1, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version should not be metabase.NextVersion", + }.Check(ctx, t, db) + }) + + t.Run("Specific version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: 5, + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Duplicate pending version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: 5, + }.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: -1, + ErrClass: &metabase.ErrConflict, + ErrText: "object already exists", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Duplicate committed version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: 5, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: -1, + ErrClass: &metabase.ErrConflict, + ErrText: "object already exists", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now1, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }, + }.Check(ctx, t, db) + }) + // TODO: expires at date + // TODO: zombie deletion deadline + + // TODO: older committed version exists + // TODO: newer committed version exists, we could reject the request + }) +} + +func TestBeginSegment(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: test.ObjectStream, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("RootPieceID missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "RootPieceID missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Pieces missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + RootPieceID: storj.PieceID{1}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Pieces missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("pending object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + RootPieceID: storj.PieceID{1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + ErrClass: &metabase.Error, + ErrText: "pending object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("pending object missing when object committed", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + now := time.Now() + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + RootPieceID: storj.PieceID{1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + ErrClass: &metabase.Error, + ErrText: "pending object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("begin segment successfully", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + now := time.Now() + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + RootPieceID: storj.PieceID{1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("multiple begin segment successfully", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + now := time.Now() + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + for i := 0; i < 5; i++ { + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + RootPieceID: storj.PieceID{1}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + } + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + }) +} + +func TestCommitSegment(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: test.ObjectStream, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("invalid request", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "RootPieceID missing", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Pieces missing", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedKey missing", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + + EncryptedKey: testrand.Bytes(32), + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedKeyNonce missing", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedSize negative or zero", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "PlainSize negative or zero", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "PlainOffset negative", + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + Pieces: metabase.Pieces{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Redundancy zero", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("duplicate", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + ErrClass: &metabase.ErrConflict, + ErrText: "segment already exists", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now1, + Status: metabase.Pending, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: defaultTestRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of missing object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + ErrClass: &metabase.Error, + ErrText: "pending object missing", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("commit segment of committed object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + now := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + ErrClass: &metabase.Error, + ErrText: "pending object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of pending object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + now := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: defaultTestRedundancy, + + Pieces: pieces, + }, + }}.Check(ctx, t, db) + }) + }) +} + +func TestCommitInlineSegment(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: test.ObjectStream, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("invalid request", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "RootPieceID missing", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "InlineData missing", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + InlineData: []byte{1, 2, 3}, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedKey missing", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + InlineData: []byte{1, 2, 3}, + + EncryptedKey: testrand.Bytes(32), + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedKeyNonce missing", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + InlineData: []byte{1, 2, 3}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "EncryptedSize negative or zero", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + InlineData: []byte{1, 2, 3}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "PlainSize negative or zero", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + InlineData: []byte{1, 2, 3}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: -1, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "PlainOffset negative", + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + RootPieceID: testrand.PieceID(), + + InlineData: []byte{1, 2, 3}, + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Redundancy zero", + }.Check(ctx, t, db) + }) + + t.Run("duplicate", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + ErrClass: &metabase.ErrConflict, + ErrText: "segment already exists", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now1, + Status: metabase.Pending, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: defaultTestRedundancy, + + InlineData: []byte{1, 2, 3}, + }, + }, + }.Check(ctx, t, db) + }) + // TODO: + }) +} + +func TestCommitObject(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + for _, test := range invalidObjectStreams(obj) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: test.ObjectStream, + Encryption: defaultTestEncryption, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("no proofs with version without pending", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: defaultTestEncryption, + }, + ErrClass: &metabase.Error, + ErrText: "object with specified version and pending status is missing", // TODO: this error message could be better + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("no proofs with version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + }, + Version: 5, + }.Check(ctx, t, db) + now := time.Now() + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + // disallow for double commit + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + Encryption: defaultTestEncryption, + }, + ErrClass: &metabase.Error, + ErrText: "object with specified version and pending status is missing", // TODO: this error message could be better + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 5, + StreamID: obj.StreamID, + }, + CreatedAt: now, + Status: metabase.Committed, + Encryption: defaultTestEncryption, + }, + }, + }.Check(ctx, t, db) + }) + }) +} diff --git a/satellite/metainfo/metabase/common.go b/satellite/metainfo/metabase/common.go index 6a1fb567b..8ae17f4ab 100644 --- a/satellite/metainfo/metabase/common.go +++ b/satellite/metainfo/metabase/common.go @@ -107,6 +107,19 @@ func (obj ObjectLocation) Segment(index int64) (SegmentLocation, error) { }, nil } +// Verify object location fields. +func (obj ObjectLocation) Verify() error { + switch { + case obj.ProjectID.IsZero(): + return ErrInvalidRequest.New("ProjectID missing") + case obj.BucketName == "": + return ErrInvalidRequest.New("BucketName missing") + case len(obj.ObjectKey) == 0: + return ErrInvalidRequest.New("ObjectKey missing") + } + return nil +} + // SegmentKey is an encoded metainfo key. This is used as the key in pointerdb key-value store. type SegmentKey []byte @@ -187,6 +200,77 @@ func (seg SegmentLocation) Encode() SegmentKey { )) } +// ObjectStream uniquely defines an object and stream. +// +// TODO: figure out whether ther's a better name. +type ObjectStream struct { + ProjectID uuid.UUID + BucketName string + ObjectKey ObjectKey + Version Version + StreamID uuid.UUID +} + +// Verify object stream fields. +func (obj *ObjectStream) Verify() error { + switch { + case obj.ProjectID.IsZero(): + return ErrInvalidRequest.New("ProjectID missing") + case obj.BucketName == "": + return ErrInvalidRequest.New("BucketName missing") + case len(obj.ObjectKey) == 0: + return ErrInvalidRequest.New("ObjectKey missing") + case obj.Version < 0: + return ErrInvalidRequest.New("Version invalid: %v", obj.Version) + case obj.StreamID.IsZero(): + return ErrInvalidRequest.New("StreamID missing") + } + return nil +} + +// Location returns object location. +func (obj *ObjectStream) Location() ObjectLocation { + return ObjectLocation{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + } +} + +// SegmentPosition is segment part and index combined. +type SegmentPosition struct { + Part uint32 + Index uint32 +} + +// SegmentPositionFromEncoded decodes an uint64 into a SegmentPosition. +func SegmentPositionFromEncoded(v uint64) SegmentPosition { + return SegmentPosition{ + Part: uint32(v >> 32), + Index: uint32(v), + } +} + +// Encode encodes a segment position into an uint64, that can be stored in a database. +func (pos SegmentPosition) Encode() uint64 { return uint64(pos.Part)<<32 | uint64(pos.Index) } + +// Version is used to uniquely identify objects with the same key. +type Version int64 + +// NextVersion means that the version should be chosen automatically. +const NextVersion = Version(0) + +// ObjectStatus defines the statuses that the object might be in. +type ObjectStatus byte + +const ( + // Pending means that the object is being uploaded or that the client failed during upload. + // The failed upload may be continued in the future. + Pending = ObjectStatus(0) + // Committed means that the object is finished and should be visible for general listing. + Committed = ObjectStatus(1) +) + // Pieces defines information for pieces. type Pieces []Piece diff --git a/satellite/metainfo/metabase/db.go b/satellite/metainfo/metabase/db.go new file mode 100644 index 000000000..4c5651ef5 --- /dev/null +++ b/satellite/metainfo/metabase/db.go @@ -0,0 +1,122 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +// Package metabase implements storing objects and segements. +package metabase + +import ( + "context" + + _ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver. + _ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver. + "github.com/spacemonkeygo/monkit/v3" + + "storj.io/storj/private/tagsql" +) + +var ( + mon = monkit.Package() +) + +// DB implements a database for storing objects and segments. +type DB struct { + db tagsql.DB +} + +// Open opens a connection to metabase. +func Open(driverName, connstr string) (*DB, error) { + db, err := tagsql.Open(driverName, connstr) + if err != nil { + return nil, Error.Wrap(err) + } + + return &DB{db: db}, nil +} + +// Ping checks whether connection has been established. +func (db *DB) Ping(ctx context.Context) error { + return Error.Wrap(db.db.PingContext(ctx)) +} + +// Close closes the connection to database. +func (db *DB) Close() error { + return Error.Wrap(db.db.Close()) +} + +// DestroyTables deletes all tables. +// +// TODO: remove this, only for bootstrapping. +func (db *DB) DestroyTables(ctx context.Context) error { + _, err := db.db.ExecContext(ctx, ` + DROP TABLE IF EXISTS objects; + DROP TABLE IF EXISTS segments; + `) + return Error.Wrap(err) +} + +// MigrateToLatest migrates database to the latest version. +// +// TODO: use migrate package. +func (db *DB) MigrateToLatest(ctx context.Context) error { + var err error + + // TODO: verify whether this is all we need. + _, err = db.db.ExecContext(ctx, ` + CREATE TABLE objects ( + project_id BYTEA NOT NULL, + bucket_name BYTEA NOT NULL, -- we're using bucket_name here to avoid a lookup into buckets table + object_key BYTEA NOT NULL, -- using 'object_key' instead of 'key' to avoid reserved word + version INT4 NOT NULL, + stream_id BYTEA NOT NULL, + + created_at TIMESTAMPTZ NOT NULL default now(), + expires_at TIMESTAMPTZ, + + status INT2 NOT NULL default 0, + segment_count INT4 NOT NULL default 0, + + encrypted_metadata_nonce BYTEA default NULL, + encrypted_metadata BYTEA default NULL, + + total_encrypted_size INT4 NOT NULL default 0, + fixed_segment_size INT4 NOT NULL default 0, + + encryption INT8 NOT NULL default 0, + + zombie_deletion_deadline TIMESTAMPTZ default now() + '1 day', -- should this be in a separate table? + + PRIMARY KEY (project_id, bucket_name, object_key, version) + ); + `) + if err != nil { + return Error.New("failed to create objects table: %w", err) + } + + // TODO: verify whether this is all we need. + _, err = db.db.ExecContext(ctx, ` + CREATE TABLE segments ( + stream_id BYTEA NOT NULL, + position INT8 NOT NULL, + + root_piece_id BYTEA NOT NULL, + encrypted_key_nonce BYTEA NOT NULL, + encrypted_key BYTEA NOT NULL, + + encrypted_size INT4 NOT NULL, -- maybe this can be skipped? + plain_offset INT8 NOT NULL, -- this is needed to find segment based on plain byte offset + plain_size INT4 NOT NULL, + + redundancy INT8 NOT NULL default 0, + + inline_data BYTEA DEFAULT NULL, + remote_pieces BYTEA[], + + PRIMARY KEY (stream_id, position) -- TODO: should this use plain_offset for the primary index? + ) + `) + if err != nil { + return Error.New("failed to create segments table: %w", err) + } + + return nil +} diff --git a/satellite/metainfo/metabase/db_test.go b/satellite/metainfo/metabase/db_test.go new file mode 100644 index 000000000..dfefcdc11 --- /dev/null +++ b/satellite/metainfo/metabase/db_test.go @@ -0,0 +1,83 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "flag" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/testcontext" + "storj.io/storj/satellite/metainfo/metabase" +) + +var databases = flag.String("databases", os.Getenv("STORJ_TEST_DATABASES"), "databases to use for testing") + +func All(t *testing.T, fn func(ctx *testcontext.Context, t *testing.T, db *metabase.DB)) { + type dbinfo struct { + name string + driver string + connstr string + } + + infos := []dbinfo{ + {"pg", "pgx", "postgres://storj:storj-pass@localhost/metabase?sslmode=disable"}, + {"crdb", "pgx", "postgres://root@localhost:26257/metabase?sslmode=disable"}, + } + if *databases != "" { + infos = nil + for _, db := range strings.Split(*databases, ";") { + toks := strings.Split(strings.TrimSpace(db), "|") + infos = append(infos, dbinfo{toks[0], toks[1], toks[2]}) + } + } + + for _, info := range infos { + info := info + t.Run(info.name, func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + db, err := metabase.Open(info.driver, info.connstr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := db.Close(); err != nil { + t.Error(err) + } + }() + + // TODO: use schemas instead + if err := db.DestroyTables(ctx); err != nil { + t.Fatal(err) + } + if err := db.MigrateToLatest(ctx); err != nil { + t.Fatal(err) + } + defer func() { + if err := db.DestroyTables(ctx); err != nil { + t.Fatal(err) + } + }() + + fn(ctx, t, db) + }) + } +} + +func TestSetup(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.Ping(ctx) + require.NoError(t, err) + + _, err = db.TestingGetState(ctx) + require.NoError(t, err) + }) +} diff --git a/satellite/metainfo/metabase/delete.go b/satellite/metainfo/metabase/delete.go new file mode 100644 index 000000000..d72f80643 --- /dev/null +++ b/satellite/metainfo/metabase/delete.go @@ -0,0 +1,320 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "database/sql" + "errors" + + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/storj/private/tagsql" +) + +// DeleteObjectExactVersion contains arguments necessary for deleting an exact version of object. +type DeleteObjectExactVersion struct { + Version Version + ObjectLocation +} + +// Verify delete object fields. +func (obj *DeleteObjectExactVersion) Verify() error { + if err := obj.ObjectLocation.Verify(); err != nil { + return err + } + if obj.Version <= 0 { + return ErrInvalidRequest.New("Version invalid: %v", obj.Version) + } + return nil +} + +// DeleteObjectResult result of deleting object. +type DeleteObjectResult struct { + Segments []DeletedSegmentInfo +} + +// DeletedSegmentInfo info about deleted segment. +type DeletedSegmentInfo struct { + // TODO figure out which part of object are needed to delete from SN + RootPieceID storj.PieceID + Pieces Pieces +} + +// DeleteObjectAllVersions contains arguments necessary for deleting all object versions. +type DeleteObjectAllVersions struct { + ObjectLocation +} + +// DeleteObjectLatestVersion contains arguments necessary for deleting latest object version. +type DeleteObjectLatestVersion struct { + ObjectLocation +} + +// DeleteObjectExactVersion deletes an exact object version. +func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion) (result DeleteObjectResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return DeleteObjectResult{}, err + } + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return DeleteObjectResult{}, Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + rows, err := tx.Query(ctx, ` + DELETE FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + status = 1 + RETURNING stream_id; + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return DeleteObjectResult{}, Error.New("object missing") + } + return DeleteObjectResult{}, Error.New("unable to delete object: %w", err) + } + + ids, err := scanObjectDeletion(rows) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(ids) == 0 { + return DeleteObjectResult{}, Error.New("object missing") + } + + segmentInfos, err := deleteSegments(ctx, tx, ids) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(segmentInfos) != 0 { + result.Segments = segmentInfos + } + + err, committed = tx.Commit(), true + if err != nil { + return DeleteObjectResult{}, Error.New("unable to commit tx: %w", err) + } + + return result, nil +} + +// DeleteObjectLatestVersion deletes latest object version. +func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLatestVersion) (result DeleteObjectResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return DeleteObjectResult{}, err + } + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return DeleteObjectResult{}, Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + // TODO different sql for Postgres and CockroachDB + // version ONLY for cockroachdb + // Postgres doesn't support ORDER BY and LIMIT in DELETE + // rows, err = tx.Query(ctx, ` + // DELETE FROM objects + // WHERE + // project_id = $1 AND + // bucket_name = $2 AND + // object_key = $3 AND + // status = 1 + // ORDER BY version DESC + // LIMIT 1 + // RETURNING stream_id; + // `, opts.ProjectID, opts.BucketName, opts.ObjectKey) + + // version for Postgres and Cockroachdb (but slow for Cockroachdb) + rows, err := tx.Query(ctx, ` + DELETE FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = (SELECT version FROM objects WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + status = 1 + ORDER BY version DESC LIMIT 1 + ) AND + status = 1 + RETURNING stream_id; + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey)) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return DeleteObjectResult{}, Error.New("object missing") + } + return DeleteObjectResult{}, Error.New("unable to delete object: %w", err) + } + + ids, err := scanObjectDeletion(rows) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(ids) == 0 { + return DeleteObjectResult{}, Error.New("object missing") + } + + segmentInfos, err := deleteSegments(ctx, tx, ids) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(segmentInfos) != 0 { + result.Segments = segmentInfos + } + + err, committed = tx.Commit(), true + if err != nil { + return DeleteObjectResult{}, Error.New("unable to commit tx: %w", err) + } + + return result, nil +} + +// DeleteObjectAllVersions deletes all object versions. +func (db *DB) DeleteObjectAllVersions(ctx context.Context, opts DeleteObjectAllVersions) (result DeleteObjectResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return DeleteObjectResult{}, err + } + + tx, err := db.db.BeginTx(ctx, nil) + if err != nil { + return DeleteObjectResult{}, Error.New("failed BeginTx: %w", err) + } + committed := false + defer func() { + if !committed { + err = errs.Combine(err, Error.Wrap(tx.Rollback())) + } + }() + + rows, err := tx.Query(ctx, ` + DELETE FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + status = 1 + RETURNING stream_id; + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey)) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return DeleteObjectResult{}, Error.New("object missing") + } + return DeleteObjectResult{}, Error.New("unable to delete object: %w", err) + } + + ids, err := scanObjectDeletion(rows) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(ids) == 0 { + return DeleteObjectResult{}, Error.New("object missing") + } + + segmentInfos, err := deleteSegments(ctx, tx, ids) + if err != nil { + return DeleteObjectResult{}, err + } + + if len(segmentInfos) != 0 { + result.Segments = segmentInfos + } + + err, committed = tx.Commit(), true + if err != nil { + return DeleteObjectResult{}, Error.New("unable to commit tx: %w", err) + } + + return result, nil +} + +func scanObjectDeletion(rows tagsql.Rows) (segmentIds []interface{}, err error) { + defer func() { err = errs.Combine(err, rows.Close()) }() + + ids := make([]interface{}, 0, 10) + for rows.Next() { + var streamID []byte + err = rows.Scan(&streamID) + if err != nil { + return []interface{}{}, Error.New("unable to delete object: %w", err) + } + + ids = append(ids, streamID) + } + + if err := rows.Err(); err != nil { + return []interface{}{}, Error.New("unable to delete object: %w", err) + } + return ids, nil +} + +func deleteSegments(ctx context.Context, tx tagsql.Tx, segmentIds []interface{}) (_ []DeletedSegmentInfo, err error) { + defer mon.Task()(&ctx)(&err) + + // TODO we need to figure out how integrate this with piece deletion code + // one issue is that with this approach we need to return all pieces SN ids at once + + infos := make([]DeletedSegmentInfo, 0, len(segmentIds)) + for _, id := range segmentIds { + segmentsRows, err := tx.Query(ctx, ` + DELETE FROM segments + WHERE stream_id = $1 + RETURNING root_piece_id, remote_pieces; + `, id) + if err != nil { + return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err) + } + + for segmentsRows.Next() { + var segmentInfo DeletedSegmentInfo + err = segmentsRows.Scan(&segmentInfo.RootPieceID, &segmentInfo.Pieces) + if err != nil { + return []DeletedSegmentInfo{}, errs.Combine(Error.New("unable to delete object: %w", err), segmentsRows.Close()) + } + + if len(segmentInfo.Pieces) != 0 { + infos = append(infos, segmentInfo) + } + } + if err := segmentsRows.Err(); err != nil { + return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err) + } + + if err := segmentsRows.Close(); err != nil { + return []DeletedSegmentInfo{}, Error.New("unable to delete object: %w", err) + } + } + return infos, nil +} diff --git a/satellite/metainfo/metabase/delete_test.go b/satellite/metainfo/metabase/delete_test.go new file mode 100644 index 000000000..e51510e04 --- /dev/null +++ b/satellite/metainfo/metabase/delete_test.go @@ -0,0 +1,691 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "testing" + "time" + + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + "storj.io/storj/satellite/metainfo/metabase" +) + +type invalidObjectLocation struct { + Name string + ObjectLocation metabase.ObjectLocation + ErrClass *errs.Class + ErrText string +} + +func invalidObjectLocations(base metabase.ObjectLocation) []invalidObjectLocation { + var tests []invalidObjectLocation + { + location := base + location.ProjectID = uuid.UUID{} + tests = append(tests, invalidObjectLocation{ + Name: "ProjectID missing", + ObjectLocation: location, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "ProjectID missing", + }) + } + { + location := base + location.BucketName = "" + tests = append(tests, invalidObjectLocation{ + Name: "BucketName missing", + ObjectLocation: location, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "BucketName missing", + }) + } + { + location := base + location.ObjectKey = "" + tests = append(tests, invalidObjectLocation{ + Name: "ObjectKey missing", + ObjectLocation: location, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "ObjectKey missing", + }) + } + + return tests +} + +func TestDeleteObjectExactVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + location := obj.Location() + + now := time.Now() + + for _, test := range invalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Version invalid", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 0, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version invalid: 0", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete non existing object version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 33, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete partial object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Delete object without segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 2) + + expectedSegmentInfo := metabase.DeletedSegmentInfo{ + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + } + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + Result: metabase.DeleteObjectResult{ + Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + }, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with inline segment", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: testrand.PieceID(), + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + InlineData: testrand.Bytes(1024), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + DeleteObjectExactVersion{ + Opts: metabase.DeleteObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + }) +} + +func TestDeleteObjectLatestVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + location := obj.Location() + + now := time.Now() + + for _, test := range invalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ObjectLocation: location}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete non existing object version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ObjectLocation: location}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete partial object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ObjectLocation: obj.Location()}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Delete object without segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ + ObjectLocation: obj.Location(), + }, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 2) + + expectedSegmentInfo := metabase.DeletedSegmentInfo{ + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + } + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ + ObjectLocation: location, + }, + Result: metabase.DeleteObjectResult{ + Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + }, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with inline segment", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: testrand.PieceID(), + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + InlineData: testrand.Bytes(1024), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ + ObjectLocation: obj.Location(), + }, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete latest from multiple versions", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + obj := randObjectStream() + + // first version + obj.Version = metabase.Version(10) + createObject(ctx, t, db, obj, 1) + + // second version, to delete + secondObject := metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 11, + StreamID: testrand.UUID(), + } + createObject(ctx, t, db, secondObject, 1) + + expectedSegmentInfo := metabase.DeletedSegmentInfo{ + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + } + + DeleteObjectLatestVersion{ + Opts: metabase.DeleteObjectLatestVersion{ + ObjectLocation: obj.Location(), + }, + Result: metabase.DeleteObjectResult{ + Segments: []metabase.DeletedSegmentInfo{ + expectedSegmentInfo, + }, + }, + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 1}, + + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + + Redundancy: defaultTestRedundancy, + + // InlineData []byte + + }, + }, + }.Check(ctx, t, db) + }) + }) +} + +func TestDeleteObjectAllVersions(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + location := obj.Location() + + now := time.Now() + + for _, test := range invalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: test.ObjectLocation}, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete non existing object version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete partial object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Delete object without segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with segments", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 2) + + expectedSegmentInfo := metabase.DeletedSegmentInfo{ + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + } + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ + ObjectLocation: location, + }, + Result: metabase.DeleteObjectResult{ + Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo}, + }, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete object with inline segment", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: testrand.PieceID(), + + EncryptedKey: testrand.Bytes(32), + EncryptedKeyNonce: testrand.Bytes(32), + + InlineData: testrand.Bytes(1024), + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + Result: metabase.DeleteObjectResult{}, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Delete multiple versions of the same object at once", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + expected := metabase.DeleteObjectResult{} + + obj := randObjectStream() + for i := 1; i <= 10; i++ { + obj.StreamID = testrand.UUID() + obj.Version = metabase.Version(i) + createObject(ctx, t, db, obj, 1) + + expected.Segments = append(expected.Segments, metabase.DeletedSegmentInfo{ + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + }) + } + + DeleteObjectAllVersions{ + Opts: metabase.DeleteObjectAllVersions{ObjectLocation: obj.Location()}, + Result: expected, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + }) +} + +func createObject(ctx *testcontext.Context, t *testing.T, db *metabase.DB, obj metabase.ObjectStream, numberOfSegments byte) { + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: obj.Version, + }.Check(ctx, t, db) + + for i := byte(1); i <= numberOfSegments; i++ { + BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{i}, + Pieces: []metabase.Piece{{ + Number: 1, + StorageNode: testrand.NodeID(), + }}, + }, + }.Check(ctx, t, db) + + CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: uint32(i)}, + RootPieceID: storj.PieceID{1}, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: defaultTestRedundancy, + }, + }.Check(ctx, t, db) + } + + CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: obj, + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) +} diff --git a/satellite/metainfo/metabase/encoding.go b/satellite/metainfo/metabase/encoding.go new file mode 100644 index 000000000..b9aac712c --- /dev/null +++ b/satellite/metainfo/metabase/encoding.go @@ -0,0 +1,198 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "database/sql/driver" + "encoding/binary" + + "github.com/jackc/pgtype" + + "storj.io/common/storj" +) + +// !!!! NB !!!! +// +// Should we use protobuf here? + +type encryptionParameters struct { + *storj.EncryptionParameters +} + +// Check that EncryptionParameters layout doesn't change. +var _ struct { + CipherSuite storj.CipherSuite + BlockSize int32 +} = storj.EncryptionParameters{} + +// TODO: do we want to use this encoding? do we want it to be extensible? + +// Value implements sql/driver.Valuer interface. +func (params encryptionParameters) Value() (driver.Value, error) { + var bytes [8]byte + bytes[0] = byte(params.CipherSuite) + binary.LittleEndian.PutUint32(bytes[1:], uint32(params.BlockSize)) + return int64(binary.LittleEndian.Uint64(bytes[:])), nil +} + +// Scan implements sql.Scanner interface. +func (params encryptionParameters) Scan(value interface{}) error { + switch value := value.(type) { + case int64: + var bytes [8]byte + binary.LittleEndian.PutUint64(bytes[:], uint64(value)) + params.CipherSuite = storj.CipherSuite(bytes[0]) + params.BlockSize = int32(binary.LittleEndian.Uint32(bytes[1:])) + return nil + default: + return Error.New("unable to scan %T into EncryptionParameters", value) + } +} + +// Value implements sql/driver.Valuer interface. +func (params SegmentPosition) Value() (driver.Value, error) { + return int64(params.Encode()), nil +} + +// Scan implements sql.Scanner interface. +func (params *SegmentPosition) Scan(value interface{}) error { + switch value := value.(type) { + case int64: + *params = SegmentPositionFromEncoded(uint64(value)) + return nil + default: + return Error.New("unable to scan %T into EncryptionParameters", value) + } +} + +type redundancyScheme struct { + *storj.RedundancyScheme +} + +// Check that RedundancyScheme layout doesn't change. +var _ struct { + Algorithm storj.RedundancyAlgorithm + ShareSize int32 + RequiredShares int16 + RepairShares int16 + OptimalShares int16 + TotalShares int16 +} = storj.RedundancyScheme{} + +// TODO: maybe should use protobuf here instead? + +func (params redundancyScheme) Value() (driver.Value, error) { + switch { + case params.ShareSize < 0 || params.ShareSize >= 1<<24: + return nil, Error.New("invalid share size %v", params.ShareSize) + case params.RequiredShares < 0 || params.RequiredShares >= 1<<8: + return nil, Error.New("invalid required shares %v", params.RequiredShares) + case params.RepairShares < 0 || params.RepairShares >= 1<<8: + return nil, Error.New("invalid repair shares %v", params.RepairShares) + case params.OptimalShares < 0 || params.OptimalShares >= 1<<8: + return nil, Error.New("invalid optimal shares %v", params.OptimalShares) + case params.TotalShares < 0 || params.TotalShares >= 1<<8: + return nil, Error.New("invalid total shares %v", params.TotalShares) + } + + var bytes [8]byte + bytes[0] = byte(params.Algorithm) + + // little endian uint32 + bytes[1] = byte(params.ShareSize >> 0) + bytes[2] = byte(params.ShareSize >> 8) + bytes[3] = byte(params.ShareSize >> 16) + + bytes[4] = byte(params.RequiredShares) + bytes[5] = byte(params.RepairShares) + bytes[6] = byte(params.OptimalShares) + bytes[7] = byte(params.TotalShares) + + return int64(binary.LittleEndian.Uint64(bytes[:])), nil +} + +func (params redundancyScheme) Scan(value interface{}) error { + switch value := value.(type) { + case int64: + var bytes [8]byte + binary.LittleEndian.PutUint64(bytes[:], uint64(value)) + + params.Algorithm = storj.RedundancyAlgorithm(bytes[0]) + + // little endian uint32 + params.ShareSize = int32(bytes[1]) | int32(bytes[2])<<8 | int32(bytes[3])<<16 + + params.RequiredShares = int16(bytes[4]) + params.RepairShares = int16(bytes[5]) + params.OptimalShares = int16(bytes[6]) + params.TotalShares = int16(bytes[7]) + + return nil + default: + return Error.New("unable to scan %T into RedundancyScheme", value) + } +} + +// TODO: should we use some other encoding? + +// Value implements sql/driver.Valuer interface. +func (pieces Pieces) Value() (driver.Value, error) { + if len(pieces) == 0 { + arr := &pgtype.ByteaArray{Status: pgtype.Null} + return arr.Value() + } + + elems := make([]pgtype.Bytea, len(pieces)) + for i, piece := range pieces { + var buf [2 + len(piece.StorageNode)]byte + binary.BigEndian.PutUint16(buf[0:], piece.Number) + copy(buf[2:], piece.StorageNode[:]) + + elems[i].Bytes = buf[:] + elems[i].Status = pgtype.Present + } + + arr := &pgtype.ByteaArray{ + Elements: elems, + Dimensions: []pgtype.ArrayDimension{{Length: int32(len(pieces)), LowerBound: 1}}, + Status: pgtype.Present, + } + return arr.Value() +} + +type unexpectedDimension struct{} +type invalidElementLength struct{} + +func (unexpectedDimension) Error() string { return "unexpected data dimension" } +func (invalidElementLength) Error() string { return "invalid element length" } + +// Scan implements sql.Scanner interface. +func (pieces *Pieces) Scan(value interface{}) error { + var arr pgtype.ByteaArray + if err := arr.Scan(value); err != nil { + return err + } + + if len(arr.Dimensions) == 0 { + *pieces = nil + return nil + } else if len(arr.Dimensions) != 1 { + return unexpectedDimension{} + } + + scan := make(Pieces, len(arr.Elements)) + for i, elem := range arr.Elements { + piece := Piece{} + if len(elem.Bytes) != 2+len(piece.StorageNode) { + return invalidElementLength{} + } + + piece.Number = binary.BigEndian.Uint16(elem.Bytes[0:]) + copy(piece.StorageNode[:], elem.Bytes[2:]) + scan[i] = piece + } + + *pieces = scan + return nil +} diff --git a/satellite/metainfo/metabase/get.go b/satellite/metainfo/metabase/get.go new file mode 100644 index 000000000..e34052518 --- /dev/null +++ b/satellite/metainfo/metabase/get.go @@ -0,0 +1,197 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "database/sql" + "errors" + + "storj.io/common/uuid" +) + +// Object object metadata. +// TODO define separated struct. +type Object RawObject + +// Segment segment metadata. +// TODO define separated struct. +type Segment RawSegment + +// GetObjectExactVersion contains arguments necessary for fetching an information +// about exact object version. +type GetObjectExactVersion struct { + Version Version + ObjectLocation +} + +// Verify verifies get object reqest fields. +func (obj *GetObjectExactVersion) Verify() error { + if err := obj.ObjectLocation.Verify(); err != nil { + return err + } + if obj.Version <= 0 { + return ErrInvalidRequest.New("Version invalid: %v", obj.Version) + } + return nil +} + +// GetObjectExactVersion returns object information for exact version. +func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVersion) (_ Object, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return Object{}, err + } + + object := Object{} + // TODO handle encryption column + err = db.db.QueryRow(ctx, ` + SELECT + stream_id, + created_at, expires_at, + segment_count, + encrypted_metadata_nonce, encrypted_metadata, + total_encrypted_size, fixed_segment_size, + encryption + FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + version = $4 AND + status = 1 + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey), opts.Version). + Scan( + &object.StreamID, + &object.CreatedAt, &object.ExpiresAt, + &object.SegmentCount, + &object.EncryptedMetadataNonce, &object.EncryptedMetadata, + &object.TotalEncryptedSize, &object.FixedSegmentSize, + encryptionParameters{&object.Encryption}, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Object{}, Error.New("object missing") + } + return Object{}, Error.New("unable to query object status: %w", err) + } + + object.ProjectID = opts.ProjectID + object.BucketName = opts.BucketName + object.ObjectKey = opts.ObjectKey + object.Version = opts.Version + + object.Status = Committed + + return object, nil +} + +// GetObjectLatestVersion contains arguments necessary for fetching +// an object information for latest version. +type GetObjectLatestVersion struct { + ObjectLocation +} + +// GetObjectLatestVersion returns object information for latest version. +func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVersion) (_ Object, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return Object{}, err + } + + object := Object{} + err = db.db.QueryRow(ctx, ` + SELECT + stream_id, version, + created_at, expires_at, + segment_count, + encrypted_metadata_nonce, encrypted_metadata, + total_encrypted_size, fixed_segment_size, + encryption + FROM objects + WHERE + project_id = $1 AND + bucket_name = $2 AND + object_key = $3 AND + status = 1 + ORDER BY version desc + LIMIT 1 + `, opts.ProjectID, opts.BucketName, []byte(opts.ObjectKey)). + Scan( + &object.StreamID, &object.Version, + &object.CreatedAt, &object.ExpiresAt, + &object.SegmentCount, + &object.EncryptedMetadataNonce, &object.EncryptedMetadata, + &object.TotalEncryptedSize, &object.FixedSegmentSize, + encryptionParameters{&object.Encryption}, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Object{}, Error.New("object missing") + } + return Object{}, Error.New("unable to query object status: %w", err) + } + + object.ProjectID = opts.ProjectID + object.BucketName = opts.BucketName + object.ObjectKey = opts.ObjectKey + + object.Status = Committed + + return object, nil +} + +// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position. +type GetSegmentByPosition struct { + StreamID uuid.UUID + Position SegmentPosition +} + +// Verify verifies get segment request fields. +func (seg *GetSegmentByPosition) Verify() error { + if seg.StreamID.IsZero() { + return ErrInvalidRequest.New("StreamID missing") + } + return nil +} + +// GetSegmentByPosition returns a segment information. +func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPosition) (segment Segment, err error) { + defer mon.Task()(&ctx)(&err) + + if err := opts.Verify(); err != nil { + return Segment{}, err + } + + err = db.db.QueryRow(ctx, ` + SELECT + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, + redundancy, + inline_data, remote_pieces + FROM objects, segments + WHERE + segments.stream_id = $1 AND + segments.position = $2 + `, opts.StreamID, opts.Position.Encode()). + Scan( + &segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey, + &segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize, + redundancyScheme{&segment.Redundancy}, + &segment.InlineData, &segment.Pieces, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Segment{}, Error.New("segment missing") + } + return Segment{}, Error.New("unable to query segment: %w", err) + } + + segment.StreamID = opts.StreamID + segment.Position = opts.Position + + return segment, nil +} diff --git a/satellite/metainfo/metabase/get_test.go b/satellite/metainfo/metabase/get_test.go new file mode 100644 index 000000000..0e5bd404b --- /dev/null +++ b/satellite/metainfo/metabase/get_test.go @@ -0,0 +1,389 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "testing" + "time" + + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/storj/satellite/metainfo/metabase" +) + +func TestGetObjectExactVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + location := obj.Location() + + now := time.Now() + + for _, test := range invalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Version invalid", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: location, + Version: 0, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "Version invalid: 0", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Get not existing version", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: location, + Version: 11, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Get pending object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Get object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + GetObjectExactVersion{ + Opts: metabase.GetObjectExactVersion{ + ObjectLocation: location, + Version: 1, + }, + Result: metabase.Object{ + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + Verify{Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }}.Check(ctx, t, db) + }) + }) +} + +func TestGetObjectLatestVersion(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + location := obj.Location() + + now := time.Now() + + for _, test := range invalidObjectLocations(location) { + test := test + t.Run(test.Name, func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + GetObjectLatestVersion{ + Opts: metabase.GetObjectLatestVersion{ + ObjectLocation: test.ObjectLocation, + }, + ErrClass: test.ErrClass, + ErrText: test.ErrText, + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + } + + t.Run("Object missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + GetObjectLatestVersion{ + Opts: metabase.GetObjectLatestVersion{ + ObjectLocation: location, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Get pending object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + }, + Version: 1, + }.Check(ctx, t, db) + + GetObjectLatestVersion{ + Opts: metabase.GetObjectLatestVersion{ + ObjectLocation: location, + }, + ErrClass: &metabase.Error, + ErrText: "object missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Pending, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("Get object", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 0) + + GetObjectLatestVersion{ + Opts: metabase.GetObjectLatestVersion{ + ObjectLocation: location, + }, + Result: metabase.Object{ + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + Verify{Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }}.Check(ctx, t, db) + }) + + t.Run("Get latest object version from multiple", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + firstVersion := obj + createObject(ctx, t, db, firstVersion, 0) + secondVersion := metabase.ObjectStream{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + Version: 2, + StreamID: obj.StreamID, + } + createObject(ctx, t, db, secondVersion, 0) + + GetObjectLatestVersion{ + Opts: metabase.GetObjectLatestVersion{ + ObjectLocation: location, + }, + Result: metabase.Object{ + ObjectStream: secondVersion, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }.Check(ctx, t, db) + + Verify{Objects: []metabase.RawObject{ + { + ObjectStream: firstVersion, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + { + ObjectStream: secondVersion, + CreatedAt: now, + Status: metabase.Committed, + + Encryption: defaultTestEncryption, + }, + }}.Check(ctx, t, db) + }) + }) +} + +func TestGetSegmentByPosition(t *testing.T) { + All(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := randObjectStream() + + now := time.Now() + + t.Run("StreamID missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + GetSegmentByPosition{ + Opts: metabase.GetSegmentByPosition{}, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "StreamID missing", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Segment missing", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + GetSegmentByPosition{ + Opts: metabase.GetSegmentByPosition{ + StreamID: obj.StreamID, + }, + ErrClass: &metabase.Error, + ErrText: "segment missing", + }.Check(ctx, t, db) + + Verify{}.Check(ctx, t, db) + }) + + t.Run("Get segment", func(t *testing.T) { + defer DeleteAll{}.Check(ctx, t, db) + + createObject(ctx, t, db, obj, 1) + + expectedSegment := metabase.Segment{ + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{ + Index: 1, + }, + RootPieceID: storj.PieceID{1}, + EncryptedKey: []byte{3}, + EncryptedKeyNonce: []byte{4}, + EncryptedSize: 1024, + PlainSize: 512, + Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, + Redundancy: defaultTestRedundancy, + } + + GetSegmentByPosition{ + Opts: metabase.GetSegmentByPosition{ + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{ + Index: 1, + }, + }, + Result: expectedSegment, + }.Check(ctx, t, db) + + // check non existing segment in existing object + GetSegmentByPosition{ + Opts: metabase.GetSegmentByPosition{ + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{ + Index: 2, + }, + }, + ErrClass: &metabase.Error, + ErrText: "segment missing", + }.Check(ctx, t, db) + + Verify{ + Objects: []metabase.RawObject{ + { + ObjectStream: obj, + CreatedAt: now, + Status: metabase.Committed, + Encryption: defaultTestEncryption, + }, + }, + Segments: []metabase.RawSegment{ + metabase.RawSegment(expectedSegment), + }, + }.Check(ctx, t, db) + }) + }) +} diff --git a/satellite/metainfo/metabase/raw.go b/satellite/metainfo/metabase/raw.go new file mode 100644 index 000000000..0fc468831 --- /dev/null +++ b/satellite/metainfo/metabase/raw.go @@ -0,0 +1,200 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase + +import ( + "context" + "time" + + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/common/uuid" +) + +// RawObject defines the full object that is stored in the database. It should be rarely used directly. +type RawObject struct { + ObjectStream + + CreatedAt time.Time + ExpiresAt *time.Time + + Status ObjectStatus + SegmentCount int32 + + EncryptedMetadataNonce []byte + EncryptedMetadata []byte + + TotalEncryptedSize int64 + FixedSegmentSize int32 + + Encryption storj.EncryptionParameters + + // ZombieDeletionDeadline defines when the pending raw object should be deleted from the database. + // This is as a safeguard against objects that failed to upload and the client has not indicated + // whether they want to continue uploading or delete the already uploaded data. + ZombieDeletionDeadline *time.Time +} + +// RawSegment defines the full segment that is stored in the database. It should be rarely used directly. +type RawSegment struct { + StreamID uuid.UUID + Position SegmentPosition + + RootPieceID storj.PieceID + EncryptedKeyNonce []byte + EncryptedKey []byte + + EncryptedSize int32 // size of the whole segment (not a piece) + PlainSize int32 + PlainOffset int64 + // TODO: add fields for proofs/chains + + Redundancy storj.RedundancyScheme + + InlineData []byte + Pieces Pieces +} + +// RawState contains full state of a table. +type RawState struct { + Objects []RawObject + Segments []RawSegment +} + +// TestingGetState returns the state of the database. +func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) { + state := &RawState{} + + state.Objects, err = db.testingGetAllObjects(ctx) + if err != nil { + return nil, Error.New("GetState: %w", err) + } + + state.Segments, err = db.testingGetAllSegments(ctx) + if err != nil { + return nil, Error.New("GetState: %w", err) + } + + return state, nil +} + +// TestingDeleteAll deletes all objects and segments from the database. +func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { + _, err = db.db.ExecContext(ctx, ` + DELETE FROM objects; + DELETE FROM segments; + `) + return Error.Wrap(err) +} + +// testingGetAllObjects returns the state of the database. +func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) { + objs := []RawObject{} + + rows, err := db.db.Query(ctx, ` + SELECT + project_id, bucket_name, object_key, version, stream_id, + created_at, expires_at, + status, segment_count, + encrypted_metadata_nonce, encrypted_metadata, + total_encrypted_size, fixed_segment_size, + encryption, + zombie_deletion_deadline + FROM objects + `) + if err != nil { + return nil, Error.New("testingGetAllObjects query: %w", err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + for rows.Next() { + var obj RawObject + err := rows.Scan( + &obj.ProjectID, + &obj.BucketName, + &obj.ObjectKey, + &obj.Version, + &obj.StreamID, + + &obj.CreatedAt, + &obj.ExpiresAt, + + &obj.Status, // TODO: fix encoding + &obj.SegmentCount, + + &obj.EncryptedMetadataNonce, + &obj.EncryptedMetadata, + + &obj.TotalEncryptedSize, + &obj.FixedSegmentSize, + + encryptionParameters{&obj.Encryption}, + &obj.ZombieDeletionDeadline, + ) + if err != nil { + return nil, Error.New("testingGetAllObjects scan failed: %w", err) + } + objs = append(objs, obj) + } + if err := rows.Err(); err != nil { + return nil, Error.New("testingGetAllObjects scan failed: %w", err) + } + + if len(objs) == 0 { + return nil, nil + } + return objs, nil +} + +// testingGetAllSegments returns the state of the database. +func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) { + segs := []RawSegment{} + + rows, err := db.db.Query(ctx, ` + SELECT + stream_id, position, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, + plain_offset, plain_size, + redundancy, + inline_data, remote_pieces + FROM segments + `) + if err != nil { + return nil, Error.New("testingGetAllSegments query: %w", err) + } + defer func() { err = errs.Combine(err, rows.Close()) }() + for rows.Next() { + var seg RawSegment + err := rows.Scan( + &seg.StreamID, + &seg.Position, + + &seg.RootPieceID, + &seg.EncryptedKeyNonce, + &seg.EncryptedKey, + + &seg.EncryptedSize, + &seg.PlainOffset, + &seg.PlainSize, + + redundancyScheme{&seg.Redundancy}, + + &seg.InlineData, + &seg.Pieces, + ) + if err != nil { + return nil, Error.New("testingGetAllSegments scan failed: %w", err) + } + segs = append(segs, seg) + } + if err := rows.Err(); err != nil { + return nil, Error.New("testingGetAllSegments scan failed: %w", err) + } + + if len(segs) == 0 { + return nil, nil + } + return segs, nil +} diff --git a/satellite/metainfo/metabase/test_test.go b/satellite/metainfo/metabase/test_test.go new file mode 100644 index 000000000..99039d43a --- /dev/null +++ b/satellite/metainfo/metabase/test_test.go @@ -0,0 +1,207 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package metabase_test + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "github.com/zeebo/errs" + + "storj.io/common/testcontext" + "storj.io/storj/satellite/metainfo/metabase" +) + +type BeginObjectNextVersion struct { + Opts metabase.BeginObjectNextVersion + Version metabase.Version + ErrClass *errs.Class + ErrText string +} + +func (step BeginObjectNextVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + got, err := db.BeginObjectNextVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + require.Equal(t, step.Version, got) +} + +type BeginObjectExactVersion struct { + Opts metabase.BeginObjectExactVersion + Version metabase.Version + ErrClass *errs.Class + ErrText string +} + +func (step BeginObjectExactVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + got, err := db.BeginObjectExactVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + require.Equal(t, step.Version, got) +} + +type CommitObject struct { + Opts metabase.CommitObject + ErrClass *errs.Class + ErrText string +} + +func (step CommitObject) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.CommitObject(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + +type BeginSegment struct { + Opts metabase.BeginSegment + ErrClass *errs.Class + ErrText string +} + +func (step BeginSegment) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.BeginSegment(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + +type CommitSegment struct { + Opts metabase.CommitSegment + ErrClass *errs.Class + ErrText string +} + +func (step CommitSegment) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.CommitSegment(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + +type CommitInlineSegment struct { + Opts metabase.CommitInlineSegment + ErrClass *errs.Class + ErrText string +} + +func (step CommitInlineSegment) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.CommitInlineSegment(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) +} + +type GetObjectExactVersion struct { + Opts metabase.GetObjectExactVersion + Result metabase.Object + ErrClass *errs.Class + ErrText string +} + +func (step GetObjectExactVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.GetObjectExactVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +type GetObjectLatestVersion struct { + Opts metabase.GetObjectLatestVersion + Result metabase.Object + ErrClass *errs.Class + ErrText string +} + +func (step GetObjectLatestVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.GetObjectLatestVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +type GetSegmentByPosition struct { + Opts metabase.GetSegmentByPosition + Result metabase.Segment + ErrClass *errs.Class + ErrText string +} + +func (step GetSegmentByPosition) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.GetSegmentByPosition(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +type DeleteObjectExactVersion struct { + Opts metabase.DeleteObjectExactVersion + Result metabase.DeleteObjectResult + ErrClass *errs.Class + ErrText string +} + +func (step DeleteObjectExactVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.DeleteObjectExactVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +type DeleteObjectLatestVersion struct { + Opts metabase.DeleteObjectLatestVersion + Result metabase.DeleteObjectResult + ErrClass *errs.Class + ErrText string +} + +func (step DeleteObjectLatestVersion) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.DeleteObjectLatestVersion(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +type DeleteObjectAllVersions struct { + Opts metabase.DeleteObjectAllVersions + Result metabase.DeleteObjectResult + ErrClass *errs.Class + ErrText string +} + +func (step DeleteObjectAllVersions) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + result, err := db.DeleteObjectAllVersions(ctx, step.Opts) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +} + +func checkError(t *testing.T, err error, errClass *errs.Class, errText string) { + if errClass != nil { + require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err) + } + if errText != "" { + require.EqualError(t, err, errClass.New(errText).Error()) + } + if errClass == nil && errText == "" { + require.NoError(t, err) + } +} + +type DeleteAll struct{} + +func (step DeleteAll) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + err := db.TestingDeleteAll(ctx) + require.NoError(t, err) +} + +type Verify metabase.RawState + +func (step Verify) Check(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + state, err := db.TestingGetState(ctx) + require.NoError(t, err) + + diff := cmp.Diff(metabase.RawState(step), *state, + cmpopts.EquateApproxTime(5*time.Second)) + require.Zero(t, diff) +}