satellte/metainfo: Make BeginDeleteObject to delete pieces

For improving the deletion performance we are shifting the
responsibility to delete the pieces of the object from Uplink to the
Satellite.

BeginDeleteObject was the first call to return the stream ID which was
used for after retrieving the list of segments and then get addressed
order limits for deleting the pieces (of each segment) from the storage
nodes.

Now we want the Satellite deletes the pieces of all the object segments
from the storage nodes hence we don't need anymore to have several
network round trips between the Uplink and the Satellite because the
Satellite can delete all of them in the initial BegingDeleteObject
request.

satellite/metainfo.ListSegments has been changed to return 0 items if
the pointer of the last segment of an object is not found because we
need to preserve the backward compatibility with Uplinks that won't be
updated to the last release and they rely on listing the segments after
calling BeginDeleteObject for retrieving the addressed order limits
to contact the storage nodes to delete the pieces.

Change-Id: I5f99ecf27d62d65b0a062936b9b17581ef692af0
This commit is contained in:
Ivan Fraixedes 2019-12-16 20:03:20 +01:00 committed by Ivan Fraixedes
parent 71ea2b0dc0
commit c3b58f1656
5 changed files with 84 additions and 133 deletions

View File

@ -291,10 +291,13 @@ func TestDeleteWithOfflineStoragenode(t *testing.T) {
} }
err = planet.Uplinks[0].Delete(ctx, planet.Satellites[0], "test-bucket", "test-file") err = planet.Uplinks[0].Delete(ctx, planet.Satellites[0], "test-bucket", "test-file")
require.NoError(t, err)
_, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "test-bucket", "test-file")
require.Error(t, err) require.Error(t, err)
require.True(t, storj.ErrObjectNotFound.Has(err))
key := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()] key := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], key) metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], key)
require.NoError(t, err) require.NoError(t, err)
defer ctx.Check(metainfoClient.Close) defer ctx.Check(metainfoClient.Close)

View File

