satellite/metabase: drop object deletion code for copy with references

With this change we are removing code responsible for deleting objects
and supporting server side copies created with references. In practice
we are restoring delete queries that we had before server side copy
implementation (with small exception, see bellow).

From deletion queries we are also removing parts with segment metadata
as result because we are not longer sending explicit delete requests to
storage nodes.

https://github.com/storj/storj/issues/5891

Change-Id: Iee4e7a9688cff27a60fb95d60dd233d996f41c85
This commit is contained in:
Michal Niewrzal 2023-07-19 11:09:53 +02:00 committed by Michał Niewrzał
parent 03f8ad323d
commit 9550b5f4a5
11 changed files with 142 additions and 979 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
@ -17,6 +18,7 @@ import (
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/audit"
)
@ -77,10 +79,16 @@ func TestAuditOrderLimit(t *testing.T) {
})
}
// Minimal test to verify that copies aren't audited.
// Minimal test to verify that copies are also audited as we duplicate
// all segment metadata.
func TestAuditSkipsRemoteCopies(t *testing.T) {
testWithRangedLoop(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.ServerSideCopyDuplicateMetadata = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
@ -96,10 +104,6 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
err = uplink.Upload(ctx, satellite, "testbucket", "testobj2", testData)
require.NoError(t, err)
originalSegments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, originalSegments, 2)
project, err := uplink.OpenProject(ctx, satellite)
require.NoError(t, err)
defer ctx.Check(project.Close)
@ -107,13 +111,17 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
_, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil)
require.NoError(t, err)
allSegments, err := satellite.Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, allSegments, 3)
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
queue := audits.VerifyQueue
auditSegments := make([]audit.Segment, 0, 2)
for range originalSegments {
auditSegments := make([]audit.Segment, 0, 3)
for range allSegments {
auditSegment, err := queue.Next(ctx)
require.NoError(t, err)
auditSegments = append(auditSegments, auditSegment)
@ -125,8 +133,8 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
})
// Check that StreamID of copy
for i := range originalSegments {
require.Equal(t, originalSegments[i].StreamID, auditSegments[i].StreamID)
for i := range allSegments {
require.Equal(t, allSegments[i].StreamID, auditSegments[i].StreamID)
}
// delete originals, keep 1 copy
@ -138,14 +146,18 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)
allSegments, err = satellite.Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, allSegments, 1)
queue = audits.VerifyQueue
// verify that the copy is being audited
remainingSegment, err := queue.Next(ctx)
require.NoError(t, err)
for _, originalSegment := range originalSegments {
require.NotEqual(t, originalSegment.StreamID, remainingSegment.StreamID)
for _, segment := range allSegments {
require.Equal(t, segment.StreamID, remainingSegment.StreamID)
}
})
}

View File

