diff --git a/satellite/orders/endpoint.go b/satellite/orders/endpoint.go index 7ecc78d9c..ae63313c2 100644 --- a/satellite/orders/endpoint.go +++ b/satellite/orders/endpoint.go @@ -38,8 +38,8 @@ type DB interface { // UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket UpdateBucketBandwidthInline(ctx context.Context, bucketID []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error - // UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage node - UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error + // UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage nodes + UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error // UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error diff --git a/satellite/orders/service.go b/satellite/orders/service.go index e3780f229..97ae6c418 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -70,28 +70,41 @@ func (service *Service) updateBandwidth(ctx context.Context, bucketID []byte, ad if len(addressedOrderLimits) == 0 { return nil } - var bucketAllocation int64 + var action pb.PieceAction - nodesAllocation := make(map[storj.NodeID]int64) + + var bucketAllocation int64 + var nodesAllocation int64 + nodes := make([]storj.NodeID, 0, len(addressedOrderLimits)) + for _, addressedOrderLimit := range addressedOrderLimits { if addressedOrderLimit != nil { orderLimit := addressedOrderLimit.Limit - bucketAllocation += orderLimit.Limit - nodesAllocation[orderLimit.StorageNodeId] = orderLimit.Limit + + if nodesAllocation == 0 { + nodesAllocation = orderLimit.Limit + } else if nodesAllocation != orderLimit.Limit { + return Error.New("inconsistent allocations had %d got %d", nodesAllocation, orderLimit.Limit) + } + + nodes = append(nodes, orderLimit.StorageNodeId) action = orderLimit.Action + + bucketAllocation += orderLimit.Limit } } + now := time.Now().UTC() intervalStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) if err := service.orders.UpdateBucketBandwidthAllocation(ctx, bucketID, action, bucketAllocation, intervalStart); err != nil { - return err + return Error.Wrap(err) } - for nodeID, allocation := range nodesAllocation { - if err := service.orders.UpdateStoragenodeBandwidthAllocation(ctx, nodeID, action, allocation, intervalStart); err != nil { - return err - } + + if err := service.orders.UpdateStoragenodeBandwidthAllocation(ctx, nodes, action, nodesAllocation, intervalStart); err != nil { + return Error.Wrap(err) } + return nil } diff --git a/satellite/satellitedb/locked.go b/satellite/satellitedb/locked.go index 88ac30320..0ba8ac773 100644 --- a/satellite/satellitedb/locked.go +++ b/satellite/satellitedb/locked.go @@ -733,11 +733,11 @@ func (m *lockedOrders) UpdateBucketBandwidthSettle(ctx context.Context, bucketID return m.db.UpdateBucketBandwidthSettle(ctx, bucketID, action, amount, intervalStart) } -// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage node -func (m *lockedOrders) UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error { +// UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage nodes +func (m *lockedOrders) UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error { m.Lock() defer m.Unlock() - return m.db.UpdateStoragenodeBandwidthAllocation(ctx, storageNode, action, amount, intervalStart) + return m.db.UpdateStoragenodeBandwidthAllocation(ctx, storageNodes, action, amount, intervalStart) } // UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node diff --git a/satellite/satellitedb/orders.go b/satellite/satellitedb/orders.go index 6168692fb..2e3539a1b 100644 --- a/satellite/satellitedb/orders.go +++ b/satellite/satellitedb/orders.go @@ -7,8 +7,12 @@ import ( "bytes" "context" "database/sql" + "sort" "time" + "github.com/lib/pq" + sqlite3 "github.com/mattn/go-sqlite3" + "storj.io/storj/internal/dbutil/pgutil" "storj.io/storj/internal/dbutil/sqliteutil" "storj.io/storj/pkg/pb" @@ -125,20 +129,44 @@ func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, bucketID [] } // UpdateStoragenodeBandwidthAllocation updates 'allocated' bandwidth for given storage node -func (db *ordersDB) UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) { +func (db *ordersDB) UpdateStoragenodeBandwidthAllocation(ctx context.Context, storageNodes []storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) { defer mon.Task()(&ctx)(&err) - statement := db.db.Rebind( - `INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, allocated, settled) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(storagenode_id, interval_start, action) - DO UPDATE SET allocated = storagenode_bandwidth_rollups.allocated + ?`, - ) - _, err = db.db.ExecContext(ctx, statement, - storageNode.Bytes(), intervalStart, defaultIntervalSeconds, action, uint64(amount), 0, uint64(amount), - ) - if err != nil { - return err + + switch t := db.db.Driver().(type) { + case *sqlite3.SQLiteDriver: + statement := db.db.Rebind( + `INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, allocated, settled) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(storagenode_id, interval_start, action) + DO UPDATE SET allocated = storagenode_bandwidth_rollups.allocated + excluded.allocated`, + ) + for _, storageNode := range storageNodes { + _, err = db.db.ExecContext(ctx, statement, + storageNode.Bytes(), intervalStart, defaultIntervalSeconds, action, uint64(amount), 0, + ) + if err != nil { + return Error.Wrap(err) + } + } + + case *pq.Driver: + // sort nodes to avoid update deadlock + sort.Sort(storj.NodeIDList(storageNodes)) + + _, err := db.db.ExecContext(ctx, ` + INSERT INTO storagenode_bandwidth_rollups + (storagenode_id, interval_start, interval_seconds, action, allocated, settled) + SELECT unnest($1::bytea[]), $2, $3, $4, $5, $6 + ON CONFLICT(storagenode_id, interval_start, action) + DO UPDATE SET allocated = storagenode_bandwidth_rollups.allocated + excluded.allocated + `, postgresNodeIDList(storageNodes), intervalStart, defaultIntervalSeconds, action, uint64(amount), 0) + if err != nil { + return Error.Wrap(err) + } + default: + return Error.New("Unsupported database %t", t) } + return nil }