@ -92,7 +92,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
}) })
t.Run("some nodes down", func(t *testing.T) { t.Run("some nodes down", func(t *testing.T) {
t.Skip("TODO: v3-3364")
t.Parallel() t.Parallel()
var testCases = []struct { var testCases = []struct {
@ -100,7 +99,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
objData []byte objData []byte
}{ }{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)}, {caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)}, {caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
} }
@ -177,7 +175,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
}) })
t.Run("all nodes down", func(t *testing.T) { t.Run("all nodes down", func(t *testing.T) {
t.Skip("TODO: v3-3364")
t.Parallel() t.Parallel()
var testCases = []struct { var testCases = []struct {
@ -185,7 +182,6 @@ func TestEndpoint_DeleteObjectPieces(t *testing.T) {
objData []byte objData []byte
}{ }{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)}, {caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)}, {caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)}, {caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
} }

View File

@ -1408,12 +1408,11 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error()) return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
_, _, err = endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, satStreamID.Bucket, satStreamID.EncryptedPath) err = endpoint.DeleteObjectPieces(ctx, keyInfo.ProjectID, satStreamID.Bucket, satStreamID.EncryptedPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
endpoint.log.Info("Delete Object", zap.Stringer("Project ID", keyInfo.ProjectID))
return &pb.ObjectBeginDeleteResponse{ return &pb.ObjectBeginDeleteResponse{
StreamId: streamID, StreamId: streamID,
}, nil }, nil
@ -1855,6 +1854,9 @@ func (endpoint *Endpoint) ListSegments(ctx context.Context, req *pb.SegmentListR
pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath) pointer, _, err := endpoint.getPointer(ctx, keyInfo.ProjectID, lastSegment, streamID.Bucket, streamID.EncryptedPath)
if err != nil { if err != nil {
if rpcstatus.Code(err) == rpcstatus.NotFound {
return &pb.SegmentListResponse{}, nil
}
return nil, err return nil, err
} }
@ -2283,29 +2285,58 @@ func (endpoint *Endpoint) DeleteObjectPieces(
nodeIDs storj.NodeIDList nodeIDs storj.NodeIDList
) )
for i := int64(lastSegment); i < (numOfSegments - 1); i++ { for segmentIdx := int64(lastSegment); segmentIdx < (numOfSegments - 1); segmentIdx++ {
pointer, _, err := endpoint.getPointer(ctx, projectID, i, bucket, encryptedPath) pointer, err := endpoint.deletePointer(ctx, projectID, segmentIdx, bucket, encryptedPath)
if err != nil { if err != nil {
if rpcstatus.Code(err) != rpcstatus.NotFound { // Only return the error for aborting the operation if it happens on the
return err // first iteration
} if segmentIdx == int64(lastSegment) {
if !knownNumOfSegments { if storj.ErrObjectNotFound.Has(err) {
// Because we don't know the number of segments, we assume that if the return rpcstatus.Error(rpcstatus.NotFound, "object doesn't exist")
// pointer isn't found then we reached in the previous iteration the }
// segment before the last one.
break endpoint.log.Error("unexpected error while deleting object pieces",
zap.Stringer("project_id", projectID),
zap.ByteString("bucket_name", bucket),
zap.Binary("encrypted_path", encryptedPath),
zap.Error(err),
)
return rpcstatus.Error(rpcstatus.Internal, err.Error())
} }
segment := "l" if storj.ErrObjectNotFound.Has(err) {
if i != lastSegment { if !knownNumOfSegments {
segment = "s" + strconv.FormatInt(i, 10) // Because we don't know the number of segments, we assume that if the
// pointer isn't found then we reached in the previous iteration the
// segment before the last one.
break
}
segment := "s" + strconv.FormatInt(segmentIdx, 10)
endpoint.log.Warn(
"unexpected not found error while deleting a pointer, it may have been deleted concurrently",
zap.String("pointer_path",
fmt.Sprintf("%s/%s/%s/%q", projectID, segment, bucket, encryptedPath),
),
zap.String("segment", segment),
)
} else {
segment := "s" + strconv.FormatInt(segmentIdx, 10)
endpoint.log.Warn(
"unexpected error while deleting a pointer",
zap.String("pointer_path",
fmt.Sprintf("%s/%s/%s/%q", projectID, segment, bucket, encryptedPath),
),
zap.String("segment", segment),
zap.Error(err),
)
} }
endpoint.log.Warn(
"expected pointer not found, it may have been deleted concurrently", // We continue with the next segment for not deleting the pieces of this
zap.String("pointer_path", // pointer and avoiding that some storage nodes fail audits due to a
fmt.Sprintf("%s/%s/%s/%q", projectID, segment, bucket, encryptedPath), // missing piece.
), // If it was not found them we assume that the pieces were deleted by
) // another request running concurrently.
continue continue
} }
@ -2376,3 +2407,25 @@ func (endpoint *Endpoint) DeleteObjectPieces(
limiter.Wait() limiter.Wait()
return nil return nil
} }
// deletePointer deletes a pointer returning the deleted pointer.
//
// If the pointer isn't found when getting or deleting it, it returns
// storj.ErrObjectNotFound error.
func (endpoint *Endpoint) deletePointer(
ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, encryptedPath []byte,
) (_ *pb.Pointer, err error) {
defer mon.Task()(&ctx, projectID, segmentIndex, bucket, encryptedPath)(&err)
pointer, path, err := endpoint.getPointer(ctx, projectID, segmentIndex, bucket, encryptedPath)
if err != nil {
return nil, err
}
err = endpoint.metainfo.UnsynchronizedDelete(ctx, path)
if err != nil {
return nil, err
}
return pointer, nil
}

View File

@ -1365,32 +1365,12 @@ func TestInlineSegment(t *testing.T) {
EncryptedPath: params.EncryptedPath, EncryptedPath: params.EncryptedPath,
}) })
require.NoError(t, err) require.NoError(t, err)
segments, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{
items, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{
StreamID: streamID, StreamID: streamID,
}) })
require.NoError(t, err) require.NoError(t, err)
for _, item := range items { require.Empty(t, segments)
segmentID, limits, _, err := metainfoClient.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{ require.False(t, more)
StreamID: streamID,
Position: storj.SegmentPosition{
Index: item.Position.Index,
},
})
require.NoError(t, err)
require.Nil(t, limits)
err = metainfoClient.FinishDeleteSegment(ctx, metainfo.FinishDeleteSegmentParams{
SegmentID: segmentID,
})
require.NoError(t, err)
}
_, _, err = metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{
StreamID: streamID,
})
require.Error(t, err)
require.True(t, storj.ErrObjectNotFound.Has(err))
err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{ err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{
StreamID: streamID, StreamID: streamID,
@ -1460,26 +1440,12 @@ func TestRemoteSegment(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
segments, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{ segments, more, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{
StreamID: streamID, StreamID: streamID,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, segments)
for _, segment := range segments { require.False(t, more)
segmentID, limits, _, err := metainfoClient.BeginDeleteSegment(ctx, metainfo.BeginDeleteSegmentParams{
StreamID: streamID,
Position: storj.SegmentPosition{
Index: segment.Position.Index,
},
})
require.NoError(t, err)
require.NotEmpty(t, limits)
err = metainfoClient.FinishDeleteSegment(ctx, metainfo.FinishDeleteSegmentParams{
SegmentID: segmentID,
})
require.NoError(t, err)
}
err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{ err = metainfoClient.FinishDeleteObject(ctx, metainfo.FinishDeleteObjectParams{
StreamID: streamID, StreamID: streamID,

View File

@ -17,7 +17,6 @@ import (
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console" "storj.io/storj/satellite/console"
"storj.io/storj/uplink/ecclient" "storj.io/storj/uplink/ecclient"
@ -32,72 +31,6 @@ const (
TestEncKey = "test-encryption-key" TestEncKey = "test-encryption-key"
) )
// TestStreamsInterruptedDelete tests a special case where the delete command is
// interrupted before all segments are deleted. On subsequent calls to
// streamStore.Delete we want to ensure it completes the delete without error,
// even though some segments have already been deleted.
func TestStreamsInterruptedDelete(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
metainfoClient, segmentStore, streamStore := storeTestSetup(t, ctx, planet, memory.KiB.Int64())
defer ctx.Check(metainfoClient.Close)
bucketName := "bucket-name"
err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName)
require.NoError(t, err)
content := testrand.Bytes(2 * memory.KiB)
path := "mypath"
fullPath := storj.JoinPaths(bucketName, "mypath")
_, err = streamStore.Put(ctx, fullPath, storj.EncNull, bytes.NewReader(content), nil, time.Time{})
require.NoError(t, err)
// Ensure the item shows when we list
listItems, _, err := streamStore.List(ctx, bucketName, "", storj.EncNull, true, 10, meta.None)
require.NoError(t, err)
require.True(t, len(listItems) == 1)
streamID, err := metainfoClient.BeginDeleteObject(ctx, metainfo.BeginDeleteObjectParams{
Bucket: []byte(bucketName),
EncryptedPath: []byte(path),
})
require.NoError(t, err)
segmentItems, _, err := metainfoClient.ListSegments(ctx, metainfo.ListSegmentsParams{
StreamID: streamID,
CursorPosition: storj.SegmentPosition{
Index: 0,
},
})
require.NoError(t, err)
// We need at least 2 items to do this test
require.True(t, len(segmentItems) > 1)
// Delete just the first item
require.NoError(t, segmentStore.Delete(ctx, streamID, segmentItems[0].Position.Index))
// It should *still* show when we list, as we've only deleted one
// segment
listItems, _, err = streamStore.List(ctx, bucketName, "", storj.EncNull, true, 10, meta.None)
require.NoError(t, err)
require.True(t, len(listItems) == 1)
// Now call the streamStore delete method. This should delete all
// remaining segments and the file pointer itself without failing
// because of the missing first segment.
_ = streamStore.Delete(ctx, fullPath, storj.EncNull)
// Now it should have 0 list items
listItems, _, err = streamStore.List(ctx, bucketName, "", storj.EncNull, true, 10, meta.None)
require.NoError(t, err)
require.True(t, len(listItems) == 0)
})
}
func TestStreamStoreList(t *testing.T) { func TestStreamStoreList(t *testing.T) {
runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) { runTest(t, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, streamStore streams.Store) {
expiration := time.Now().Add(10 * 24 * time.Hour) expiration := time.Now().Add(10 * 24 * time.Hour)