satellite/orders: do a better job limiting concurrent requests
Doing it at the ProcessOrders level was insufficient: the endpoints make multiple database calls. It was a misguided attempt to only have one spot enter the semaphore. By putting it in the endpoint we can not only be sure that the concurrency is correctly limited but it can be configurable easily. Change-Id: I937149dd077adf9eb87fce52a1a17dc0afe96f64
This commit is contained in:
parent
cf1748158a
commit
0f0faf0a9f
@ -333,6 +333,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.DB.NodeAPIVersion(),
|
||||
config.Orders.SettlementBatchSize,
|
||||
config.Orders.WindowEndpointRolloutPhase,
|
||||
config.Orders.OrdersSemaphoreSize,
|
||||
)
|
||||
var err error
|
||||
peer.Orders.Service, err = orders.NewService(
|
||||
|
@ -205,10 +205,19 @@ type Endpoint struct {
|
||||
nodeAPIVersionDB nodeapiversion.DB
|
||||
settlementBatchSize int
|
||||
windowEndpointRolloutPhase WindowEndpointRolloutPhase
|
||||
ordersSemaphore chan struct{}
|
||||
}
|
||||
|
||||
// NewEndpoint new orders receiving endpoint.
|
||||
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase) *Endpoint {
|
||||
//
|
||||
// ordersSemaphoreSize controls the number of concurrent clients allowed to submit orders at once.
|
||||
// A value of zero means unlimited.
|
||||
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPIVersionDB nodeapiversion.DB, settlementBatchSize int, windowEndpointRolloutPhase WindowEndpointRolloutPhase, ordersSemaphoreSize int) *Endpoint {
|
||||
var ordersSemaphore chan struct{}
|
||||
if ordersSemaphoreSize > 0 {
|
||||
ordersSemaphore = make(chan struct{}, ordersSemaphoreSize)
|
||||
}
|
||||
|
||||
return &Endpoint{
|
||||
log: log,
|
||||
satelliteSignee: satelliteSignee,
|
||||
@ -216,6 +225,7 @@ func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, nodeAPI
|
||||
nodeAPIVersionDB: nodeAPIVersionDB,
|
||||
settlementBatchSize: settlementBatchSize,
|
||||
windowEndpointRolloutPhase: windowEndpointRolloutPhase,
|
||||
ordersSemaphore: ordersSemaphore,
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,11 +247,31 @@ func monitoredSettlementStreamSend(ctx context.Context, stream pb.DRPCOrders_Set
|
||||
return stream.Send(resp)
|
||||
}
|
||||
|
||||
// enterOrdersSemaphore acquires a slot with the ordersSemaphore if one exists and returns
|
||||
// a function to exit it. If the context expires, it returns an error.
|
||||
func (endpoint *Endpoint) enterOrdersSemaphore(ctx context.Context) (func(), error) {
|
||||
if endpoint.ordersSemaphore == nil {
|
||||
return func() {}, nil
|
||||
}
|
||||
select {
|
||||
case endpoint.ordersSemaphore <- struct{}{}:
|
||||
return func() { <-endpoint.ordersSemaphore }, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Settlement receives orders and handles them in batches.
|
||||
func (endpoint *Endpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
|
||||
ctx := stream.Context()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
exit, err := endpoint.enterOrdersSemaphore(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
defer exit()
|
||||
|
||||
switch endpoint.windowEndpointRolloutPhase {
|
||||
case WindowEndpointRolloutPhase1:
|
||||
case WindowEndpointRolloutPhase2, WindowEndpointRolloutPhase3:
|
||||
@ -412,6 +442,12 @@ func (endpoint *Endpoint) SettlementWithWindowMigration(stream pb.DRPCOrders_Set
|
||||
ctx := stream.Context()
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
exit, err := endpoint.enterOrdersSemaphore(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer exit()
|
||||
|
||||
peer, err := identity.PeerIdentityFromContext(ctx)
|
||||
if err != nil {
|
||||
endpoint.log.Debug("err peer identity from context", zap.Error(err))
|
||||
|
@ -36,6 +36,7 @@ type Config struct {
|
||||
ReportedRollupsReadBatchSize int `help:"how many records to read in a single transaction when calculating billable bandwidth" default:"1000"`
|
||||
NodeStatusLogging bool `hidden:"true" help:"deprecated, log the offline/disqualification status of nodes" default:"false"`
|
||||
WindowEndpointRolloutPhase WindowEndpointRolloutPhase `help:"rollout phase for the windowed endpoint" default:"phase1"`
|
||||
OrdersSemaphoreSize int `help:"how many concurrent orders to process at once. zero is unlimited" default:"2"`
|
||||
}
|
||||
|
||||
// BucketsDB returns information about buckets.
|
||||
|
@ -241,8 +241,6 @@ func (db *ordersDB) UnuseSerialNumber(ctx context.Context, serialNumber storj.Se
|
||||
return err
|
||||
}
|
||||
|
||||
var processSem = make(chan struct{}, 2)
|
||||
|
||||
// ProcessOrders take a list of order requests and inserts them into the pending serials queue.
|
||||
//
|
||||
// ProcessOrders requires that all orders come from the same storage node.
|
||||
@ -253,14 +251,6 @@ func (db *ordersDB) ProcessOrders(ctx context.Context, requests []*orders.Proces
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// bound the number of orders we issue at once to avoid herds using all the database connections
|
||||
select {
|
||||
case processSem <- struct{}{}:
|
||||
defer func() { <-processSem }()
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// check that all requests are from the same storage node
|
||||
storageNodeID := requests[0].OrderLimit.StorageNodeId
|
||||
for _, req := range requests[1:] {
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -463,6 +463,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# include encrypted metadata in the order limit
|
||||
# orders.include-encrypted-metadata: false
|
||||
|
||||
# how many concurrent orders to process at once. zero is unlimited
|
||||
# orders.orders-semaphore-size: 2
|
||||
|
||||
# how many records to read in a single transaction when calculating billable bandwidth
|
||||
# orders.reported-rollups-read-batch-size: 1000
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user