satellite/metainfo/objectdeletion: Object deletion implementation

Improve our delete logic to require fewer database requests.
This PR creates a new objectdeletion package

Change-Id: I0500173bb9b8c771accb350f076329ede6dbb42c
This commit is contained in:
Rafael Gomes 2020-06-29 17:31:23 -03:00 committed by Yingrong Zhao
parent 85aff88e7f
commit 375d76638d
8 changed files with 1180 additions and 1 deletions

View File

@ -0,0 +1,86 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion
import (
"errors"
"strconv"
"strings"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
)
// ObjectIdentifier contains information about an object
// that are needed for delete operation.
type ObjectIdentifier struct {
ProjectID uuid.UUID
Bucket []byte
EncryptedPath []byte
}
// SegmentPath returns a raw path for a specific segment index.
func (id *ObjectIdentifier) SegmentPath(segmentIndex int64) ([]byte, error) {
if segmentIndex < lastSegmentIndex {
return nil, errors.New("invalid segment index")
}
segment := "l"
if segmentIndex > lastSegmentIndex {
segment = "s" + strconv.FormatInt(segmentIndex, 10)
}
return []byte(storj.JoinPaths(
id.ProjectID.String(),
segment,
string(id.Bucket),
string(id.EncryptedPath),
)), nil
}
// ParseSegmentPath parses a raw path and returns an
// object identifier from that path along with the path's segment index.
// example: <project-id>/01/<bucket-name>/<encrypted-path>
func ParseSegmentPath(rawPath []byte) (ObjectIdentifier, int64, error) {
elements := storj.SplitPath(string(rawPath))
if len(elements) < 4 {
return ObjectIdentifier{}, -1, errs.New("invalid path %q", string(rawPath))
}
projectID, err := uuid.FromString(elements[0])
if err != nil {
return ObjectIdentifier{}, -1, errs.Wrap(err)
}
var segmentIndex int64
if elements[1] == "l" {
segmentIndex = lastSegmentIndex
} else {
segmentIndex, err = strconv.ParseInt(elements[1][1:], 10, 64) // remove the strng `s` from segment index we got
if err != nil {
return ObjectIdentifier{}, -1, errs.Wrap(err)
}
}
return ObjectIdentifier{
ProjectID: projectID,
Bucket: []byte(elements[2]),
EncryptedPath: []byte(storj.JoinPaths(elements[3:]...)),
}, segmentIndex, nil
}
// Key returns a string concatenated by all object identifier fields plus 0.
// It's a unique string used to identify an object.
// It's not a valid key for retrieving pointers from metainfo database.
func (id *ObjectIdentifier) Key() string {
builder := strings.Builder{}
// we don't need the return value here
// Write will always return the length of the argument and nil error
_, _ = builder.Write(id.ProjectID[:])
_, _ = builder.Write(id.Bucket)
_, _ = builder.Write(id.EncryptedPath)
return builder.String()
}

View File

