From bd5213f68b3e332e6b880c4759c8160598a357d3 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao Date: Wed, 5 Aug 2020 21:23:45 -0400 Subject: [PATCH] satellite/metainfo: implement batch delete for DeleteBucket This PR changes DeleteBucket to be able to delete all objects within a bucket if `DeleteAll` is set in `BucketDeleteRequest`. It also changes `DeleteBucket` API to treat `ErrBucketNotFound` as a successful delete operation instead of returning an error back to the client. Change-Id: I3a22c16224c7894f2d0c2a40ba1ae8717fa1005f --- go.mod | 2 +- go.sum | 4 +- satellite/metainfo/endpoint_test.go | 59 +++++++- satellite/metainfo/metainfo.go | 137 +++++++++++++++---- satellite/metainfo/objectdeletion/service.go | 1 - 5 files changed, 172 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index 3fc462d6f..d63f4ce1c 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( golang.org/x/sys v0.0.0-20200808120158-1030fc2bf1d9 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect - storj.io/common v0.0.0-20200811165556-40ea3df42d8e + storj.io/common v0.0.0-20200818131620-f9cddf66b4be storj.io/drpc v0.0.14 storj.io/monkit-jaeger v0.0.0-20200518165323-80778fc3f91b storj.io/private v0.0.0-20200729145012-46794d335b51 diff --git a/go.sum b/go.sum index ea2110e50..2b64605b2 100644 --- a/go.sum +++ b/go.sum @@ -726,8 +726,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 storj.io/common v0.0.0-20200424175742-65ac59022f4f/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0= storj.io/common v0.0.0-20200729140050-4c1ddac6fa63 h1:BkRvlginTJGi0yAkpN+4ZKm2YpG63bDSDFLQtXYxxdg= storj.io/common v0.0.0-20200729140050-4c1ddac6fa63/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM= -storj.io/common v0.0.0-20200811165556-40ea3df42d8e h1:AbcFef6fqg+E3h052sncpkApPbEubF/MK1hSgulxuTU= -storj.io/common v0.0.0-20200811165556-40ea3df42d8e/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM= +storj.io/common v0.0.0-20200818131620-f9cddf66b4be h1:kyX4v2M3ZNjlj0cFGON9as91Qm08Jg3XtVz7MQwjMy8= +storj.io/common v0.0.0-20200818131620-f9cddf66b4be/go.mod h1:ILr54ISCqCQ6MmIwT7eaR/fEGrBfgfxiPt8nmpWqnUM= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.14 h1:GCBdymTt1BRw4oHmmUZZlxYXLVRxxYj6x3Ivide2J+I= diff --git a/satellite/metainfo/endpoint_test.go b/satellite/metainfo/endpoint_test.go index 65bbb77f7..75089ca39 100644 --- a/satellite/metainfo/endpoint_test.go +++ b/satellite/metainfo/endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -74,9 +75,7 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) { } projectID, encryptedPath := getProjectIDAndEncPathFirstObject(ctx, t, satelliteSys) - _, err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces( - ctx, projectID, []byte(bucketName), encryptedPath, - ) + _, err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(ctx, projectID, []byte(bucketName), encryptedPath) require.NoError(t, err) planet.WaitForStorageNodeDeleters(ctx) @@ -466,6 +465,60 @@ func TestEndpoint_DeleteObjectPieces_ObjectWithoutLastSegment(t *testing.T) { }) } +func TestDeleteBucket(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 2, 4, 4), + testplanet.MaxSegmentSize(13*memory.KiB), + ), + }, + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] + satelliteSys := planet.Satellites[0] + uplnk := planet.Uplinks[0] + + expectedBucketName := "remote-segments-bucket" + + err := uplnk.Upload(ctx, planet.Satellites[0], expectedBucketName, "single-segment-object", testrand.Bytes(10*memory.KiB)) + require.NoError(t, err) + err = uplnk.Upload(ctx, planet.Satellites[0], expectedBucketName, "multi-segment-object", testrand.Bytes(50*memory.KiB)) + require.NoError(t, err) + err = uplnk.Upload(ctx, planet.Satellites[0], expectedBucketName, "remote-segment-inline-object", testrand.Bytes(33*memory.KiB)) + require.NoError(t, err) + + listResp, err := satelliteSys.API.Metainfo.Endpoint2.ListObjects(ctx, &pb.ObjectListRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Bucket: []byte(expectedBucketName), + }) + require.NoError(t, err) + require.Len(t, listResp.GetItems(), 3) + + delResp, err := satelliteSys.API.Metainfo.Endpoint2.DeleteBucket(ctx, &pb.BucketDeleteRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Name: []byte(expectedBucketName), + DeleteAll: true, + }) + require.NoError(t, err) + require.Equal(t, int64(3), delResp.DeletedObjectsCount) + + // confirm the bucket is deleted + buckets, err := satelliteSys.Metainfo.Endpoint2.ListBuckets(ctx, &pb.BucketListRequest{ + Header: &pb.RequestHeader{ + ApiKey: apiKey.SerializeRaw(), + }, + Direction: int32(storj.Forward), + }) + require.NoError(t, err) + require.Len(t, buckets.GetItems(), 0) + }) +} + func getProjectIDAndEncPathFirstObject( ctx context.Context, t *testing.T, satellite *testplanet.Satellite, ) (projectID uuid.UUID, encryptedPath []byte) { diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index de6a0c475..056a1518b 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -42,6 +42,7 @@ import ( const ( satIDExpiration = 48 * time.Hour lastSegment = -1 + firstSegment = 0 listLimit = 1000 deleteObjectPiecesSuccessThreshold = 0.75 @@ -400,9 +401,12 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete }) canList := err == nil - var bucket storj.Bucket + var ( + bucket storj.Bucket + convBucket *pb.Bucket + ) if canRead || canList { - // Info about deleted bucket is returned only if either Read, or List permission is granted + // Info about deleted bucket is returned only if either Read, or List permission is granted. bucket, err = endpoint.metainfo.GetBucket(ctx, req.Name, keyInfo.ProjectID) if err != nil { if storj.ErrBucketNotFound.Has(err) { @@ -410,28 +414,105 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete } return nil, err } + + convBucket, err = convertBucketToProto(bucket, endpoint.redundancyScheme()) + if err != nil { + return nil, err + } } err = endpoint.metainfo.DeleteBucket(ctx, req.Name, keyInfo.ProjectID) if err != nil { if !canRead && !canList { - // No error info is returned if neither Read, nor List permission is granted + // No error info is returned if neither Read, nor List permission is granted. return &pb.BucketDeleteResponse{}, nil } if ErrBucketNotEmpty.Has(err) { - return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error()) - } else if storj.ErrBucketNotFound.Has(err) { - return nil, rpcstatus.Error(rpcstatus.NotFound, err.Error()) + // List permission is required to delete all objects in a bucket. + if !req.GetDeleteAll() || !canList { + return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error()) + } + + _, deletedObjCount, err := endpoint.deleteBucketNotEmpty(ctx, keyInfo.ProjectID, req.Name) + if err != nil { + return nil, err + } + + return &pb.BucketDeleteResponse{Bucket: convBucket, DeletedObjectsCount: int64(deletedObjCount)}, nil + } + if storj.ErrBucketNotFound.Has(err) { + return &pb.BucketDeleteResponse{Bucket: convBucket}, nil } return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - convBucket, err := convertBucketToProto(bucket, endpoint.redundancyScheme()) + return &pb.BucketDeleteResponse{Bucket: convBucket}, nil +} + +// deleteBucketNotEmpty deletes all objects that're complete or have first segment. +// On success, it returns only the number of complete objects that has been deleted +// since from the user's perspective, objects without last segment are invisible. +func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) { + // Delete all objects that has last segment. + deletedCount, err := endpoint.deleteByPrefix(ctx, projectID, bucketName, lastSegment) if err != nil { - return nil, err + return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + // Delete all zombie objects that have first segment. + _, err = endpoint.deleteByPrefix(ctx, projectID, bucketName, firstSegment) + if err != nil { + return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error()) } - return &pb.BucketDeleteResponse{Bucket: convBucket}, nil + err = endpoint.metainfo.DeleteBucket(ctx, bucketName, projectID) + if err != nil { + if ErrBucketNotEmpty.Has(err) { + return nil, 0, rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error()) + } + if storj.ErrBucketNotFound.Has(err) { + return bucketName, 0, nil + } + return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } + + return bucketName, deletedCount, nil +} + +// deleteByPrefix deletes all objects that matches with a prefix. +func (endpoint *Endpoint) deleteByPrefix(ctx context.Context, projectID uuid.UUID, bucketName []byte, segmentIdx int64) (deletedCount int, err error) { + defer mon.Task()(&ctx)(&err) + + prefix, err := CreatePath(ctx, projectID, segmentIdx, bucketName, []byte{}) + if err != nil { + return deletedCount, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error()) + } + + for { + segments, more, err := endpoint.metainfo.List(ctx, prefix, "", true, 0, meta.None) + if err != nil { + return deletedCount, err + } + + deleteReqs := make([]*objectdeletion.ObjectIdentifier, len(segments)) + for i, segment := range segments { + deleteReqs[i] = &objectdeletion.ObjectIdentifier{ + ProjectID: projectID, + Bucket: bucketName, + EncryptedPath: []byte(segment.Path), + } + } + rep, err := endpoint.deleteObjectsPieces(ctx, deleteReqs...) + if err != nil { + return deletedCount, err + } + + deletedCount += len(rep.Deleted) + + if !more { + break + } + } + return deletedCount, nil } // ListBuckets returns buckets in a project where the bucket name matches the request cursor. @@ -1885,15 +1966,13 @@ func (endpoint *Endpoint) DeleteObjectPieces( ) (report objectdeletion.Report, err error) { defer mon.Task()(&ctx, projectID.String(), bucket, encryptedPath)(&err) - // We should ignore client cancelling and always try to delete segments. - ctx = context2.WithoutCancellation(ctx) - req := &objectdeletion.ObjectIdentifier{ ProjectID: projectID, Bucket: bucket, EncryptedPath: encryptedPath, } - results, err := endpoint.deleteObjects.Delete(ctx, req) + + report, err = endpoint.deleteObjectsPieces(ctx, req) if err != nil { endpoint.log.Error("failed to delete pointers", zap.Stringer("project_id", projectID), @@ -1901,9 +1980,26 @@ func (endpoint *Endpoint) DeleteObjectPieces( zap.Binary("encrypted_path", encryptedPath), zap.Error(err), ) - return report, rpcstatus.Error(rpcstatus.Internal, err.Error()) + // Only return an error if we failed to delete the pointers. If we failed + // to delete pieces, let garbage collector take care of it. + if objectdeletion.Error.Has(err) { + return report, rpcstatus.Error(rpcstatus.Internal, err.Error()) + } } + return report, nil +} + +func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, reqs ...*objectdeletion.ObjectIdentifier) (report objectdeletion.Report, err error) { + // We should ignore client cancelling and always try to delete segments. + ctx = context2.WithoutCancellation(ctx) + + results, err := endpoint.deleteObjects.Delete(ctx, reqs...) + if err != nil { + return report, err + } + + var requests []piecedeletion.Request for _, r := range results { pointers := r.DeletedPointers() report.Deleted = append(report.Deleted, r.Deleted...) @@ -1915,7 +2011,6 @@ func (endpoint *Endpoint) DeleteObjectPieces( continue } - var requests []piecedeletion.Request for node, pieces := range nodesPieces { requests = append(requests, piecedeletion.Request{ Node: storj.NodeURL{ @@ -1924,19 +2019,13 @@ func (endpoint *Endpoint) DeleteObjectPieces( Pieces: pieces, }) } + } - if err := endpoint.deletePieces.Delete(ctx, requests, deleteObjectPiecesSuccessThreshold); err != nil { - endpoint.log.Error("failed to delete pieces", - zap.Stringer("project_id", projectID), - zap.ByteString("bucket_name", bucket), - zap.Binary("encrypted_path", encryptedPath), - zap.Error(err), - ) - } + if err := endpoint.deletePieces.Delete(ctx, requests, deleteObjectPiecesSuccessThreshold); err != nil { + endpoint.log.Error("failed to delete pieces", zap.Error(err)) } return report, nil - } func (endpoint *Endpoint) redundancyScheme() *pb.RedundancyScheme { diff --git a/satellite/metainfo/objectdeletion/service.go b/satellite/metainfo/objectdeletion/service.go index 022d7c772..0391326c4 100644 --- a/satellite/metainfo/objectdeletion/service.go +++ b/satellite/metainfo/objectdeletion/service.go @@ -115,7 +115,6 @@ func (service *Service) Delete(ctx context.Context, requests ...*ObjectIdentifie } return reports, nil - } // DeletePointers returns a list of pointers and their paths that are deleted.