metainfo/piecedeletion: use NodeURL-s

Change-Id: I247dbfe03e7864e940e4cd1d0f343f38e84099e0
This commit is contained in:
Egon Elbre 2020-05-20 16:10:25 +03:00
parent 03e5f922c3
commit 7f323754a4
10 changed files with 46 additions and 91 deletions

View File

@ -2353,7 +2353,10 @@ func (endpoint *Endpoint) DeleteObjectPieces(
var requests []piecedeletion.Request
for _, node := range nodes {
requests = append(requests, piecedeletion.Request{
Node: node,
Node: storj.NodeURL{
ID: node.Id,
Address: node.Address.Address,
},
Pieces: nodesPieces[node.Id],
})
}

View File

@ -7,7 +7,6 @@ import (
"context"
"sync"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
)
@ -15,7 +14,7 @@ import (
// Handler handles piece deletion requests from a queue.
type Handler interface {
// Handle should call queue.PopAll until finished.
Handle(ctx context.Context, node *pb.Node, queue Queue)
Handle(ctx context.Context, node storj.NodeURL, queue Queue)
}
// NewQueue is a constructor func for queues.
@ -73,7 +72,7 @@ type Combiner struct {
// worker handles a batch of jobs.
type worker struct {
waitFor chan struct{}
node *pb.Node
node storj.NodeURL
jobs Queue
done chan struct{}
}
@ -97,11 +96,11 @@ func (combiner *Combiner) Close() {
}
// Enqueue adds a deletion job to the queue.
func (combiner *Combiner) Enqueue(node *pb.Node, job Job) {
func (combiner *Combiner) Enqueue(node storj.NodeURL, job Job) {
combiner.mu.Lock()
defer combiner.mu.Unlock()
last := combiner.workerByID[node.Id]
last := combiner.workerByID[node.ID]
// Check whether we can use the last worker.
if last != nil && last.jobs.TryPush(job) {
@ -118,7 +117,7 @@ func (combiner *Combiner) Enqueue(node *pb.Node, job Job) {
if last != nil {
next.waitFor = last.done
}
combiner.workerByID[node.Id] = next
combiner.workerByID[node.ID] = next
if !next.jobs.TryPush(job) {
// This should never happen.
job.Resolve.Failure()

View File

@ -10,7 +10,6 @@ import (
"testing"
"time"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
@ -24,7 +23,7 @@ type SleepyHandler struct {
TotalHandled int64
}
func (handler *SleepyHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
func (handler *SleepyHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
if !sync2.Sleep(ctx, handler.Min) {
return
}
@ -63,10 +62,10 @@ func BenchmarkCombiner(b *testing.B) {
activeLimits := []int{8, 32, 64, -1}
queueSizes := []int{1, 8, 64, 128, -1}
nodes := []*pb.Node{}
nodes := []storj.NodeURL{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, &pb.Node{
Id: testrand.NodeID(),
nodes = append(nodes, storj.NodeURL{
ID: testrand.NodeID(),
})
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
@ -23,7 +22,7 @@ type CountHandler struct {
Count int64
}
func (handler *CountHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
func (handler *CountHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
for {
list, ok := queue.PopAll()
if !ok {
@ -48,10 +47,10 @@ func TestCombiner(t *testing.T) {
queueSize = 5
)
nodes := []*pb.Node{}
nodes := []storj.NodeURL{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, &pb.Node{
Id: testrand.NodeID(),
nodes = append(nodes, storj.NodeURL{
ID: testrand.NodeID(),
})
}

View File

@ -44,7 +44,7 @@ func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold
}
// Handle tries to send the deletion requests to the specified node.
func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
defer FailPending(queue)
if dialer.recentlyFailed(ctx, node) {
@ -53,13 +53,13 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
client, conn, err := dialPieceStore(ctx, dialer.dialer, node)
if err != nil {
dialer.log.Debug("failed to dial", zap.Stringer("id", node.Id), zap.Error(err))
dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err))
dialer.markFailed(ctx, node)
return
}
defer func() {
if err := conn.Close(); err != nil {
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.Id), zap.Error(err))
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err))
}
}()
@ -92,7 +92,7 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
}
if err != nil {
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.Id), zap.Error(err))
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err))
// don't try to send to this storage node a bit, when the deletion times out
if errs2.IsCanceled(err) {
dialer.markFailed(ctx, node)
@ -114,22 +114,22 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) {
// markFailed marks node as something failed recently, so we shouldn't try again,
// for some time.
func (dialer *Dialer) markFailed(ctx context.Context, node *pb.Node) {
func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) {
dialer.mu.Lock()
defer dialer.mu.Unlock()
now := time.Now()
lastFailed, ok := dialer.dialFailed[node.Id]
lastFailed, ok := dialer.dialFailed[node.ID]
if !ok || lastFailed.Before(now) {
dialer.dialFailed[node.Id] = now
dialer.dialFailed[node.ID] = now
}
}
// recentlyFailed checks whether a request to node recently failed.
func (dialer *Dialer) recentlyFailed(ctx context.Context, node *pb.Node) bool {
func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool {
dialer.mu.RLock()
lastFailed, ok := dialer.dialFailed[node.Id]
lastFailed, ok := dialer.dialFailed[node.ID]
dialer.mu.RUnlock()
// when we recently failed to dial, then fail immediately
@ -149,14 +149,10 @@ func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises [
return pieces, promises, nil
}
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target *pb.Node) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
conn, err := dialer.DialNodeURL(ctx, storj.NodeURL{
ID: target.Id,
Address: target.Address.Address,
})
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
conn, err := dialer.DialNodeURL(ctx, target)
if err != nil {
return nil, nil, err
}
return pb.NewDRPCPiecestoreClient(conn), conn, nil
}

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
@ -36,16 +35,9 @@ func TestDialer(t *testing.T) {
dialer := piecedeletion.NewDialer(log, planet.Satellites[0].Dialer, 5*time.Second, 5*time.Second, 100)
require.NotNil(t, dialer)
storageNode := &pb.Node{
Id: planet.StorageNodes[0].ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[0].Addr(),
},
}
storageNode := planet.StorageNodes[0].NodeURL()
promise, jobs := makeJobsQueue(t, 2)
dialer.Handle(ctx, storageNode, jobs)
require.Equal(t, int64(2), promise.SuccessCount)
@ -70,13 +62,7 @@ func TestDialer_DialTimeout(t *testing.T) {
require.NoError(t, planet.StopPeer(planet.StorageNodes[0]))
storageNode := &pb.Node{
Id: planet.StorageNodes[0].ID(),
Address: &pb.NodeAddress{
Transport: pb.NodeTransport_TCP_TLS_GRPC,
Address: planet.StorageNodes[0].Addr(),
},
}
storageNode := planet.StorageNodes[0].NodeURL()
{
promise, jobs := makeJobsQueue(t, 1)

View File

@ -8,7 +8,7 @@ import (
"golang.org/x/sync/semaphore"
"storj.io/common/pb"
"storj.io/common/storj"
)
// LimitedHandler wraps handler with a concurrency limit.
@ -26,7 +26,7 @@ func NewLimitedHandler(handler Handler, limit int) *LimitedHandler {
}
// Handle handles the job queue.
func (handler *LimitedHandler) Handle(ctx context.Context, node *pb.Node, queue Queue) {
func (handler *LimitedHandler) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
if err := handler.active.Acquire(ctx, 1); err != nil {
return
}

View File

@ -9,7 +9,7 @@ import (
"testing"
"time"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metainfo/piecedeletion"
@ -24,7 +24,7 @@ func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue {
panic("should not be called")
}
func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
current := atomic.AddInt64(&verifier.Active, 1)
if current > verifier.ExpectedLimit {
panic("over limit")
@ -46,7 +46,7 @@ func TestLimitedHandler(t *testing.T) {
for i := 0; i < 800; i++ {
ctx.Go(func() error {
limited.Handle(ctx, nil, nil)
limited.Handle(ctx, storj.NodeURL{}, nil)
return nil
})
}

View File

@ -10,7 +10,6 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
@ -162,15 +161,13 @@ func (service *Service) Delete(ctx context.Context, requests []Request, successT
// Request defines a deletion requests for a node.
type Request struct {
Node *pb.Node
Node storj.NodeURL
Pieces []storj.PieceID
}
// IsValid returns whether the request is valid.
func (req *Request) IsValid() bool {
return req.Node != nil &&
!req.Node.Id.IsZero() &&
len(req.Pieces) > 0
return !req.Node.ID.IsZero() && len(req.Pieces) > 0
}
func requestsPieceCount(requests []Request) int {

View File

@ -115,14 +115,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) {
require.NoError(t, err)
totalUsedSpace += piecesTotal
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := piecedeletion.Request{
Node: &dossier.Node,
}
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
@ -184,15 +178,9 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
var requests []piecedeletion.Request
for i, sn := range planet.StorageNodes {
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := piecedeletion.Request{
Node: &dossier.Node,
}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err := sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
@ -260,14 +248,8 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) {
require.NoError(t, err)
expectedTotalUsedSpace += piecesTotal
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := piecedeletion.Request{
Node: &dossier.Node,
}
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
@ -354,14 +336,8 @@ func TestService_DeletePieces_Timeout(t *testing.T) {
require.NoError(t, err)
expectedTotalUsedSpace += piecesTotal
// Get pb node and all the pieces of the storage node
dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID())
require.NoError(t, err)
nodePieces := piecedeletion.Request{
Node: &dossier.Node,
}
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())