@ -0,0 +1,54 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion
import (
"context"
"go.uber.org/zap"
)
// Report represents the deleteion status report.
type Report struct {
Deleted []*ObjectIdentifier
Failed []*ObjectIdentifier
}
// HasFailures returns wether a delete operation has failures.
func (r Report) HasFailures() bool {
return len(r.Failed) > 0
}
// GenerateReport returns the result of a delete, success, or failure.
func GenerateReport(ctx context.Context, log *zap.Logger, requests []*ObjectIdentifier, deletedPaths [][]byte) Report {
defer mon.Task()(&ctx)(nil)
report := Report{}
deletedObjects := make(map[string]*ObjectIdentifier)
for _, path := range deletedPaths {
if path == nil {
continue
}
id, _, err := ParseSegmentPath(path)
if err != nil {
log.Debug("failed to parse deleted segmnt path for report",
zap.String("Raw Segment Path", string(path)),
)
continue
}
if _, ok := deletedObjects[id.Key()]; !ok {
deletedObjects[id.Key()] = &id
}
}
// populate report with failed and deleted objects
for _, req := range requests {
if _, ok := deletedObjects[req.Key()]; !ok {
report.Failed = append(report.Failed, req)
} else {
report.Deleted = append(report.Deleted, req)
}
}
return report
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion_test
import (
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/objectdeletion"
)
func TestReport(t *testing.T) {
logger := zaptest.NewLogger(t)
var testCases = []struct {
description string
numRequests int
numDeletedPaths int
expectedFailure bool
}{
{"has-failure", 2, 1, true},
{"all-deleted", 2, 2, false},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.description, func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
requests := createRequests(tt.numRequests)
deletedSegmentPaths, err := createDeletedSegmentPaths(requests, tt.numDeletedPaths)
require.NoError(t, err)
report := objectdeletion.GenerateReport(ctx, logger, requests, deletedSegmentPaths)
require.Equal(t, tt.expectedFailure, report.HasFailures())
})
}
}
func createDeletedSegmentPaths(requests []*objectdeletion.ObjectIdentifier, numDeleted int) ([][]byte, error) {
if numDeleted > len(requests) {
return nil, errs.New("invalid argument")
}
deletedSegmentPaths := make([][]byte, 0, numDeleted)
for i := 0; i < numDeleted; i++ {
path, err := requests[i].SegmentPath(int64(testrand.Intn(10)))
if err != nil {
return nil, err
}
deletedSegmentPaths = append(deletedSegmentPaths, path)
}
return deletedSegmentPaths, nil
}
func createRequests(numRequests int) []*objectdeletion.ObjectIdentifier {
requests := make([]*objectdeletion.ObjectIdentifier, 0, numRequests)
for i := 0; i < numRequests; i++ {
obj := objectdeletion.ObjectIdentifier{
ProjectID: testrand.UUID(),
Bucket: []byte("test"),
EncryptedPath: []byte(strconv.Itoa(i) + "test"),
}
requests = append(requests, &obj)
}
return requests
}

View File

