storagenode/piecestore: Respect config.MaxConcurrentRequests for drpc (#3402)

This commit is contained in:
Isaac Hess 2019-10-28 13:12:49 -06:00 committed by GitHub
parent 7c2daa4dd9
commit 1defd4dbfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -57,7 +57,7 @@ type OldConfig struct {
// Config defines parameters for piecestore endpoint.
type Config struct {
ExpirationGracePeriod time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected." default:"6"`
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"`
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
@ -75,8 +75,9 @@ type pingStatsSource interface {
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
config Config
log *zap.Logger
config Config
grpcReqLimit int
signer signing.Signer
trust *trust.Pool
@ -104,9 +105,17 @@ func (endpoint *Endpoint) DRPC() pb.DRPCPiecestoreServer { return &drpcEndpoint{
// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
// If config.MaxConcurrentRequests is set we want to repsect it for grpc.
// However, if it is 0 (unlimited) we force a limit.
grpcReqLimit := config.MaxConcurrentRequests
if grpcReqLimit <= 0 {
grpcReqLimit = 7
}
return &Endpoint{
log: log,
config: config,
log: log,
config: config,
grpcReqLimit: grpcReqLimit,
signer: signer,
trust: trust,
@ -159,12 +168,12 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
// Upload handles uploading a piece on piece store.
func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error) {
return endpoint.doUpload(stream, true)
return endpoint.doUpload(stream, endpoint.grpcReqLimit)
}
// Upload handles uploading a piece on piece store.
func (endpoint *drpcEndpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error) {
return endpoint.doUpload(stream, false)
return endpoint.doUpload(stream, endpoint.config.MaxConcurrentRequests)
}
// uploadStream is the minimum interface required to perform settlements.
@ -175,7 +184,7 @@ type uploadStream interface {
}
// doUpload handles uploading a piece on piece store.
func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err error) {
func (endpoint *Endpoint) doUpload(stream uploadStream, requestLimit int) (err error) {
ctx := stream.Context()
defer monLiveRequests(&ctx)(&err)
defer mon.Task()(&ctx)(&err)
@ -185,7 +194,7 @@ func (endpoint *Endpoint) doUpload(stream uploadStream, limitRequests bool) (err
endpoint.pingStats.WasPinged(time.Now())
if limitRequests && int(liveRequests) > endpoint.config.MaxConcurrentRequests {
if requestLimit > 0 && int(liveRequests) > requestLimit {
endpoint.log.Error("upload rejected, too many requests", zap.Int32("live requests", liveRequests))
return rpcstatus.Error(rpcstatus.Unavailable, "storage node overloaded")
}