satellite/metainfo: stop sending delete requests to SN
We decided that we will stop sending explicit delete requests to nodes and we will cleanup deleted with GC instead. https://github.com/storj/storj/issues/5888 Change-Id: I65a308cca6fb17e97e3ba85eb3212584c96a32cd
This commit is contained in:
parent
4a49bc4b65
commit
6c08d5024e
@ -445,7 +445,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
|||||||
peer.Log.Named("metainfo:endpoint"),
|
peer.Log.Named("metainfo:endpoint"),
|
||||||
peer.Buckets.Service,
|
peer.Buckets.Service,
|
||||||
peer.Metainfo.Metabase,
|
peer.Metainfo.Metabase,
|
||||||
peer.Metainfo.PieceDeletion,
|
|
||||||
peer.Orders.Service,
|
peer.Orders.Service,
|
||||||
peer.Overlay.Service,
|
peer.Overlay.Service,
|
||||||
peer.DB.Attribution(),
|
peer.DB.Attribution(),
|
||||||
|
@ -268,7 +268,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
|||||||
|
|
||||||
// verify that we deleted only pieces for "remote-no-copy" object
|
// verify that we deleted only pieces for "remote-no-copy" object
|
||||||
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
||||||
require.Equal(t, singleRemoteUsed, afterTotalUsedByNodes)
|
require.Equal(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||||
|
|
||||||
// delete rest of objects to verify that everything will be removed also from SNs
|
// delete rest of objects to verify that everything will be removed also from SNs
|
||||||
for _, toDelete := range []string{
|
for _, toDelete := range []string{
|
||||||
@ -295,7 +295,7 @@ func TestGarbageCollectionWithCopies(t *testing.T) {
|
|||||||
|
|
||||||
// verify that nothing more was deleted from storage nodes after GC
|
// verify that nothing more was deleted from storage nodes after GC
|
||||||
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
afterTotalUsedByNodes = allSpaceUsedForPieces()
|
||||||
require.EqualValues(t, 0, afterTotalUsedByNodes)
|
require.EqualValues(t, totalUsedByNodes, afterTotalUsedByNodes)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,10 +26,6 @@ const (
|
|||||||
type DeleteBucketObjects struct {
|
type DeleteBucketObjects struct {
|
||||||
Bucket BucketLocation
|
Bucket BucketLocation
|
||||||
BatchSize int
|
BatchSize int
|
||||||
|
|
||||||
// DeletePieces is called for every batch of objects.
|
|
||||||
// Slice `segments` will be reused between calls.
|
|
||||||
DeletePieces func(ctx context.Context, segments []DeletedSegmentInfo) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var deleteObjectsCockroachSubSQL = `
|
var deleteObjectsCockroachSubSQL = `
|
||||||
@ -134,33 +130,7 @@ func (db *DB) deleteBucketObjectBatchWithCopyFeatureEnabled(ctx context.Context,
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
deletedObjectCount = int64(len(objects))
|
return int64(len(objects)), err
|
||||||
|
|
||||||
if opts.DeletePieces == nil {
|
|
||||||
// no callback, this should only be in test path
|
|
||||||
return deletedObjectCount, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, object := range objects {
|
|
||||||
if object.PromotedAncestor != nil {
|
|
||||||
// don't remove pieces, they are now linked to the new ancestor
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, segment := range object.Segments {
|
|
||||||
// Is there an advantage to batching this?
|
|
||||||
err := opts.DeletePieces(ctx, []DeletedSegmentInfo{
|
|
||||||
{
|
|
||||||
RootPieceID: segment.RootPieceID,
|
|
||||||
Pieces: segment.Pieces,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return deletedObjectCount, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return deletedObjectCount, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, location BucketLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) {
|
func (db *DB) scanBucketObjectsDeletionServerSideCopy(ctx context.Context, location BucketLocation, rows tagsql.Rows) (result []deletedObjectInfo, err error) {
|
||||||
@ -292,12 +262,5 @@ func (db *DB) deleteBucketObjectsWithCopyFeatureDisabled(ctx context.Context, op
|
|||||||
if len(deletedSegments) == 0 {
|
if len(deletedSegments) == 0 {
|
||||||
return deletedObjectCount, nil
|
return deletedObjectCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.DeletePieces != nil {
|
|
||||||
err = opts.DeletePieces(ctx, deletedSegments)
|
|
||||||
if err != nil {
|
|
||||||
return deletedObjectCount, Error.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ package metabase_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -68,9 +67,6 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: obj1.Location().Bucket(),
|
Bucket: obj1.Location().Bucket(),
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return errors.New("shouldn't be called")
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 0,
|
Deleted: 0,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
@ -83,26 +79,13 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
|
|
||||||
metabasetest.CreateObject(ctx, t, db, obj1, 2)
|
metabasetest.CreateObject(ctx, t, db, obj1, 2)
|
||||||
|
|
||||||
nSegments := 0
|
|
||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: obj1.Location().Bucket(),
|
Bucket: obj1.Location().Bucket(),
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
nSegments += len(segments)
|
|
||||||
|
|
||||||
for _, s := range segments {
|
|
||||||
if len(s.Pieces) != 1 {
|
|
||||||
return errors.New("expected 1 piece per segment")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 1,
|
Deleted: 1,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
require.Equal(t, 2, nSegments)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -114,9 +97,6 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: obj1.Location().Bucket(),
|
Bucket: obj1.Location().Bucket(),
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return errors.New("expected no segments")
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 1,
|
Deleted: 1,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
@ -131,27 +111,14 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
metabasetest.CreateObject(ctx, t, db, obj2, 2)
|
metabasetest.CreateObject(ctx, t, db, obj2, 2)
|
||||||
metabasetest.CreateObject(ctx, t, db, obj3, 2)
|
metabasetest.CreateObject(ctx, t, db, obj3, 2)
|
||||||
|
|
||||||
nSegments := 0
|
|
||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: obj1.Location().Bucket(),
|
Bucket: obj1.Location().Bucket(),
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
nSegments += len(segments)
|
|
||||||
|
|
||||||
for _, s := range segments {
|
|
||||||
if len(s.Pieces) != 1 {
|
|
||||||
return errors.New("expected 1 piece per segment")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 3,
|
Deleted: 3,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
require.Equal(t, 6, nSegments)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -239,22 +206,14 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
|
|
||||||
metabasetest.CreateObject(ctx, t, db, obj1, 37)
|
metabasetest.CreateObject(ctx, t, db, obj1, 37)
|
||||||
|
|
||||||
nSegments := 0
|
|
||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: obj1.Location().Bucket(),
|
Bucket: obj1.Location().Bucket(),
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
nSegments += len(segments)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 1,
|
Deleted: 1,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
require.Equal(t, 37, nSegments)
|
|
||||||
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -269,20 +228,14 @@ func TestDeleteBucketObjects(t *testing.T) {
|
|||||||
metabasetest.CreateObject(ctx, t, db, obj, 5)
|
metabasetest.CreateObject(ctx, t, db, obj, 5)
|
||||||
}
|
}
|
||||||
|
|
||||||
segmentsDeleted := 0
|
|
||||||
metabasetest.DeleteBucketObjects{
|
metabasetest.DeleteBucketObjects{
|
||||||
Opts: metabase.DeleteBucketObjects{
|
Opts: metabase.DeleteBucketObjects{
|
||||||
Bucket: root.Location().Bucket(),
|
Bucket: root.Location().Bucket(),
|
||||||
BatchSize: 1,
|
BatchSize: 1,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
segmentsDeleted += len(segments)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Deleted: 5,
|
Deleted: 5,
|
||||||
}.Check(ctx, t, db)
|
}.Check(ctx, t, db)
|
||||||
|
|
||||||
require.Equal(t, 25, segmentsDeleted)
|
|
||||||
metabasetest.Verify{}.Check(ctx, t, db)
|
metabasetest.Verify{}.Check(ctx, t, db)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -310,9 +263,6 @@ func TestDeleteBucketObjectsParallel(t *testing.T) {
|
|||||||
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
_, err := db.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||||
Bucket: root.Location().Bucket(),
|
Bucket: root.Location().Bucket(),
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -334,9 +284,6 @@ func TestDeleteBucketObjectsCancel(t *testing.T) {
|
|||||||
_, err := db.DeleteBucketObjects(testCtx, metabase.DeleteBucketObjects{
|
_, err := db.DeleteBucketObjects(testCtx, metabase.DeleteBucketObjects{
|
||||||
Bucket: object.Location().Bucket(),
|
Bucket: object.Location().Bucket(),
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
@ -382,9 +329,6 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
|||||||
BucketName: "copy-bucket",
|
BucketName: "copy-bucket",
|
||||||
},
|
},
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -426,9 +370,6 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
|||||||
BucketName: "original-bucket",
|
BucketName: "original-bucket",
|
||||||
},
|
},
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -493,9 +434,6 @@ func TestDeleteBucketWithCopies(t *testing.T) {
|
|||||||
BucketName: "bucket2",
|
BucketName: "bucket2",
|
||||||
},
|
},
|
||||||
BatchSize: 2,
|
BatchSize: 2,
|
||||||
DeletePieces: func(ctx context.Context, segments []metabase.DeletedSegmentInfo) error {
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -28,7 +28,6 @@ import (
|
|||||||
"storj.io/storj/satellite/console"
|
"storj.io/storj/satellite/console"
|
||||||
"storj.io/storj/satellite/internalpb"
|
"storj.io/storj/satellite/internalpb"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/metainfo/piecedeletion"
|
|
||||||
"storj.io/storj/satellite/metainfo/pointerverification"
|
"storj.io/storj/satellite/metainfo/pointerverification"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
"storj.io/storj/satellite/overlay"
|
"storj.io/storj/satellite/overlay"
|
||||||
@ -67,7 +66,6 @@ type Endpoint struct {
|
|||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
buckets *buckets.Service
|
buckets *buckets.Service
|
||||||
metabase *metabase.DB
|
metabase *metabase.DB
|
||||||
deletePieces *piecedeletion.Service
|
|
||||||
orders *orders.Service
|
orders *orders.Service
|
||||||
overlay *overlay.Service
|
overlay *overlay.Service
|
||||||
attributions attribution.DB
|
attributions attribution.DB
|
||||||
@ -88,8 +86,7 @@ type Endpoint struct {
|
|||||||
|
|
||||||
// NewEndpoint creates new metainfo endpoint instance.
|
// NewEndpoint creates new metainfo endpoint instance.
|
||||||
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
|
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
|
||||||
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
|
orders *orders.Service, cache *overlay.Service, attributions attribution.DB, peerIdentities overlay.PeerIdentities,
|
||||||
attributions attribution.DB, peerIdentities overlay.PeerIdentities,
|
|
||||||
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
|
apiKeys APIKeys, projectUsage *accounting.Service, projectLimits *accounting.ProjectLimitCache, projects console.Projects,
|
||||||
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
|
satellite signing.Signer, revocations revocation.DB, config Config) (*Endpoint, error) {
|
||||||
// TODO do something with too many params
|
// TODO do something with too many params
|
||||||
@ -115,7 +112,6 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
|
|||||||
log: log,
|
log: log,
|
||||||
buckets: buckets,
|
buckets: buckets,
|
||||||
metabase: metabaseDB,
|
metabase: metabaseDB,
|
||||||
deletePieces: deletePieces,
|
|
||||||
orders: orders,
|
orders: orders,
|
||||||
overlay: cache,
|
overlay: cache,
|
||||||
attributions: attributions,
|
attributions: attributions,
|
||||||
|
@ -289,10 +289,6 @@ func (endpoint *Endpoint) deleteBucketObjects(ctx context.Context, projectID uui
|
|||||||
bucketLocation := metabase.BucketLocation{ProjectID: projectID, BucketName: string(bucketName)}
|
bucketLocation := metabase.BucketLocation{ProjectID: projectID, BucketName: string(bucketName)}
|
||||||
deletedObjects, err := endpoint.metabase.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
deletedObjects, err := endpoint.metabase.DeleteBucketObjects(ctx, metabase.DeleteBucketObjects{
|
||||||
Bucket: bucketLocation,
|
Bucket: bucketLocation,
|
||||||
DeletePieces: func(ctx context.Context, deleted []metabase.DeletedSegmentInfo) error {
|
|
||||||
endpoint.deleteSegmentPieces(ctx, deleted)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return deletedObjects, Error.Wrap(err)
|
return deletedObjects, Error.Wrap(err)
|
||||||
|
@ -14,7 +14,6 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"storj.io/common/context2"
|
|
||||||
"storj.io/common/encryption"
|
"storj.io/common/encryption"
|
||||||
"storj.io/common/errs2"
|
"storj.io/common/errs2"
|
||||||
"storj.io/common/macaroon"
|
"storj.io/common/macaroon"
|
||||||
@ -25,7 +24,6 @@ import (
|
|||||||
"storj.io/storj/satellite/buckets"
|
"storj.io/storj/satellite/buckets"
|
||||||
"storj.io/storj/satellite/internalpb"
|
"storj.io/storj/satellite/internalpb"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/metainfo/piecedeletion"
|
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -252,9 +250,6 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit
|
|||||||
Encryption: encryption,
|
Encryption: encryption,
|
||||||
|
|
||||||
DisallowDelete: !allowDelete,
|
DisallowDelete: !allowDelete,
|
||||||
OnDelete: func(segments []metabase.DeletedSegmentInfo) {
|
|
||||||
endpoint.deleteSegmentPieces(ctx, segments)
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
// uplink can send empty metadata with not empty key/nonce
|
// uplink can send empty metadata with not empty key/nonce
|
||||||
// we need to fix it on uplink side but that part will be
|
// we need to fix it on uplink side but that part will be
|
||||||
@ -1490,15 +1485,15 @@ func (endpoint *Endpoint) DeleteCommittedObject(
|
|||||||
return nil, Error.Wrap(err)
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
deletedObjects, err = endpoint.deleteObjectsPieces(ctx, result)
|
deletedObjects, err = endpoint.deleteObjectResultToProto(ctx, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
endpoint.log.Error("failed to delete pointers",
|
endpoint.log.Error("failed to convert delete object result",
|
||||||
zap.Stringer("project", projectID),
|
zap.Stringer("project", projectID),
|
||||||
zap.String("bucket", bucket),
|
zap.String("bucket", bucket),
|
||||||
zap.Binary("object", []byte(object)),
|
zap.Binary("object", []byte(object)),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
return deletedObjects, Error.Wrap(err)
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return deletedObjects, nil
|
return deletedObjects, nil
|
||||||
@ -1534,15 +1529,15 @@ func (endpoint *Endpoint) DeleteObjectAnyStatus(ctx context.Context, location me
|
|||||||
return nil, Error.Wrap(err)
|
return nil, Error.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
deletedObjects, err = endpoint.deleteObjectsPieces(ctx, result)
|
deletedObjects, err = endpoint.deleteObjectResultToProto(ctx, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
endpoint.log.Error("failed to delete pointers",
|
endpoint.log.Error("failed to convert delete object result",
|
||||||
zap.Stringer("project", location.ProjectID),
|
zap.Stringer("project", location.ProjectID),
|
||||||
zap.String("bucket", location.BucketName),
|
zap.String("bucket", location.BucketName),
|
||||||
zap.Binary("object", []byte(location.ObjectKey)),
|
zap.Binary("object", []byte(location.ObjectKey)),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
return deletedObjects, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return deletedObjects, nil
|
return deletedObjects, nil
|
||||||
@ -1563,13 +1558,10 @@ func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metaba
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return endpoint.deleteObjectsPieces(ctx, result)
|
return endpoint.deleteObjectResultToProto(ctx, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, result metabase.DeleteObjectResult) (deletedObjects []*pb.Object, err error) {
|
func (endpoint *Endpoint) deleteObjectResultToProto(ctx context.Context, result metabase.DeleteObjectResult) (deletedObjects []*pb.Object, err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
// We should ignore client cancelling and always try to delete segments.
|
|
||||||
ctx = context2.WithoutCancellation(ctx)
|
|
||||||
deletedObjects = make([]*pb.Object, len(result.Objects))
|
deletedObjects = make([]*pb.Object, len(result.Objects))
|
||||||
for i, object := range result.Objects {
|
for i, object := range result.Objects {
|
||||||
deletedObject, err := endpoint.objectToProto(ctx, object, endpoint.defaultRS)
|
deletedObject, err := endpoint.objectToProto(ctx, object, endpoint.defaultRS)
|
||||||
@ -1579,50 +1571,9 @@ func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, result metaba
|
|||||||
deletedObjects[i] = deletedObject
|
deletedObjects[i] = deletedObject
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoint.deleteSegmentPieces(ctx, result.Segments)
|
|
||||||
|
|
||||||
return deletedObjects, nil
|
return deletedObjects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (endpoint *Endpoint) deleteSegmentPieces(ctx context.Context, segments []metabase.DeletedSegmentInfo) {
|
|
||||||
var err error
|
|
||||||
defer mon.Task()(&ctx)(&err)
|
|
||||||
|
|
||||||
nodesPieces := groupPiecesByNodeID(segments)
|
|
||||||
|
|
||||||
var requests []piecedeletion.Request
|
|
||||||
for node, pieces := range nodesPieces {
|
|
||||||
requests = append(requests, piecedeletion.Request{
|
|
||||||
Node: storj.NodeURL{
|
|
||||||
ID: node,
|
|
||||||
},
|
|
||||||
Pieces: pieces,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only return an error if we failed to delete the objects. If we failed
|
|
||||||
// to delete pieces, let garbage collector take care of it.
|
|
||||||
err = endpoint.deletePieces.Delete(ctx, requests)
|
|
||||||
if err != nil {
|
|
||||||
endpoint.log.Error("failed to delete pieces", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// groupPiecesByNodeID returns a map that contains pieces with node id as the key.
|
|
||||||
func groupPiecesByNodeID(segments []metabase.DeletedSegmentInfo) map[storj.NodeID][]storj.PieceID {
|
|
||||||
piecesToDelete := map[storj.NodeID][]storj.PieceID{}
|
|
||||||
|
|
||||||
for _, segment := range segments {
|
|
||||||
deriver := segment.RootPieceID.Deriver()
|
|
||||||
for _, piece := range segment.Pieces {
|
|
||||||
pieceID := deriver.Derive(piece.StorageNode, int32(piece.Number))
|
|
||||||
piecesToDelete[piece.StorageNode] = append(piecesToDelete[piece.StorageNode], pieceID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return piecesToDelete
|
|
||||||
}
|
|
||||||
|
|
||||||
// Server side move.
|
// Server side move.
|
||||||
|
|
||||||
// BeginMoveObject begins moving object to different key.
|
// BeginMoveObject begins moving object to different key.
|
||||||
|
@ -37,7 +37,6 @@ import (
|
|||||||
"storj.io/storj/satellite/internalpb"
|
"storj.io/storj/satellite/internalpb"
|
||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
"storj.io/storj/storagenode/blobstore"
|
|
||||||
"storj.io/uplink"
|
"storj.io/uplink"
|
||||||
"storj.io/uplink/private/metaclient"
|
"storj.io/uplink/private/metaclient"
|
||||||
"storj.io/uplink/private/object"
|
"storj.io/uplink/private/object"
|
||||||
@ -1445,43 +1444,10 @@ func TestEndpoint_Object_With_StorageNodes(t *testing.T) {
|
|||||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, testrand.Bytes(5*memory.KiB))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, segments, 1)
|
|
||||||
require.NotZero(t, len(segments[0].Pieces))
|
|
||||||
|
|
||||||
for _, piece := range segments[0].Pieces {
|
|
||||||
node := planet.FindNode(piece.StorageNode)
|
|
||||||
pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
||||||
|
|
||||||
piece, err := node.DB.Pieces().Stat(ctx, blobstore.BlobRef{
|
|
||||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
||||||
Key: pieceID.Bytes(),
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, piece)
|
|
||||||
}
|
|
||||||
|
|
||||||
oldPieces := segments[0].Pieces
|
|
||||||
expectedData := testrand.Bytes(5 * memory.KiB)
|
expectedData := testrand.Bytes(5 * memory.KiB)
|
||||||
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, objectName, expectedData)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
planet.WaitForStorageNodeDeleters(ctx)
|
|
||||||
|
|
||||||
// verify that old object pieces are not stored on storage nodes anymore
|
|
||||||
for _, piece := range oldPieces {
|
|
||||||
node := planet.FindNode(piece.StorageNode)
|
|
||||||
pieceID := segments[0].RootPieceID.Derive(piece.StorageNode, int32(piece.Number))
|
|
||||||
|
|
||||||
piece, err := node.DB.Pieces().Stat(ctx, blobstore.BlobRef{
|
|
||||||
Namespace: planet.Satellites[0].ID().Bytes(),
|
|
||||||
Key: pieceID.Bytes(),
|
|
||||||
})
|
|
||||||
require.Error(t, err)
|
|
||||||
require.Nil(t, piece)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucketName, objectName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedData, data)
|
require.Equal(t, expectedData, data)
|
||||||
@ -1702,9 +1668,6 @@ func testDeleteObject(t *testing.T,
|
|||||||
),
|
),
|
||||||
},
|
},
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
var (
|
|
||||||
percentExp = 0.75
|
|
||||||
)
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
tc := tc
|
tc := tc
|
||||||
t.Run(tc.caseDescription, func(t *testing.T) {
|
t.Run(tc.caseDescription, func(t *testing.T) {
|
||||||
@ -1735,12 +1698,8 @@ func testDeleteObject(t *testing.T,
|
|||||||
totalUsedSpaceAfterDelete += piecesTotal
|
totalUsedSpaceAfterDelete += piecesTotal
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point we can only guarantee that the 75% of the SNs pieces
|
// we are not deleting data from SN right away so used space should be the same
|
||||||
// are delete due to the success threshold
|
require.Equal(t, totalUsedSpace, totalUsedSpaceAfterDelete)
|
||||||
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
|
|
||||||
if deletedUsedSpace < percentExp {
|
|
||||||
t.Fatalf("deleted used space is less than %f%%. Got %f", percentExp, deletedUsedSpace)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -1781,12 +1740,14 @@ func testDeleteObject(t *testing.T,
|
|||||||
// Shutdown the first numToShutdown storage nodes before we delete the pieces
|
// Shutdown the first numToShutdown storage nodes before we delete the pieces
|
||||||
// and collect used space values for those nodes
|
// and collect used space values for those nodes
|
||||||
snUsedSpace := make([]int64, len(planet.StorageNodes))
|
snUsedSpace := make([]int64, len(planet.StorageNodes))
|
||||||
for i := 0; i < numToShutdown; i++ {
|
for i, node := range planet.StorageNodes {
|
||||||
var err error
|
var err error
|
||||||
snUsedSpace[i], _, err = planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
|
snUsedSpace[i], _, err = node.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
|
if i < numToShutdown {
|
||||||
|
require.NoError(t, planet.StopPeer(node))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
|
||||||
@ -1797,12 +1758,8 @@ func testDeleteObject(t *testing.T,
|
|||||||
|
|
||||||
planet.WaitForStorageNodeDeleters(ctx)
|
planet.WaitForStorageNodeDeleters(ctx)
|
||||||
|
|
||||||
// Check that storage nodes that were offline when deleting the pieces
|
// we are not deleting data from SN right away so used space should be the same
|
||||||
// they are still holding data
|
// for online and shutdown/offline node
|
||||||
// Check that storage nodes which are online when deleting pieces don't
|
|
||||||
// hold any piece
|
|
||||||
// We are comparing used space from before deletion for nodes that were
|
|
||||||
// offline, values for available nodes are 0
|
|
||||||
for i, sn := range planet.StorageNodes {
|
for i, sn := range planet.StorageNodes {
|
||||||
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user