satellite/metainfo: implement deleteBucketObjects with metabase objects

iterator

This method replaces `deleteByPrefix` as at the moment only function of
this method was to delete objects in a bucket.

Change-Id: I5266103672003fbd64f3847f53760b1ba0016fe2
This commit is contained in:
Michal Niewrzal 2020-11-17 18:53:24 +01:00
parent f08e34f15e
commit d5c0264163

View File

@ -36,7 +36,6 @@ import (
"storj.io/storj/satellite/revocation"
"storj.io/storj/satellite/rewards"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/storage/meta"
)
const (
@ -409,20 +408,13 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
return &pb.BucketDeleteResponse{Bucket: convBucket}, nil
}
// deleteBucketNotEmpty deletes all objects that're complete or have first segment.
// On success, it returns only the number of complete objects that has been deleted
// since from the user's perspective, objects without last segment are invisible.
// deleteBucketNotEmpty deletes all objects from bucket and deletes this bucket.
// On success, it returns only the number of deleted objects.
func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uuid.UUID, bucketName []byte) ([]byte, int, error) {
// Delete all objects that has last segment.
deletedCount, err := endpoint.deleteByPrefix(ctx, projectID, bucketName, metabase.LastSegmentIndex)
deletedCount, err := endpoint.deleteBucketObjects(ctx, projectID, bucketName)
if err != nil {
return nil, 0, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// Delete all zombie objects that have first segment.
_, err = endpoint.deleteByPrefix(ctx, projectID, bucketName, metabase.FirstSegmentIndex)
if err != nil {
return nil, deletedCount, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
err = endpoint.metainfo.DeleteBucket(ctx, bucketName, projectID)
if err != nil {
@ -438,40 +430,36 @@ func (endpoint *Endpoint) deleteBucketNotEmpty(ctx context.Context, projectID uu
return bucketName, deletedCount, nil
}
// deleteByPrefix deletes all objects that matches with a prefix.
func (endpoint *Endpoint) deleteByPrefix(ctx context.Context, projectID uuid.UUID, bucketName []byte, segmentIdx int64) (deletedCount int, err error) {
// deleteBucketObjects deletes all objects in a bucket.
func (endpoint *Endpoint) deleteBucketObjects(ctx context.Context, projectID uuid.UUID, bucketName []byte) (deletedCount int, err error) {
defer mon.Task()(&ctx)(&err)
location, err := CreatePath(ctx, projectID, segmentIdx, bucketName, []byte{})
if err != nil {
return deletedCount, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
prefix := location.Encode()
for {
segments, more, err := endpoint.metainfo.List(ctx, prefix, "", true, 0, meta.None)
if err != nil {
return deletedCount, err
}
deleteReqs := make([]metabase.ObjectLocation, len(segments))
for i, segment := range segments {
deleteReqs[i] = metabase.ObjectLocation{
// TODO we should list pending + committed
err = endpoint.metainfo.metabaseDB.IterateObjectsAllVersions(ctx, metabase.IterateObjects{
ProjectID: projectID,
BucketName: string(bucketName),
Status: metabase.Committed,
Recursive: true,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
// TODO we should implement custom method in metabase instead of using iterator
entry := metabase.ObjectEntry{}
for it.Next(ctx, &entry) {
deletedObjects, err := endpoint.deleteObjectsPieces(ctx, metabase.ObjectLocation{
ProjectID: projectID,
BucketName: string(bucketName),
ObjectKey: metabase.ObjectKey(segment.Path),
ObjectKey: entry.ObjectKey,
})
if err != nil {
return err
}
}
deletedObjects, err := endpoint.deleteObjectsPieces(ctx, deleteReqs...)
if err != nil {
return deletedCount, err
deletedCount += len(deletedObjects)
}
deletedCount += len(deletedObjects)
if !more {
break
}
return nil
})
if err != nil {
return deletedCount, err
}
return deletedCount, nil
}