From 518946fab9cc7e4495922cf9ca774531d23b718a Mon Sep 17 00:00:00 2001 From: Yingrong Zhao Date: Tue, 28 Apr 2020 09:17:22 -0400 Subject: [PATCH] satellite/metainfo/piecedeletion: add metrics for unhandled pieces Change-Id: I0cd66e09a8de7c7c0a708b2a9fe44ed1739770b0 --- satellite/metainfo/piecedeletion/dialer.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/satellite/metainfo/piecedeletion/dialer.go b/satellite/metainfo/piecedeletion/dialer.go index 4a43e7408..a232ee54c 100644 --- a/satellite/metainfo/piecedeletion/dialer.go +++ b/satellite/metainfo/piecedeletion/dialer.go @@ -14,7 +14,6 @@ import ( "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/storj" - "storj.io/uplink/private/piecestore" ) // Dialer implements dialing piecestores and sending delete requests with batching and redial threshold. @@ -52,13 +51,12 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { return } - conn, err := piecestore.Dial(ctx, dialer.dialer, node, dialer.log, piecestore.DefaultConfig) + client, conn, err := dialPieceStore(ctx, dialer.dialer, node) if err != nil { dialer.log.Info("failed to dial", zap.Stringer("id", node.Id), zap.Error(err)) dialer.markFailed(ctx, node) return } - defer func() { if err := conn.Close(); err != nil { dialer.log.Info("closing connection failed", zap.Stringer("id", node.Id), zap.Error(err)) @@ -80,7 +78,9 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { jobs = rest requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout) - err := conn.DeletePieces(requestCtx, batch...) + resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{ + PieceIds: batch, + }) cancel() for _, promise := range promises { @@ -98,6 +98,8 @@ func (dialer *Dialer) Handle(ctx context.Context, node *pb.Node, queue Queue) { dialer.markFailed(ctx, node) } break + } else { + mon.IntVal("deletion pieces unhandled count").Observe(resp.UnhandledCount) } jobs = append(jobs, queue.PopAllWithoutClose()...) @@ -146,3 +148,12 @@ func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises [ return pieces, promises, nil } + +func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target *pb.Node) (pb.DRPCPiecestoreClient, *rpc.Conn, error) { + conn, err := dialer.DialNode(ctx, target) + if err != nil { + return nil, nil, err + } + + return pb.NewDRPCPiecestoreClient(conn), conn, nil +}