From 14c76485305db0fa81a0944a46cea8cb36c3bafd Mon Sep 17 00:00:00 2001 From: Isaac Hess Date: Wed, 23 Oct 2019 10:14:02 -0600 Subject: [PATCH] storagenode/piecestore: Only limit grpc requests (#3342) --- storagenode/piecestore/endpoint.go | 31 ++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index fd44839d9..596206a6b 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -89,7 +89,10 @@ type Endpoint struct { usage bandwidth.DB usedSerials UsedSerials - liveRequests int32 + // liveGRPCRequests is a limit to the number of allowed concurrent GRPC + // requests. Concurrent DRPC requests count toward the limit, but DRPC + // requests are accepted regardless of the current level. + liveGRPCRequests int32 } // drpcEndpoint wraps streaming methods so that they can be used with drpc @@ -115,7 +118,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni usage: usage, usedSerials: usedSerials, - liveRequests: 0, + liveGRPCRequests: 0, }, nil } @@ -126,8 +129,8 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - atomic.AddInt32(&endpoint.liveRequests, 1) - defer atomic.AddInt32(&endpoint.liveRequests, -1) + atomic.AddInt32(&endpoint.liveGRPCRequests, 1) + defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) endpoint.pingStats.WasPinged(time.Now()) @@ -155,12 +158,12 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ // Upload handles uploading a piece on piece store. func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) { - return endpoint.doUpload(stream) + return endpoint.doUpload(stream, true) } // Upload handles uploading a piece on piece store. func (endpoint *drpcEndpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) { - return endpoint.doUpload(stream) + return endpoint.doUpload(stream, false) } // uploadStream is the minimum interface required to perform settlements. @@ -171,18 +174,18 @@ type uploadStream interface { } // doUpload handles uploading a piece on piece store. -func (endpoint *Endpoint) doUpload(stream uploadStream) (err error) { +func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err error) { ctx := stream.Context() defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1) - defer atomic.AddInt32(&endpoint.liveRequests, -1) + liveGRPCRequests := atomic.AddInt32(&endpoint.liveGRPCRequests, 1) + defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) endpoint.pingStats.WasPinged(time.Now()) - if int(liveRequests) > endpoint.config.MaxConcurrentRequests { - endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests)) + if limitRequests && int(liveGRPCRequests) > endpoint.config.MaxConcurrentRequests { + endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveGRPCRequests)) return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded") } @@ -396,8 +399,8 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) { defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - atomic.AddInt32(&endpoint.liveRequests, 1) - defer atomic.AddInt32(&endpoint.liveRequests, -1) + atomic.AddInt32(&endpoint.liveGRPCRequests, 1) + defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) startTime := time.Now().UTC() @@ -685,7 +688,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques // TestLiveRequestCount returns the current number of live requests. func (endpoint *Endpoint) TestLiveRequestCount() int32 { - return atomic.LoadInt32(&endpoint.liveRequests) + return atomic.LoadInt32(&endpoint.liveGRPCRequests) } // min finds the min of two values