storj/satellite/metainfo/piecedeletion/handler_test.go
Egon Elbre 3d6518081a satellite/metainfo/piecedeletion: add Combiner
To handle concurrent deletion requests we need to combine them into a
single request.

To implement this we introduces few concurrency ideas:

* Combiner, which takes a node id and a Job and handles combining
  multiple requests to a single batch.

* Job, which represents deleting of multiple piece ids with a
  notification mechanism to the caller.

* Queue, which provides communication from Combiner to Handler.
  It can limit the number of requests per work queue.

* Handler, which takes an active Queue and processes it until it has
  consumed all the jobs.
  It can provide limits to handling concurrency.

Change-Id: I3299325534abad4bae66969ffa16c6ed95d5574f
2020-03-16 17:13:26 +00:00

54 lines
1.1 KiB
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"sync/atomic"
"testing"
"time"
"storj.io/common/pb"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type HandleLimitVerifier struct {
Active int64
ExpectedLimit int64
}
func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue {
panic("should not be called")
}
func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node *pb.Node, queue piecedeletion.Queue) {
current := atomic.AddInt64(&verifier.Active, 1)
if current > verifier.ExpectedLimit {
panic("over limit")
}
defer atomic.AddInt64(&verifier.Active, -1)
defer sync2.Sleep(ctx, time.Millisecond)
}
func TestLimitedHandler(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
verifier := &HandleLimitVerifier{
Active: 0,
ExpectedLimit: 8,
}
limited := piecedeletion.NewLimitedHandler(verifier, int(verifier.ExpectedLimit))
for i := 0; i < 800; i++ {
ctx.Go(func() error {
limited.Handle(ctx, nil, nil)
return nil
})
}
}