satellite/piecedeletion: move node info retrieval into the service

This change will require less work for the user of peiecedeletion
service by moving overlay database call into the package.

Change-Id: I14a150ab71fe885780e7a7a74db006a779507ae5
This commit is contained in:
Yingrong Zhao 2020-08-07 14:13:55 -04:00 committed by Yingrong Zhao
parent c7b86a3481
commit 0518b16370
4 changed files with 72 additions and 29 deletions

View File

@ -384,6 +384,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Metainfo.PieceDeletion, err = piecedeletion.NewService(
peer.Log.Named("metainfo:piecedeletion"),
peer.Dialer,
peer.Overlay.Service,
config.Metainfo.PieceDeletion,
)
if err != nil {

View File

@ -1915,31 +1915,13 @@ func (endpoint *Endpoint) DeleteObjectPieces(
continue
}
nodeIDs := []storj.NodeID{}
for nodeID := range nodesPieces {
nodeIDs = append(nodeIDs, nodeID)
}
nodes, err := endpoint.overlay.KnownReliable(ctx, nodeIDs)
if err != nil {
endpoint.log.Warn("unable to look up nodes from overlay",
zap.String("object_path",
fmt.Sprintf("%s/%s/%q", projectID, bucket, encryptedPath),
),
zap.Error(err),
)
// Pieces will be collected by garbage collector
continue
}
var requests []piecedeletion.Request
for _, node := range nodes {
for node, pieces := range nodesPieces {
requests = append(requests, piecedeletion.Request{
Node: storj.NodeURL{
ID: node.Id,
Address: node.Address.Address,
ID: node,
},
Pieces: nodesPieces[node.Id],
Pieces: pieces,
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
@ -56,6 +57,11 @@ func (config *Config) Verify() errs.Group {
return errlist
}
// Nodes stores reliable nodes information.
type Nodes interface {
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
}
// Service handles combining piece deletion requests.
//
// architecture: Service
@ -64,6 +70,7 @@ type Service struct {
config Config
rpcDialer rpc.Dialer
nodesDB Nodes
running sync2.Fence
combiner *Combiner
@ -72,7 +79,7 @@ type Service struct {
}
// NewService creates a new service.
func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, error) {
func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) {
var errlist errs.Group
if log == nil {
errlist.Add(Error.New("log is nil"))
@ -80,6 +87,9 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, er
if dialer == (rpc.Dialer{}) {
errlist.Add(Error.New("dialer is zero"))
}
if nodesDB == nil {
errlist.Add(Error.New("nodesDB is nil"))
}
if errs := config.Verify(); len(errs) > 0 {
errlist.Add(errs...)
}
@ -96,6 +106,7 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, config Config) (*Service, er
log: log,
config: config,
rpcDialer: dialerClone,
nodesDB: nodesDB,
}, nil
}
@ -147,7 +158,38 @@ func (service *Service) Delete(ctx context.Context, requests []Request, successT
return Error.Wrap(err)
}
// Create a map for matching node information with the corresponding
// request.
nodesReqs := make(map[storj.NodeID]Request, len(requests))
nodeIDs := []storj.NodeID{}
for _, req := range requests {
if req.Node.Address == "" {
nodeIDs = append(nodeIDs, req.Node.ID)
}
nodesReqs[req.Node.ID] = req
}
if len(nodeIDs) > 0 {
nodes, err := service.nodesDB.KnownReliable(ctx, nodeIDs)
if err != nil {
// Pieces will be collected by garbage collector
return Error.Wrap(err)
}
for _, node := range nodes {
req := nodesReqs[node.Id]
nodesReqs[node.Id] = Request{
Node: storj.NodeURL{
ID: node.Id,
Address: node.Address.Address,
},
Pieces: req.Pieces,
}
}
}
for _, req := range nodesReqs {
service.combiner.Enqueue(req.Node, Job{
Pieces: req.Pieces,
Resolve: threshold,

View File

@ -4,6 +4,7 @@
package piecedeletion_test
import (
"context"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/testcontext"
@ -29,7 +31,7 @@ func TestService_New_Error(t *testing.T) {
log := zaptest.NewLogger(t)
dialer := rpc.NewDefaultDialer(nil)
_, err := piecedeletion.NewService(nil, dialer, piecedeletion.Config{
_, err := piecedeletion.NewService(nil, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 8,
MaxPiecesPerBatch: 0,
MaxPiecesPerRequest: 0,
@ -39,35 +41,45 @@ func TestService_New_Error(t *testing.T) {
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "log is nil")
_, err = piecedeletion.NewService(log, rpc.Dialer{}, piecedeletion.Config{
_, err = piecedeletion.NewService(log, rpc.Dialer{}, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 87,
DialTimeout: time.Second,
})
//require.True(t, metainfo.ErrDeletePieces.Has(err), err)
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "dialer is zero")
_, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{
_, err = piecedeletion.NewService(log, dialer, nil, piecedeletion.Config{
MaxConcurrency: 8,
MaxPiecesPerBatch: 0,
MaxPiecesPerRequest: 0,
DialTimeout: time.Second,
FailThreshold: 5 * time.Minute,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "nodesDB is nil")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 0,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "greater than 0")
_, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: -3,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "greater than 0")
_, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 3,
DialTimeout: time.Nanosecond,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "dial timeout 1ns must be between 5ms and 5m0s")
_, err = piecedeletion.NewService(log, dialer, piecedeletion.Config{
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 3,
DialTimeout: time.Hour,
})
@ -357,3 +369,9 @@ func TestService_DeletePieces_Timeout(t *testing.T) {
require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace")
})
}
type nodesDB struct{}
func (n *nodesDB) KnownReliable(ctx context.Context, nodesID storj.NodeIDList) ([]*pb.Node, error) {
return nil, nil
}