storj/satellite/metainfo/piecedeletion/combiner.go
Egon Elbre 676f3e8516 satellite/metainfo/piecedeletion: try to make batches larger
Currently it was possible that PopAll returns 1010 items, then
makes one RPC call with 1000 items, then RPC call 10 items. Meanwhile,
there have been added 500 new items added to the queue.

This change ensures that we pull items from the queue early and
try to make rpc batches as large as possible.

Change-Id: I1a30dde9164c2ff7b90c906a9544593c4f1cf0e9
2020-04-22 18:43:29 +00:00

171 lines
4.0 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"sync"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Handler handles piece deletion requests from a queue.
type Handler interface {
// Handle should call queue.PopAll until finished.
Handle(ctx context.Context, node *pb.Node, queue Queue)
}
// NewQueue is a constructor func for queues.
type NewQueue func() Queue
// Queue is a queue for jobs.
type Queue interface {
// TryPush tries to push a new job to the queue.
TryPush(job Job) bool
// PopAll fetches all jobs in the queue.
//
// When there are no more jobs, the queue must stop accepting new jobs.
PopAll() ([]Job, bool)
// PopAllWithoutClose fetches all jobs in the queue,
// but without closing the queue for new requests.
PopAllWithoutClose() []Job
}
// Job is a single of deletion.
type Job struct {
// Pieces are the pieces id-s that need to be deleted.
Pieces []storj.PieceID
// Resolve is for notifying the job issuer about the outcome.
Resolve Promise
}
// Promise is for signaling to the deletion requests about the result.
type Promise interface {
// Success is called when the job has been successfully handled.
Success()
// Failure is called when the job didn't complete successfully.
Failure()
}
// Combiner combines multiple concurrent deletion requests into batches.
type Combiner struct {
// ctx context to pass down to the handler.
ctx context.Context
cancel context.CancelFunc
// handler defines what to do with the jobs.
handler Handler
// newQueue creates a new queue.
newQueue NewQueue
// workers contains all worker goroutines.
workers sync2.WorkGroup
// mu protects workerByID
mu sync.Mutex
workerByID map[storj.NodeID]*worker
}
// worker handles a batch of jobs.
type worker struct {
waitFor chan struct{}
node *pb.Node
jobs Queue
done chan struct{}
}
// NewCombiner creates a new combiner.
func NewCombiner(parent context.Context, handler Handler, newQueue NewQueue) *Combiner {
ctx, cancel := context.WithCancel(parent)
return &Combiner{
ctx: ctx,
cancel: cancel,
handler: handler,
newQueue: newQueue,
workerByID: map[storj.NodeID]*worker{},
}
}
// Close shuts down all workers.
func (combiner *Combiner) Close() {
combiner.cancel()
combiner.workers.Close()
}
// Enqueue adds a deletion job to the queue.
func (combiner *Combiner) Enqueue(node *pb.Node, job Job) {
combiner.mu.Lock()
defer combiner.mu.Unlock()
last := combiner.workerByID[node.Id]
// Check whether we can use the last worker.
if last != nil && last.jobs.TryPush(job) {
// We've successfully added a job to an existing worker.
return
}
// Create a new worker when one doesn't exist or the last one was full.
next := &worker{
node: node,
jobs: combiner.newQueue(),
done: make(chan struct{}),
}
if last != nil {
next.waitFor = last.done
}
combiner.workerByID[node.Id] = next
if !next.jobs.TryPush(job) {
// This should never happen.
job.Resolve.Failure()
}
// Start the worker.
next.start(combiner)
}
// schedule starts the worker.
func (worker *worker) start(combiner *Combiner) {
// Try to add to worker pool, this may fail when we are shutting things down.
workerStarted := combiner.workers.Go(func() {
defer close(worker.done)
// Ensure we fail any jobs that the handler didn't handle.
defer FailPending(worker.jobs)
if worker.waitFor != nil {
// Wait for previous worker to finish work to ensure fairness between nodes.
select {
case <-worker.waitFor:
case <-combiner.ctx.Done():
return
}
}
// Handle the job queue.
combiner.handler.Handle(combiner.ctx, worker.node, worker.jobs)
})
// If we failed to start a worker, then mark all the jobs as failures.
if !workerStarted {
FailPending(worker.jobs)
}
}
// FailPending fails all the jobs in the queue.
func FailPending(jobs Queue) {
for {
list, ok := jobs.PopAll()
if !ok {
return
}
for _, job := range list {
job.Resolve.Failure()
}
}
}