satellite/metainfo/piecedeletion: add Combiner

To handle concurrent deletion requests we need to combine them into a
single request.

To implement this we introduces few concurrency ideas:

* Combiner, which takes a node id and a Job and handles combining
  multiple requests to a single batch.

* Job, which represents deleting of multiple piece ids with a
  notification mechanism to the caller.

* Queue, which provides communication from Combiner to Handler.
  It can limit the number of requests per work queue.

* Handler, which takes an active Queue and processes it until it has
  consumed all the jobs.
  It can provide limits to handling concurrency.

Change-Id: I3299325534abad4bae66969ffa16c6ed95d5574f
This commit is contained in:
Egon Elbre 2020-03-11 20:20:23 +02:00
parent 97df9b5704
commit 3d6518081a
8 changed files with 711 additions and 0 deletions

View File

@ -0,0 +1,165 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"sync"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Handler handles piece deletion requests from a queue.
type Handler interface {
// Handle should call queue.PopAll until finished.
Handle(ctx context.Context, node *pb.Node, queue Queue)
}
// NewQueue is a constructor func for queues.
type NewQueue func() Queue
// Queue is a queue for jobs.
type Queue interface {
// TryPush tries to push a new job to the queue.
TryPush(job Job) bool
// PopAll fetches all jobs in the queue.
//
// When there are no more jobs, the queue must stop accepting new jobs.
PopAll() ([]Job, bool)
}
// Job is a single of deletion.
type Job struct {
// Pieces are the pieces id-s that need to be deleted.
Pieces []storj.PieceID
// Resolve is for notifying the job issuer about the outcome.
Resolve Promise
}
// Promise is for signaling to the deletion requests about the result.
type Promise interface {
// Success is called when the job has been successfully handled.
Success()
// Failure is called when the job didn't complete successfully.
Failure()
}
// Combiner combines multiple concurrent deletion requests into batches.
type Combiner struct {
// ctx context to pass down to the handler.
ctx context.Context
cancel context.CancelFunc
// handler defines what to do with the jobs.
handler Handler
// newQueue creates a new queue.
newQueue NewQueue
// workers contains all worker goroutines.
workers sync2.WorkGroup
// mu protects workerByID
mu sync.Mutex
workerByID map[storj.NodeID]*worker
}
// worker handles a batch of jobs.
type worker struct {
waitFor chan struct{}
node *pb.Node
jobs Queue
done chan struct{}
}
// NewCombiner creates a new combiner.
func NewCombiner(parent context.Context, handler Handler, newQueue NewQueue) *Combiner {
ctx, cancel := context.WithCancel(parent)
return &Combiner{
ctx: ctx,
cancel: cancel,
handler: handler,
newQueue: newQueue,
workerByID: map[storj.NodeID]*worker{},
}
}
// Close shuts down all workers.
func (combiner *Combiner) Close() {
combiner.cancel()
combiner.workers.Close()
}
// Enqueue adds a deletion job to the queue.
func (combiner *Combiner) Enqueue(node *pb.Node, job Job) {
combiner.mu.Lock()
defer combiner.mu.Unlock()
last := combiner.workerByID[node.Id]
// Check whether we can use the last worker.
if last != nil && last.jobs.TryPush(job) {
// We've successfully added a job to an existing worker.
return
}
// Create a new worker when one doesn't exist or the last one was full.
next := &worker{
node: node,
jobs: combiner.newQueue(),
done: make(chan struct{}),
}
if last != nil {
next.waitFor = last.done
}
combiner.workerByID[node.Id] = next
if !next.jobs.TryPush(job) {
// This should never happen.
job.Resolve.Failure()
}
// Start the worker.
next.start(combiner)
}
// schedule starts the worker.
func (worker *worker) start(combiner *Combiner) {
// Try to add to worker pool, this may fail when we are shutting things down.
workerStarted := combiner.workers.Go(func() {
defer close(worker.done)
// Ensure we fail any jobs that the handler didn't handle.
defer FailPending(worker.jobs)
if worker.waitFor != nil {
// Wait for previous worker to finish work to ensure fairness between nodes.
select {
case <-worker.waitFor:
case <-combiner.ctx.Done():
return
}
}
// Handle the job queue.
combiner.handler.Handle(combiner.ctx, worker.node, worker.jobs)
})
// If we failed to start a worker, then mark all the jobs as failures.
if !workerStarted {
FailPending(worker.jobs)
}
}
// FailPending fails all the jobs in the queue.
func FailPending(jobs Queue) {
for {
list, ok := jobs.PopAll()
if !ok {
return
}
for _, job := range list {
job.Resolve.Failure()
}
}
}

