diff --git a/satellite/api.go b/satellite/api.go index 9de2a872f..eb24ce4dc 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -384,6 +384,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, peer.Metainfo.PieceDeletion, err = piecedeletion.NewService( peer.Log.Named("metainfo:piecedeletion"), peer.Dialer, + peer.Overlay.Service, config.Metainfo.PieceDeletion, ) if err != nil { diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index a6a2a26c3..de6a0c475 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -1915,31 +1915,13 @@ func (endpoint *Endpoint) DeleteObjectPieces( continue } - nodeIDs := []storj.NodeID{} - for nodeID := range nodesPieces { - nodeIDs = append(nodeIDs, nodeID) - } - - nodes, err := endpoint.overlay.KnownReliable(ctx, nodeIDs) - if err != nil { - endpoint.log.Warn("unable to look up nodes from overlay", - zap.String("object_path", - fmt.Sprintf("%s/%s/%q", projectID, bucket, encryptedPath), - ), - zap.Error(err), - ) - // Pieces will be collected by garbage collector - continue - } - var requests []piecedeletion.Request - for _, node := range nodes { + for node, pieces := range nodesPieces { requests = append(requests, piecedeletion.Request{ Node: storj.NodeURL{ - ID: node.Id, - Address: node.Address.Address, + ID: node, }, - Pieces: nodesPieces[node.Id], + Pieces: pieces, }) } diff --git a/satellite/metainfo/piecedeletion/service.go b/satellite/metainfo/piecedeletion/service.go index 1d2cf8a9d..99fcbb372 100644 --- a/satellite/metainfo/piecedeletion/service.go +++ b/satellite/metainfo/piecedeletion/service.go @@ -10,6 +10,7 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/storj" "storj.io/common/sync2" @@ -56,6 +57,11 @@ func (config *Config) Verify() errs.Group { return errlist } +// Nodes stores reliable nodes information. +type Nodes interface { + KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error) +} + // Service handles combining piece deletion requests. // // architecture: Service @@ -64,6 +70,7 @@ type Service struct { config Config rpcDialer rpc.Dialer + nodesDB Nodes running sync2.Fence combiner *Combiner @@ -72,7 +79,7 @@ type Service struct { } // NewService creates a new service. -func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, error) { +func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) { var errlist errs.Group if log == nil { errlist.Add(Error.New("log is nil")) @@ -80,6 +87,9 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, er if dialer == (rpc.Dialer{}) { errlist.Add(Error.New("dialer is zero")) } + if nodesDB == nil { + errlist.Add(Error.New("nodesDB is nil")) + } if errs := config.Verify(); len(errs) > 0 { errlist.Add(errs...) } @@ -96,6 +106,7 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, er log: log, config: config, rpcDialer: dialerClone, + nodesDB: nodesDB, }, nil } @@ -147,7 +158,38 @@ func (service *Service) Delete(ctx context.Context, requests []Request, successT return Error.Wrap(err) } + // Create a map for matching node information with the corresponding + // request. + nodesReqs := make(map[storj.NodeID]Request, len(requests)) + nodeIDs := []storj.NodeID{} for _, req := range requests { + if req.Node.Address == "" { + nodeIDs = append(nodeIDs, req.Node.ID) + } + nodesReqs[req.Node.ID] = req + } + + if len(nodeIDs) > 0 { + nodes, err := service.nodesDB.KnownReliable(ctx, nodeIDs) + if err != nil { + // Pieces will be collected by garbage collector + return Error.Wrap(err) + } + + for _, node := range nodes { + req := nodesReqs[node.Id] + + nodesReqs[node.Id] = Request{ + Node: storj.NodeURL{ + ID: node.Id, + Address: node.Address.Address, + }, + Pieces: req.Pieces, + } + } + } + + for _, req := range nodesReqs { service.combiner.Enqueue(req.Node, Job{ Pieces: req.Pieces, Resolve: threshold, diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go index 17235a1d1..ff2209668 100644 --- a/satellite/metainfo/piecedeletion/service_test.go +++ b/satellite/metainfo/piecedeletion/service_test.go @@ -4,6 +4,7 @@ package piecedeletion_test import ( + "context" "testing" "time" @@ -13,6 +14,7 @@ import ( "go.uber.org/zap/zaptest" "storj.io/common/memory" + "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/storj" "storj.io/common/testcontext" @@ -29,7 +31,7 @@ func TestService_New_Error(t *testing.T) { log := zaptest.NewLogger(t) dialer := rpc.NewDefaultDialer(nil) - _, err := piecedeletion.NewService(nil, dialer, piecedeletion.Config{ + _, err := piecedeletion.NewService(nil, dialer, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: 8, MaxPiecesPerBatch: 0, MaxPiecesPerRequest: 0, @@ -39,35 +41,45 @@ func TestService_New_Error(t *testing.T) { require.True(t, piecedeletion.Error.Has(err), err) require.Contains(t, err.Error(), "log is nil") - _, err = piecedeletion.NewService(log, rpc.Dialer{}, piecedeletion.Config{ + _, err = piecedeletion.NewService(log, rpc.Dialer{}, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: 87, DialTimeout: time.Second, }) - //require.True(t, metainfo.ErrDeletePieces.Has(err), err) + require.True(t, piecedeletion.Error.Has(err), err) require.Contains(t, err.Error(), "dialer is zero") - _, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{ + _, err = piecedeletion.NewService(log, dialer, nil, piecedeletion.Config{ + MaxConcurrency: 8, + MaxPiecesPerBatch: 0, + MaxPiecesPerRequest: 0, + DialTimeout: time.Second, + FailThreshold: 5 * time.Minute, + }) + require.True(t, piecedeletion.Error.Has(err), err) + require.Contains(t, err.Error(), "nodesDB is nil") + + _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: 0, DialTimeout: time.Second, }) require.True(t, piecedeletion.Error.Has(err), err) require.Contains(t, err.Error(), "greater than 0") - _, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{ + _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: -3, DialTimeout: time.Second, }) require.True(t, piecedeletion.Error.Has(err), err) require.Contains(t, err.Error(), "greater than 0") - _, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{ + _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: 3, DialTimeout: time.Nanosecond, }) require.True(t, piecedeletion.Error.Has(err), err) require.Contains(t, err.Error(), "dial timeout 1ns must be between 5ms and 5m0s") - _, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{ + _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ MaxConcurrency: 3, DialTimeout: time.Hour, }) @@ -357,3 +369,9 @@ func TestService_DeletePieces_Timeout(t *testing.T) { require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace") }) } + +type nodesDB struct{} + +func (n *nodesDB) KnownReliable(ctx context.Context, nodesID storj.NodeIDList) ([]*pb.Node, error) { + return nil, nil +}