From 7f323754a4d281e89be77e17c395b3967125bee2 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 20 May 2020 16:10:25 +0300 Subject: [PATCH] metainfo/piecedeletion: use NodeURL-s Change-Id: I247dbfe03e7864e940e4cd1d0f343f38e84099e0 --- satellite/metainfo/metainfo.go | 5 ++- satellite/metainfo/piecedeletion/combiner.go | 11 +++-- .../piecedeletion/combiner_bench_test.go | 9 ++-- .../metainfo/piecedeletion/combiner_test.go | 9 ++-- satellite/metainfo/piecedeletion/dialer.go | 26 +++++------- .../metainfo/piecedeletion/dialer_test.go | 18 +------- satellite/metainfo/piecedeletion/handler.go | 4 +- .../metainfo/piecedeletion/handler_test.go | 6 +-- satellite/metainfo/piecedeletion/service.go | 7 +--- .../metainfo/piecedeletion/service_test.go | 42 ++++--------------- 10 files changed, 46 insertions(+), 91 deletions(-) diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 8673d5693..6d388411f 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -2353,7 +2353,10 @@ func (endpoint *Endpoint) DeleteObjectPieces( var requests []piecedeletion.Request for _, node := range nodes { requests = append(requests, piecedeletion.Request{ - Node: node, + Node: storj.NodeURL{ + ID: node.Id, + Address: node.Address.Address, + }, Pieces: nodesPieces[node.Id], }) } diff --git a/satellite/metainfo/piecedeletion/combiner.go b/satellite/metainfo/piecedeletion/combiner.go index f947c1b00..e090f51a1 100644 --- a/satellite/metainfo/piecedeletion/combiner.go +++ b/satellite/metainfo/piecedeletion/combiner.go @@ -7,7 +7,6 @@ import ( "context" "sync" - "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/sync2" ) @@ -15,7 +14,7 @@ import ( // Handler handles piece deletion requests from a queue. type Handler interface { // Handle should call queue.PopAll until finished. - Handle(ctx context.Context, node *pb.Node, queue Queue) + Handle(ctx context.Context, node storj.NodeURL, queue Queue) } // NewQueue is a constructor func for queues. @@ -73,7 +72,7 @@ type Combiner struct { // worker handles a batch of jobs. type worker struct { waitFor chan struct{} - node *pb.Node + node storj.NodeURL jobs Queue done chan struct{} } @@ -97,11 +96,11 @@ func (combiner *Combiner) Close() { } // Enqueue adds a deletion job to the queue. -func (combiner *Combiner) Enqueue(node *pb.Node, job Job) { +func (combiner *Combiner) Enqueue(node storj.NodeURL, job Job) { combiner.mu.Lock() defer combiner.mu.Unlock() - last := combiner.workerByID[node.Id] + last := combiner.workerByID[node.ID] // Check whether we can use the last worker. if last != nil && last.jobs.TryPush(job) { @@ -118,7 +117,7 @@ func (combiner *Combiner) Enqueue(node *pb.Node, job Job) { if last != nil { next.waitFor = last.done } - combiner.workerByID[node.Id] = next + combiner.workerByID[node.ID] = next if !next.jobs.TryPush(job) { // This should never happen. job.Resolve.Failure() diff --git a/satellite/metainfo/piecedeletion/combiner_bench_test.go b/satellite/metainfo/piecedeletion/combiner_bench_test.go index 2f6ca2e67..bc2eb68db 100644 --- a/satellite/metainfo/piecedeletion/combiner_bench_test.go +++ b/satellite/metainfo/piecedeletion/combiner_bench_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/sync2" "storj.io/common/testcontext" @@ -24,7 +23,7 @@ type SleepyHandler struct { TotalHandled int64 } -func (handler *SleepyHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) { +func (handler *SleepyHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { if !sync2.Sleep(ctx, handler.Min) { return } @@ -63,10 +62,10 @@ func BenchmarkCombiner(b *testing.B) { activeLimits := []int{8, 32, 64, -1} queueSizes := []int{1, 8, 64, 128, -1} - nodes := []*pb.Node{} + nodes := []storj.NodeURL{} for i := 0; i < nodeCount; i++ { - nodes = append(nodes, &pb.Node{ - Id: testrand.NodeID(), + nodes = append(nodes, storj.NodeURL{ + ID: testrand.NodeID(), }) } diff --git a/satellite/metainfo/piecedeletion/combiner_test.go b/satellite/metainfo/piecedeletion/combiner_test.go index 4140e108f..90ede0c8a 100644 --- a/satellite/metainfo/piecedeletion/combiner_test.go +++ b/satellite/metainfo/piecedeletion/combiner_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" - "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/sync2" "storj.io/common/testcontext" @@ -23,7 +22,7 @@ type CountHandler struct { Count int64 } -func (handler *CountHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) { +func (handler *CountHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { for { list, ok := queue.PopAll() if !ok { @@ -48,10 +47,10 @@ func TestCombiner(t *testing.T) { queueSize = 5 ) - nodes := []*pb.Node{} + nodes := []storj.NodeURL{} for i := 0; i < nodeCount; i++ { - nodes = append(nodes, &pb.Node{ - Id: testrand.NodeID(), + nodes = append(nodes, storj.NodeURL{ + ID: testrand.NodeID(), }) } diff --git a/satellite/metainfo/piecedeletion/dialer.go b/satellite/metainfo/piecedeletion/dialer.go index bd6910999..9fe551f10 100644 --- a/satellite/metainfo/piecedeletion/dialer.go +++ b/satellite/metainfo/piecedeletion/dialer.go @@ -44,7 +44,7 @@ func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold } // Handle tries to send the deletion requests to the specified node. -func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { +func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) { defer FailPending(queue) if dialer.recentlyFailed(ctx, node) { @@ -53,13 +53,13 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { client, conn, err := dialPieceStore(ctx, dialer.dialer, node) if err != nil { - dialer.log.Debug("failed to dial", zap.Stringer("id", node.Id), zap.Error(err)) + dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err)) dialer.markFailed(ctx, node) return } defer func() { if err := conn.Close(); err != nil { - dialer.log.Debug("closing connection failed", zap.Stringer("id", node.Id), zap.Error(err)) + dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err)) } }() @@ -92,7 +92,7 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { } if err != nil { - dialer.log.Debug("deletion request failed", zap.Stringer("id", node.Id), zap.Error(err)) + dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err)) // don't try to send to this storage node a bit, when the deletion times out if errs2.IsCanceled(err) { dialer.markFailed(ctx, node) @@ -114,22 +114,22 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { // markFailed marks node as something failed recently, so we shouldn't try again, // for some time. -func (dialer *Dialer) markFailed(ctx context.Context, node *pb.Node) { +func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) { dialer.mu.Lock() defer dialer.mu.Unlock() now := time.Now() - lastFailed, ok := dialer.dialFailed[node.Id] + lastFailed, ok := dialer.dialFailed[node.ID] if !ok || lastFailed.Before(now) { - dialer.dialFailed[node.Id] = now + dialer.dialFailed[node.ID] = now } } // recentlyFailed checks whether a request to node recently failed. -func (dialer *Dialer) recentlyFailed(ctx context.Context, node *pb.Node) bool { +func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool { dialer.mu.RLock() - lastFailed, ok := dialer.dialFailed[node.Id] + lastFailed, ok := dialer.dialFailed[node.ID] dialer.mu.RUnlock() // when we recently failed to dial, then fail immediately @@ -149,14 +149,10 @@ func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises [ return pieces, promises, nil } -func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target *pb.Node) (pb.DRPCPiecestoreClient, *rpc.Conn, error) { - conn, err := dialer.DialNodeURL(ctx, storj.NodeURL{ - ID: target.Id, - Address: target.Address.Address, - }) +func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) { + conn, err := dialer.DialNodeURL(ctx, target) if err != nil { return nil, nil, err } - return pb.NewDRPCPiecestoreClient(conn), conn, nil } diff --git a/satellite/metainfo/piecedeletion/dialer_test.go b/satellite/metainfo/piecedeletion/dialer_test.go index 0c5d75c99..328f0b237 100644 --- a/satellite/metainfo/piecedeletion/dialer_test.go +++ b/satellite/metainfo/piecedeletion/dialer_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" - "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -36,16 +35,9 @@ func TestDialer(t *testing.T) { dialer := piecedeletion.NewDialer(log, planet.Satellites[0].Dialer, 5*time.Second, 5*time.Second, 100) require.NotNil(t, dialer) - storageNode := &pb.Node{ - Id: planet.StorageNodes[0].ID(), - Address: &pb.NodeAddress{ - Transport: pb.NodeTransport_TCP_TLS_GRPC, - Address: planet.StorageNodes[0].Addr(), - }, - } + storageNode := planet.StorageNodes[0].NodeURL() promise, jobs := makeJobsQueue(t, 2) - dialer.Handle(ctx, storageNode, jobs) require.Equal(t, int64(2), promise.SuccessCount) @@ -70,13 +62,7 @@ func TestDialer_DialTimeout(t *testing.T) { require.NoError(t, planet.StopPeer(planet.StorageNodes[0])) - storageNode := &pb.Node{ - Id: planet.StorageNodes[0].ID(), - Address: &pb.NodeAddress{ - Transport: pb.NodeTransport_TCP_TLS_GRPC, - Address: planet.StorageNodes[0].Addr(), - }, - } + storageNode := planet.StorageNodes[0].NodeURL() { promise, jobs := makeJobsQueue(t, 1) diff --git a/satellite/metainfo/piecedeletion/handler.go b/satellite/metainfo/piecedeletion/handler.go index 3b1fb7af3..dabe7edf3 100644 --- a/satellite/metainfo/piecedeletion/handler.go +++ b/satellite/metainfo/piecedeletion/handler.go @@ -8,7 +8,7 @@ import ( "golang.org/x/sync/semaphore" - "storj.io/common/pb" + "storj.io/common/storj" ) // LimitedHandler wraps handler with a concurrency limit. @@ -26,7 +26,7 @@ func NewLimitedHandler(handler Handler, limit int) *LimitedHandler { } // Handle handles the job queue. -func (handler *LimitedHandler) Handle(ctx context.Context, node *pb.Node, queue Queue) { +func (handler *LimitedHandler) Handle(ctx context.Context, node storj.NodeURL, queue Queue) { if err := handler.active.Acquire(ctx, 1); err != nil { return } diff --git a/satellite/metainfo/piecedeletion/handler_test.go b/satellite/metainfo/piecedeletion/handler_test.go index acc6823ff..8cc765b1d 100644 --- a/satellite/metainfo/piecedeletion/handler_test.go +++ b/satellite/metainfo/piecedeletion/handler_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/storj/satellite/metainfo/piecedeletion" @@ -24,7 +24,7 @@ func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue { panic("should not be called") } -func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) { +func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { current := atomic.AddInt64(&verifier.Active, 1) if current > verifier.ExpectedLimit { panic("over limit") @@ -46,7 +46,7 @@ func TestLimitedHandler(t *testing.T) { for i := 0; i < 800; i++ { ctx.Go(func() error { - limited.Handle(ctx, nil, nil) + limited.Handle(ctx, storj.NodeURL{}, nil) return nil }) } diff --git a/satellite/metainfo/piecedeletion/service.go b/satellite/metainfo/piecedeletion/service.go index 1bd6f60b2..1d2cf8a9d 100644 --- a/satellite/metainfo/piecedeletion/service.go +++ b/satellite/metainfo/piecedeletion/service.go @@ -10,7 +10,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/storj" "storj.io/common/sync2" @@ -162,15 +161,13 @@ func (service *Service) Delete(ctx context.Context, requests []Request, successT // Request defines a deletion requests for a node. type Request struct { - Node *pb.Node + Node storj.NodeURL Pieces []storj.PieceID } // IsValid returns whether the request is valid. func (req *Request) IsValid() bool { - return req.Node != nil && - !req.Node.Id.IsZero() && - len(req.Pieces) > 0 + return !req.Node.ID.IsZero() && len(req.Pieces) > 0 } func requestsPieceCount(requests []Request) int { diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go index 2c29e74e4..3e7d3882d 100644 --- a/satellite/metainfo/piecedeletion/service_test.go +++ b/satellite/metainfo/piecedeletion/service_test.go @@ -115,14 +115,8 @@ func TestService_DeletePieces_AllNodesUp(t *testing.T) { require.NoError(t, err) totalUsedSpace += piecesTotal - // Get pb node and all the pieces of the storage node - dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) - require.NoError(t, err) - - nodePieces := piecedeletion.Request{ - Node: &dossier.Node, - } - + // Get all the pieces of the storage node + nodePieces := piecedeletion.Request{Node: sn.NodeURL()} err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), func(store pieces.StoredPieceAccess) error { nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) @@ -184,15 +178,9 @@ func TestService_DeletePieces_SomeNodesDown(t *testing.T) { var requests []piecedeletion.Request for i, sn := range planet.StorageNodes { - // Get pb node and all the pieces of the storage node - dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) - require.NoError(t, err) - - nodePieces := piecedeletion.Request{ - Node: &dossier.Node, - } - - err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), + // Get all the pieces of the storage node + nodePieces := piecedeletion.Request{Node: sn.NodeURL()} + err := sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), func(store pieces.StoredPieceAccess) error { nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) return nil @@ -260,14 +248,8 @@ func TestService_DeletePieces_AllNodesDown(t *testing.T) { require.NoError(t, err) expectedTotalUsedSpace += piecesTotal - // Get pb node and all the pieces of the storage node - dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) - require.NoError(t, err) - - nodePieces := piecedeletion.Request{ - Node: &dossier.Node, - } - + // Get all the pieces of the storage node + nodePieces := piecedeletion.Request{Node: sn.NodeURL()} err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), func(store pieces.StoredPieceAccess) error { nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) @@ -354,14 +336,8 @@ func TestService_DeletePieces_Timeout(t *testing.T) { require.NoError(t, err) expectedTotalUsedSpace += piecesTotal - // Get pb node and all the pieces of the storage node - dossier, err := satelliteSys.Overlay.Service.Get(ctx, sn.ID()) - require.NoError(t, err) - - nodePieces := piecedeletion.Request{ - Node: &dossier.Node, - } - + // Get all the pieces of the storage node + nodePieces := piecedeletion.Request{Node: sn.NodeURL()} err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), func(store pieces.StoredPieceAccess) error { nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())