View File

@ -0,0 +1,127 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type SleepyHandler struct {
Min, Max time.Duration
TotalHandled int64
}
func (handler *SleepyHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
if !sync2.Sleep(ctx, handler.Min) {
return
}
for {
list, ok := queue.PopAll()
if !ok {
return
}
for _, job := range list {
atomic.AddInt64(&handler.TotalHandled, int64(len(job.Pieces)))
job.Resolve.Success()
}
span := int(handler.Max - handler.Min)
wait := testrand.Intn(span)
if !sync2.Sleep(ctx, handler.Min+time.Duration(wait)) {
return
}
}
}
func BenchmarkCombiner(b *testing.B) {
const (
// currently nodes are picked uniformly, however total piece distribution is not
// hence we use a lower number to simulate frequent nodes
nodeCount = 500
requestCount = 100
// assume each request has ~2 segments
callsPerRequest = 160
// we cannot use realistic values here due to sleep granularity
minWait = 1 * time.Millisecond
maxWait = 20 * time.Millisecond
)
activeLimits := []int{8, 32, 64, -1}
queueSizes := []int{1, 8, 64, 128, -1}
nodes := []*pb.Node{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, &pb.Node{
Id: testrand.NodeID(),
})
}
for _, activeLimit := range activeLimits {
for _, queueSize := range queueSizes {
activeLimit, queueSize := activeLimit, queueSize
name := fmt.Sprintf("active=%d,queue=%d", activeLimit, queueSize)
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
func() {
sleeper := &SleepyHandler{Min: minWait, Max: maxWait}
limited := piecedeletion.NewLimitedHandler(sleeper, activeLimit)
ctx := testcontext.New(b)
defer ctx.Cleanup()
newQueue := func() piecedeletion.Queue {
return piecedeletion.NewLimitedJobs(queueSize)
}
var combiner *piecedeletion.Combiner
if activeLimit > 0 {
combiner = piecedeletion.NewCombiner(ctx, limited, newQueue)
} else {
combiner = piecedeletion.NewCombiner(ctx, sleeper, newQueue)
}
for request := 0; request < requestCount; request++ {
ctx.Go(func() error {
done, err := sync2.NewSuccessThreshold(callsPerRequest, 0.999999)
if err != nil {
return err
}
for call := 0; call < callsPerRequest; call++ {
i := testrand.Intn(nodeCount)
combiner.Enqueue(nodes[i], piecedeletion.Job{
Pieces: []storj.PieceID{testrand.PieceID()},
Resolve: done,
})
}
done.Wait(ctx)
return nil
})
}
ctx.Wait()
combiner.Close()
totalRequests := int64(callsPerRequest * requestCount)
if sleeper.TotalHandled != totalRequests {
b.Fatalf("handled only %d expected %d", sleeper.TotalHandled, totalRequests)
}
}()
}
})
}
}
}

View File

