From 1defd4dbfee3a80df6d63a308814bc6154cb8557 Mon Sep 17 00:00:00 2001 From: Isaac Hess Date: Mon, 28 Oct 2019 13:12:49 -0600 Subject: [PATCH] storagenode/piecestore: Respect config.MaxConcurrentRequests for drpc (#3402) --- storagenode/piecestore/endpoint.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index edaffd0fa..f6309b4d9 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -57,7 +57,7 @@ type OldConfig struct { // Config defines parameters for piecestore endpoint. type Config struct { ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"` - MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"` + MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"` OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"` CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"` @@ -75,8 +75,9 @@ type pingStatsSource interface { // // architecture: Endpoint type Endpoint struct { - log *zap.Logger - config Config + log *zap.Logger + config Config + grpcReqLimit int signer signing.Signer trust *trust.Pool @@ -104,9 +105,17 @@ func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{ // NewEndpoint creates a new piecestore endpoint. func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { + // If config.MaxConcurrentRequests is set we want to repsect it for grpc. + // However, if it is 0 (unlimited) we force a limit. + grpcReqLimit := config.MaxConcurrentRequests + if grpcReqLimit <= 0 { + grpcReqLimit = 7 + } + return &Endpoint{ - log: log, - config: config, + log: log, + config: config, + grpcReqLimit: grpcReqLimit, signer: signer, trust: trust, @@ -159,12 +168,12 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ // Upload handles uploading a piece on piece store. func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) { - return endpoint.doUpload(stream, true) + return endpoint.doUpload(stream, endpoint.grpcReqLimit) } // Upload handles uploading a piece on piece store. func (endpoint *drpcEndpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) { - return endpoint.doUpload(stream, false) + return endpoint.doUpload(stream, endpoint.config.MaxConcurrentRequests) } // uploadStream is the minimum interface required to perform settlements. @@ -175,7 +184,7 @@ type uploadStream interface { } // doUpload handles uploading a piece on piece store. -func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err error) { +func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err error) { ctx := stream.Context() defer monLiveRequests(&ctx)(&err) defer mon.Task()(&ctx)(&err) @@ -185,7 +194,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err endpoint.pingStats.WasPinged(time.Now()) - if limitRequests && int(liveRequests) > endpoint.config.MaxConcurrentRequests { + if requestLimit > 0 && int(liveRequests) > requestLimit { endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests)) return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded") }