diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 596206a6b..edaffd0fa 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -89,10 +89,11 @@ type Endpoint struct { usage bandwidth.DB usedSerials UsedSerials - // liveGRPCRequests is a limit to the number of allowed concurrent GRPC - // requests. Concurrent DRPC requests count toward the limit, but DRPC - // requests are accepted regardless of the current level. - liveGRPCRequests int32 + // liveRequests tracks the total number of incoming rpc requests. For gRPC + // requests only, this number is compared to config.MaxConcurrentRequests + // and limits the number of gRPC requests. dRPC requests are tracked but + // not limited. + liveRequests int32 } // drpcEndpoint wraps streaming methods so that they can be used with drpc @@ -118,7 +119,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni usage: usage, usedSerials: usedSerials, - liveGRPCRequests: 0, + liveRequests: 0, }, nil } @@ -129,8 +130,8 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - atomic.AddInt32(&endpoint.liveGRPCRequests, 1) - defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) + atomic.AddInt32(&endpoint.liveRequests, 1) + defer atomic.AddInt32(&endpoint.liveRequests, -1) endpoint.pingStats.WasPinged(time.Now()) @@ -179,13 +180,13 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - liveGRPCRequests := atomic.AddInt32(&endpoint.liveGRPCRequests, 1) - defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) + liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1) + defer atomic.AddInt32(&endpoint.liveRequests, -1) endpoint.pingStats.WasPinged(time.Now()) - if limitRequests && int(liveGRPCRequests) > endpoint.config.MaxConcurrentRequests { - endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveGRPCRequests)) + if limitRequests && int(liveRequests) > endpoint.config.MaxConcurrentRequests { + endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests)) return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded") } @@ -399,8 +400,8 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) { defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) - atomic.AddInt32(&endpoint.liveGRPCRequests, 1) - defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) + atomic.AddInt32(&endpoint.liveRequests, 1) + defer atomic.AddInt32(&endpoint.liveRequests, -1) startTime := time.Now().UTC() @@ -688,7 +689,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques // TestLiveRequestCount returns the current number of live requests. func (endpoint *Endpoint) TestLiveRequestCount() int32 { - return atomic.LoadInt32(&endpoint.liveGRPCRequests) + return atomic.LoadInt32(&endpoint.liveRequests) } // min finds the min of two values