@ -0,0 +1,94 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type CountHandler struct {
Count int64
}
func (handler *CountHandler) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
for {
list, ok := queue.PopAll()
if !ok {
return
}
for _, job := range list {
atomic.AddInt64(&handler.Count, int64(len(job.Pieces)))
job.Resolve.Success()
}
}
}
func TestCombiner(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const (
activeLimit = 8
nodeCount = 70
requestCount = 100
parallelCount = 10
queueSize = 5
)
nodes := []*pb.Node{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, &pb.Node{
Id: testrand.NodeID(),
})
}
counter := &CountHandler{}
limited := piecedeletion.NewLimitedHandler(counter, activeLimit)
newQueue := func() piecedeletion.Queue {
return piecedeletion.NewLimitedJobs(queueSize)
}
combiner := piecedeletion.NewCombiner(ctx, limited, newQueue)
var wg sync.WaitGroup
for i := 0; i < parallelCount; i++ {
wg.Add(1)
ctx.Go(func() error {
defer wg.Done()
pending, err := sync2.NewSuccessThreshold(requestCount, 0.999999)
if err != nil {
return err
}
for k := 0; k < requestCount; k++ {
node := nodes[testrand.Intn(len(nodes))]
combiner.Enqueue(node, piecedeletion.Job{
Pieces: []storj.PieceID{testrand.PieceID()},
Resolve: pending,
})
}
pending.Wait(ctx)
return nil
})
}
wg.Wait()
combiner.Close()
require.Equal(t, int(counter.Count), int(requestCount*parallelCount))
}

View File

@ -0,0 +1,5 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
// Package piecedeletion implements service for deleting pieces that combines concurrent requests.
package piecedeletion

View File

@ -0,0 +1,36 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"golang.org/x/sync/semaphore"
"storj.io/common/pb"
)
// LimitedHandler wraps handler with a concurrency limit.
type LimitedHandler struct {
active *semaphore.Weighted
Handler
}
// NewLimitedHandler wraps handler with a concurrency limit.
func NewLimitedHandler(handler Handler, limit int) *LimitedHandler {
return &LimitedHandler{
active: semaphore.NewWeighted(int64(limit)),
Handler: handler,
}
}
// Handle handles the job queue.
func (handler *LimitedHandler) Handle(ctx context.Context, node *pb.Node, queue Queue) {
if err := handler.active.Acquire(ctx, 1); err != nil {
return
}
defer handler.active.Release(1)
handler.Handler.Handle(ctx, node, queue)
}

View File

@ -0,0 +1,53 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"sync/atomic"
"testing"
"time"
"storj.io/common/pb"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type HandleLimitVerifier struct {
Active int64
ExpectedLimit int64
}
func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue {
panic("should not be called")
}
func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
current := atomic.AddInt64(&verifier.Active, 1)
if current > verifier.ExpectedLimit {
panic("over limit")
}
defer atomic.AddInt64(&verifier.Active, -1)
defer sync2.Sleep(ctx, time.Millisecond)
}
func TestLimitedHandler(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
verifier := &HandleLimitVerifier{
Active: 0,
ExpectedLimit: 8,
}
limited := piecedeletion.NewLimitedHandler(verifier, int(verifier.ExpectedLimit))
for i := 0; i < 800; i++ {
ctx.Go(func() error {
limited.Handle(ctx, nil, nil)
return nil
})
}
}

View File

