storagenode/piecestore: Only limit grpc requests (#3342)
This commit is contained in:
parent
655e2b3422
commit
14c7648530
@ -89,7 +89,10 @@ type Endpoint struct {
|
||||
usage bandwidth.DB
|
||||
usedSerials UsedSerials
|
||||
|
||||
liveRequests int32
|
||||
// liveGRPCRequests is a limit to the number of allowed concurrent GRPC
|
||||
// requests. Concurrent DRPC requests count toward the limit, but DRPC
|
||||
// requests are accepted regardless of the current level.
|
||||
liveGRPCRequests int32
|
||||
}
|
||||
|
||||
// drpcEndpoint wraps streaming methods so that they can be used with drpc
|
||||
@ -115,7 +118,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni
|
||||
usage: usage,
|
||||
usedSerials: usedSerials,
|
||||
|
||||
liveRequests: 0,
|
||||
liveGRPCRequests: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -126,8 +129,8 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
|
||||
defer monLiveRequests(&ctx)(&err)
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
atomic.AddInt32(&endpoint.liveRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveRequests, -1)
|
||||
atomic.AddInt32(&endpoint.liveGRPCRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1)
|
||||
|
||||
endpoint.pingStats.WasPinged(time.Now())
|
||||
|
||||
@ -155,12 +158,12 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
|
||||
|
||||
// Upload handles uploading a piece on piece store.
|
||||
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
|
||||
return endpoint.doUpload(stream)
|
||||
return endpoint.doUpload(stream, true)
|
||||
}
|
||||
|
||||
// Upload handles uploading a piece on piece store.
|
||||
func (endpoint *drpcEndpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) {
|
||||
return endpoint.doUpload(stream)
|
||||
return endpoint.doUpload(stream, false)
|
||||
}
|
||||
|
||||
// uploadStream is the minimum interface required to perform settlements.
|
||||
@ -171,18 +174,18 @@ type uploadStream interface {
|
||||
}
|
||||
|
||||
// doUpload handles uploading a piece on piece store.
|
||||
func (endpoint *Endpoint) doUpload(stream uploadStream) (err error) {
|
||||
func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err error) {
|
||||
ctx := stream.Context()
|
||||
defer monLiveRequests(&ctx)(&err)
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
liveRequests := atomic.AddInt32(&endpoint.liveRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveRequests, -1)
|
||||
liveGRPCRequests := atomic.AddInt32(&endpoint.liveGRPCRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1)
|
||||
|
||||
endpoint.pingStats.WasPinged(time.Now())
|
||||
|
||||
if int(liveRequests) > endpoint.config.MaxConcurrentRequests {
|
||||
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
|
||||
if limitRequests && int(liveGRPCRequests) > endpoint.config.MaxConcurrentRequests {
|
||||
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveGRPCRequests))
|
||||
return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded")
|
||||
}
|
||||
|
||||
@ -396,8 +399,8 @@ func (endpoint *Endpoint) doDownload(stream downloadStream) (err error) {
|
||||
defer monLiveRequests(&ctx)(&err)
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
atomic.AddInt32(&endpoint.liveRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveRequests, -1)
|
||||
atomic.AddInt32(&endpoint.liveGRPCRequests, 1)
|
||||
defer atomic.AddInt32(&endpoint.liveGRPCRequests, -1)
|
||||
|
||||
startTime := time.Now().UTC()
|
||||
|
||||
@ -685,7 +688,7 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
|
||||
|
||||
// TestLiveRequestCount returns the current number of live requests.
|
||||
func (endpoint *Endpoint) TestLiveRequestCount() int32 {
|
||||
return atomic.LoadInt32(&endpoint.liveRequests)
|
||||
return atomic.LoadInt32(&endpoint.liveGRPCRequests)
|
||||
}
|
||||
|
||||
// min finds the min of two values
|
||||
|
Loading…
Reference in New Issue
Block a user