storagenode/piecestore: Rename liveGRPCRequests back to liveRequests (#3354)

This commit is contained in:
Isaac Hess 2019-10-23 13:43:43 -06:00 committed by GitHub
parent e82245e10e
commit 75412e54e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -89,10 +89,11 @@ type Endpoint struct {
usage bandwidth.DB usage bandwidth.DB
usedSerials UsedSerials usedSerials UsedSerials
// liveGRPCRequests is a limit to the number of allowed concurrent GRPC // liveRequests tracks the total number of incoming rpc requests. For gRPC
// requests. Concurrent DRPC requests count toward the limit, but DRPC // requests only, this number is compared to config.MaxConcurrentRequests
// requests are accepted regardless of the current level. // and limits the number of gRPC requests. dRPC requests are tracked but
liveGRPCRequests int32 // not limited.
liveRequests int32
} }
// drpcEndpoint wraps streaming methods so that they can be used with drpc // drpcEndpoint wraps streaming methods so that they can be used with drpc
@ -118,7 +119,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
usage: usage, usage: usage,
usedSerials: usedSerials, usedSerials: usedSerials,
liveGRPCRequests: 0, liveRequests: 0,
}, nil }, nil
} }
@ -129,8 +130,8 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
defer monLiveRequests(&ctx)(&err) defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveGRPCRequests, 1) atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) defer atomic.AddInt32(&endpoint.liveRequests, -1)
endpoint.pingStats.WasPinged(time.Now()) endpoint.pingStats.WasPinged(time.Now())
@ -179,13 +180,13 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err
defer monLiveRequests(&ctx)(&err) defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
liveGRPCRequests := atomic.AddInt32(&endpoint.liveGRPCRequests, 1) liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) defer atomic.AddInt32(&endpoint.liveRequests, -1)
endpoint.pingStats.WasPinged(time.Now()) endpoint.pingStats.WasPinged(time.Now())
if limitRequests && int(liveGRPCRequests) > endpoint.config.MaxConcurrentRequests { if limitRequests && int(liveRequests) > endpoint.config.MaxConcurrentRequests {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveGRPCRequests)) endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded") return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded")
} }
@ -399,8 +400,8 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
defer monLiveRequests(&ctx)(&err) defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
atomic.AddInt32(&endpoint.liveGRPCRequests, 1) atomic.AddInt32(&endpoint.liveRequests, 1)
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1) defer atomic.AddInt32(&endpoint.liveRequests, -1)
startTime := time.Now().UTC() startTime := time.Now().UTC()
@ -688,7 +689,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
// TestLiveRequestCount returns the current number of live requests. // TestLiveRequestCount returns the current number of live requests.
func (endpoint *Endpoint) TestLiveRequestCount() int32 { func (endpoint *Endpoint) TestLiveRequestCount() int32 {
return atomic.LoadInt32(&endpoint.liveGRPCRequests) return atomic.LoadInt32(&endpoint.liveRequests)
} }
// min finds the min of two values // min finds the min of two values