@ -0,0 +1,75 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import "sync"
// LimitedJobs is a finalizable list of deletion jobs with a limit to how many
// jobs it can handle.
type LimitedJobs struct {
maxPiecesPerBatch int
mu sync.Mutex
// done indicates that no more items will be appended to the queue.
done bool
// count is the number of piece ids queued here.
count int
// list is the list of delete jobs.
list []Job
}
// NewLimitedJobs returns a new limited job queue.
func NewLimitedJobs(maxPiecesPerBatch int) *LimitedJobs {
return &LimitedJobs{
maxPiecesPerBatch: maxPiecesPerBatch,
}
}
// TryPush tries to add a job to the queue.
//
// maxPiecesPerBatch < 0, means no limit
func (jobs *LimitedJobs) TryPush(job Job) bool {
return jobs.tryPush(job, jobs.maxPiecesPerBatch)
}
// tryPush tries to add a job to the queue.
//
// maxPiecesPerBatch < 0, means no limit
func (jobs *LimitedJobs) tryPush(job Job, maxPiecesPerBatch int) bool {
jobs.mu.Lock()
defer jobs.mu.Unlock()
// check whether we have finished work with this jobs queue.
if jobs.done {
return false
}
// add to the queue, this can potentially overflow `maxPiecesPerBatch`,
// however splitting a single request and promise across multiple batches, is annoying.
jobs.count += len(job.Pieces)
// check whether the queue is at capacity
if maxPiecesPerBatch >= 0 && jobs.count >= maxPiecesPerBatch {
jobs.done = true
}
jobs.list = append(jobs.list, job)
return true
}
// PopAll returns all the jobs in this list.
func (jobs *LimitedJobs) PopAll() (_ []Job, ok bool) {
jobs.mu.Lock()
defer jobs.mu.Unlock()
// when we try to pop and the queue is empty, make the queue final.
if len(jobs.list) == 0 {
jobs.done = true
return nil, false
}
list := jobs.list
jobs.list = nil
return list, true
}

View File

@ -0,0 +1,156 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"errors"
"sort"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
func TestLimitedJobs(t *testing.T) {
{ // pop on an empty list
q := piecedeletion.NewLimitedJobs(-1)
list, ok := q.PopAll()
require.False(t, ok)
require.Nil(t, list)
}
{ // pop on a non-empty list
q := piecedeletion.NewLimitedJobs(-1)
job1 := randomJob(2)
job2 := randomJob(3)
// first push should always work
require.True(t, q.TryPush(job1))
// try push another, currently we don't have limits
require.True(t, q.TryPush(job2))
list, ok := q.PopAll()
require.True(t, ok)
require.Equal(t, list, []piecedeletion.Job{job1, job2})
// should be empty
list, ok = q.PopAll()
require.False(t, ok)
require.Nil(t, list)
// pushing another should fail
require.False(t, q.TryPush(randomJob(1)))
}
}
func TestLimitedJobs_Limiting(t *testing.T) {
{
q := piecedeletion.NewLimitedJobs(2)
require.True(t, q.TryPush(randomJob(1)))
require.True(t, q.TryPush(randomJob(1)))
require.False(t, q.TryPush(randomJob(1)))
require.False(t, q.TryPush(randomJob(1)))
}
{
q := piecedeletion.NewLimitedJobs(2)
require.True(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.True(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.False(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.False(t, q.TryPush(randomJob(1)))
}
}
func TestLimitedJobs_Concurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const N = 4
q := piecedeletion.NewLimitedJobs(-1)
jobs := []piecedeletion.Job{
randomJob(1),
randomJob(2),
randomJob(3),
randomJob(4),
}
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
ctx.Go(func() error {
defer wg.Done()
if !q.TryPush(jobs[i]) {
return errors.New("failed to add job")
}
return nil
})
}
ctx.Go(func() error {
wg.Wait()
list, ok := q.PopAll()
if !ok {
return errors.New("failed to return jobs")
}
sort.Slice(list, func(i, k int) bool {
return len(list[i].Pieces) < len(list[k].Pieces)
})
if !assert.Equal(t, jobs, list) {
return errors.New("not equal")
}
return nil
})
}
func TestLimitedJobs_NoRace(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const N = 4
q := piecedeletion.NewLimitedJobs(-1)
jobs := []piecedeletion.Job{
randomJob(1),
randomJob(2),
randomJob(3),
randomJob(4),
}
for i := 0; i < N; i++ {
i := i
ctx.Go(func() error {
_ = q.TryPush(jobs[i])
return nil
})
}
ctx.Go(func() error {
_, _ = q.PopAll()
return nil
})
}
func randomJob(n int) piecedeletion.Job {
job := piecedeletion.Job{}
for i := 0; i < n; i++ {
job.Pieces = append(job.Pieces, testrand.PieceID())
}
return job
}