satellite/satellitedb: remove ORDER BY when reading from queue
also remove the continuation support from the queue, otherwise we may end up sequential scanning the entire table to get a few rows at the end. then, in the core, instead of looping both to get a big enough batch inside of the queue, as well as outside of it to ensure we consume the whole queue, just get a single batch at a time. also, make the queue size configurable because we'll need to do some tuning in production. Change-Id: If1a997c6012898056ace89366a847c4cb141a025
This commit is contained in:
parent
163c027a6d
commit
44433f38be
@ -27,23 +27,32 @@ var (
|
||||
|
||||
// Config is a configuration struct for the Chore.
|
||||
type Config struct {
|
||||
Interval time.Duration `help:"how often to flush the reported serial rollups to the database" default:"5m"`
|
||||
Interval time.Duration `help:"how often to flush the reported serial rollups to the database" default:"5m"`
|
||||
QueueBatchSize int `help:"default queue batch size" default:"10000"`
|
||||
}
|
||||
|
||||
// Chore for flushing reported serials to the database as rollups.
|
||||
//
|
||||
// architecture: Chore
|
||||
type Chore struct {
|
||||
log *zap.Logger
|
||||
db orders.DB
|
||||
log *zap.Logger
|
||||
db orders.DB
|
||||
config Config
|
||||
|
||||
Loop *sync2.Cycle
|
||||
}
|
||||
|
||||
// NewChore creates new chore for flushing the reported serials to the database as rollups.
|
||||
func NewChore(log *zap.Logger, db orders.DB, config Config) *Chore {
|
||||
if config.QueueBatchSize == 0 {
|
||||
config.QueueBatchSize = 10000
|
||||
}
|
||||
|
||||
return &Chore{
|
||||
log: log,
|
||||
db: db,
|
||||
log: log,
|
||||
db: db,
|
||||
config: config,
|
||||
|
||||
Loop: sync2.NewCycle(config.Interval),
|
||||
}
|
||||
}
|
||||
@ -81,13 +90,6 @@ func (chore *Chore) RunOnce(ctx context.Context, now time.Time) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: jeeze make configurable
|
||||
const (
|
||||
defaultQueueBatchSize = 10000
|
||||
defaultRollupBatchSize = 1000
|
||||
defaultConsumedSerialsBatchSize = 10000
|
||||
)
|
||||
|
||||
func (chore *Chore) readWork(ctx context.Context, now time.Time, queue orders.Queue) (
|
||||
bucketRollups []orders.BucketBandwidthRollup,
|
||||
storagenodeRollups []orders.StoragenodeBandwidthRollup,
|
||||
@ -118,68 +120,61 @@ func (chore *Chore) readWork(ctx context.Context, now time.Time, queue orders.Qu
|
||||
}
|
||||
seenConsumedSerials := make(map[consumedSerialKey]struct{})
|
||||
|
||||
// Loop until our batch is big enough, but not too big in any dimension.
|
||||
for len(byBucket) < defaultRollupBatchSize &&
|
||||
len(byStoragenode) < defaultRollupBatchSize &&
|
||||
len(seenConsumedSerials) < defaultConsumedSerialsBatchSize {
|
||||
// Get a batch of pending serials from the queue.
|
||||
pendingSerials, queueDone, err := queue.GetPendingSerialsBatch(ctx, chore.config.QueueBatchSize)
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, errs.Wrap(err)
|
||||
}
|
||||
|
||||
// Get a batch of pending serials from the queue.
|
||||
pendingSerials, queueDone, err := queue.GetPendingSerialsBatch(ctx, defaultQueueBatchSize)
|
||||
for _, row := range pendingSerials {
|
||||
row := row
|
||||
|
||||
// If we have seen this serial inside of this function already, don't
|
||||
// count it again and record it now.
|
||||
key := consumedSerialKey{
|
||||
nodeID: row.NodeID,
|
||||
serialNumber: row.SerialNumber,
|
||||
}
|
||||
if _, exists := seenConsumedSerials[key]; exists {
|
||||
continue
|
||||
}
|
||||
seenConsumedSerials[key] = struct{}{}
|
||||
|
||||
// Parse the node id, project id, and bucket name from the reported serial.
|
||||
projectID, bucketName, err := orders.SplitBucketID(row.BucketID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, false, errs.Wrap(err)
|
||||
chore.log.Error("bad row inserted into reported serials",
|
||||
zap.Binary("bucket_id", row.BucketID),
|
||||
zap.String("node_id", row.NodeID.String()),
|
||||
zap.String("serial_number", row.SerialNumber.String()))
|
||||
continue
|
||||
}
|
||||
action := pb.PieceAction(row.Action)
|
||||
settled := row.Settled
|
||||
|
||||
for _, row := range pendingSerials {
|
||||
row := row
|
||||
// Update our batch state to include it.
|
||||
byBucket[bucketKey{
|
||||
projectID: projectID,
|
||||
bucketName: string(bucketName),
|
||||
action: action,
|
||||
}] += settled
|
||||
|
||||
// If we have seen this serial inside of this function already, don't
|
||||
// count it again and record it now.
|
||||
key := consumedSerialKey{
|
||||
nodeID: row.NodeID,
|
||||
serialNumber: row.SerialNumber,
|
||||
}
|
||||
if _, exists := seenConsumedSerials[key]; exists {
|
||||
continue
|
||||
}
|
||||
seenConsumedSerials[key] = struct{}{}
|
||||
byStoragenode[storagenodeKey{
|
||||
nodeID: row.NodeID,
|
||||
action: action,
|
||||
}] += settled
|
||||
|
||||
// Parse the node id, project id, and bucket name from the reported serial.
|
||||
projectID, bucketName, err := orders.SplitBucketID(row.BucketID)
|
||||
if err != nil {
|
||||
chore.log.Error("bad row inserted into reported serials",
|
||||
zap.Binary("bucket_id", row.BucketID),
|
||||
zap.String("node_id", row.NodeID.String()),
|
||||
zap.String("serial_number", row.SerialNumber.String()))
|
||||
continue
|
||||
}
|
||||
action := pb.PieceAction(row.Action)
|
||||
settled := row.Settled
|
||||
consumedSerials = append(consumedSerials, orders.ConsumedSerial{
|
||||
NodeID: row.NodeID,
|
||||
SerialNumber: row.SerialNumber,
|
||||
ExpiresAt: row.ExpiresAt,
|
||||
})
|
||||
}
|
||||
|
||||
// Update our batch state to include it.
|
||||
byBucket[bucketKey{
|
||||
projectID: projectID,
|
||||
bucketName: string(bucketName),
|
||||
action: action,
|
||||
}] += settled
|
||||
|
||||
byStoragenode[storagenodeKey{
|
||||
nodeID: row.NodeID,
|
||||
action: action,
|
||||
}] += settled
|
||||
|
||||
consumedSerials = append(consumedSerials, orders.ConsumedSerial{
|
||||
NodeID: row.NodeID,
|
||||
SerialNumber: row.SerialNumber,
|
||||
ExpiresAt: row.ExpiresAt,
|
||||
})
|
||||
}
|
||||
|
||||
// If we didn't get a full batch, the queue must have run out. We should signal
|
||||
// this fact to our caller so that they can stop looping.
|
||||
if queueDone {
|
||||
done = true
|
||||
break
|
||||
}
|
||||
// If we didn't get a full batch, the queue must have run out. We should signal
|
||||
// this fact to our caller so that they can stop looping.
|
||||
if queueDone {
|
||||
done = true
|
||||
}
|
||||
|
||||
// Convert bucket rollups into a slice.
|
||||
|
@ -482,7 +482,7 @@ func (tx *ordersDBTx) UpdateBucketBandwidthBatch(ctx context.Context, intervalSt
|
||||
_, err = tx.tx.Tx.ExecContext(ctx, `
|
||||
INSERT INTO project_bandwidth_rollups(project_id, interval_month, egress_allocated)
|
||||
SELECT unnest($1::bytea[]), $2, unnest($3::bigint[])
|
||||
ON CONFLICT(project_id, interval_month)
|
||||
ON CONFLICT(project_id, interval_month)
|
||||
DO UPDATE SET egress_allocated = project_bandwidth_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint;
|
||||
`,
|
||||
pq.ByteaArray(projectRUIDs), projectInterval, pq.Array(projectRUAllocated))
|
||||
@ -657,11 +657,6 @@ func (db *ordersDB) WithQueue(ctx context.Context, cb func(ctx context.Context,
|
||||
func (queue *ordersDBQueue) GetPendingSerialsBatch(ctx context.Context, size int) (pendingSerials []orders.PendingSerial, done bool, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var cont rawPendingSerial
|
||||
if len(queue.produced) > 0 {
|
||||
cont = queue.produced[len(queue.produced)-1]
|
||||
}
|
||||
|
||||
// TODO: this might end up being WORSE on cockroach because it does a hash-join after a
|
||||
// full scan of the consumed_serials table, but it's massively better on postgres because
|
||||
// it does an indexed anti-join. hopefully we can get rid of the entire serials system
|
||||
@ -677,10 +672,8 @@ func (queue *ordersDBQueue) GetPendingSerialsBatch(ctx context.Context, size int
|
||||
AND consumed_serials.serial_number = pending_serial_queue.serial_number
|
||||
), 0) as consumed
|
||||
FROM pending_serial_queue
|
||||
WHERE (storage_node_id, bucket_id, serial_number) > ($1, $2, $3)
|
||||
ORDER BY storage_node_id, bucket_id, serial_number
|
||||
LIMIT $4
|
||||
`, cont.nodeID, cont.bucketID, cont.serialNumber, size)
|
||||
LIMIT $1
|
||||
`, size)
|
||||
if err != nil {
|
||||
return nil, false, Error.Wrap(err)
|
||||
}
|
||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -565,6 +565,9 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
|
||||
# how often to flush the reported serial rollups to the database
|
||||
# reported-rollup.interval: 5m0s
|
||||
|
||||
# default queue batch size
|
||||
# reported-rollup.queue-batch-size: 10000
|
||||
|
||||
# the default bandwidth usage limit
|
||||
# rollup.default-max-bandwidth: 50.0 GB
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user