@ -0,0 +1,349 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion
import (
"context"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/context2"
"storj.io/common/pb"
"storj.io/common/storj"
)
const (
lastSegmentIndex = -1
firstSegmentIndex = 0
)
var (
mon = monkit.Package()
// Error is a general object deletion error.
Error = errs.Class("object deletion")
)
// Config defines configuration options for Service.
type Config struct {
MaxObjectsPerRequest int `help:"maximum number of requests per batch" default:"100"`
ZombieSegmentsPerRequest int `help:"number of segments per request when looking for zombie segments" default:"3"`
}
// Verify verifies configuration sanity.
func (config *Config) Verify() errs.Group {
var errlist errs.Group
if config.MaxObjectsPerRequest <= 0 {
errlist.Add(Error.New("max requests per batch %d must be greater than 0", config.MaxObjectsPerRequest))
}
if config.ZombieSegmentsPerRequest <= 0 {
errlist.Add(Error.New("zombie segments per request %d must be greater than 0", config.ZombieSegmentsPerRequest))
}
return errlist
}
// PointerDB stores pointers.
type PointerDB interface {
GetItems(ctx context.Context, paths [][]byte) ([]*pb.Pointer, error)
UnsynchronizedGetDel(ctx context.Context, paths [][]byte) (deletedPaths [][]byte, _ []*pb.Pointer, _ error)
}
// Service implements the object deletion service
//
// architecture: Service
type Service struct {
log *zap.Logger
config Config
pointers PointerDB
}
// NewService returns new instance of Service.
func NewService(log *zap.Logger, pointerDB PointerDB, config Config) (*Service, error) {
if errs := config.Verify(); len(errs) > 0 {
return nil, errs.Err()
}
return &Service{
log: log,
config: config,
pointers: pointerDB,
}, nil
}
// DeletePointers returns a list of pointers and their paths that are deleted.
// If a object is not found, we will consider it as a successful delete.
func (service *Service) DeletePointers(ctx context.Context, requests []*ObjectIdentifier) (_ []*pb.Pointer, _ [][]byte, err error) {
defer mon.Task()(&ctx, len(requests))(&err)
// We should ignore client cancelling and always try to delete segments.
ctx = context2.WithoutCancellation(ctx)
// get first and last segment to determine the object state
lastAndFirstSegmentsPath := [][]byte{}
for _, req := range requests {
lastSegmentPath, err := req.SegmentPath(lastSegmentIndex)
if err != nil {
return nil, nil, Error.Wrap(err)
}
firstSegmentPath, err := req.SegmentPath(firstSegmentIndex)
if err != nil {
return nil, nil, Error.Wrap(err)
}
lastAndFirstSegmentsPath = append(lastAndFirstSegmentsPath,
lastSegmentPath,
firstSegmentPath,
)
}
// Get pointers from the database
pointers, err := service.pointers.GetItems(ctx, lastAndFirstSegmentsPath)
if err != nil {
return nil, nil, Error.Wrap(err)
}
states, err := CreateObjectStates(ctx, requests, pointers, lastAndFirstSegmentsPath)
if err != nil {
return nil, nil, Error.Wrap(err)
}
pathsToDel, err := service.generateSegmentPathsForCompleteObjects(ctx, states)
if err != nil {
return nil, nil, Error.Wrap(err)
}
zombiePaths, err := service.collectSegmentPathsForZombieObjects(ctx, states)
if err != nil {
return nil, nil, Error.Wrap(err)
}
pathsToDel = append(pathsToDel, zombiePaths...)
// Delete pointers and fetch the piece ids.
//
// The deletion may fail in the database for an arbitrary reason.
// In that case we return an error and the pointers are left intact.
//
// The deletion may succeed in the database, but the connection may drop
// while the database is sending a response -- in that case we won't send
// the piecedeletion requests and and let garbage collection clean up those
// pieces.
deletedPaths, pointers, err := service.pointers.UnsynchronizedGetDel(ctx, pathsToDel)
if err != nil {
return nil, nil, Error.Wrap(err)
}
// Update state map with other segments.
for i, pointer := range pointers {
id, segmentIdx, err := ParseSegmentPath(deletedPaths[i])
if err != nil {
return pointers, deletedPaths, Error.Wrap(err)
}
state, ok := states[id.Key()]
if !ok {
return pointers, deletedPaths, Error.New("object not found")
}
if segmentIdx == lastSegmentIndex || segmentIdx == firstSegmentIndex {
continue
}
state.OtherSegments = append(state.OtherSegments, pointer)
}
// if object is missing, we can consider it as a successful delete
deletedPaths = append(deletedPaths, service.extractSegmentPathsForMissingObjects(ctx, states)...)
return pointers, deletedPaths, nil
}
// GroupPiecesByNodeID returns a map that contains pieces with node id as the key.
func GroupPiecesByNodeID(pointers []*pb.Pointer) map[storj.NodeID][]storj.PieceID {
// build piece deletion requests
piecesToDelete := map[storj.NodeID][]storj.PieceID{}
for _, p := range pointers {
if p == nil {
continue
}
if p.Type != pb.Pointer_REMOTE {
continue
}
rootPieceID := p.GetRemote().RootPieceId
for _, piece := range p.GetRemote().GetRemotePieces() {
pieceID := rootPieceID.Derive(piece.NodeId, piece.PieceNum)
piecesToDelete[piece.NodeId] = append(piecesToDelete[piece.NodeId], pieceID)
}
}
return piecesToDelete
}
// generateSegmentPathsForCompleteObjects collects segment paths for objects that has last segment found in pointerDB.
func (service *Service) generateSegmentPathsForCompleteObjects(ctx context.Context, states map[string]*ObjectState) (_ [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
segmentPaths := [][]byte{}
for _, state := range states {
switch state.Status() {
case ObjectMissing:
// nothing to do here
case ObjectMultiSegment:
// just delete the starting things, we already have the necessary information
lastSegmentPath, err := state.SegmentPath(lastSegmentIndex)
if err != nil {
return nil, Error.Wrap(err)
}
firstSegmentPath, err := state.SegmentPath(firstSegmentIndex)
if err != nil {
return nil, Error.Wrap(err)
}
segmentPaths = append(segmentPaths, lastSegmentPath)
segmentPaths = append(segmentPaths, firstSegmentPath)
largestSegmentIdx, err := numberOfSegmentsFromPointer(state.LastSegment)
if err != nil {
return nil, Error.Wrap(err)
}
// gather all segment paths that're not first or last segments
for index := largestSegmentIdx - 1; index > firstSegmentIndex; index-- {
path, err := state.SegmentPath(index)
if err != nil {
return nil, Error.Wrap(err)
}
segmentPaths = append(segmentPaths, path)
}
case ObjectSingleSegment:
// just add to segment path, we already have the necessary information
lastSegmentPath, err := state.SegmentPath(lastSegmentIndex)
if err != nil {
return nil, Error.Wrap(err)
}
segmentPaths = append(segmentPaths, lastSegmentPath)
case ObjectActiveOrZombie:
// we will handle it in a separate method
}
}
return segmentPaths, nil
}
// collectSegmentPathsForZombieObjects collects segment paths for objects that has no last segment found in pointerDB.
func (service *Service) collectSegmentPathsForZombieObjects(ctx context.Context, states map[string]*ObjectState) (_ [][]byte, err error) {
defer mon.Task()(&ctx)(&err)
zombies := map[string]ObjectIdentifier{}
largestLoaded := map[string]int64{}
segmentsToDel := [][]byte{}
for _, state := range states {
if state.Status() == ObjectActiveOrZombie {
firstSegmentPath, err := state.SegmentPath(firstSegmentIndex)
if err != nil {
return nil, Error.Wrap(err)
}
segmentsToDel = append(segmentsToDel, firstSegmentPath)
zombies[state.Key()] = state.ObjectIdentifier
largestLoaded[state.Key()] = int64(firstSegmentIndex)
}
}
largestKnownSegment := int64(firstSegmentIndex)
for len(zombies) > 0 {
// Don't make requests for segments where we found the final segment.
for key, largest := range largestLoaded {
if largest != largestKnownSegment {
delete(largestLoaded, key)
delete(zombies, key)
}
}
// found all segments
if len(zombies) == 0 {
break
}
// Request the next batch of segments.
startFrom := largestKnownSegment + 1
largestKnownSegment += int64(service.config.ZombieSegmentsPerRequest)
var zombieSegmentPaths [][]byte
for _, id := range zombies {
for i := startFrom; i <= largestKnownSegment; i++ {
path, err := id.SegmentPath(i)
if err != nil {
return nil, Error.Wrap(err)
}
zombieSegmentPaths = append(zombieSegmentPaths, path)
}
}
// We are relying on the database to return the pointers in the same
// order as the paths we requested.
pointers, err := service.pointers.GetItems(ctx, zombieSegmentPaths)
if err != nil {
return nil, errs.Wrap(err)
}
for i, p := range zombieSegmentPaths {
if pointers[i] == nil {
continue
}
id, segmentIdx, err := ParseSegmentPath(p)
if err != nil {
return nil, Error.Wrap(err)
}
segmentsToDel = append(segmentsToDel, p)
largestLoaded[id.Key()] = max(largestLoaded[id.Key()], segmentIdx)
}
}
return segmentsToDel, nil
}
func (service *Service) extractSegmentPathsForMissingObjects(ctx context.Context, states map[string]*ObjectState) [][]byte {
paths := make([][]byte, 0, len(states))
for _, state := range states {
if state.Status() == ObjectMissing {
lastSegmentPath, err := state.ObjectIdentifier.SegmentPath(lastSegmentIndex)
if err != nil {
// it shouldn't happen
service.log.Debug("failed to get segment path for missing object",
zap.Stringer("Project ID", state.ObjectIdentifier.ProjectID),
zap.String("Bucket", string(state.ObjectIdentifier.Bucket)),
zap.String("Encrypted Path", string(state.ObjectIdentifier.EncryptedPath)),
)
continue
}
paths = append(paths, lastSegmentPath)
}
}
return paths
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
func numberOfSegmentsFromPointer(pointer *pb.Pointer) (int64, error) {
meta := &pb.StreamMeta{}
err := pb.Unmarshal(pointer.Metadata, meta)
if err != nil {
return 0, err
}
return meta.NumberOfSegments, nil
}

View File

@ -0,0 +1,371 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion_test
import (
"context"
"math/rand"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metainfo/objectdeletion"
)
func TestService_Delete_SingleObject(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// mock the object that we want to delete
item := &objectdeletion.ObjectIdentifier{
ProjectID: testrand.UUID(),
Bucket: []byte("bucketname"),
EncryptedPath: []byte("encrypted"),
}
objectNotFound := &objectdeletion.ObjectIdentifier{
ProjectID: testrand.UUID(),
Bucket: []byte("object-not-found"),
EncryptedPath: []byte("object-missing"),
}
config := objectdeletion.Config{
MaxObjectsPerRequest: 100,
ZombieSegmentsPerRequest: 3,
}
var testCases = []struct {
segmentType string
isValidObject bool
largestSegmentIdx int
numPiecesPerSegment int32
expectedPointersDeleted int
expectedPathDeleted int
expectedPiecesToDelete int32
}{
{"single-segment", true, 0, 3, 1, 1, 3},
{"multi-segment", true, 5, 2, 6, 6, 12},
{"inline-segment", true, 0, 0, 1, 1, 0},
{"mixed-segment", true, 5, 3, 6, 6, 15},
{"zombie-segment", true, 5, 2, 5, 5, 10},
{"single-segment", false, 0, 3, 0, 1, 0},
}
for _, tt := range testCases {
tt := tt // quiet linting
t.Run(tt.segmentType, func(t *testing.T) {
pointerDBMock, err := newPointerDB([]*objectdeletion.ObjectIdentifier{item}, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false)
require.NoError(t, err)
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
require.NoError(t, err)
pointers, deletedPaths, err := service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{item})
if !tt.isValidObject {
pointers, deletedPaths, err = service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{objectNotFound})
}
require.NoError(t, err)
require.Len(t, pointers, tt.expectedPointersDeleted)
require.Len(t, deletedPaths, tt.expectedPathDeleted)
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
totalPiecesToDelete := 0
for _, pieces := range piecesToDeleteByNodes {
totalPiecesToDelete += len(pieces)
}
require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete))
})
}
}
func TestService_Delete_SingleObject_Failure(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
// mock the object that we want to delete
item := &objectdeletion.ObjectIdentifier{
ProjectID: testrand.UUID(),
Bucket: []byte("bucketname"),
EncryptedPath: []byte("encrypted"),
}
config := objectdeletion.Config{
MaxObjectsPerRequest: 100,
ZombieSegmentsPerRequest: 3,
}
var testCases = []struct {
segmentType string
largestSegmentIdx int
numPiecesPerSegment int32
expectedPiecesToDelete int32
}{
{"single-segment", 0, 1, 0},
{"mixed-segment", 5, 3, 0},
{"zombie-segment", 5, 2, 0},
}
for _, tt := range testCases {
tt := tt // quiet linting
t.Run(tt.segmentType, func(t *testing.T) {
reqs := []*objectdeletion.ObjectIdentifier{item}
pointerDBMock, err := newPointerDB(reqs, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, true)
require.NoError(t, err)
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
require.NoError(t, err)
pointers, deletedPaths, err := service.DeletePointers(ctx, reqs)
require.Error(t, err)
require.Len(t, pointers, 0)
require.Len(t, deletedPaths, 0)
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
totalPiecesToDelete := 0
for _, pieces := range piecesToDeleteByNodes {
totalPiecesToDelete += len(pieces)
}
require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete))
})
}
}
func TestService_Delete_MultipleObject(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
items := make([]*objectdeletion.ObjectIdentifier, 0, 100)
for i := 0; i < 10; i++ {
item := &objectdeletion.ObjectIdentifier{
ProjectID: testrand.UUID(),
Bucket: []byte("bucketname"),
EncryptedPath: []byte("encrypted" + strconv.Itoa(i)),
}
items = append(items, item)
}
config := objectdeletion.Config{
MaxObjectsPerRequest: 100,
ZombieSegmentsPerRequest: 3,
}
var testCases = []struct {
segmentType string
largestSegmentIdx int
numPiecesPerSegment int32
expectedPointersDeleted int
expectedPiecesToDelete int32
}{
{"single-segment", 0, 3, 10, 30},
{"multi-segment", 5, 2, 60, 120},
{"inline-segment", 0, 0, 10, 0},
{"mixed-segment", 5, 3, 60, 177},
{"zombie-segment", 5, 2, 50, 100},
}
for _, tt := range testCases {
tt := tt // quiet linting
t.Run(tt.segmentType, func(t *testing.T) {
pointerDBMock, err := newPointerDB(items, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false)
require.NoError(t, err)
service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config)
require.NoError(t, err)
pointers, deletedPaths, err := service.DeletePointers(ctx, items)
require.NoError(t, err)
require.Len(t, pointers, tt.expectedPointersDeleted)
require.Len(t, deletedPaths, tt.expectedPointersDeleted)
piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers)
totalPiecesToDelete := 0
for _, pieces := range piecesToDeleteByNodes {
totalPiecesToDelete += len(pieces)
}
require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete))
})
}
}
const (
lastSegmentIdx = -1
firstSegmentIdx = 0
)
type pointerDBMock struct {
pointers map[string]*pb.Pointer
hasError bool
}
func newPointerDB(objects []*objectdeletion.ObjectIdentifier, segmentType string, numSegments int, numPiecesPerSegment int32, hasError bool) (*pointerDBMock, error) {
var (
pointers []*pb.Pointer
err error
)
segmentMap := map[string]struct{ lastSegment, firstSegment, inlineSegment bool }{
"single-segment": {true, false, false},
"multi-segment": {true, true, false},
"inline-segment": {true, false, true},
"mixed-segment": {true, true, true},
"zombie-segment": {false, true, false},
}
option, ok := segmentMap[segmentType]
if !ok {
return nil, errs.New("unsupported segment type")
}
paths := [][]byte{}
for _, obj := range objects {
paths = append(paths, createPaths(obj, numSegments)...)
}
pointers, err = createMockPointers(option.lastSegment, option.firstSegment, option.inlineSegment, paths, numPiecesPerSegment, numSegments)
if err != nil {
return nil, err
}
pointerDB := &pointerDBMock{
pointers: make(map[string]*pb.Pointer, len(paths)),
hasError: hasError,
}
for i, p := range paths {
pointerDB.pointers[string(p)] = pointers[i]
}
return pointerDB, nil
}
func (db *pointerDBMock) GetItems(ctx context.Context, paths [][]byte) ([]*pb.Pointer, error) {
if db.hasError {
return nil, errs.New("pointerDB failure")
}
pointers := make([]*pb.Pointer, len(paths))
for i, p := range paths {
pointers[i] = db.pointers[string(p)]
}
return pointers, nil
}
func (db *pointerDBMock) UnsynchronizedGetDel(ctx context.Context, paths [][]byte) ([][]byte, []*pb.Pointer, error) {
pointers := make([]*pb.Pointer, len(paths))
for i, p := range paths {
pointers[i] = db.pointers[string(p)]
}
rand.Shuffle(len(pointers), func(i, j int) {
pointers[i], pointers[j] = pointers[j], pointers[i]
paths[i], paths[j] = paths[j], paths[i]
})
return paths, pointers, nil
}
func newPointer(pointerType pb.Pointer_DataType, numPiecesPerSegment int32) *pb.Pointer {
pointer := &pb.Pointer{
Type: pointerType,
}
if pointerType == pb.Pointer_REMOTE {
remotePieces := make([]*pb.RemotePiece, 0, numPiecesPerSegment)
for i := int32(0); i < numPiecesPerSegment; i++ {
remotePieces = append(remotePieces, &pb.RemotePiece{
PieceNum: i,
NodeId: testrand.NodeID(),
})
}
pointer.Remote = &pb.RemoteSegment{
RootPieceId: testrand.PieceID(),
RemotePieces: remotePieces,
}
}
return pointer
}
func newLastSegmentPointer(pointerType pb.Pointer_DataType, numSegments int, numPiecesPerSegment int32) (*pb.Pointer, error) {
pointer := newPointer(pointerType, numPiecesPerSegment)
meta := &pb.StreamMeta{
NumberOfSegments: int64(numSegments),
}
metaInBytes, err := pb.Marshal(meta)
if err != nil {
return nil, err
}
pointer.Metadata = metaInBytes
return pointer, nil
}
func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegments bool, paths [][]byte, numPiecesPerSegment int32, numSegments int) ([]*pb.Pointer, error) {
pointers := make([]*pb.Pointer, 0, len(paths))
isInlineAdded := false
for _, p := range paths {
_, segment, err := objectdeletion.ParseSegmentPath(p)
if err != nil {
return nil, err
}
if segment == lastSegmentIdx {
if !hasLastSegment {
pointers = append(pointers, nil)
} else {
lastSegmentPointer, err := newLastSegmentPointer(pb.Pointer_REMOTE, numSegments, numPiecesPerSegment)
if err != nil {
return nil, err
}
pointers = append(pointers, lastSegmentPointer)
}
continue
}
if !hasFirstSegment && segment == firstSegmentIdx {
pointers = append(pointers, nil)
continue
}
if hasInlineSegments && !isInlineAdded {
pointers = append(pointers, newPointer(pb.Pointer_INLINE, 0))
isInlineAdded = true
continue
}
pointers = append(pointers, newPointer(pb.Pointer_REMOTE, numPiecesPerSegment))
}
return pointers, nil
}
func createPaths(object *objectdeletion.ObjectIdentifier, largestSegmentIdx int) [][]byte {
paths := [][]byte{}
for i := 0; i <= largestSegmentIdx; i++ {
segmentIdx := i
if segmentIdx == largestSegmentIdx {
segmentIdx = lastSegmentIdx
}
paths = append(paths, createPath(object.ProjectID, object.Bucket, segmentIdx, object.EncryptedPath))
}
return paths
}
func createPath(projectID uuid.UUID, bucket []byte, segmentIdx int, encryptedPath []byte) []byte {
segment := "l"
if segmentIdx > lastSegmentIdx { // lastSegmentIndex = -1
segment = "s" + strconv.Itoa(segmentIdx)
}
entries := make([]string, 0)
entries = append(entries, projectID.String())
entries = append(entries, segment)
entries = append(entries, string(bucket))
entries = append(entries, string(encryptedPath))
return []byte(storj.JoinPaths(entries...))
}

