satellite: Create method for deleting pieces of SNs

Create a method for deleting pieces of the storage nodes and add a test
for it.

Change-Id: I5fffc8c43d900d317961098b1d135ce3223b73ea
This commit is contained in:
Ivan Fraixedes 2019-12-11 18:44:13 +01:00
parent 5ee1a00857
commit 366f4b9493
No known key found for this signature in database
GPG Key ID: 042B474597F96DB7
3 changed files with 420 additions and 2 deletions

View File

@ -306,6 +306,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, pointerDB metai
peer.DB.Attribution(), peer.DB.Attribution(),
peer.Marketing.PartnersService, peer.Marketing.PartnersService,
peer.DB.PeerIdentities(), peer.DB.PeerIdentities(),
peer.Dialer,
peer.DB.Console().APIKeys(), peer.DB.Console().APIKeys(),
peer.Accounting.ProjectUsage, peer.Accounting.ProjectUsage,
config.Metainfo.RS, config.Metainfo.RS,

View File

@ -0,0 +1,269 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo_test
import (
"context"
"testing"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/stretchr/testify/require"
"storj.io/storj/pkg/storj"
"storj.io/storj/private/memory"
"storj.io/storj/private/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/private/testrand"
"storj.io/storj/storage"
"storj.io/storj/uplink"
)
func TestEndpoint_DeleteObjectPieces(t *testing.T) {
t.Run("all nodes up", func(t *testing.T) {
t.Parallel()
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
var testCases = []struct {
caseDescription string
objData []byte
}{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const (
bucketName = "a-bucket"
objectName = "object-filename"
)
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, uplink.Config{
Client: uplink.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
},
bucketName, objectName, tc.objData,
)
require.NoError(t, err)
projectID, encryptedPath := getProjectIDAndEncPathFirstObject(ctx, t, satelliteSys)
err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.NoError(t, err)
// Check that storage nodes don't hold any data after the satellite
// delete the pieces
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.Zero(t, totalUsedSpace, "totalUsedSpace")
})
}
})
t.Run("some nodes down", func(t *testing.T) {
t.Skip("TODO: v3-3364")
t.Parallel()
var testCases = []struct {
caseDescription string
objData []byte
}{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
const (
bucketName = "a-bucket"
objectName = "object-filename"
)
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, uplink.Config{
Client: uplink.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
}, bucketName, objectName, tc.objData)
require.NoError(t, err)
// Shutdown the first 2 storage nodes before we delete the pieces
require.NoError(t, planet.StopPeer(planet.StorageNodes[0]))
require.NoError(t, planet.StopPeer(planet.StorageNodes[1]))
projectID, encryptedPath := getProjectIDAndEncPathFirstObject(ctx, t, satelliteSys)
err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.NoError(t, err)
// Check that storage nodes that were offline when deleting the pieces
// they are still holding data
var totalUsedSpace int64
for i := 0; i < 2; i++ {
usedSpace, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.NotZero(t, totalUsedSpace, "totalUsedSpace offline nodes")
// Check that storage nodes which are online when deleting pieces don't
// hold any piece
totalUsedSpace = 0
for i := 2; i < len(planet.StorageNodes); i++ {
usedSpace, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes")
})
}
})
t.Run("all nodes down", func(t *testing.T) {
t.Skip("TODO: v3-3364")
t.Parallel()
var testCases = []struct {
caseDescription string
objData []byte
}{
{caseDescription: "one remote segment", objData: testrand.Bytes(10 * memory.KiB)},
{caseDescription: "one inline segment", objData: testrand.Bytes(3 * memory.KiB)},
{caseDescription: "several segments (all remote)", objData: testrand.Bytes(50 * memory.KiB)},
{caseDescription: "several segments (remote + inline)", objData: testrand.Bytes(33 * memory.KiB)},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.caseDescription, func(t *testing.T) {
const (
bucketName = "a-bucket"
objectName = "object-filename"
)
ctx := testcontext.New(t)
defer ctx.Cleanup()
planet, err := testplanet.New(t, 1, 4, 1)
require.NoError(t, err)
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
var (
uplnk = planet.Uplinks[0]
satelliteSys = planet.Satellites[0]
)
// Use RSConfig for ensuring that we don't have long-tail cancellations and the
// upload doesn't leave garbage in the SNs
err = uplnk.UploadWithClientConfig(ctx, satelliteSys, uplink.Config{
Client: uplink.ClientConfig{
SegmentSize: 10 * memory.KiB,
},
RS: uplink.RSConfig{
MinThreshold: 2,
RepairThreshold: 2,
SuccessThreshold: 4,
MaxThreshold: 4,
},
}, bucketName, objectName, tc.objData)
require.NoError(t, err)
// Shutdown all the storage nodes before we delete the pieces
for _, sn := range planet.StorageNodes {
require.NoError(t, planet.StopPeer(sn))
}
projectID, encryptedPath := getProjectIDAndEncPathFirstObject(ctx, t, satelliteSys)
err = satelliteSys.Metainfo.Endpoint2.DeleteObjectPieces(
ctx, *projectID, []byte(bucketName), encryptedPath,
)
require.NoError(t, err)
// Check that storage nodes that were offline when deleting the pieces
// they are still holding data
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
usedSpace, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += usedSpace
}
require.NotZero(t, totalUsedSpace, "totalUsedSpace")
})
}
})
}
func getProjectIDAndEncPathFirstObject(
ctx context.Context, t *testing.T, satellite *testplanet.SatelliteSystem,
) (projectID *uuid.UUID, encryptedPath []byte) {
t.Helper()
keys, err := satellite.Metainfo.Database.List(ctx, storage.Key{}, 1)
require.NoError(t, err)
keyParts := storj.SplitPath(keys[0].String())
require.Len(t, keyParts, 4)
projectID, err = uuid.Parse(keyParts[0])
require.NoError(t, err)
encryptedPath = []byte(keyParts[3])
return projectID, encryptedPath
}

