satellite/metainfo/objectdeletion: replace
objectdeletion.ObjectIdentifier with metabase.ObjectLocation Another change to use metabase.ObjectLocation across satellite codebase to avoid duplication and provide better type safety. Change-Id: I82cb52b94a9107ed3144255a6ef4ad9f3fc1ca63
This commit is contained in:
parent
c4d6f472fc
commit
c753d17e8f
@ -464,12 +464,12 @@ func (endpoint *Endpoint) deleteByPrefix(ctx context.Context, projectID uuid.UUI
|
||||
return deletedCount, err
|
||||
}
|
||||
|
||||
deleteReqs := make([]*objectdeletion.ObjectIdentifier, len(segments))
|
||||
deleteReqs := make([]*metabase.ObjectLocation, len(segments))
|
||||
for i, segment := range segments {
|
||||
deleteReqs[i] = &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: projectID,
|
||||
Bucket: bucketName,
|
||||
EncryptedPath: []byte(segment.Path),
|
||||
deleteReqs[i] = &metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: string(bucketName),
|
||||
ObjectKey: metabase.ObjectKey(segment.Path),
|
||||
}
|
||||
}
|
||||
rep, err := endpoint.deleteObjectsPieces(ctx, deleteReqs...)
|
||||
@ -2016,10 +2016,10 @@ func (endpoint *Endpoint) DeleteObjectPieces(
|
||||
) (report objectdeletion.Report, err error) {
|
||||
defer mon.Task()(&ctx, projectID.String(), bucket, encryptedPath)(&err)
|
||||
|
||||
req := &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: projectID,
|
||||
Bucket: bucket,
|
||||
EncryptedPath: encryptedPath,
|
||||
req := &metabase.ObjectLocation{
|
||||
ProjectID: projectID,
|
||||
BucketName: string(bucket),
|
||||
ObjectKey: metabase.ObjectKey(encryptedPath),
|
||||
}
|
||||
|
||||
report, err = endpoint.deleteObjectsPieces(ctx, req)
|
||||
@ -2040,7 +2040,7 @@ func (endpoint *Endpoint) DeleteObjectPieces(
|
||||
return report, nil
|
||||
}
|
||||
|
||||
func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, reqs ...*objectdeletion.ObjectIdentifier) (report objectdeletion.Report, err error) {
|
||||
func (endpoint *Endpoint) deleteObjectsPieces(ctx context.Context, reqs ...*metabase.ObjectLocation) (report objectdeletion.Report, err error) {
|
||||
// We should ignore client cancelling and always try to delete segments.
|
||||
ctx = context2.WithoutCancellation(ctx)
|
||||
|
||||
|
@ -1,86 +0,0 @@
|
||||
// Copyright (C) 2020 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package objectdeletion
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
)
|
||||
|
||||
// ObjectIdentifier contains information about an object
|
||||
// that are needed for delete operation.
|
||||
type ObjectIdentifier struct {
|
||||
ProjectID uuid.UUID
|
||||
Bucket []byte
|
||||
EncryptedPath []byte
|
||||
}
|
||||
|
||||
// SegmentPath returns a raw path for a specific segment index.
|
||||
func (id *ObjectIdentifier) SegmentPath(segmentIndex int64) ([]byte, error) {
|
||||
if segmentIndex < lastSegmentIndex {
|
||||
return nil, errors.New("invalid segment index")
|
||||
}
|
||||
segment := "l"
|
||||
if segmentIndex > lastSegmentIndex {
|
||||
segment = "s" + strconv.FormatInt(segmentIndex, 10)
|
||||
}
|
||||
|
||||
return []byte(storj.JoinPaths(
|
||||
id.ProjectID.String(),
|
||||
segment,
|
||||
string(id.Bucket),
|
||||
string(id.EncryptedPath),
|
||||
)), nil
|
||||
}
|
||||
|
||||
// ParseSegmentPath parses a raw path and returns an
|
||||
// object identifier from that path along with the path's segment index.
|
||||
// example: <project-id>/01/<bucket-name>/<encrypted-path>
|
||||
func ParseSegmentPath(rawPath []byte) (ObjectIdentifier, int64, error) {
|
||||
elements := storj.SplitPath(string(rawPath))
|
||||
if len(elements) < 4 {
|
||||
return ObjectIdentifier{}, -1, errs.New("invalid path %q", string(rawPath))
|
||||
}
|
||||
|
||||
projectID, err := uuid.FromString(elements[0])
|
||||
if err != nil {
|
||||
return ObjectIdentifier{}, -1, errs.Wrap(err)
|
||||
}
|
||||
var segmentIndex int64
|
||||
if elements[1] == "l" {
|
||||
segmentIndex = lastSegmentIndex
|
||||
} else {
|
||||
segmentIndex, err = strconv.ParseInt(elements[1][1:], 10, 64) // remove the strng `s` from segment index we got
|
||||
|
||||
if err != nil {
|
||||
return ObjectIdentifier{}, -1, errs.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
return ObjectIdentifier{
|
||||
ProjectID: projectID,
|
||||
Bucket: []byte(elements[2]),
|
||||
EncryptedPath: []byte(storj.JoinPaths(elements[3:]...)),
|
||||
}, segmentIndex, nil
|
||||
}
|
||||
|
||||
// Key returns a string concatenated by all object identifier fields plus 0.
|
||||
// It's a unique string used to identify an object.
|
||||
// It's not a valid key for retrieving pointers from metainfo database.
|
||||
func (id *ObjectIdentifier) Key() string {
|
||||
builder := strings.Builder{}
|
||||
// we don't need the return value here
|
||||
// Write will always return the length of the argument and nil error
|
||||
_, _ = builder.Write(id.ProjectID[:])
|
||||
_, _ = builder.Write(id.Bucket)
|
||||
_, _ = builder.Write(id.EncryptedPath)
|
||||
|
||||
return builder.String()
|
||||
}
|
@ -50,8 +50,8 @@ func (r Report) DeletedObjects() ([]*pb.Object, error) {
|
||||
continue
|
||||
}
|
||||
object := &pb.Object{
|
||||
Bucket: d.Bucket,
|
||||
EncryptedPath: d.EncryptedPath,
|
||||
Bucket: []byte(d.BucketName),
|
||||
EncryptedPath: []byte(d.ObjectKey),
|
||||
Version: -1,
|
||||
ExpiresAt: d.LastSegment.ExpirationDate,
|
||||
CreatedAt: d.LastSegment.CreationDate,
|
||||
@ -90,16 +90,16 @@ func (r Report) DeletedObjects() ([]*pb.Object, error) {
|
||||
}
|
||||
|
||||
// GenerateReport returns the result of a delete, success, or failure.
|
||||
func GenerateReport(ctx context.Context, log *zap.Logger, requests []*ObjectIdentifier, deletedPaths []metabase.SegmentKey, pointers []*pb.Pointer) Report {
|
||||
func GenerateReport(ctx context.Context, log *zap.Logger, requests []*metabase.ObjectLocation, deletedPaths []metabase.SegmentKey, pointers []*pb.Pointer) Report {
|
||||
defer mon.Task()(&ctx)(nil)
|
||||
|
||||
report := Report{}
|
||||
deletedObjects := make(map[string]*ObjectState)
|
||||
deletedObjects := make(map[metabase.ObjectLocation]*ObjectState)
|
||||
for i, path := range deletedPaths {
|
||||
if path == nil {
|
||||
continue
|
||||
}
|
||||
id, segmentIdx, err := ParseSegmentPath(path)
|
||||
segmentLocation, err := metabase.ParseSegmentKey(path)
|
||||
if err != nil {
|
||||
log.Debug("failed to parse deleted segmnt path for report",
|
||||
zap.String("Raw Segment Path", string(path)),
|
||||
@ -107,31 +107,33 @@ func GenerateReport(ctx context.Context, log *zap.Logger, requests []*ObjectIden
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := deletedObjects[id.Key()]; !ok {
|
||||
deletedObjects[id.Key()] = &ObjectState{
|
||||
object := segmentLocation.Object()
|
||||
|
||||
if _, ok := deletedObjects[object]; !ok {
|
||||
deletedObjects[object] = &ObjectState{
|
||||
OtherSegments: []*pb.Pointer{},
|
||||
}
|
||||
}
|
||||
|
||||
switch segmentIdx {
|
||||
case lastSegmentIndex:
|
||||
deletedObjects[id.Key()].LastSegment = pointers[i]
|
||||
switch {
|
||||
case segmentLocation.IsLast():
|
||||
deletedObjects[object].LastSegment = pointers[i]
|
||||
default:
|
||||
deletedObjects[id.Key()].OtherSegments = append(deletedObjects[id.Key()].OtherSegments, pointers[i])
|
||||
deletedObjects[object].OtherSegments = append(deletedObjects[object].OtherSegments, pointers[i])
|
||||
}
|
||||
}
|
||||
|
||||
// populate report with failed and deleted objects
|
||||
for _, req := range requests {
|
||||
state, ok := deletedObjects[req.Key()]
|
||||
state, ok := deletedObjects[*req]
|
||||
if !ok {
|
||||
report.Failed = append(report.Failed, &ObjectState{
|
||||
ObjectIdentifier: *req,
|
||||
ObjectLocation: *req,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
state.ObjectIdentifier = *req
|
||||
state.ObjectLocation = *req
|
||||
report.Deleted = append(report.Deleted, state)
|
||||
}
|
||||
return report
|
||||
|
@ -47,31 +47,31 @@ func TestReport(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func createDeletedItems(requests []*objectdeletion.ObjectIdentifier, numDeleted int) ([]metabase.SegmentKey, []*pb.Pointer, error) {
|
||||
func createDeletedItems(requests []*metabase.ObjectLocation, numDeleted int) ([]metabase.SegmentKey, []*pb.Pointer, error) {
|
||||
if numDeleted > len(requests) {
|
||||
return nil, nil, errs.New("invalid argument")
|
||||
}
|
||||
paths := make([]metabase.SegmentKey, 0, numDeleted)
|
||||
pointers := make([]*pb.Pointer, 0, numDeleted)
|
||||
for i := 0; i < numDeleted; i++ {
|
||||
path, err := requests[i].SegmentPath(int64(testrand.Intn(10)))
|
||||
segmentLocation, err := requests[i].Segment(int64(testrand.Intn(10)))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
paths = append(paths, path)
|
||||
paths = append(paths, segmentLocation.Encode())
|
||||
pointers = append(pointers, &pb.Pointer{})
|
||||
}
|
||||
return paths, pointers, nil
|
||||
}
|
||||
|
||||
func createRequests(numRequests int) []*objectdeletion.ObjectIdentifier {
|
||||
requests := make([]*objectdeletion.ObjectIdentifier, 0, numRequests)
|
||||
func createRequests(numRequests int) []*metabase.ObjectLocation {
|
||||
requests := make([]*metabase.ObjectLocation, 0, numRequests)
|
||||
|
||||
for i := 0; i < numRequests; i++ {
|
||||
obj := objectdeletion.ObjectIdentifier{
|
||||
ProjectID: testrand.UUID(),
|
||||
Bucket: []byte("test"),
|
||||
EncryptedPath: []byte(strconv.Itoa(i) + "test"),
|
||||
obj := metabase.ObjectLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "test",
|
||||
ObjectKey: metabase.ObjectKey(strconv.Itoa(i) + "test"),
|
||||
}
|
||||
requests = append(requests, &obj)
|
||||
}
|
||||
|
@ -16,11 +16,6 @@ import (
|
||||
"storj.io/storj/satellite/metainfo/metabase"
|
||||
)
|
||||
|
||||
const (
|
||||
lastSegmentIndex = -1
|
||||
firstSegmentIndex = 0
|
||||
)
|
||||
|
||||
var (
|
||||
mon = monkit.Package()
|
||||
// Error is a general object deletion error.
|
||||
@ -86,7 +81,7 @@ func NewService(log *zap.Logger, pointerDB PointerDB, config Config) (*Service,
|
||||
}
|
||||
|
||||
// Delete ensures that all pointers belongs to an object no longer exists.
|
||||
func (service *Service) Delete(ctx context.Context, requests ...*ObjectIdentifier) (reports []Report, err error) {
|
||||
func (service *Service) Delete(ctx context.Context, requests ...*metabase.ObjectLocation) (reports []Report, err error) {
|
||||
defer mon.Task()(&ctx, len(requests))(&err)
|
||||
|
||||
// When number of requests are more than the maximum limit, we let it overflow,
|
||||
@ -105,12 +100,12 @@ func (service *Service) Delete(ctx context.Context, requests ...*ObjectIdentifie
|
||||
if batchSize > service.config.MaxObjectsPerRequest {
|
||||
batchSize = service.config.MaxObjectsPerRequest
|
||||
}
|
||||
pointers, paths, err := service.DeletePointers(ctx, requests[:batchSize])
|
||||
pointers, keys, err := service.DeletePointers(ctx, requests[:batchSize])
|
||||
if err != nil {
|
||||
return reports, Error.Wrap(err)
|
||||
}
|
||||
|
||||
report := GenerateReport(ctx, service.log, requests[:batchSize], paths, pointers)
|
||||
report := GenerateReport(ctx, service.log, requests[:batchSize], keys, pointers)
|
||||
reports = append(reports, report)
|
||||
requests = requests[batchSize:]
|
||||
}
|
||||
@ -118,49 +113,41 @@ func (service *Service) Delete(ctx context.Context, requests ...*ObjectIdentifie
|
||||
return reports, nil
|
||||
}
|
||||
|
||||
// DeletePointers returns a list of pointers and their paths that are deleted.
|
||||
// DeletePointers returns a list of pointers and their keys that are deleted.
|
||||
// If a object is not found, we will consider it as a successful delete.
|
||||
func (service *Service) DeletePointers(ctx context.Context, requests []*ObjectIdentifier) (_ []*pb.Pointer, _ []metabase.SegmentKey, err error) {
|
||||
func (service *Service) DeletePointers(ctx context.Context, requests []*metabase.ObjectLocation) (_ []*pb.Pointer, _ []metabase.SegmentKey, err error) {
|
||||
defer mon.Task()(&ctx, len(requests))(&err)
|
||||
|
||||
// get first and last segment to determine the object state
|
||||
lastAndFirstSegmentsPath := []metabase.SegmentKey{}
|
||||
lastAndFirstSegmentsKey := []metabase.SegmentKey{}
|
||||
for _, req := range requests {
|
||||
lastSegmentPath, err := req.SegmentPath(lastSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
firstSegmentPath, err := req.SegmentPath(firstSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
lastAndFirstSegmentsPath = append(lastAndFirstSegmentsPath,
|
||||
lastSegmentPath,
|
||||
firstSegmentPath,
|
||||
lastAndFirstSegmentsKey = append(lastAndFirstSegmentsKey,
|
||||
req.LastSegment().Encode(),
|
||||
req.FirstSegment().Encode(),
|
||||
)
|
||||
}
|
||||
|
||||
// Get pointers from the database
|
||||
pointers, err := service.pointers.GetItems(ctx, lastAndFirstSegmentsPath)
|
||||
pointers, err := service.pointers.GetItems(ctx, lastAndFirstSegmentsKey)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
states, err := CreateObjectStates(ctx, requests, pointers, lastAndFirstSegmentsPath)
|
||||
states, err := CreateObjectStates(ctx, requests, pointers, lastAndFirstSegmentsKey)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
pathsToDel, err := service.generateSegmentPathsForCompleteObjects(ctx, states)
|
||||
keysToDel, err := service.generateSegmentKeysForCompleteObjects(ctx, states)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
zombiePaths, err := service.collectSegmentPathsForZombieObjects(ctx, states)
|
||||
zombieKeys, err := service.collectSegmentKeysForZombieObjects(ctx, states)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
pathsToDel = append(pathsToDel, zombiePaths...)
|
||||
keysToDel = append(keysToDel, zombieKeys...)
|
||||
|
||||
// Delete pointers and fetch the piece ids.
|
||||
//
|
||||
@ -171,19 +158,19 @@ func (service *Service) DeletePointers(ctx context.Context, requests []*ObjectId
|
||||
// while the database is sending a response -- in that case we won't send
|
||||
// the piecedeletion requests and and let garbage collection clean up those
|
||||
// pieces.
|
||||
paths, pointers, err := service.pointers.UnsynchronizedGetDel(ctx, pathsToDel)
|
||||
keys, pointers, err := service.pointers.UnsynchronizedGetDel(ctx, keysToDel)
|
||||
if err != nil {
|
||||
return nil, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// if object is missing, we can consider it as a successful delete
|
||||
objectMissingPaths := service.extractSegmentPathsForMissingObjects(ctx, states)
|
||||
for _, p := range objectMissingPaths {
|
||||
paths = append(paths, p)
|
||||
objectMissingKeys := service.extractSegmentKeysForMissingObjects(ctx, states)
|
||||
for _, p := range objectMissingKeys {
|
||||
keys = append(keys, p)
|
||||
pointers = append(pointers, nil)
|
||||
}
|
||||
|
||||
return pointers, paths, nil
|
||||
return pointers, keys, nil
|
||||
}
|
||||
|
||||
// GroupPiecesByNodeID returns a map that contains pieces with node id as the key.
|
||||
@ -208,90 +195,72 @@ func GroupPiecesByNodeID(pointers []*pb.Pointer) map[storj.NodeID][]storj.PieceI
|
||||
return piecesToDelete
|
||||
}
|
||||
|
||||
// generateSegmentPathsForCompleteObjects collects segment paths for objects that has last segment found in pointerDB.
|
||||
func (service *Service) generateSegmentPathsForCompleteObjects(ctx context.Context, states map[string]*ObjectState) (_ []metabase.SegmentKey, err error) {
|
||||
// generateSegmentKeysForCompleteObjects collects segment keys for objects that has last segment found in pointerDB.
|
||||
func (service *Service) generateSegmentKeysForCompleteObjects(ctx context.Context, states map[metabase.ObjectLocation]*ObjectState) (_ []metabase.SegmentKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
segmentPaths := []metabase.SegmentKey{}
|
||||
segmentKeys := []metabase.SegmentKey{}
|
||||
|
||||
for _, state := range states {
|
||||
switch state.Status() {
|
||||
case ObjectMissing:
|
||||
// nothing to do here
|
||||
case ObjectMultiSegment:
|
||||
// just delete the starting things, we already have the necessary information
|
||||
lastSegmentPath, err := state.SegmentPath(lastSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
firstSegmentPath, err := state.SegmentPath(firstSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
segmentPaths = append(segmentPaths, lastSegmentPath)
|
||||
segmentPaths = append(segmentPaths, firstSegmentPath)
|
||||
segmentKeys = append(segmentKeys,
|
||||
state.ObjectLocation.LastSegment().Encode(),
|
||||
state.ObjectLocation.FirstSegment().Encode())
|
||||
|
||||
largestSegmentIdx, err := numberOfSegmentsFromPointer(state.LastSegment)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
// gather all segment paths that're not first or last segments
|
||||
for index := largestSegmentIdx - 1; index > firstSegmentIndex; index-- {
|
||||
path, err := state.SegmentPath(index)
|
||||
|
||||
// gather all segment keys that're not first or last segments
|
||||
for index := largestSegmentIdx - 1; index > metabase.FirstSegmentIndex; index-- {
|
||||
location, err := state.Segment(index)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
segmentPaths = append(segmentPaths, path)
|
||||
segmentKeys = append(segmentKeys, location.Encode())
|
||||
}
|
||||
|
||||
case ObjectSingleSegment:
|
||||
// just add to segment path, we already have the necessary information
|
||||
lastSegmentPath, err := state.SegmentPath(lastSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
segmentPaths = append(segmentPaths, lastSegmentPath)
|
||||
// just add to segment key, we already have the necessary information
|
||||
segmentKeys = append(segmentKeys, state.ObjectLocation.LastSegment().Encode())
|
||||
case ObjectActiveOrZombie:
|
||||
// we will handle it in a separate method
|
||||
}
|
||||
}
|
||||
|
||||
return segmentPaths, nil
|
||||
return segmentKeys, nil
|
||||
}
|
||||
|
||||
// collectSegmentPathsForZombieObjects collects segment paths for objects that has no last segment found in pointerDB.
|
||||
func (service *Service) collectSegmentPathsForZombieObjects(ctx context.Context, states map[string]*ObjectState) (_ []metabase.SegmentKey, err error) {
|
||||
// collectSegmentKeysForZombieObjects collects segment keys for objects that has no last segment found in pointerDB.
|
||||
func (service *Service) collectSegmentKeysForZombieObjects(ctx context.Context, states map[metabase.ObjectLocation]*ObjectState) (_ []metabase.SegmentKey, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
zombies := map[string]ObjectIdentifier{}
|
||||
largestLoaded := map[string]int64{}
|
||||
largestLoaded := map[metabase.ObjectLocation]int64{}
|
||||
|
||||
segmentsToDel := []metabase.SegmentKey{}
|
||||
|
||||
for _, state := range states {
|
||||
if state.Status() == ObjectActiveOrZombie {
|
||||
firstSegmentPath, err := state.SegmentPath(firstSegmentIndex)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
segmentsToDel = append(segmentsToDel, firstSegmentPath)
|
||||
segmentsToDel = append(segmentsToDel, state.ObjectLocation.FirstSegment().Encode())
|
||||
|
||||
zombies[state.Key()] = state.ObjectIdentifier
|
||||
largestLoaded[state.Key()] = int64(firstSegmentIndex)
|
||||
largestLoaded[state.ObjectLocation] = metabase.FirstSegmentIndex
|
||||
}
|
||||
}
|
||||
|
||||
largestKnownSegment := int64(firstSegmentIndex)
|
||||
for len(zombies) > 0 {
|
||||
largestKnownSegment := int64(metabase.FirstSegmentIndex)
|
||||
for len(largestLoaded) > 0 {
|
||||
// Don't make requests for segments where we found the final segment.
|
||||
for key, largest := range largestLoaded {
|
||||
if largest != largestKnownSegment {
|
||||
delete(largestLoaded, key)
|
||||
delete(zombies, key)
|
||||
}
|
||||
}
|
||||
// found all segments
|
||||
if len(zombies) == 0 {
|
||||
if len(largestLoaded) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
@ -299,60 +268,50 @@ func (service *Service) collectSegmentPathsForZombieObjects(ctx context.Context,
|
||||
startFrom := largestKnownSegment + 1
|
||||
largestKnownSegment += int64(service.config.ZombieSegmentsPerRequest)
|
||||
|
||||
var zombieSegmentPaths []metabase.SegmentKey
|
||||
for _, id := range zombies {
|
||||
var zombieSegmentKeys []metabase.SegmentKey
|
||||
for location := range largestLoaded {
|
||||
for i := startFrom; i <= largestKnownSegment; i++ {
|
||||
path, err := id.SegmentPath(i)
|
||||
location, err := location.Segment(i)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
zombieSegmentPaths = append(zombieSegmentPaths, path)
|
||||
zombieSegmentKeys = append(zombieSegmentKeys, location.Encode())
|
||||
}
|
||||
}
|
||||
|
||||
// We are relying on the database to return the pointers in the same
|
||||
// order as the paths we requested.
|
||||
pointers, err := service.pointers.GetItems(ctx, zombieSegmentPaths)
|
||||
// order as the keys we requested.
|
||||
pointers, err := service.pointers.GetItems(ctx, zombieSegmentKeys)
|
||||
if err != nil {
|
||||
return nil, errs.Wrap(err)
|
||||
}
|
||||
|
||||
for i, p := range zombieSegmentPaths {
|
||||
for i, p := range zombieSegmentKeys {
|
||||
if pointers[i] == nil {
|
||||
continue
|
||||
}
|
||||
id, segmentIdx, err := ParseSegmentPath(p)
|
||||
segmentLocation, err := metabase.ParseSegmentKey(p)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
segmentsToDel = append(segmentsToDel, p)
|
||||
largestLoaded[id.Key()] = max(largestLoaded[id.Key()], segmentIdx)
|
||||
objectKey := segmentLocation.Object()
|
||||
largestLoaded[objectKey] = max(largestLoaded[objectKey], segmentLocation.Index)
|
||||
}
|
||||
}
|
||||
|
||||
return segmentsToDel, nil
|
||||
}
|
||||
|
||||
func (service *Service) extractSegmentPathsForMissingObjects(ctx context.Context, states map[string]*ObjectState) [][]byte {
|
||||
paths := make([][]byte, 0, len(states))
|
||||
func (service *Service) extractSegmentKeysForMissingObjects(ctx context.Context, states map[metabase.ObjectLocation]*ObjectState) []metabase.SegmentKey {
|
||||
keys := make([]metabase.SegmentKey, 0, len(states))
|
||||
for _, state := range states {
|
||||
if state.Status() == ObjectMissing {
|
||||
lastSegmentPath, err := state.ObjectIdentifier.SegmentPath(lastSegmentIndex)
|
||||
if err != nil {
|
||||
// it shouldn't happen
|
||||
service.log.Debug("failed to get segment path for missing object",
|
||||
zap.Stringer("Project ID", state.ObjectIdentifier.ProjectID),
|
||||
zap.String("Bucket", string(state.ObjectIdentifier.Bucket)),
|
||||
zap.String("Encrypted Path", string(state.ObjectIdentifier.EncryptedPath)),
|
||||
)
|
||||
continue
|
||||
}
|
||||
paths = append(paths, lastSegmentPath)
|
||||
keys = append(keys, state.ObjectLocation.LastSegment().Encode())
|
||||
}
|
||||
}
|
||||
|
||||
return paths
|
||||
return keys
|
||||
}
|
||||
|
||||
func max(a, b int64) int64 {
|
||||
|
@ -25,16 +25,16 @@ func TestService_Delete_SingleObject(t *testing.T) {
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// mock the object that we want to delete
|
||||
item := &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: testrand.UUID(),
|
||||
Bucket: []byte("bucketname"),
|
||||
EncryptedPath: []byte("encrypted"),
|
||||
item := &metabase.ObjectLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "bucketname",
|
||||
ObjectKey: "encrypted",
|
||||
}
|
||||
|
||||
objectNotFound := &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: testrand.UUID(),
|
||||
Bucket: []byte("object-not-found"),
|
||||
EncryptedPath: []byte("object-missing"),
|
||||
objectNotFound := &metabase.ObjectLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "object-not-found",
|
||||
ObjectKey: "object-missing",
|
||||
}
|
||||
|
||||
config := objectdeletion.Config{
|
||||
@ -46,10 +46,10 @@ func TestService_Delete_SingleObject(t *testing.T) {
|
||||
var testCases = []struct {
|
||||
segmentType string
|
||||
isValidObject bool
|
||||
largestSegmentIdx int
|
||||
largestSegmentIdx int64
|
||||
numPiecesPerSegment int32
|
||||
expectedPointersDeleted int
|
||||
expectedPathDeleted int
|
||||
expectedKeyDeleted int
|
||||
expectedPiecesToDelete int32
|
||||
}{
|
||||
{"single-segment", true, 0, 3, 1, 1, 3},
|
||||
@ -63,19 +63,19 @@ func TestService_Delete_SingleObject(t *testing.T) {
|
||||
for _, tt := range testCases {
|
||||
tt := tt // quiet linting
|
||||
t.Run(tt.segmentType, func(t *testing.T) {
|
||||
pointerDBMock, err := newPointerDB([]*objectdeletion.ObjectIdentifier{item}, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false)
|
||||
pointerDBMock, err := newPointerDB([]*metabase.ObjectLocation{item}, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointers, deletedPaths, err := service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{item})
|
||||
pointers, deletedKeys, err := service.DeletePointers(ctx, []*metabase.ObjectLocation{item})
|
||||
if !tt.isValidObject {
|
||||
pointers, deletedPaths, err = service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{objectNotFound})
|
||||
pointers, deletedKeys, err = service.DeletePointers(ctx, []*metabase.ObjectLocation{objectNotFound})
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pointers, tt.expectedPointersDeleted)
|
||||
require.Len(t, deletedPaths, tt.expectedPathDeleted)
|
||||
require.Len(t, deletedKeys, tt.expectedKeyDeleted)
|
||||
|
||||
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
|
||||
|
||||
@ -93,10 +93,10 @@ func TestService_Delete_SingleObject_Failure(t *testing.T) {
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// mock the object that we want to delete
|
||||
item := &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: testrand.UUID(),
|
||||
Bucket: []byte("bucketname"),
|
||||
EncryptedPath: []byte("encrypted"),
|
||||
item := &metabase.ObjectLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "bucketname",
|
||||
ObjectKey: "encrypted",
|
||||
}
|
||||
|
||||
config := objectdeletion.Config{
|
||||
@ -107,7 +107,7 @@ func TestService_Delete_SingleObject_Failure(t *testing.T) {
|
||||
|
||||
var testCases = []struct {
|
||||
segmentType string
|
||||
largestSegmentIdx int
|
||||
largestSegmentIdx int64
|
||||
numPiecesPerSegment int32
|
||||
expectedPiecesToDelete int32
|
||||
}{
|
||||
@ -119,17 +119,17 @@ func TestService_Delete_SingleObject_Failure(t *testing.T) {
|
||||
for _, tt := range testCases {
|
||||
tt := tt // quiet linting
|
||||
t.Run(tt.segmentType, func(t *testing.T) {
|
||||
reqs := []*objectdeletion.ObjectIdentifier{item}
|
||||
reqs := []*metabase.ObjectLocation{item}
|
||||
pointerDBMock, err := newPointerDB(reqs, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointers, deletedPaths, err := service.DeletePointers(ctx, reqs)
|
||||
pointers, deletedKeys, err := service.DeletePointers(ctx, reqs)
|
||||
require.Error(t, err)
|
||||
require.Len(t, pointers, 0)
|
||||
require.Len(t, deletedPaths, 0)
|
||||
require.Len(t, deletedKeys, 0)
|
||||
|
||||
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
|
||||
|
||||
@ -146,13 +146,13 @@ func TestService_Delete_MultipleObject(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
items := make([]*objectdeletion.ObjectIdentifier, 0, 100)
|
||||
items := make([]*metabase.ObjectLocation, 0, 100)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
item := &objectdeletion.ObjectIdentifier{
|
||||
ProjectID: testrand.UUID(),
|
||||
Bucket: []byte("bucketname"),
|
||||
EncryptedPath: []byte("encrypted" + strconv.Itoa(i)),
|
||||
item := &metabase.ObjectLocation{
|
||||
ProjectID: testrand.UUID(),
|
||||
BucketName: "bucketname",
|
||||
ObjectKey: metabase.ObjectKey("encrypted" + strconv.Itoa(i)),
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
@ -165,7 +165,7 @@ func TestService_Delete_MultipleObject(t *testing.T) {
|
||||
|
||||
var testCases = []struct {
|
||||
segmentType string
|
||||
largestSegmentIdx int
|
||||
largestSegmentIdx int64
|
||||
numPiecesPerSegment int32
|
||||
expectedPointersDeleted int
|
||||
expectedPiecesToDelete int32
|
||||
@ -186,10 +186,10 @@ func TestService_Delete_MultipleObject(t *testing.T) {
|
||||
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
|
||||
require.NoError(t, err)
|
||||
|
||||
pointers, deletedPaths, err := service.DeletePointers(ctx, items)
|
||||
pointers, deletedKeys, err := service.DeletePointers(ctx, items)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pointers, tt.expectedPointersDeleted)
|
||||
require.Len(t, deletedPaths, tt.expectedPointersDeleted)
|
||||
require.Len(t, deletedKeys, tt.expectedPointersDeleted)
|
||||
|
||||
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
|
||||
totalPiecesToDelete := 0
|
||||
@ -201,8 +201,8 @@ func TestService_Delete_MultipleObject(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func calcExpectedPieces(segmentType string, numRequests int, batchSize int, largestSegmentIdx int, numPiecesPerSegment int) int {
|
||||
numSegments := largestSegmentIdx + 1
|
||||
func calcExpectedPieces(segmentType string, numRequests int, batchSize int, largestSegmentIdx int64, numPiecesPerSegment int) int {
|
||||
numSegments := int(largestSegmentIdx) + 1
|
||||
|
||||
totalPieces := numRequests * numSegments * numPiecesPerSegment
|
||||
|
||||
@ -210,7 +210,7 @@ func calcExpectedPieces(segmentType string, numRequests int, batchSize int, larg
|
||||
case "mixed-segment":
|
||||
return totalPieces - numPiecesPerSegment
|
||||
case "zombie-segment":
|
||||
return numRequests * largestSegmentIdx * numPiecesPerSegment
|
||||
return numRequests * int(largestSegmentIdx) * numPiecesPerSegment
|
||||
default:
|
||||
return totalPieces
|
||||
}
|
||||
@ -226,7 +226,7 @@ func TestService_Delete_Batch(t *testing.T) {
|
||||
segmentType string
|
||||
numRequests int
|
||||
batchSize int
|
||||
largestSegmentIdx int
|
||||
largestSegmentIdx int64
|
||||
numPiecesPerSegment int32
|
||||
}{
|
||||
{"single-request", "single-segment", 1, 1, 0, 3},
|
||||
@ -276,17 +276,12 @@ func TestService_Delete_Batch(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
const (
|
||||
lastSegmentIdx = -1
|
||||
firstSegmentIdx = 0
|
||||
)
|
||||
|
||||
type pointerDBMock struct {
|
||||
pointers map[string]*pb.Pointer
|
||||
hasError bool
|
||||
}
|
||||
|
||||
func newPointerDB(objects []*objectdeletion.ObjectIdentifier, segmentType string, numSegments int, numPiecesPerSegment int32, hasError bool) (*pointerDBMock, error) {
|
||||
func newPointerDB(objects []*metabase.ObjectLocation, segmentType string, numSegments int64, numPiecesPerSegment int32, hasError bool) (*pointerDBMock, error) {
|
||||
var (
|
||||
pointers []*pb.Pointer
|
||||
err error
|
||||
@ -305,50 +300,54 @@ func newPointerDB(objects []*objectdeletion.ObjectIdentifier, segmentType string
|
||||
return nil, errs.New("unsupported segment type")
|
||||
}
|
||||
|
||||
paths := [][]byte{}
|
||||
keys := []metabase.SegmentKey{}
|
||||
for _, obj := range objects {
|
||||
paths = append(paths, createPaths(obj, numSegments)...)
|
||||
newKeys, err := createKeys(obj, numSegments)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys = append(keys, newKeys...)
|
||||
}
|
||||
|
||||
pointers, err = createMockPointers(option.lastSegment, option.firstSegment, option.inlineSegment, paths, numPiecesPerSegment, numSegments)
|
||||
pointers, err = createMockPointers(option.lastSegment, option.firstSegment, option.inlineSegment, keys, numPiecesPerSegment, numSegments)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pointerDB := &pointerDBMock{
|
||||
pointers: make(map[string]*pb.Pointer, len(paths)),
|
||||
pointers: make(map[string]*pb.Pointer, len(keys)),
|
||||
hasError: hasError,
|
||||
}
|
||||
for i, p := range paths {
|
||||
for i, p := range keys {
|
||||
pointerDB.pointers[string(p)] = pointers[i]
|
||||
}
|
||||
|
||||
return pointerDB, nil
|
||||
}
|
||||
|
||||
func (db *pointerDBMock) GetItems(ctx context.Context, paths []metabase.SegmentKey) ([]*pb.Pointer, error) {
|
||||
func (db *pointerDBMock) GetItems(ctx context.Context, keys []metabase.SegmentKey) ([]*pb.Pointer, error) {
|
||||
if db.hasError {
|
||||
return nil, errs.New("pointerDB failure")
|
||||
}
|
||||
pointers := make([]*pb.Pointer, len(paths))
|
||||
for i, p := range paths {
|
||||
pointers := make([]*pb.Pointer, len(keys))
|
||||
for i, p := range keys {
|
||||
pointers[i] = db.pointers[string(p)]
|
||||
}
|
||||
return pointers, nil
|
||||
}
|
||||
|
||||
func (db *pointerDBMock) UnsynchronizedGetDel(ctx context.Context, paths []metabase.SegmentKey) ([]metabase.SegmentKey, []*pb.Pointer, error) {
|
||||
pointers := make([]*pb.Pointer, len(paths))
|
||||
for i, p := range paths {
|
||||
func (db *pointerDBMock) UnsynchronizedGetDel(ctx context.Context, keys []metabase.SegmentKey) ([]metabase.SegmentKey, []*pb.Pointer, error) {
|
||||
pointers := make([]*pb.Pointer, len(keys))
|
||||
for i, p := range keys {
|
||||
pointers[i] = db.pointers[string(p)]
|
||||
}
|
||||
|
||||
rand.Shuffle(len(pointers), func(i, j int) {
|
||||
pointers[i], pointers[j] = pointers[j], pointers[i]
|
||||
paths[i], paths[j] = paths[j], paths[i]
|
||||
keys[i], keys[j] = keys[j], keys[i]
|
||||
})
|
||||
|
||||
return paths, pointers, nil
|
||||
return keys, pointers, nil
|
||||
}
|
||||
|
||||
func newPointer(pointerType pb.Pointer_DataType, numPiecesPerSegment int32) *pb.Pointer {
|
||||
@ -371,10 +370,10 @@ func newPointer(pointerType pb.Pointer_DataType, numPiecesPerSegment int32) *pb.
|
||||
return pointer
|
||||
}
|
||||
|
||||
func newLastSegmentPointer(pointerType pb.Pointer_DataType, numSegments int, numPiecesPerSegment int32) (*pb.Pointer, error) {
|
||||
func newLastSegmentPointer(pointerType pb.Pointer_DataType, numSegments int64, numPiecesPerSegment int32) (*pb.Pointer, error) {
|
||||
pointer := newPointer(pointerType, numPiecesPerSegment)
|
||||
meta := &pb.StreamMeta{
|
||||
NumberOfSegments: int64(numSegments),
|
||||
NumberOfSegments: numSegments,
|
||||
}
|
||||
metaInBytes, err := pb.Marshal(meta)
|
||||
if err != nil {
|
||||
@ -384,17 +383,17 @@ func newLastSegmentPointer(pointerType pb.Pointer_DataType, numSegments int, num
|
||||
return pointer, nil
|
||||
}
|
||||
|
||||
func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegments bool, paths [][]byte, numPiecesPerSegment int32, numSegments int) ([]*pb.Pointer, error) {
|
||||
pointers := make([]*pb.Pointer, 0, len(paths))
|
||||
func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegments bool, keys []metabase.SegmentKey, numPiecesPerSegment int32, numSegments int64) ([]*pb.Pointer, error) {
|
||||
pointers := make([]*pb.Pointer, 0, len(keys))
|
||||
|
||||
isInlineAdded := false
|
||||
for _, p := range paths {
|
||||
_, segment, err := objectdeletion.ParseSegmentPath(p)
|
||||
for _, p := range keys {
|
||||
segmentLocation, err := metabase.ParseSegmentKey(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if segment == lastSegmentIdx {
|
||||
if segmentLocation.IsLast() {
|
||||
if !hasLastSegment {
|
||||
pointers = append(pointers, nil)
|
||||
} else {
|
||||
@ -406,7 +405,7 @@ func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegm
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !hasFirstSegment && segment == firstSegmentIdx {
|
||||
if !hasFirstSegment && segmentLocation.IsFirst() {
|
||||
pointers = append(pointers, nil)
|
||||
continue
|
||||
}
|
||||
@ -421,21 +420,20 @@ func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegm
|
||||
return pointers, nil
|
||||
}
|
||||
|
||||
func createPaths(object *objectdeletion.ObjectIdentifier, largestSegmentIdx int) [][]byte {
|
||||
paths := [][]byte{}
|
||||
for i := 0; i <= largestSegmentIdx; i++ {
|
||||
func createKeys(object *metabase.ObjectLocation, largestSegmentIdx int64) ([]metabase.SegmentKey, error) {
|
||||
keys := []metabase.SegmentKey{}
|
||||
for i := int64(0); i <= largestSegmentIdx; i++ {
|
||||
segmentIdx := i
|
||||
if segmentIdx == largestSegmentIdx {
|
||||
segmentIdx = lastSegmentIdx
|
||||
segmentIdx = metabase.LastSegmentIndex
|
||||
}
|
||||
|
||||
location := metabase.SegmentLocation{
|
||||
ProjectID: object.ProjectID,
|
||||
BucketName: string(object.Bucket),
|
||||
Index: int64(segmentIdx),
|
||||
ObjectKey: metabase.ObjectKey(string(object.EncryptedPath)),
|
||||
segment, err := object.Segment(segmentIdx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
paths = append(paths, location.Encode())
|
||||
|
||||
keys = append(keys, segment.Encode())
|
||||
}
|
||||
return paths
|
||||
return keys, nil
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
// a delete operation.
|
||||
// It also stores pointers related to an object.
|
||||
type ObjectState struct {
|
||||
ObjectIdentifier
|
||||
metabase.ObjectLocation
|
||||
|
||||
LastSegment *pb.Pointer
|
||||
ZeroSegment *pb.Pointer
|
||||
@ -54,32 +54,32 @@ const (
|
||||
)
|
||||
|
||||
// CreateObjectStates creates the current object states.
|
||||
func CreateObjectStates(ctx context.Context, requests []*ObjectIdentifier, pointers []*pb.Pointer, paths []metabase.SegmentKey) (map[string]*ObjectState, error) {
|
||||
func CreateObjectStates(ctx context.Context, requests []*metabase.ObjectLocation, pointers []*pb.Pointer, paths []metabase.SegmentKey) (map[metabase.ObjectLocation]*ObjectState, error) {
|
||||
|
||||
// Fetch headers to figure out the status of objects.
|
||||
objects := make(map[string]*ObjectState)
|
||||
objects := make(map[metabase.ObjectLocation]*ObjectState)
|
||||
for _, req := range requests {
|
||||
objects[req.Key()] = &ObjectState{
|
||||
ObjectIdentifier: *req,
|
||||
objects[*req] = &ObjectState{
|
||||
ObjectLocation: *req,
|
||||
}
|
||||
}
|
||||
|
||||
for i, p := range paths {
|
||||
// Update our state map.
|
||||
id, segment, err := ParseSegmentPath(p)
|
||||
segmentLocation, err := metabase.ParseSegmentKey(p)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
state, ok := objects[id.Key()]
|
||||
state, ok := objects[segmentLocation.Object()]
|
||||
if !ok {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
switch segment {
|
||||
case lastSegmentIndex:
|
||||
switch {
|
||||
case segmentLocation.IsLast():
|
||||
state.LastSegment = pointers[i]
|
||||
case firstSegmentIndex:
|
||||
case segmentLocation.IsFirst():
|
||||
state.ZeroSegment = pointers[i]
|
||||
default:
|
||||
return nil, Error.New("pointerDB failure")
|
||||
|
Loading…
Reference in New Issue
Block a user