View File

@ -0,0 +1,88 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package objectdeletion
import (
"context"
"storj.io/common/pb"
)
// ObjectState determines how an object should be handled during
// a delete operation.
// It also stores pointers related to an object.
type ObjectState struct {
ObjectIdentifier
LastSegment *pb.Pointer
ZeroSegment *pb.Pointer
OtherSegments []*pb.Pointer
}
// Status returns the current object status.
func (state *ObjectState) Status() ObjectStatus {
switch {
case state.LastSegment == nil && state.ZeroSegment == nil:
return ObjectMissing
case state.LastSegment != nil && state.ZeroSegment == nil:
return ObjectSingleSegment
case state.LastSegment != nil && state.ZeroSegment != nil:
return ObjectMultiSegment
case state.LastSegment == nil && state.ZeroSegment != nil:
return ObjectActiveOrZombie
default:
return ObjectMissing
}
}
// ObjectStatus is used to determine how to retrieve segment information
// from the database.
type ObjectStatus byte
const (
// ObjectMissing represents when there's no object with that particular name
ObjectMissing = ObjectStatus(iota)
// ObjectMultiSegment represents a multi segment object
ObjectMultiSegment
// ObjectSingleSegment represents a single segment object
ObjectSingleSegment
// ObjectActiveOrZombie represents either an object is being uploaded, deleted or it's a zombie
ObjectActiveOrZombie
)
// CreateObjectStates creates the current object states.
func CreateObjectStates(ctx context.Context, requests []*ObjectIdentifier, pointers []*pb.Pointer, paths [][]byte) (map[string]*ObjectState, error) {
// Fetch headers to figure out the status of objects.
objects := make(map[string]*ObjectState)
for _, req := range requests {
objects[req.Key()] = &ObjectState{
ObjectIdentifier: *req,
}
}
for i, p := range paths {
// Update our state map.
id, segment, err := ParseSegmentPath(p)
if err != nil {
return nil, Error.Wrap(err)
}
state, ok := objects[id.Key()]
if !ok {
return nil, Error.Wrap(err)
}
switch segment {
case lastSegmentIndex:
state.LastSegment = pointers[i]
case firstSegmentIndex:
state.ZeroSegment = pointers[i]
default:
return nil, Error.New("pointerDB failure")
}
}
return objects, nil
}

