satellite/orders: don't cancel flushing bandwidth orders
Earlier we made a change to not cancel flushing orders when flushing was triggered by orders endpoint method but we missed a case where it can be also triggered (and canceled) by metainfo endpoints method. This change moves ignoring context cancellation deeper. Change-Id: Id43176f552efc3167345783f73aab885411ac247
This commit is contained in:
parent
eab83ee7fa
commit
b46c0fb78f
@ -15,7 +15,6 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/context2"
|
||||
"storj.io/common/identity"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/rpc/rpcstatus"
|
||||
@ -387,10 +386,6 @@ func (endpoint *Endpoint) SettlementWithWindowFinal(stream pb.DRPCOrders_Settlem
|
||||
)
|
||||
|
||||
if status == pb.SettlementWithWindowResponse_ACCEPTED && !alreadyProcessed {
|
||||
// we would like to update bandwidth even if context was canceled because
|
||||
// underlying implementation is flushing cache using this context in separate
|
||||
// goroutine so it can be executed after this stream will be closed
|
||||
ctx := context2.WithoutCancellation(ctx)
|
||||
for bucketIDAction, bwAmount := range bucketSettled {
|
||||
err = endpoint.DB.UpdateBucketBandwidthSettle(ctx,
|
||||
bucketIDAction.projectID, []byte(bucketIDAction.bucketname), bucketIDAction.action, bwAmount.Settled, bwAmount.Dead, time.Unix(0, window),
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/common/context2"
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/sync2"
|
||||
"storj.io/common/uuid"
|
||||
@ -144,6 +145,12 @@ func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups Rollup
|
||||
})
|
||||
}
|
||||
|
||||
// we would like to update bandwidth even if context was canceled. flushing
|
||||
// is triggered by endpoint methods (metainfo/orders) but flushing is started
|
||||
// in separate goroutine and because of that endpoint request can be finished
|
||||
// and its context will be canceled before UpdateBandwidthBatch is finished.
|
||||
ctx = context2.WithoutCancellation(ctx)
|
||||
|
||||
err := cache.DB.UpdateBandwidthBatch(ctx, rollups)
|
||||
if err != nil {
|
||||
mon.Event("rollups_write_cache_flush_lost")
|
||||
|
Loading…
Reference in New Issue
Block a user