@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"storj.io/common/memory"
"storj.io/common/pb"
@ -233,14 +235,19 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 3, 4, 4),
func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.ServerSideCopyDuplicateMetadata = true
},
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
require.NoError(t, err)
defer ctx.Check(project.Close)
testGETransferQueue := func(node storj.NodeID) {
testGETransferQueue := func(node storj.NodeID, segmentsToTransfer int) {
_, err = planet.Satellites[0].Overlay.DB.UpdateExitStatus(ctx, &overlay.ExitStatusRequest{
NodeID: node,
ExitInitiatedAt: time.Now().UTC(),
@ -251,26 +258,21 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
// we should get only one item from GE queue as we have only one remote segments
// we should get two items from GE queue as we have remote segment and its copy
items, err := planet.Satellites[0].DB.GracefulExit().GetIncomplete(ctx, node, 100, 0)
require.NoError(t, err)
require.Len(t, items, 1)
require.Len(t, items, segmentsToTransfer)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
// find non copy remote segment
segment := metabase.Segment{}
for _, s := range segments {
if len(s.Pieces) > 0 {
if !segment.StreamID.IsZero() {
t.Fatal("we should have only one remote non copy segment")
}
segment = s
for _, segment := range segments {
if !segment.Inline() {
require.True(t, slices.ContainsFunc(items, func(item *gracefulexit.TransferQueueItem) bool {
return item.StreamID == segment.StreamID
}))
}
}
require.True(t, !segment.StreamID.IsZero())
require.Equal(t, segment.StreamID, items[0].StreamID)
}
// upload inline and remote and make copies
@ -282,15 +284,15 @@ func TestGracefulExit_CopiedObjects(t *testing.T) {
require.NoError(t, err)
}
testGETransferQueue(planet.StorageNodes[0].ID())
testGETransferQueue(planet.StorageNodes[0].ID(), 2)
// delete original objects to promote copies as new ancestors
// delete original objects
for _, size := range []memory.Size{memory.KiB, 10 * memory.KiB} {
_, err = project.DeleteObject(ctx, "my-bucket", "my-object-"+size.String())
require.NoError(t, err)
}
testGETransferQueue(planet.StorageNodes[1].ID())
testGETransferQueue(planet.StorageNodes[1].ID(), 1)
})
}

View File

@ -613,10 +613,6 @@ type CommitObject struct {
EncryptedMetadataEncryptedKey []byte // optional
DisallowDelete bool
// OnDelete will be triggered when/if existing object will be overwritten on commit.
// Wil be only executed after succesfull commit + delete DB operation.
// Error on this function won't revert back committed object.
OnDelete func(segments []DeletedSegmentInfo)
UsePendingObjectsTable bool
}
@ -650,8 +646,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return Object{}, err
}
deletedSegments := []DeletedSegmentInfo{}
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
segments, err := fetchSegmentsForCommit(ctx, tx, opts.StreamID)
if err != nil {
@ -877,7 +871,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
}
for _, version := range versionsToDelete {
deleteResult, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
_, err := db.deleteObjectExactVersion(ctx, DeleteObjectExactVersion{
ObjectLocation: ObjectLocation{
ProjectID: opts.ProjectID,
BucketName: opts.BucketName,
@ -888,8 +882,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
if err != nil {
return Error.New("failed to delete existing object: %w", err)
}
deletedSegments = append(deletedSegments, deleteResult.Segments...)
}
object.StreamID = opts.StreamID
@ -908,11 +900,6 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
return Object{}, err
}
// we can execute this only when whole transaction is committed without any error
if len(deletedSegments) > 0 && opts.OnDelete != nil {
opts.OnDelete(deletedSegments)
}
mon.Meter("object_commit").Mark(1)
mon.IntVal("object_commit_segments").Observe(int64(object.SegmentCount))
mon.IntVal("object_commit_encrypted_size").Observe(object.TotalEncryptedSize)

View File

@ -8,8 +8,6 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
@ -3762,46 +3760,6 @@ func TestCommitObject(t *testing.T) {
}.Check(ctx, t, db)
})
t.Run("OnDelete", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
// check deleted segments
obj := metabasetest.RandObjectStream()
_, expectedSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 3)
expectedDeletedSegments := []metabase.DeletedSegmentInfo{}
for _, segment := range expectedSegments {
expectedDeletedSegments = append(expectedDeletedSegments, metabase.DeletedSegmentInfo{
RootPieceID: segment.RootPieceID,
Pieces: segment.Pieces,
})
}
obj.Version++
metabasetest.BeginObjectExactVersion{
Opts: metabase.BeginObjectExactVersion{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
},
Version: obj.Version,
}.Check(ctx, t, db)
deletedSegments := []metabase.DeletedSegmentInfo{}
metabasetest.CommitObject{
Opts: metabase.CommitObject{
ObjectStream: obj,
Encryption: metabasetest.DefaultEncryption,
OnDelete: func(segments []metabase.DeletedSegmentInfo) {
deletedSegments = append(deletedSegments, segments...)
},
},
}.Check(ctx, t, db)
require.Equal(t, expectedDeletedSegments, deletedSegments)
})
t.Run("use pending objects table", func(t *testing.T) {
obj.Version = metabase.NextVersion
t.Run("version", func(t *testing.T) {

View File

@ -263,7 +263,7 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
if objectAtDestination != nil {
version := objectAtDestination.Version
deletedObjects, err := db.deleteObjectExactVersionServerSideCopy(
deletedObjects, err := db.deleteObjectExactVersion(
ctx, DeleteObjectExactVersion{
Version: version,
ObjectLocation: ObjectLocation{
@ -278,13 +278,10 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
}
// The object at the destination was the ancestor!
// Now that the ancestor of the source object is removed, we need to change the target ancestor.
if ancestorStreamID == objectAtDestination.StreamID {
if len(deletedObjects) == 0 {
if len(deletedObjects.Objects) == 0 {
return Error.New("ancestor is gone, please retry operation")
}
ancestorStreamID = *deletedObjects[0].PromotedAncestor
}
}

View File

@ -86,6 +86,8 @@ func TestBeginCopyObject(t *testing.T) {
}
func TestFinishCopyObject(t *testing.T) {
t.Skip("test will be removed in subsequent change")
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()
newBucketName := "New bucket name"

View File

@ -6,19 +6,21 @@ package metabase
import (
"bytes"
"context"
"fmt"
"sort"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/txutil"
"storj.io/private/tagsql"
)
// DeletedSegmentInfo info about deleted segment.
type DeletedSegmentInfo struct {
RootPieceID storj.PieceID
Pieces Pieces
}
// DeleteObjectExactVersion contains arguments necessary for deleting an exact version of object.
type DeleteObjectExactVersion struct {
Version Version
@ -38,31 +40,7 @@ func (obj *DeleteObjectExactVersion) Verify() error {
// DeleteObjectResult result of deleting object.
type DeleteObjectResult struct {
Objects []Object
Segments []DeletedSegmentInfo
}
// DeletedSegmentInfo info about deleted segment.
type DeletedSegmentInfo struct {
RootPieceID storj.PieceID
Pieces Pieces
}
type deletedObjectInfo struct {
Object
Segments []deletedRemoteSegmentInfo
// while deletion we are trying to find if deleted object have a copy
// and if we need new ancestor to replace it. If we find a copy that
// can be new ancestor we are keeping its stream id in this field.
PromotedAncestor *uuid.UUID
}
type deletedRemoteSegmentInfo struct {
Position SegmentPosition
RootPieceID storj.PieceID
Pieces Pieces
RepairedAt *time.Time
Objects []Object
}
// DeleteObjectsAllVersions contains arguments necessary for deleting all versions of multiple objects from the same bucket.
@ -101,7 +79,7 @@ func (delete *DeleteObjectsAllVersions) Verify() error {
return nil
}
var deleteObjectExactVersionWithoutCopyFeatureSQL = `
var deleteObjectExactVersion = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
@ -110,29 +88,21 @@ WITH deleted_objects AS (
object_key = $3 AND
version = $4
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
RETURNING segments.stream_id
)
SELECT
deleted_objects.version, deleted_objects.stream_id,
deleted_objects.created_at, deleted_objects.expires_at,
deleted_objects.status, deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`
var deleteObjectLastCommittedWithoutCopyFeatureSQL = `
var deleteObjectLastCommitted = `
WITH deleted_objects AS (
DELETE FROM objects
WHERE
@ -157,338 +127,56 @@ WITH deleted_objects AS (
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id, segments.root_piece_id, segments.remote_alias_pieces
RETURNING segments.stream_id
)
SELECT
deleted_objects.version, deleted_objects.stream_id,
deleted_objects.created_at, deleted_objects.expires_at,
deleted_objects.status, deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id`
// TODO: remove comments with regex.
var deleteBucketObjectsWithCopyFeatureSQL = `
WITH deleted_objects AS (
%s
RETURNING
stream_id
-- extra properties only returned when deleting single object
%s
),
deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING
segments.stream_id,
segments.position,
segments.inline_data,
segments.plain_size,
segments.encrypted_size,
segments.repaired_at,
segments.root_piece_id,
segments.remote_alias_pieces
),
deleted_copies AS (
DELETE FROM segment_copies
WHERE segment_copies.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segment_copies.stream_id
),
-- lowest stream_id becomes new ancestor
promoted_ancestors AS (
-- select only one child to promote per ancestor
SELECT DISTINCT ON (segment_copies.ancestor_stream_id)
segment_copies.stream_id AS new_ancestor_stream_id,
segment_copies.ancestor_stream_id AS deleted_stream_id
FROM segment_copies
-- select children about to lose their ancestor
-- this is not a WHERE clause because that caused a full table scan in CockroachDB
INNER JOIN deleted_objects
ON deleted_objects.stream_id = segment_copies.ancestor_stream_id
-- don't select children which will be removed themselves
WHERE segment_copies.stream_id NOT IN (
SELECT stream_id
FROM deleted_objects
)
)
SELECT
deleted_objects.stream_id,
deleted_segments.position,
deleted_segments.root_piece_id,
-- piece to remove from storagenodes or link to new ancestor
deleted_segments.remote_alias_pieces,
-- if set, caller needs to promote this stream_id to new ancestor or else object contents will be lost
promoted_ancestors.new_ancestor_stream_id
-- extra properties only returned when deleting single object
%s
FROM deleted_objects
LEFT JOIN deleted_segments
ON deleted_objects.stream_id = deleted_segments.stream_id
LEFT JOIN promoted_ancestors
ON deleted_objects.stream_id = promoted_ancestors.deleted_stream_id
ORDER BY stream_id
`
var deleteObjectExactVersionSubSQL = `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version = $4
`
var deleteObjectLastCommittedSubSQL = `
DELETE FROM objects
WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
version IN (SELECT version FROM objects WHERE
project_id = $1 AND
bucket_name = $2 AND
object_key = $3 AND
status = ` + committedStatus + ` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
)
`
var deleteObjectExactVersionWithCopyFeatureSQL = fmt.Sprintf(
deleteBucketObjectsWithCopyFeatureSQL,
deleteObjectExactVersionSubSQL,
`,version,
created_at,
expires_at,
status,
segment_count,
encrypted_metadata_nonce,
encrypted_metadata,
encrypted_metadata_encrypted_key,
total_plain_size,
total_encrypted_size,
fixed_segment_size,
encryption`,
`,deleted_objects.version,
deleted_objects.created_at,
deleted_objects.expires_at,
deleted_objects.status,
deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce,
deleted_objects.encrypted_metadata,
deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size,
deleted_objects.total_encrypted_size,
deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.repaired_at`,
)
var deleteObjectLastCommittedWithCopyFeatureSQL = fmt.Sprintf(
deleteBucketObjectsWithCopyFeatureSQL,
deleteObjectLastCommittedSubSQL,
`,version,
created_at,
expires_at,
status,
segment_count,
encrypted_metadata_nonce,
encrypted_metadata,
encrypted_metadata_encrypted_key,
total_plain_size,
total_encrypted_size,
fixed_segment_size,
encryption`,
`,deleted_objects.version,
deleted_objects.created_at,
deleted_objects.expires_at,
deleted_objects.status,
deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce,
deleted_objects.encrypted_metadata,
deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size,
deleted_objects.total_encrypted_size,
deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.repaired_at`,
)
var deleteFromSegmentCopies = `
DELETE FROM segment_copies WHERE segment_copies.stream_id = $1
`
var updateSegmentsWithAncestor = `
WITH update_segment_copies AS (
UPDATE segment_copies
SET ancestor_stream_id = $2
WHERE ancestor_stream_id = $1
RETURNING false
)
UPDATE segments
SET
remote_alias_pieces = P.remote_alias_pieces,
repaired_at = P.repaired_at
FROM (SELECT UNNEST($3::INT8[]), UNNEST($4::BYTEA[]), UNNEST($5::timestamptz[]))
as P(position, remote_alias_pieces, repaired_at)
WHERE
segments.stream_id = $2 AND
segments.position = P.position
`
version, stream_id, created_at, expires_at, status, segment_count, encrypted_metadata_nonce,
encrypted_metadata, encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects`
// DeleteObjectExactVersion deletes an exact object version.
//
// Result will contain only those segments which needs to be deleted
// from storage nodes. If object is an ancestor for copied object its
// segments pieces cannot be deleted because copy still needs it.
func (db *DB) DeleteObjectExactVersion(
ctx context.Context, opts DeleteObjectExactVersion,
) (result DeleteObjectResult, err error) {
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
result, err = db.deleteObjectExactVersion(ctx, opts, tx)
if err != nil {
return err
}
return nil
})
result, err = db.deleteObjectExactVersion(ctx, opts, db.db)
if err != nil {
return DeleteObjectResult{}, err
}
return result, nil
}
return result, err
type stmt interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (tagsql.Rows, error)
}
// implementation of DB.DeleteObjectExactVersion for re-use internally in metabase package.
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (result DeleteObjectResult, err error) {
func (db *DB) deleteObjectExactVersion(ctx context.Context, opts DeleteObjectExactVersion, stmt stmt) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
if db.config.ServerSideCopy {
objects, err := db.deleteObjectExactVersionServerSideCopy(ctx, opts, tx)
if err != nil {
return DeleteObjectResult{}, err
}
for _, object := range objects {
result.Objects = append(result.Objects, object.Object)
// if object is ancestor for copied object we cannot delete its
// segments pieces from storage nodes so we are not returning it
// as an object deletion result
if object.PromotedAncestor != nil {
continue
}
for _, segment := range object.Segments {
result.Segments = append(result.Segments, DeletedSegmentInfo{
RootPieceID: segment.RootPieceID,
Pieces: segment.Pieces,
})
}
}
} else {
err = withRows(
tx.QueryContext(ctx, deleteObjectExactVersionWithoutCopyFeatureSQL,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
}
err = withRows(
stmt.QueryContext(ctx, deleteObjectExactVersion,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
if err != nil {
return DeleteObjectResult{}, err
}
mon.Meter("object_delete").Mark(len(result.Objects))
mon.Meter("segment_delete").Mark(len(result.Segments))
for _, object := range result.Objects {
mon.Meter("segment_delete").Mark(int(object.SegmentCount))
}
return result, nil
}
func (db *DB) deleteObjectExactVersionServerSideCopy(ctx context.Context, opts DeleteObjectExactVersion, tx tagsql.Tx) (objects []deletedObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
err = withRows(
tx.QueryContext(ctx, deleteObjectExactVersionWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version),
)(func(rows tagsql.Rows) error {
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
return err
})
if err != nil {
return nil, err
}
err = db.promoteNewAncestors(ctx, tx, objects)
if err != nil {
return nil, err
}
return objects, nil
}
func (db *DB) promoteNewAncestors(ctx context.Context, tx tagsql.Tx, objects []deletedObjectInfo) (err error) {
defer mon.Task()(&ctx)(&err)
for _, object := range objects {
if object.PromotedAncestor == nil {
continue
}
positions := make([]int64, len(object.Segments))
remoteAliasesPieces := make([][]byte, len(object.Segments))
repairedAts := make([]*time.Time, len(object.Segments))
for i, segment := range object.Segments {
positions[i] = int64(segment.Position.Encode())
aliases, err := db.aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces)
if err != nil {
return err
}
aliasesBytes, err := aliases.Bytes()
if err != nil {
return err
}
remoteAliasesPieces[i] = aliasesBytes
repairedAts[i] = segment.RepairedAt
}
result, err := tx.ExecContext(ctx, deleteFromSegmentCopies, *object.PromotedAncestor)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected != 1 {
return errs.New("new ancestor was not deleted from segment copies")
}
result, err = tx.ExecContext(ctx, updateSegmentsWithAncestor,
object.StreamID, *object.PromotedAncestor, pgutil.Int8Array(positions),
pgutil.ByteaArray(remoteAliasesPieces), pgutil.NullTimestampTZArray(repairedAts))
if err != nil {
return err
}
affected, err = result.RowsAffected()
if err != nil {
return err
}
if affected != int64(len(object.Segments)) {
return errs.New("not all new ancestor segments were update: got %d want %d", affected, len(object.Segments))
}
}
return nil
}
// DeletePendingObject contains arguments necessary for deleting a pending object.
type DeletePendingObject struct {
ObjectStream
@ -521,29 +209,21 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
stream_id = $5 AND
status = `+pendingStatus+`
RETURNING
version, stream_id,
created_at, expires_at,
status, segment_count,
version, stream_id, created_at, expires_at, status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
total_plain_size, total_encrypted_size, fixed_segment_size, encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces
RETURNING segments.stream_id
)
SELECT
deleted_objects.version, deleted_objects.stream_id,
deleted_objects.created_at, deleted_objects.expires_at,
deleted_objects.status, deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
version, stream_id, created_at, expires_at, status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size, encryption
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID))(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.Location(), rows)
result.Objects, err = db.scanObjectDeletion(ctx, opts.Location(), rows)
return err
})
@ -556,7 +236,9 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
}
mon.Meter("object_delete").Mark(len(result.Objects))
mon.Meter("segment_delete").Mark(len(result.Segments))
for _, object := range result.Objects {
mon.Meter("segment_delete").Mark(int(object.SegmentCount))
}
return result, nil
}
@ -601,31 +283,23 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
object_key = ANY ($3) AND
status = `+committedStatus+`
RETURNING
project_id, bucket_name,
object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
project_id, bucket_name, object_key, version, stream_id, created_at, expires_at,
status, segment_count, encrypted_metadata_nonce, encrypted_metadata,
encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id,segments.root_piece_id, segments.remote_alias_pieces
RETURNING segments.stream_id
)
SELECT
deleted_objects.project_id, deleted_objects.bucket_name,
deleted_objects.object_key,deleted_objects.version, deleted_objects.stream_id,
deleted_objects.created_at, deleted_objects.expires_at,
deleted_objects.status, deleted_objects.segment_count,
deleted_objects.encrypted_metadata_nonce, deleted_objects.encrypted_metadata, deleted_objects.encrypted_metadata_encrypted_key,
deleted_objects.total_plain_size, deleted_objects.total_encrypted_size, deleted_objects.fixed_segment_size,
deleted_objects.encryption,
deleted_segments.root_piece_id, deleted_segments.remote_alias_pieces
project_id, bucket_name, object_key, version, stream_id, created_at, expires_at,
status, segment_count, encrypted_metadata_nonce, encrypted_metadata,
encrypted_metadata_encrypted_key, total_plain_size, total_encrypted_size,
fixed_segment_size, encryption
FROM deleted_objects
LEFT JOIN deleted_segments ON deleted_objects.stream_id = deleted_segments.stream_id
`, projectID, []byte(bucketName), pgutil.ByteaArray(objectKeys)))(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanMultipleObjectsDeletion(ctx, rows)
result.Objects, err = db.scanMultipleObjectsDeletion(ctx, rows)
return err
})
@ -634,87 +308,20 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
}
mon.Meter("object_delete").Mark(len(result.Objects))
mon.Meter("segment_delete").Mark(len(result.Segments))
return result, nil
}
func (db *DB) scanObjectDeletionServerSideCopy(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
defer func() { err = errs.Combine(err, rows.Close()) }()
result = make([]deletedObjectInfo, 0, 10)
var rootPieceID *storj.PieceID
// for object without segments we can get position = NULL
var segmentPosition *SegmentPosition
var object deletedObjectInfo
var segment deletedRemoteSegmentInfo
var aliasPieces AliasPieces
for rows.Next() {
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
object.ObjectKey = location.ObjectKey
err = rows.Scan(
// shared properties between deleteObject and deleteBucketObjects functionality
&object.StreamID,
&segmentPosition,
&rootPieceID,
&aliasPieces,
&object.PromotedAncestor,
// properties only for deleteObject functionality
&object.Version,
&object.CreatedAt, &object.ExpiresAt,
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
&segment.RepairedAt,
)
if err != nil {
return nil, Error.New("unable to delete object: %w", err)
}
if len(result) == 0 || result[len(result)-1].StreamID != object.StreamID {
result = append(result, object)
}
if rootPieceID != nil {
if segmentPosition != nil {
segment.Position = *segmentPosition
}
segment.RootPieceID = *rootPieceID
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, Error.Wrap(err)
}
if len(segment.Pieces) > 0 {
result[len(result)-1].Segments = append(result[len(result)-1].Segments, segment)
}
}
}
if err := rows.Err(); err != nil {
return nil, Error.New("unable to delete object: %w", err)
for _, object := range result.Objects {
mon.Meter("segment_delete").Mark(int(object.SegmentCount))
}
return result, nil
}
func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) {
func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
defer mon.Task()(&ctx)(&err)
defer func() { err = errs.Combine(err, rows.Close()) }()
objects = make([]Object, 0, 10)
segments = make([]DeletedSegmentInfo, 0, 10)
var rootPieceID *storj.PieceID
var object Object
var segment DeletedSegmentInfo
var aliasPieces AliasPieces
for rows.Next() {
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
@ -725,49 +332,29 @@ func (db *DB) scanObjectDeletion(ctx context.Context, location ObjectLocation, r
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces,
encryptionParameters{&object.Encryption},
)
if err != nil {
return nil, nil, Error.New("unable to delete object: %w", err)
}
if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID {
objects = append(objects, object)
return nil, Error.New("unable to delete object: %w", err)
}
if rootPieceID != nil {
segment.RootPieceID = *rootPieceID
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, nil, Error.Wrap(err)
}
if len(segment.Pieces) > 0 {
segments = append(segments, segment)
}
}
objects = append(objects, object)
}
if err := rows.Err(); err != nil {
return nil, nil, Error.New("unable to delete object: %w", err)
return nil, Error.New("unable to delete object: %w", err)
}
if len(segments) == 0 {
return objects, nil, nil
}
return objects, segments, nil
return objects, nil
}
func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, segments []DeletedSegmentInfo, err error) {
func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows) (objects []Object, err error) {
defer mon.Task()(&ctx)(&err)
defer func() { err = errs.Combine(err, rows.Close()) }()
objects = make([]Object, 0, 10)
segments = make([]DeletedSegmentInfo, 0, 10)
var rootPieceID *storj.PieceID
var object Object
var segment DeletedSegmentInfo
var aliasPieces AliasPieces
for rows.Next() {
err = rows.Scan(&object.ProjectID, &object.BucketName,
&object.ObjectKey, &object.Version, &object.StreamID,
@ -775,38 +362,23 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows)
&object.Status, &object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption}, &rootPieceID, &aliasPieces)
encryptionParameters{&object.Encryption})
if err != nil {
return nil, nil, Error.New("unable to delete object: %w", err)
return nil, Error.New("unable to delete object: %w", err)
}
if len(objects) == 0 || objects[len(objects)-1].StreamID != object.StreamID {
objects = append(objects, object)
}
if rootPieceID != nil {
segment.RootPieceID = *rootPieceID
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, nil, Error.Wrap(err)
}
if len(segment.Pieces) > 0 {
segments = append(segments, segment)
}
}
objects = append(objects, object)
}
if err := rows.Err(); err != nil {
return nil, nil, Error.New("unable to delete object: %w", err)
return nil, Error.New("unable to delete object: %w", err)
}
if len(objects) == 0 {
objects = nil
}
if len(segments) == 0 {
return objects, nil, nil
}
return objects, segments, nil
return objects, nil
}
// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
@ -820,89 +392,30 @@ func (obj *DeleteObjectLastCommitted) Verify() error {
}
// DeleteObjectLastCommitted deletes an object last committed version.
//
// Result will contain only those segments which needs to be deleted
// from storage nodes. If object is an ancestor for copied object its
// segments pieces cannot be deleted because copy still needs it.
func (db *DB) DeleteObjectLastCommitted(
ctx context.Context, opts DeleteObjectLastCommitted,
) (result DeleteObjectResult, err error) {
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) error {
result, err = db.deleteObjectLastCommitted(ctx, opts, tx)
if err != nil {
return err
}
return nil
})
return result, err
}
// implementation of DB.DeleteObjectLastCommitted for re-use internally in metabase package.
func (db *DB) deleteObjectLastCommitted(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}
if db.config.ServerSideCopy {
objects, err := db.deleteObjectLastCommittedServerSideCopy(ctx, opts, tx)
if err != nil {
return DeleteObjectResult{}, err
}
for _, object := range objects {
result.Objects = append(result.Objects, object.Object)
// if object is ancestor for copied object we cannot delete its
// segments pieces from storage nodes so we are not returning it
// as an object deletion result
if object.PromotedAncestor != nil {
continue
}
for _, segment := range object.Segments {
result.Segments = append(result.Segments, DeletedSegmentInfo{
RootPieceID: segment.RootPieceID,
Pieces: segment.Pieces,
})
}
}
} else {
err = withRows(
tx.QueryContext(ctx, deleteObjectLastCommittedWithoutCopyFeatureSQL,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
)(func(rows tagsql.Rows) error {
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
}
err = withRows(
db.db.QueryContext(ctx, deleteObjectLastCommitted,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
)(func(rows tagsql.Rows) error {
result.Objects, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
return err
})
if err != nil {
return DeleteObjectResult{}, err
}
mon.Meter("object_delete").Mark(len(result.Objects))
mon.Meter("segment_delete").Mark(len(result.Segments))
for _, object := range result.Objects {
mon.Meter("segment_delete").Mark(int(object.SegmentCount))
}
return result, nil
}
func (db *DB) deleteObjectLastCommittedServerSideCopy(ctx context.Context, opts DeleteObjectLastCommitted, tx tagsql.Tx) (objects []deletedObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
err = withRows(
tx.QueryContext(ctx, deleteObjectLastCommittedWithCopyFeatureSQL, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey),
)(func(rows tagsql.Rows) error {
objects, err = db.scanObjectDeletionServerSideCopy(ctx, opts.ObjectLocation, rows)
return err
})
if err != nil {
return nil, err
}
err = db.promoteNewAncestors(ctx, tx, objects)
if err != nil {
return nil, err
}
return objects, nil
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
@ -198,11 +197,6 @@ func TestDeletePendingObject(t *testing.T) {
metabasetest.CreatePendingObject(ctx, t, db, obj, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeletePendingObject{
Opts: metabase.DeletePendingObject{
ObjectStream: obj,
@ -216,7 +210,6 @@ func TestDeletePendingObject(t *testing.T) {
Encryption: metabasetest.DefaultEncryption,
},
},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
},
}.Check(ctx, t, db)
@ -400,19 +393,13 @@ func TestDeleteObjectExactVersion(t *testing.T) {
object := metabasetest.CreateObject(ctx, t, db, obj, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
ObjectLocation: location,
Version: 1,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{object},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
Objects: []metabase.Object{object},
},
}.Check(ctx, t, db)
@ -594,18 +581,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
object := metabasetest.CreateObject(ctx, t, db, obj, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeleteObjectsAllVersions{
Opts: metabase.DeleteObjectsAllVersions{
Locations: []metabase.ObjectLocation{location},
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{object},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
Objects: []metabase.Object{object},
},
}.Check(ctx, t, db)
@ -622,18 +603,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
object1 := metabasetest.CreateObject(ctx, t, db, obj, 1)
object2 := metabasetest.CreateObject(ctx, t, db, obj2, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeleteObjectsAllVersions{
Opts: metabase.DeleteObjectsAllVersions{
Locations: []metabase.ObjectLocation{location, obj2.Location()},
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{object1, object2},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo, expectedSegmentInfo},
Objects: []metabase.Object{object1, object2},
},
}.Check(ctx, t, db)
@ -722,18 +697,12 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
object2 := metabasetest.CreateObject(ctx, t, db, obj2, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeleteObjectsAllVersions{
Opts: metabase.DeleteObjectsAllVersions{
Locations: []metabase.ObjectLocation{location, object2.Location()},
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{object1, object2},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
Objects: []metabase.Object{object1, object2},
},
}.Check(ctx, t, db)
@ -751,10 +720,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
obj.StreamID = testrand.UUID()
obj.Version = metabase.Version(i)
expected.Objects = append(expected.Objects, metabasetest.CreateObject(ctx, t, db, obj, 1))
expected.Segments = append(expected.Segments, metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
})
}
metabasetest.DeleteObjectsAllVersions{
@ -769,244 +734,6 @@ func TestDeleteObjectsAllVersions(t *testing.T) {
})
}
func TestDeleteCopy(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
for _, numberOfSegments := range []int{0, 1, 3} {
t.Run(fmt.Sprintf("%d segments", numberOfSegments), func(t *testing.T) {
t.Run("delete copy", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
originalObjStream := metabasetest.RandObjectStream()
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjStream,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, originalObjStream, byte(numberOfSegments))
copyObj, _, copySegments := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
var copies []metabase.RawCopy
if numberOfSegments > 0 {
copies = []metabase.RawCopy{
{
StreamID: copyObj.StreamID,
AncestorStreamID: originalObj.StreamID,
}}
}
// check that copy went OK
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
metabase.RawObject(copyObj),
},
Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments...),
Copies: copies,
}.Check(ctx, t, db)
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
ObjectLocation: copyObj.Location(),
Version: copyObj.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{copyObj},
// no segments returned as we deleted copy
},
}.Check(ctx, t, db)
// Verify that we are back at the original single object
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
},
Segments: metabasetest.SegmentsToRaw(originalSegments),
}.Check(ctx, t, db)
})
t.Run("delete one of two copies", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
originalObjectStream := metabasetest.RandObjectStream()
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjectStream,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
copyObject1, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
ObjectLocation: copyObject1.Location(),
Version: copyObject1.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{copyObject1},
// no segments returned as we deleted copy
},
}.Check(ctx, t, db)
var copies []metabase.RawCopy
if numberOfSegments > 0 {
copies = []metabase.RawCopy{
{
StreamID: copyObject2.StreamID,
AncestorStreamID: originalObj.StreamID,
}}
}
// Verify that only one of the copies is deleted
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
metabase.RawObject(copyObject2),
},
Segments: append(metabasetest.SegmentsToRaw(originalSegments), copySegments2...),
Copies: copies,
}.Check(ctx, t, db)
})
t.Run("delete original", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
originalObjectStream := metabasetest.RandObjectStream()
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjectStream,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
copyObject, _, copySegments := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
metabasetest.DeleteObjectExactVersion{
Opts: metabase.DeleteObjectExactVersion{
ObjectLocation: originalObj.Location(),
Version: originalObj.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{originalObj},
// no segments returned as we deleted ancestor
// and we moved pieces to one of copies
},
}.Check(ctx, t, db)
for i := range copySegments {
copySegments[i].Pieces = originalSegments[i].Pieces
}
// verify that the copy is left
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(copyObject),
},
Segments: copySegments,
}.Check(ctx, t, db)
})
t.Run("delete original and leave two copies", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
originalObjectStream := metabasetest.RandObjectStream()
originalObj, originalSegments := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: originalObjectStream,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, false)
_, err := db.DeleteObjectExactVersion(ctx, metabase.DeleteObjectExactVersion{
Version: originalObj.Version,
ObjectLocation: originalObj.Location(),
})
require.NoError(t, err)
var expectedAncestorStreamID uuid.UUID
var expectedCopyStreamID uuid.UUID
var copies []metabase.RawCopy
if numberOfSegments > 0 {
segments, err := db.TestingAllSegments(ctx)
require.NoError(t, err)
require.NotEmpty(t, segments)
if segments[0].PiecesInAncestorSegment() {
if segments[0].StreamID == copyObject1.StreamID {
expectedCopyStreamID = copyObject1.StreamID
expectedAncestorStreamID = copyObject2.StreamID
} else {
expectedCopyStreamID = copyObject2.StreamID
expectedAncestorStreamID = copyObject1.StreamID
}
} else {
if segments[0].StreamID == copyObject1.StreamID {
expectedCopyStreamID = copyObject2.StreamID
expectedAncestorStreamID = copyObject1.StreamID
} else {
expectedCopyStreamID = copyObject1.StreamID
expectedAncestorStreamID = copyObject2.StreamID
}
}
copies = []metabase.RawCopy{
{
StreamID: expectedCopyStreamID,
AncestorStreamID: expectedAncestorStreamID,
}}
}
// set pieces in expected ancestor for verifcation
for _, segments := range [][]metabase.RawSegment{copySegments1, copySegments2} {
for i := range segments {
if segments[i].StreamID == expectedAncestorStreamID {
segments[i].Pieces = originalSegments[i].Pieces
}
}
}
// verify that two functioning copies are left and the original object is gone
metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(copyObject1),
metabase.RawObject(copyObject2),
},
Segments: append(copySegments1, copySegments2...),
Copies: copies,
}.Check(ctx, t, db)
})
})
}
})
}
func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
for _, numberOfSegments := range []int{0, 1, 3} {
@ -1043,8 +770,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
Version: copyObj.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{copyObj},
Segments: rawSegmentsToDeletedSegmentInfo(copySegments),
Objects: []metabase.Object{copyObj},
},
}.Check(ctx, t, db)
@ -1070,7 +796,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
},
}.Run(ctx, t, db, originalObjectStream, byte(numberOfSegments))
copyObject1, _, copySegments1 := metabasetest.CreateObjectCopy{
copyObject1, _, _ := metabasetest.CreateObjectCopy{
OriginalObject: originalObj,
}.Run(ctx, t, db, true)
copyObject2, _, copySegments2 := metabasetest.CreateObjectCopy{
@ -1083,8 +809,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
Version: copyObject1.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{copyObject1},
Segments: rawSegmentsToDeletedSegmentInfo(copySegments1),
Objects: []metabase.Object{copyObject1},
},
}.Check(ctx, t, db)
@ -1121,8 +846,7 @@ func TestDeleteCopyWithDuplicateMetadata(t *testing.T) {
Version: originalObj.Version,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{originalObj},
Segments: rawSegmentsToDeletedSegmentInfo(copySegments),
Objects: []metabase.Object{originalObj},
},
}.Check(ctx, t, db)
@ -1274,18 +998,12 @@ func TestDeleteObjectLastCommitted(t *testing.T) {
object := metabasetest.CreateObject(ctx, t, db, obj, 2)
expectedSegmentInfo := metabase.DeletedSegmentInfo{
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
}
metabasetest.DeleteObjectLastCommitted{
Opts: metabase.DeleteObjectLastCommitted{
ObjectLocation: location,
},
Result: metabase.DeleteObjectResult{
Objects: []metabase.Object{object},
Segments: []metabase.DeletedSegmentInfo{expectedSegmentInfo, expectedSegmentInfo},
Objects: []metabase.Object{object},
},
}.Check(ctx, t, db)
@ -1398,12 +1116,3 @@ func TestDeleteObjectLastCommitted(t *testing.T) {
})
})
}
func rawSegmentsToDeletedSegmentInfo(segments []metabase.RawSegment) []metabase.DeletedSegmentInfo {
result := make([]metabase.DeletedSegmentInfo, len(segments))
for i := range segments {
result[i].RootPieceID = segments[i].RootPieceID
result[i].Pieces = segments[i].Pieces
}
return result
}

View File

@ -42,6 +42,7 @@ func (s Segment) Inline() bool {
}
// PiecesInAncestorSegment returns true if remote alias pieces are to be found in an ancestor segment.
// TODO we will remove this method and related to code when all metadata will be migrated to segment copies.
func (s Segment) PiecesInAncestorSegment() bool {
return s.EncryptedSize != 0 && len(s.InlineData) == 0 && len(s.Pieces) == 0
}

View File

@ -4,7 +4,6 @@
package metabasetest
import (
"bytes"
"sort"
"testing"
"time"
@ -92,12 +91,6 @@ func sortRawCopies(copies []metabase.RawCopy) {
})
}
func sortDeletedSegments(segments []metabase.DeletedSegmentInfo) {
sort.Slice(segments, func(i, j int) bool {
return bytes.Compare(segments[i].RootPieceID[:], segments[j].RootPieceID[:]) < 0
})
}
func checkError(t require.TestingT, err error, errClass *errs.Class, errText string) {
if errClass != nil {
require.True(t, errClass.Has(err), "expected an error %v got %v", *errClass, err)

View File

@ -424,9 +424,6 @@ func (step DeleteObjectExactVersion) Check(ctx *testcontext.Context, t testing.T
sortObjects(result.Objects)
sortObjects(step.Result.Objects)
sortDeletedSegments(result.Segments)
sortDeletedSegments(step.Result.Segments)
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
require.Zero(t, diff)
}
@ -447,9 +444,6 @@ func (step DeletePendingObject) Check(ctx *testcontext.Context, t testing.TB, db
sortObjects(result.Objects)
sortObjects(step.Result.Objects)
sortDeletedSegments(result.Segments)
sortDeletedSegments(step.Result.Segments)
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
require.Zero(t, diff)
}
@ -470,9 +464,6 @@ func (step DeleteObjectsAllVersions) Check(ctx *testcontext.Context, t testing.T
sortObjects(result.Objects)
sortObjects(step.Result.Objects)
sortDeletedSegments(result.Segments)
sortDeletedSegments(step.Result.Segments)
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
require.Zero(t, diff)
}
@ -719,8 +710,6 @@ func (step DeleteObjectLastCommitted) Check(ctx *testcontext.Context, t testing.
sortObjects(result.Objects)
sortObjects(step.Result.Objects)
sortDeletedSegments(result.Segments)
sortDeletedSegments(step.Result.Segments)
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.EquateEmpty())
require.Zero(t, diff)