View File

@ -224,6 +224,34 @@ func (s *Service) Get(ctx context.Context, path string) (_ *pb.Pointer, err erro
return pointer, nil
}
// GetItems gets decoded pointers from DB.
// The return value is in the same order as the argument paths.
func (s *Service) GetItems(ctx context.Context, paths [][]byte) (_ []*pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
keys := make(storage.Keys, len(paths))
for i := range paths {
keys[i] = paths[i]
}
pointerBytes, err := s.db.GetAll(ctx, keys)
if err != nil {
return nil, Error.Wrap(err)
}
pointers := make([]*pb.Pointer, len(pointerBytes))
for i, p := range pointerBytes {
if p == nil {
continue
}
var pointer pb.Pointer
err = pb.Unmarshal([]byte(p), &pointer)
if err != nil {
return nil, Error.Wrap(err)
}
pointers[i] = &pointer
}
return pointers, nil
}
// GetWithBytes gets the protocol buffers encoded and decoded pointer from the DB.
func (s *Service) GetWithBytes(ctx context.Context, path string) (pointerBytes []byte, pointer *pb.Pointer, err error) {
defer mon.Task()(&ctx)(&err)
@ -337,7 +365,37 @@ func (s *Service) Delete(ctx context.Context, path string, oldPointerBytes []byt
return Error.Wrap(err)
}
// UnsynchronizedDelete deletes from item from db without verifying whether the pointer has changed in the database.
// UnsynchronizedGetDel deletes items from db without verifying whether the pointers have changed in the database,
// and it returns deleted items.
func (s *Service) UnsynchronizedGetDel(ctx context.Context, paths [][]byte) ([][]byte, []*pb.Pointer, error) {
keys := make(storage.Keys, len(paths))
for i := range paths {
keys[i] = paths[i]
}
items, err := s.db.DeleteMultiple(ctx, keys)
if err != nil {
return nil, nil, Error.Wrap(err)
}
pointerPaths := make([][]byte, 0, len(items))
pointers := make([]*pb.Pointer, 0, len(items))
for _, item := range items {
data := &pb.Pointer{}
err = pb.Unmarshal(item.Value, data)
if err != nil {
return nil, nil, Error.Wrap(err)
}
pointerPaths = append(pointerPaths, item.Key)
pointers = append(pointers, data)
}
return pointerPaths, pointers, nil
}
// UnsynchronizedDelete deletes item from db without verifying whether the pointer has changed in the database.
func (s *Service) UnsynchronizedDelete(ctx context.Context, path string) (err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -5,9 +5,12 @@ package metainfo_test
import (
"context"
"fmt"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"storj.io/common/memory"
"storj.io/common/pb"
@ -19,6 +22,8 @@ import (
"storj.io/storj/storage"
)
const lastSegmentIndex = -1
func TestIterate(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
@ -58,6 +63,81 @@ func TestIterate(t *testing.T) {
})
}
// TestGetItems_ReturnValueOrder ensures the return value
// of GetItems will always be the same order as the requested paths.
// The test does following steps:
// - Uploads test data (multi-segment objects)
// - Gather all object paths with an extra invalid path at random position
// - Retrieve pointers using above paths
// - Ensure the nil pointer and last segment paths are in the same order as their
// corresponding paths.
func TestGetItems_ReturnValueOrder(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 2, 4, 4),
testplanet.MaxSegmentSize(3*memory.KiB),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
uplinkPeer := planet.Uplinks[0]
numItems := 5
for i := 0; i < numItems; i++ {
path := fmt.Sprintf("test/path_%d", i)
err := uplinkPeer.Upload(ctx, satellite, "bucket", path, testrand.Bytes(15*memory.KiB))
require.NoError(t, err)
}
keys, err := satellite.Metainfo.Database.List(ctx, nil, numItems)
require.NoError(t, err)
var paths = make([][]byte, 0, numItems+1)
var lastSegmentPathIndices []int
// Random nil pointer
nilPointerIndex := testrand.Intn(numItems + 1)
for i, key := range keys {
paths = append(paths, []byte(key.String()))
segmentIdx, err := parseSegmentPath([]byte(key.String()))
require.NoError(t, err)
if segmentIdx == lastSegmentIndex {
lastSegmentPathIndices = append(lastSegmentPathIndices, i)
}
// set a random path to be nil.
if nilPointerIndex == i {
paths[nilPointerIndex] = nil
}
}
pointers, err := satellite.Metainfo.Service.GetItems(ctx, paths)
require.NoError(t, err)
for i, p := range pointers {
if p == nil {
require.Equal(t, nilPointerIndex, i)
continue
}
meta := pb.StreamMeta{}
metaInBytes := p.GetMetadata()
err = pb.Unmarshal(metaInBytes, &meta)
require.NoError(t, err)
lastSegmentMeta := meta.GetLastSegmentMeta()
if lastSegmentMeta != nil {
require.Equal(t, lastSegmentPathIndices[i], i)
}
}
})
}
func TestUpdatePiecesCheckDuplicates(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
@ -152,3 +232,21 @@ func TestCountBuckets(t *testing.T) {
require.Equal(t, 2, count)
})
}
func parseSegmentPath(segmentPath []byte) (segmentIndex int64, err error) {
elements := storj.SplitPath(string(segmentPath))
if len(elements) < 4 {
return -1, errs.New("invalid path %q", string(segmentPath))
}
// var segmentIndex int64
if elements[1] == "l" {
segmentIndex = lastSegmentIndex
} else {
segmentIndex, err = strconv.ParseInt(elements[1][1:], 10, 64)
if err != nil {
return lastSegmentIndex, errs.Wrap(err)
}
}
return segmentIndex, nil
}