satellite/metainfo: Delete segments in reverse order

Change DeleteObjectPieces for deleting the segments' pointers of an
object in a reverse order.

Last segment: L
N: total number of segments

Deleting in reverse order is: L, n-1 to 0

Deleting in reverse order makes BeginDeleteObject usable to delete
partially uploaded objects that were interrupted (e.g. upload
cancellation).

With this change, the uplink upload cancellation, can be changed to use
BeginDeleteObject to cleanup already uploaded segments without having to
retrieve orders and dial every single node which stored a piece.

Ticket: https://storjlabs.atlassian.net/browse/V3-3525
Change-Id: Ieca6fd3801c4b71671811cb5f08a99d5146928a6
This commit is contained in:
Ivan Fraixedes 2020-01-17 19:47:37 +01:00 committed by Kaloyan Raev
parent 416e5053a3
commit d5a60aec58
No known key found for this signature in database
GPG Key ID: BFC30A9447742B1C
2 changed files with 356 additions and 39 deletions

View File

@ -12,11 +12,13 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/cmd/uplink/cmd" "storj.io/storj/cmd/uplink/cmd"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/storage" "storj.io/storj/storage"
) )
@ -27,6 +29,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
// Reconfigure RS for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
}, },
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -54,8 +58,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
objectName = "object-filename" + strconv.Itoa(i) objectName = "object-filename" + strconv.Itoa(i)
) )
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{ Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB, SegmentSize: 10 * memory.KiB,
@ -123,6 +125,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
// Reconfigure RS for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
}, },
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -130,8 +134,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
uplnk = planet.Uplinks[0] uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0] satelliteSys = planet.Satellites[0]
) )
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{ Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB, SegmentSize: 10 * memory.KiB,
@ -198,6 +200,8 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{ testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{ Reconfigure: testplanet.Reconfigure{
// Reconfigure RS for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
}, },
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
@ -206,8 +210,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
satelliteSys = planet.Satellites[0] satelliteSys = planet.Satellites[0]
) )
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{ err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{ Client: cmd.ClientConfig{
SegmentSize: 10 * memory.KiB, SegmentSize: 10 * memory.KiB,
@ -242,6 +244,210 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
}) })
} }
func TestEndpoint_DeleteObjectPieces_ObjectWithoutLastSegment(t *testing.T) {
t.Run("continuous segments", func(t *testing.T) {
t.Parallel()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// Reconfigure RS for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
const segmentSize = 10 * memory.KiB
var testCases = []struct {
caseDescription string
objData []byte
}{
{
caseDescription: "one segment",
objData: testrand.Bytes(2 * segmentSize),
},
{
caseDescription: "several segments",
objData: testrand.Bytes(4 * segmentSize),
},
{
caseDescription: "several segments last inline",
objData: testrand.Bytes((2 * segmentSize) + (3 * memory.KiB)),
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const bucketName = "a-bucket"
// Use a different name for avoid collisions without having to run
// testplanet for each test cases. We cannot upload to the same path
// because it fails due to the zombie segments left by previous test
// cases
var objectName = tc.caseDescription
projectID, encryptedPath := uploadFirstObjectWithoutLastSegmentPointer(
ctx, t, uplnk, satelliteSys, segmentSize, bucketName, objectName, tc.objData,
)
// calculate the SNs total used space after data upload
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
err := satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.NoError(t, err)
// confirm that the object was deleted
err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.Error(t, err)
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)
// calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes {
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpaceAfterDelete += usedSpace
}
if totalUsedSpaceAfterDelete >= totalUsedSpace {
t.Fatalf(
"used space after deletion. want before > after, got %d <= %d",
totalUsedSpace, totalUsedSpaceAfterDelete,
)
}
})
}
})
})
t.Run("sparse segments", func(t *testing.T) {
t.Parallel()
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
// Reconfigure RS for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
const segmentSize = 10 * memory.KiB
var testCases = []struct {
caseDescription string
objData []byte
noSegmentsIndexes []int64 // Witout the last segment which is always included
expectedMaxGarbageFactor float64
expectedNotFoundErr bool
}{
{
caseDescription: "some firsts",
objData: testrand.Bytes(10 * segmentSize),
noSegmentsIndexes: []int64{3, 5, 6, 9}, // Object with no pointers: L, 3, 5, 6, 9
expectedNotFoundErr: false,
},
{
caseDescription: "some firsts inline",
objData: testrand.Bytes((9 * segmentSize) + (3 * memory.KiB)),
noSegmentsIndexes: []int64{4, 5, 6}, // Object with no pointers: L, 4, 5, 6
expectedNotFoundErr: false,
},
{
caseDescription: "no first",
objData: testrand.Bytes(10 * segmentSize),
noSegmentsIndexes: []int64{0}, // Object with no pointer to : L, 0
expectedNotFoundErr: true,
},
{
caseDescription: "no firsts",
objData: testrand.Bytes(8 * segmentSize),
noSegmentsIndexes: []int64{0, 2, 5}, // Object with no pointer to : L, 0, 2, 5
expectedNotFoundErr: true,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const bucketName = "a-bucket"
// Use a different name for avoid collisions without having to run
// testplanet for each test cases. We cannot upload to the same path
// because it fails due to the zombie segments left by previous test
// cases
var objectName = tc.caseDescription
// add the last segment to the indicated no segments to upload
noSegmentsIndexes := []int64{-1}
noSegmentsIndexes = append(noSegmentsIndexes, tc.noSegmentsIndexes...)
projectID, encryptedPath := uploadFirstObjectWithoutSomeSegmentsPointers(
ctx, t, uplnk, satelliteSys, segmentSize, bucketName, objectName, tc.objData, noSegmentsIndexes,
)
// calculate the SNs used space
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
err := satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
if tc.expectedNotFoundErr {
require.Error(t, err)
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)
return
}
require.NoError(t, err)
// confirm that the object was deleted
err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.Error(t, err)
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)
// calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes {
usedSpace, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpaceAfterDelete += usedSpace
}
if totalUsedSpaceAfterDelete >= totalUsedSpace {
t.Fatalf(
"used space after deletion. want before > after, got %d <= %d",
totalUsedSpace, totalUsedSpaceAfterDelete,
)
}
})
}
})
})
}
func getProjectIDAndEncPathFirstObject( func getProjectIDAndEncPathFirstObject(
ctx context.Context, t *testing.T, satellite *testplanet.SatelliteSystem, ctx context.Context, t *testing.T, satellite *testplanet.SatelliteSystem,
) (projectID *uuid.UUID, encryptedPath []byte) { ) (projectID *uuid.UUID, encryptedPath []byte) {
@ -257,3 +463,47 @@ func getProjectIDAndEncPathFirstObject(
return projectID, encryptedPath return projectID, encryptedPath
} }
func uploadFirstObjectWithoutLastSegmentPointer(
ctx context.Context, t *testing.T, uplnk *testplanet.Uplink,
satelliteSys *testplanet.SatelliteSystem, segmentSize memory.Size,
bucketName string, objectName string, objectData []byte,
) (projectID *uuid.UUID, encryptedPath []byte) {
t.Helper()
return uploadFirstObjectWithoutSomeSegmentsPointers(
ctx, t, uplnk, satelliteSys, segmentSize, bucketName, objectName, objectData, []int64{-1},
)
}
func uploadFirstObjectWithoutSomeSegmentsPointers(
ctx context.Context, t *testing.T, uplnk *testplanet.Uplink,
satelliteSys *testplanet.SatelliteSystem, segmentSize memory.Size,
bucketName string, objectName string, objectData []byte, noSegmentsIndexes []int64,
) (projectID *uuid.UUID, encryptedPath []byte) {
t.Helper()
if len(noSegmentsIndexes) < 1 {
t.Fatal("noSegments list must have at least one segment")
}
err := uplnk.UploadWithClientConfig(ctx, satelliteSys, cmd.Config{
Client: cmd.ClientConfig{
SegmentSize: segmentSize,
},
},
bucketName, objectName, objectData,
)
require.NoError(t, err)
projectID, encryptedPath = getProjectIDAndEncPathFirstObject(ctx, t, satelliteSys)
for _, segIndx := range noSegmentsIndexes {
path, err := metainfo.CreatePath(ctx, *projectID, segIndx, []byte(bucketName), encryptedPath)
require.NoError(t, err)
err = satelliteSys.Metainfo.Service.UnsynchronizedDelete(ctx, path)
require.NoError(t, err)
}
return projectID, encryptedPath
}

View File

@ -8,7 +8,6 @@ import (
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt" "fmt"
"math"
"strconv" "strconv"
"time" "time"
@ -2287,16 +2286,46 @@ func (endpoint *Endpoint) DeleteObjectPieces(
// We should ignore client cancelling and always try to delete segments. // We should ignore client cancelling and always try to delete segments.
ctx = context2.WithoutCancellation(ctx) ctx = context2.WithoutCancellation(ctx)
numOfSegments, err := endpoint.getObjectNumberOfSegments(ctx, projectID, bucket, encryptedPath) var (
if err != nil { lastSegmentNotFound = false
return err prevLastSegmentIndex int64
} )
{
numOfSegments, err := endpoint.getObjectNumberOfSegments(ctx, projectID, bucket, encryptedPath)
if err != nil {
if !errs2.IsRPC(err, rpcstatus.NotFound) {
return err
}
knownNumOfSegments := false // Not found is that the last segment doesn't exist, so we proceed deleting
if numOfSegments == 0 { // in a reverse order the continuous segments starting from index 0
numOfSegments = math.MaxInt64 lastSegmentNotFound = true
} else { {
knownNumOfSegments = true var err error
prevLastSegmentIndex, err = endpoint.findIndexPreviousLastSegmentWhenNotKnowingNumSegments(
ctx, projectID, bucket, encryptedPath,
)
if err != nil {
endpoint.log.Error("unexpected error while finding last segment index previous to the last segment",
zap.Stringer("project_id", projectID),
zap.ByteString("bucket_name", bucket),
zap.Binary("encrypted_path", encryptedPath),
zap.Error(err),
)
return err
}
}
// There no last segment and any continuous segment so we return the
// NotFound error handled in this conditional block
if prevLastSegmentIndex == -1 {
return err
}
} else {
prevLastSegmentIndex = numOfSegments - 2 // because of the last segment and because it's an index
}
} }
var ( var (
@ -2304,16 +2333,19 @@ func (endpoint *Endpoint) DeleteObjectPieces(
nodeIDs storj.NodeIDList nodeIDs storj.NodeIDList
) )
for segmentIdx := int64(lastSegment); segmentIdx < (numOfSegments - 1); segmentIdx++ { if !lastSegmentNotFound {
pointer, err := endpoint.deletePointer(ctx, projectID, segmentIdx, bucket, encryptedPath) // first delete the last segment
pointer, err := endpoint.deletePointer(ctx, projectID, lastSegment, bucket, encryptedPath)
if err != nil { if err != nil {
// Only return the error for aborting the operation if it happens on the if storj.ErrObjectNotFound.Has(err) {
// first iteration endpoint.log.Warn(
if segmentIdx == int64(lastSegment) { "unexpected not found error while deleting a pointer, it may have been deleted concurrently",
if storj.ErrObjectNotFound.Has(err) { zap.String("pointer_path",
return rpcstatus.Error(rpcstatus.NotFound, "object doesn't exist") fmt.Sprintf("%s/l/%s/%q", projectID, bucket, encryptedPath),
} ),
zap.String("segment", "l"),
)
} else {
endpoint.log.Error("unexpected error while deleting object pieces", endpoint.log.Error("unexpected error while deleting object pieces",
zap.Stringer("project_id", projectID), zap.Stringer("project_id", projectID),
zap.ByteString("bucket_name", bucket), zap.ByteString("bucket_name", bucket),
@ -2322,16 +2354,29 @@ func (endpoint *Endpoint) DeleteObjectPieces(
) )
return rpcstatus.Error(rpcstatus.Internal, err.Error()) return rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
}
if storj.ErrObjectNotFound.Has(err) { if err == nil && pointer.Type == pb.Pointer_REMOTE {
if !knownNumOfSegments { rootPieceID := pointer.GetRemote().RootPieceId
// Because we don't know the number of segments, we assume that if the for _, piece := range pointer.GetRemote().GetRemotePieces() {
// pointer isn't found then we reached in the previous iteration the pieceID := rootPieceID.Derive(piece.NodeId, piece.PieceNum)
// segment before the last one. pieces, ok := nodesPieces[piece.NodeId]
break if !ok {
nodesPieces[piece.NodeId] = []storj.PieceID{pieceID}
nodeIDs = append(nodeIDs, piece.NodeId)
continue
} }
segment := "s" + strconv.FormatInt(segmentIdx, 10) nodesPieces[piece.NodeId] = append(pieces, pieceID)
}
}
}
for segmentIdx := prevLastSegmentIndex; segmentIdx >= 0; segmentIdx-- {
pointer, err := endpoint.deletePointer(ctx, projectID, segmentIdx, bucket, encryptedPath)
if err != nil {
segment := "s" + strconv.FormatInt(segmentIdx, 10)
if storj.ErrObjectNotFound.Has(err) {
endpoint.log.Warn( endpoint.log.Warn(
"unexpected not found error while deleting a pointer, it may have been deleted concurrently", "unexpected not found error while deleting a pointer, it may have been deleted concurrently",
zap.String("pointer_path", zap.String("pointer_path",
@ -2340,7 +2385,6 @@ func (endpoint *Endpoint) DeleteObjectPieces(
zap.String("segment", segment), zap.String("segment", segment),
) )
} else { } else {
segment := "s" + strconv.FormatInt(segmentIdx, 10)
endpoint.log.Warn( endpoint.log.Warn(
"unexpected error while deleting a pointer", "unexpected error while deleting a pointer",
zap.String("pointer_path", zap.String("pointer_path",
@ -2351,11 +2395,8 @@ func (endpoint *Endpoint) DeleteObjectPieces(
) )
} }
// We continue with the next segment for not deleting the pieces of this // We continue with the next segment and we leave the pieces of this
// pointer and avoiding that some storage nodes fail audits due to a // segment to be deleted by the garbage collector
// missing piece.
// If it was not found them we assume that the pieces were deleted by
// another request running concurrently.
continue continue
} }
@ -2378,7 +2419,6 @@ func (endpoint *Endpoint) DeleteObjectPieces(
} }
if len(nodeIDs) == 0 { if len(nodeIDs) == 0 {
// Pieces will be collected by garbage collector
return return
} }
@ -2429,3 +2469,30 @@ func (endpoint *Endpoint) deletePointer(
return pointer, nil return pointer, nil
} }
// findIndexPreviousLastSegmentWhenNotKnowingNumSegments returns the index of
// the segment previous to the last segment when there is an unknown number of
// segments.
//
// It returns -1 index if none is found and error if there is some error getting
// the segments' pointers.
func (endpoint *Endpoint) findIndexPreviousLastSegmentWhenNotKnowingNumSegments(
ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte,
) (index int64, err error) {
defer mon.Task()(&ctx, projectID, bucket, encryptedPath)(&err)
lastIdxFound := int64(-1)
for {
_, _, err := endpoint.getPointer(ctx, projectID, lastIdxFound+1, bucket, encryptedPath)
if err != nil {
if errs2.IsRPC(err, rpcstatus.NotFound) {
break
}
return -1, err
}
lastIdxFound++
}
return lastIdxFound, nil
}