View File

@ -8,6 +8,7 @@ import (
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt" "fmt"
"math"
"strconv" "strconv"
"time" "time"
@ -20,10 +21,12 @@ import (
"storj.io/storj/pkg/identity" "storj.io/storj/pkg/identity"
"storj.io/storj/pkg/macaroon" "storj.io/storj/pkg/macaroon"
"storj.io/storj/pkg/pb" "storj.io/storj/pkg/pb"
"storj.io/storj/pkg/rpc"
"storj.io/storj/pkg/rpc/rpcstatus" "storj.io/storj/pkg/rpc/rpcstatus"
"storj.io/storj/pkg/signing" "storj.io/storj/pkg/signing"
"storj.io/storj/pkg/storj" "storj.io/storj/pkg/storj"
"storj.io/storj/private/dbutil" "storj.io/storj/private/dbutil"
"storj.io/storj/private/sync2"
"storj.io/storj/satellite/accounting" "storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/attribution" "storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console" "storj.io/storj/satellite/console"
@ -31,6 +34,7 @@ import (
"storj.io/storj/satellite/overlay" "storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/rewards" "storj.io/storj/satellite/rewards"
"storj.io/storj/uplink/eestream" "storj.io/storj/uplink/eestream"
"storj.io/storj/uplink/piecestore"
"storj.io/storj/uplink/storage/meta" "storj.io/storj/uplink/storage/meta"
) )
@ -39,6 +43,9 @@ const (
satIDExpiration = 24 * time.Hour satIDExpiration = 24 * time.Hour
lastSegment = -1 lastSegment = -1
listLimit = 1000 listLimit = 1000
// TODO: orange/v3-3184 no idea what value should be set here. In the future
// we may want to make this value configurable.
deleteObjectPiecesConcurrencyLimit = 10
) )
var ( var (
@ -75,6 +82,7 @@ type Endpoint struct {
partners *rewards.PartnersService partners *rewards.PartnersService
peerIdentities overlay.PeerIdentities peerIdentities overlay.PeerIdentities
projectUsage *accounting.Service projectUsage *accounting.Service
dialer rpc.Dialer
apiKeys APIKeys apiKeys APIKeys
createRequests *createRequests createRequests *createRequests
requiredRSConfig RSConfig requiredRSConfig RSConfig
@ -82,10 +90,10 @@ type Endpoint struct {
maxCommitInterval time.Duration maxCommitInterval time.Duration
} }
// NewEndpoint creates new metainfo endpoint instance // NewEndpoint creates new metainfo endpoint instance.
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Service, func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Service,
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities, attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig, satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint { dialer rpc.Dialer, apiKeys APIKeys, projectUsage *accounting.Service, rsConfig RSConfig, satellite signing.Signer, maxCommitInterval time.Duration) *Endpoint {
// TODO do something with too many params // TODO do something with too many params
return &Endpoint{ return &Endpoint{
log: log, log: log,
@ -95,6 +103,7 @@ func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cac
attributions: attributions, attributions: attributions,
partners: partners, partners: partners,
peerIdentities: peerIdentities, peerIdentities: peerIdentities,
dialer: dialer,
apiKeys: apiKeys, apiKeys: apiKeys,
projectUsage: projectUsage, projectUsage: projectUsage,
createRequests: newCreateRequests(), createRequests: newCreateRequests(),
@ -2114,6 +2123,28 @@ func (endpoint *Endpoint) getPointer(
return pointer, path, nil return pointer, path, nil
} }
// getObjectNumberOfSegments returns the number of segments of the indicated
// object by projectID, bucket and encryptedPath.
//
// It returns 0 if the number is unknown.
func (endpoint *Endpoint) getObjectNumberOfSegments(ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte) (_ int64, err error) {
defer mon.Task()(&ctx, projectID.String(), bucket, encryptedPath)(&err)
pointer, _, err := endpoint.getPointer(ctx, projectID, lastSegment, bucket, encryptedPath)
if err != nil {
return 0, err
}
meta := &pb.StreamMeta{}
err = proto.Unmarshal(pointer.Metadata, meta)
if err != nil {
endpoint.log.Error("error unmarshaling pointer metadata", zap.Error(err))
return 0, rpcstatus.Error(rpcstatus.Internal, "unable to unmarshal metadata")
}
return meta.NumberOfSegments, nil
}
// sortLimits sorts order limits and fill missing ones with nil values // sortLimits sorts order limits and fill missing ones with nil values
func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit { func sortLimits(limits []*pb.AddressedOrderLimit, pointer *pb.Pointer) []*pb.AddressedOrderLimit {
sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal()) sorted := make([]*pb.AddressedOrderLimit, pointer.GetRemote().GetRedundancy().GetTotal())
@ -2216,3 +2247,120 @@ func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID s
return satSegmentID, nil return satSegmentID, nil
} }
// DeleteObjectPieces deletes all the pieces of the storage nodes that belongs
// to the specified object.
//
// NOTE: this method is exported for being able to individually test it without
// having import cycles.
func (endpoint *Endpoint) DeleteObjectPieces(
ctx context.Context, projectID uuid.UUID, bucket, encryptedPath []byte,
) (err error) {
defer mon.Task()(&ctx, projectID.String(), bucket, encryptedPath)(&err)
numOfSegments, err := endpoint.getObjectNumberOfSegments(ctx, projectID, bucket, encryptedPath)
if err != nil {
return err
}
knownNumOfSegments := false
if numOfSegments == 0 {
numOfSegments = math.MaxInt64
} else {
knownNumOfSegments = true
}
// TODO: orange/v3-3184 initialize this map to an approximated number of nodes
// if it's possible. Also figure out how much memory is required for an object
// of a big size like 10GiB.
nodesPieces := make(map[storj.NodeID][]storj.PieceID)
for i := int64(lastSegment); i < (numOfSegments - 1); i++ {
pointer, _, err := endpoint.getPointer(ctx, projectID, i, bucket, encryptedPath)
if err != nil {
if rpcstatus.Code(err) != rpcstatus.NotFound {
return err
}
if !knownNumOfSegments {
// Because we don't know the number of segments, we assume that if the
// pointer isn't found then we reached in the previous iteration the
// segment before the last one.
break
}
segment := "l"
if i != lastSegment {
segment = "s" + strconv.FormatInt(i, 10)
}
endpoint.log.Warn(
"expected pointer not found, it may have been deleted concurrently",
zap.String("pointer_path",
fmt.Sprintf("%s/%s/%s/%q", projectID, segment, bucket, encryptedPath),
),
)
continue
}
if pointer.Type != pb.Pointer_REMOTE {
continue
}
rootPieceID := pointer.GetRemote().RootPieceId
for _, piece := range pointer.GetRemote().GetRemotePieces() {
pieceID := rootPieceID.Derive(piece.NodeId, piece.PieceNum)
pieces, ok := nodesPieces[piece.NodeId]
if !ok {
nodesPieces[piece.NodeId] = []storj.PieceID{pieceID}
continue
}
nodesPieces[piece.NodeId] = append(pieces, pieceID)
}
}
limiter := sync2.NewLimiter(deleteObjectPiecesConcurrencyLimit)
for nodeID, nodePieces := range nodesPieces {
nodeID := nodeID
nodePieces := nodePieces
dossier, err := endpoint.overlay.Get(ctx, nodeID)
if err != nil {
endpoint.log.Warn("unable to get node dossier",
zap.Stringer("node_id", nodeID), zap.Error(err),
)
// Pieces will be collected by garbage collector
continue
}
limiter.Go(ctx, func() {
client, err := piecestore.Dial(
ctx, endpoint.dialer, &dossier.Node, endpoint.log, piecestore.Config{},
)
if err != nil {
endpoint.log.Warn("unable to dial storage node",
zap.Stringer("node_id", nodeID),
zap.Stringer("node_info", &dossier.Node),
zap.Error(err),
)
// Pieces will be collected by garbage collector
return
}
for _, pieceID := range nodePieces {
err := client.DeletePiece(ctx, pieceID)
if err != nil {
// piece will be collected by garbage collector
endpoint.log.Warn("unable to delete piece of a storage node",
zap.Stringer("node_id", nodeID),
zap.Stringer("piece_id", pieceID),
zap.Error(err),
)
}
}
})
}
limiter.Wait()
return nil
}