satellite/metainfo: drop piecedeletion package

We are not using this package anymore.

Change-Id: If32315d43d73c8deb096e93cb43c03881bd9aad1
This commit is contained in:
Michal Niewrzal 2023-06-14 11:02:51 +02:00 committed by Storj Robot
parent 6c08d5024e
commit 2e0b687581
16 changed files with 13 additions and 1775 deletions

View File

@ -79,8 +79,6 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter
storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter
storj.io/storj/satellite/metabase/rangedloop."rangedloop_error" Event
storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event
storj.io/storj/satellite/metainfo/piecedeletion."delete_batch_size" IntVal
storj.io/storj/satellite/metainfo/piecedeletion."deletion_pieces_unhandled_count" IntVal
storj.io/storj/satellite/metrics."total_inline_bytes" IntVal
storj.io/storj/satellite/metrics."total_inline_segments" IntVal
storj.io/storj/satellite/metrics."total_remote_bytes" IntVal

View File

@ -41,7 +41,6 @@ import (
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/nodestats"
"storj.io/storj/satellite/oidc"
"storj.io/storj/satellite/orders"
@ -101,7 +100,6 @@ type API struct {
Metainfo struct {
Metabase *metabase.DB
PieceDeletion *piecedeletion.Service
Endpoint *metainfo.Endpoint
}
@ -425,22 +423,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup metainfo
peer.Metainfo.Metabase = metabaseDB
peer.Metainfo.PieceDeletion, err = piecedeletion.NewService(
peer.Log.Named("metainfo:piecedeletion"),
peer.Dialer,
// TODO use cache designed for deletion
peer.Overlay.Service.DownloadSelectionCache,
config.Metainfo.PieceDeletion,
)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "metainfo:piecedeletion",
Run: peer.Metainfo.PieceDeletion.Run,
Close: peer.Metainfo.PieceDeletion.Close,
})
peer.Metainfo.Endpoint, err = metainfo.NewEndpoint(
peer.Log.Named("metainfo:endpoint"),
peer.Buckets.Service,

View File

@ -13,7 +13,6 @@ import (
"storj.io/common/memory"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/uplink/private/eestream"
)
@ -142,7 +141,6 @@ type Config struct {
RateLimiter RateLimiterConfig `help:"rate limiter configuration"`
UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"`
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
PieceDeletion piecedeletion.Config `help:"piece deletion configuration"`
// TODO remove this flag when server-side copy implementation will be finished
ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"`
ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"`

View File

@ -1,171 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"sync"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Handler handles piece deletion requests from a queue.
type Handler interface {
// Handle should call queue.PopAll until finished.
Handle(ctx context.Context, node storj.NodeURL, queue Queue)
}
// NewQueue is a constructor func for queues.
type NewQueue func() Queue
// Queue is a queue for jobs.
type Queue interface {
// TryPush tries to push a new job to the queue.
TryPush(job Job) bool
// PopAll fetches all jobs in the queue.
//
// When there are no more jobs, the queue must stop accepting new jobs.
PopAll() ([]Job, bool)
// PopAllWithoutClose fetches all jobs in the queue,
// but without closing the queue for new requests.
PopAllWithoutClose() []Job
}
// Job is a single of deletion.
type Job struct {
// Pieces are the pieces id-s that need to be deleted.
Pieces []storj.PieceID
// Resolve is for notifying the job issuer about the outcome.
Resolve Promise
}
// Promise is for signaling to the deletion requests about the result.
type Promise interface {
// Success is called when the job has been successfully handled.
Success()
// Failure is called when the job didn't complete successfully.
Failure()
}
// Combiner combines multiple concurrent deletion requests into batches.
type Combiner struct {
// ctx context to pass down to the handler.
ctx context.Context
cancel context.CancelFunc
// handler defines what to do with the jobs.
handler Handler
// newQueue creates a new queue.
newQueue NewQueue
// workers contains all worker goroutines.
workers sync2.WorkGroup
// mu protects workerByID
mu sync.Mutex
workerByID map[storj.NodeID]*worker
}
// worker handles a batch of jobs.
type worker struct {
waitFor chan struct{}
node storj.NodeURL
jobs Queue
done chan struct{}
}
// NewCombiner creates a new combiner.
func NewCombiner(parent context.Context, handler Handler, newQueue NewQueue) *Combiner {
ctx, cancel := context.WithCancel(parent)
return &Combiner{
ctx: ctx,
cancel: cancel,
handler: handler,
newQueue: newQueue,
workerByID: map[storj.NodeID]*worker{},
}
}
// Close shuts down all workers.
func (combiner *Combiner) Close() {
combiner.cancel()
combiner.workers.Close()
}
// Enqueue adds a deletion job to the queue.
func (combiner *Combiner) Enqueue(node storj.NodeURL, job Job) {
combiner.mu.Lock()
defer combiner.mu.Unlock()
last := combiner.workerByID[node.ID]
// Check whether we can use the last worker.
if last != nil && last.jobs.TryPush(job) {
// We've successfully added a job to an existing worker.
return
}
// Create a new worker when one doesn't exist or the last one was full.
next := &worker{
node: node,
jobs: combiner.newQueue(),
done: make(chan struct{}),
}
if last != nil {
next.waitFor = last.done
}
combiner.workerByID[node.ID] = next
if !next.jobs.TryPush(job) {
// This should never happen.
job.Resolve.Failure()
}
// Start the worker.
next.start(combiner)
}
// schedule starts the worker.
func (worker *worker) start(combiner *Combiner) {
// Try to add to worker pool, this may fail when we are shutting things down.
workerStarted := combiner.workers.Go(func() {
defer mon.TaskNamed("worker_start")(nil)(nil)
defer close(worker.done)
// Ensure we fail any jobs that the handler didn't handle.
defer FailPending(worker.jobs)
if worker.waitFor != nil {
// Wait for previous worker to finish work to ensure fairness between nodes.
select {
case <-worker.waitFor:
case <-combiner.ctx.Done():
return
}
}
// Handle the job queue.
combiner.handler.Handle(combiner.ctx, worker.node, worker.jobs)
})
// If we failed to start a worker, then mark all the jobs as failures.
if !workerStarted {
FailPending(worker.jobs)
}
}
// FailPending fails all the jobs in the queue.
func FailPending(jobs Queue) {
for {
list, ok := jobs.PopAll()
if !ok {
return
}
for _, job := range list {
job.Resolve.Failure()
}
}
}

View File

@ -1,136 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type SleepyHandler struct {
Min, Max time.Duration
TotalHandled int64
}
func (handler *SleepyHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
if !sync2.Sleep(ctx, handler.Min) {
return
}
for {
list, ok := queue.PopAll()
if !ok {
return
}
for _, job := range list {
atomic.AddInt64(&handler.TotalHandled, int64(len(job.Pieces)))
job.Resolve.Success()
}
span := int(handler.Max - handler.Min)
wait := testrand.Intn(span)
if !sync2.Sleep(ctx, handler.Min+time.Duration(wait)) {
return
}
}
}
func BenchmarkCombiner(b *testing.B) {
var (
// currently nodes are picked uniformly, however total piece distribution is not
// hence we use a lower number to simulate frequent nodes
nodeCount = 500
requestCount = 100
// assume each request has ~2 segments
callsPerRequest = 160
// we cannot use realistic values here due to sleep granularity
minWait = 1 * time.Millisecond
maxWait = 20 * time.Millisecond
// add few variations to test
activeLimits = []int{8, 32, 64, -1}
queueSizes = []int{1, 8, 64, 128, -1}
)
if testing.Short() {
// use values to make tests run faster
nodeCount = 5
requestCount = 5
callsPerRequest = 5
activeLimits = []int{8, 64, -1}
queueSizes = []int{8, 128, -1}
}
nodes := []storj.NodeURL{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, storj.NodeURL{
ID: testrand.NodeID(),
})
}
for _, activeLimit := range activeLimits {
for _, queueSize := range queueSizes {
activeLimit, queueSize := activeLimit, queueSize
name := fmt.Sprintf("active=%d,queue=%d", activeLimit, queueSize)
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
func() {
sleeper := &SleepyHandler{Min: minWait, Max: maxWait}
limited := piecedeletion.NewLimitedHandler(sleeper, activeLimit)
ctx := testcontext.New(b)
defer ctx.Cleanup()
newQueue := func() piecedeletion.Queue {
return piecedeletion.NewLimitedJobs(queueSize)
}
var combiner *piecedeletion.Combiner
if activeLimit > 0 {
combiner = piecedeletion.NewCombiner(ctx, limited, newQueue)
} else {
combiner = piecedeletion.NewCombiner(ctx, sleeper, newQueue)
}
for request := 0; request < requestCount; request++ {
ctx.Go(func() error {
done, err := sync2.NewSuccessThreshold(callsPerRequest, 0.999999)
if err != nil {
return err
}
for call := 0; call < callsPerRequest; call++ {
i := testrand.Intn(nodeCount)
combiner.Enqueue(nodes[i], piecedeletion.Job{
Pieces: []storj.PieceID{testrand.PieceID()},
Resolve: done,
})
}
done.Wait(ctx)
return nil
})
}
ctx.Wait()
combiner.Close()
totalRequests := int64(callsPerRequest * requestCount)
if sleeper.TotalHandled != totalRequests {
b.Fatalf("handled only %d expected %d", sleeper.TotalHandled, totalRequests)
}
}()
}
})
}
}
}

View File

@ -1,93 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type CountHandler struct {
Count int64
}
func (handler *CountHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
for {
list, ok := queue.PopAll()
if !ok {
return
}
for _, job := range list {
atomic.AddInt64(&handler.Count, int64(len(job.Pieces)))
job.Resolve.Success()
}
}
}
func TestCombiner(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const (
activeLimit = 8
nodeCount = 70
requestCount = 100
parallelCount = 10
queueSize = 5
)
nodes := []storj.NodeURL{}
for i := 0; i < nodeCount; i++ {
nodes = append(nodes, storj.NodeURL{
ID: testrand.NodeID(),
})
}
counter := &CountHandler{}
limited := piecedeletion.NewLimitedHandler(counter, activeLimit)
newQueue := func() piecedeletion.Queue {
return piecedeletion.NewLimitedJobs(queueSize)
}
combiner := piecedeletion.NewCombiner(ctx, limited, newQueue)
var wg sync.WaitGroup
for i := 0; i < parallelCount; i++ {
wg.Add(1)
ctx.Go(func() error {
defer wg.Done()
pending, err := sync2.NewSuccessThreshold(requestCount, 0.999999)
if err != nil {
return err
}
for k := 0; k < requestCount; k++ {
node := nodes[testrand.Intn(len(nodes))]
combiner.Enqueue(node, piecedeletion.Job{
Pieces: []storj.PieceID{testrand.PieceID()},
Resolve: pending,
})
}
pending.Wait(ctx)
return nil
})
}
wg.Wait()
combiner.Close()
require.Equal(t, int(counter.Count), int(requestCount*parallelCount))
}

View File

@ -1,178 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"errors"
"net"
"strconv"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
)
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
type Dialer struct {
log *zap.Logger
dialer rpc.Dialer
requestTimeout time.Duration
failThreshold time.Duration
piecesPerRequest int
mu sync.RWMutex
dialFailed map[storj.NodeID]time.Time
}
// NewDialer returns a new Dialer.
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
return &Dialer{
log: log,
dialer: dialer,
requestTimeout: requestTimeout,
failThreshold: failThreshold,
piecesPerRequest: piecesPerRequest,
dialFailed: map[storj.NodeID]time.Time{},
}
}
// Handle tries to send the deletion requests to the specified node.
func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
defer mon.Task()(&ctx, node.ID.String())(nil)
defer FailPending(queue)
if dialer.recentlyFailed(ctx, node) {
return
}
client, conn, err := dialPieceStore(ctx, dialer.dialer, node)
if err != nil {
dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err))
dialer.markFailed(ctx, node)
return
}
defer func() {
if err := conn.Close(); err != nil {
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err))
}
}()
for {
if err := ctx.Err(); err != nil {
return
}
jobs, ok := queue.PopAll()
// Add metrics on to the span
s := monkit.SpanFromCtx(ctx)
s.Annotate("delete jobs size", strconv.Itoa(len(jobs)))
if !ok {
return
}
for len(jobs) > 0 {
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
// Aggregation metrics
mon.IntVal("delete_batch_size").Observe(int64(len(batch))) //mon:locked
// Tracing metrics
s.Annotate("delete_batch_size", strconv.Itoa(len(batch)))
jobs = rest
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{
PieceIds: batch,
})
cancel()
for _, promise := range promises {
if err != nil {
promise.Failure()
} else {
promise.Success()
}
}
if err != nil {
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err))
// don't try to send to this storage node a bit, when the deletion times out
if errs2.IsCanceled(err) || errors.Is(err, context.DeadlineExceeded) {
dialer.markFailed(ctx, node)
}
var opErr *net.OpError
if errors.As(err, &opErr) {
dialer.markFailed(ctx, node)
}
break
} else {
mon.IntVal("deletion_pieces_unhandled_count").Observe(resp.UnhandledCount) //mon:locked
}
jobs = append(jobs, queue.PopAllWithoutClose()...)
}
// if we failed early, remaining jobs should be marked as failures
for _, job := range jobs {
job.Resolve.Failure()
}
}
}
// markFailed marks node as something failed recently, so we shouldn't try again,
// for some time.
func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) {
dialer.mu.Lock()
defer dialer.mu.Unlock()
now := time.Now()
lastFailed, ok := dialer.dialFailed[node.ID]
if !ok || lastFailed.Before(now) {
dialer.dialFailed[node.ID] = now
}
}
// recentlyFailed checks whether a request to node recently failed.
func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool {
dialer.mu.RLock()
lastFailed, ok := dialer.dialFailed[node.ID]
dialer.mu.RUnlock()
// when we recently failed to dial, then fail immediately
return ok && time.Since(lastFailed) < dialer.failThreshold
}
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
for i, job := range jobs {
if len(pieces) >= maxBatchSize {
return pieces, promises, jobs[i:]
}
pieces = append(pieces, job.Pieces...)
promises = append(promises, job.Resolve)
}
return pieces, promises, nil
}
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
conn, err := dialer.DialNodeURL(ctx, target)
if err != nil {
return nil, nil, err
}
return pb.NewDRPCPiecestoreClient(conn), conn, nil
}

View File

@ -1,107 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type CountedPromise struct {
SuccessCount int64
FailureCount int64
}
func (p *CountedPromise) Success() { atomic.AddInt64(&p.SuccessCount, 1) }
func (p *CountedPromise) Failure() { atomic.AddInt64(&p.FailureCount, 1) }
func TestDialer(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
log := zaptest.NewLogger(t)
dialer := piecedeletion.NewDialer(log, planet.Satellites[0].Dialer, 5*time.Second, 5*time.Second, 100)
require.NotNil(t, dialer)
storageNode := planet.StorageNodes[0].NodeURL()
promise, jobs := makeJobsQueue(t, 2)
dialer.Handle(ctx, storageNode, jobs)
require.Equal(t, int64(2), promise.SuccessCount)
require.Equal(t, int64(0), promise.FailureCount)
})
}
func TestDialer_DialTimeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
log := zaptest.NewLogger(t)
const dialTimeout = 5 * time.Second
rpcdial := planet.Satellites[0].Dialer
rpcdial.DialTimeout = dialTimeout
dialer := piecedeletion.NewDialer(log, rpcdial, 5*time.Second, 1*time.Minute, 100)
require.NotNil(t, dialer)
require.NoError(t, planet.StopPeer(planet.StorageNodes[0]))
storageNode := planet.StorageNodes[0].NodeURL()
{
promise, jobs := makeJobsQueue(t, 1)
// we should fail to dial in the time allocated
start := time.Now()
dialer.Handle(ctx, storageNode, jobs)
failingToDial := time.Since(start)
require.Less(t, failingToDial.Seconds(), (2 * dialTimeout).Seconds())
require.Equal(t, int64(0), promise.SuccessCount)
require.Equal(t, int64(1), promise.FailureCount)
}
{
promise, jobs := makeJobsQueue(t, 1)
// we should immediately return when we try to redial within 1 minute
start := time.Now()
dialer.Handle(ctx, storageNode, jobs)
failingToRedial := time.Since(start)
require.Less(t, failingToRedial.Seconds(), time.Second.Seconds())
require.Equal(t, int64(0), promise.SuccessCount)
require.Equal(t, int64(1), promise.FailureCount)
}
})
}
// we can use a random piece id, since deletion requests for already deleted pieces is expected.
func makeJobsQueue(t *testing.T, n int) (*CountedPromise, piecedeletion.Queue) {
promise := &CountedPromise{}
jobs := piecedeletion.NewLimitedJobs(-1)
for i := 0; i < n; i++ {
require.True(t, jobs.TryPush(piecedeletion.Job{
Pieces: []storj.PieceID{testrand.PieceID(), testrand.PieceID()},
Resolve: promise,
}))
}
return promise, jobs
}

View File

@ -1,15 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
// Package piecedeletion implements service for deleting pieces that combines concurrent requests.
package piecedeletion
import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
)
var mon = monkit.Package()
// Error is the default error class for piece deletion.
var Error = errs.Class("piece deletion")

View File

@ -1,36 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"golang.org/x/sync/semaphore"
"storj.io/common/storj"
)
// LimitedHandler wraps handler with a concurrency limit.
type LimitedHandler struct {
active *semaphore.Weighted
Handler
}
// NewLimitedHandler wraps handler with a concurrency limit.
func NewLimitedHandler(handler Handler, limit int) *LimitedHandler {
return &LimitedHandler{
active: semaphore.NewWeighted(int64(limit)),
Handler: handler,
}
}
// Handle handles the job queue.
func (handler *LimitedHandler) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
if err := handler.active.Acquire(ctx, 1); err != nil {
return
}
defer handler.active.Release(1)
handler.Handler.Handle(ctx, node, queue)
}

View File

@ -1,53 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"sync/atomic"
"testing"
"time"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/common/testcontext"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
type HandleLimitVerifier struct {
Active int64
ExpectedLimit int64
}
func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue {
panic("should not be called")
}
func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) {
current := atomic.AddInt64(&verifier.Active, 1)
if current > verifier.ExpectedLimit {
panic("over limit")
}
defer atomic.AddInt64(&verifier.Active, -1)
defer sync2.Sleep(ctx, time.Millisecond)
}
func TestLimitedHandler(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
verifier := &HandleLimitVerifier{
Active: 0,
ExpectedLimit: 8,
}
limited := piecedeletion.NewLimitedHandler(verifier, int(verifier.ExpectedLimit))
for i := 0; i < 800; i++ {
ctx.Go(func() error {
limited.Handle(ctx, storj.NodeURL{}, nil)
return nil
})
}
}

View File

@ -1,78 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import "sync"
// LimitedJobs is a finalizable list of deletion jobs with a limit to how many
// jobs it can handle.
type LimitedJobs struct {
maxPiecesPerBatch int
mu sync.Mutex
// done indicates that no more items will be appended to the queue.
done bool
// count is the number of piece ids queued here.
count int
// list is the list of delete jobs.
list []Job
}
// NewLimitedJobs returns a new limited job queue.
func NewLimitedJobs(maxPiecesPerBatch int) *LimitedJobs {
return &LimitedJobs{
maxPiecesPerBatch: maxPiecesPerBatch,
}
}
// TryPush tries to add a job to the queue.
//
// maxPiecesPerBatch < 0, means no limit.
func (jobs *LimitedJobs) TryPush(job Job) bool {
jobs.mu.Lock()
defer jobs.mu.Unlock()
// check whether we have finished work with this jobs queue.
if jobs.done {
return false
}
// add to the queue, this can potentially overflow `maxPiecesPerBatch`,
// however splitting a single request and promise across multiple batches, is annoying.
jobs.count += len(job.Pieces)
// check whether the queue is at capacity
if jobs.maxPiecesPerBatch >= 0 && jobs.count >= jobs.maxPiecesPerBatch {
jobs.done = true
}
jobs.list = append(jobs.list, job)
return true
}
// PopAll returns all the jobs in this list.
func (jobs *LimitedJobs) PopAll() (_ []Job, ok bool) {
jobs.mu.Lock()
defer jobs.mu.Unlock()
// when we try to pop and the queue is empty, make the queue final.
if len(jobs.list) == 0 {
jobs.done = true
return nil, false
}
list := jobs.list
jobs.list = nil
return list, true
}
// PopAllWithoutClose returns all the jobs in this list without closing the queue.
func (jobs *LimitedJobs) PopAllWithoutClose() []Job {
jobs.mu.Lock()
defer jobs.mu.Unlock()
list := jobs.list
jobs.list = nil
return list
}

View File

@ -1,178 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"errors"
"sort"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
func TestLimitedJobs(t *testing.T) {
{ // pop on an empty list
q := piecedeletion.NewLimitedJobs(-1)
list, ok := q.PopAll()
require.False(t, ok)
require.Nil(t, list)
}
{ // pop on a non-empty list
q := piecedeletion.NewLimitedJobs(-1)
job1 := randomJob(2)
job2 := randomJob(3)
// first push should always work
require.True(t, q.TryPush(job1))
// try push another, currently we don't have limits
require.True(t, q.TryPush(job2))
list, ok := q.PopAll()
require.True(t, ok)
require.Equal(t, []piecedeletion.Job{job1, job2}, list)
// should be empty
list, ok = q.PopAll()
require.False(t, ok)
require.Nil(t, list)
// pushing another should fail
require.False(t, q.TryPush(randomJob(1)))
}
}
func TestLimitedJobs_Limiting(t *testing.T) {
{
q := piecedeletion.NewLimitedJobs(2)
require.True(t, q.TryPush(randomJob(1)))
require.True(t, q.TryPush(randomJob(1)))
require.False(t, q.TryPush(randomJob(1)))
require.False(t, q.TryPush(randomJob(1)))
}
{
q := piecedeletion.NewLimitedJobs(2)
require.True(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.True(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.False(t, q.TryPush(randomJob(1)))
_, _ = q.PopAll()
require.False(t, q.TryPush(randomJob(1)))
}
}
func TestLimitedJobs_NoClose(t *testing.T) {
{
q := piecedeletion.NewLimitedJobs(2)
job1, job2 := randomJob(1), randomJob(1)
require.True(t, q.TryPush(job1))
list := q.PopAllWithoutClose()
require.Equal(t, []piecedeletion.Job{job1}, list)
list = q.PopAllWithoutClose()
require.Empty(t, list)
require.True(t, q.TryPush(job2))
list = q.PopAllWithoutClose()
require.Equal(t, []piecedeletion.Job{job2}, list)
}
}
func TestLimitedJobs_Concurrent(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const N = 4
q := piecedeletion.NewLimitedJobs(-1)
jobs := []piecedeletion.Job{
randomJob(1),
randomJob(2),
randomJob(3),
randomJob(4),
}
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
ctx.Go(func() error {
defer wg.Done()
if !q.TryPush(jobs[i]) {
return errors.New("failed to add job")
}
return nil
})
}
ctx.Go(func() error {
wg.Wait()
list, ok := q.PopAll()
if !ok {
return errors.New("failed to return jobs")
}
sort.Slice(list, func(i, k int) bool {
return len(list[i].Pieces) < len(list[k].Pieces)
})
if !assert.Equal(t, jobs, list) {
return errors.New("not equal")
}
return nil
})
}
func TestLimitedJobs_NoRace(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const N = 4
q := piecedeletion.NewLimitedJobs(-1)
jobs := []piecedeletion.Job{
randomJob(1),
randomJob(2),
randomJob(3),
randomJob(4),
}
for i := 0; i < N; i++ {
i := i
ctx.Go(func() error {
_ = q.TryPush(jobs[i])
return nil
})
}
ctx.Go(func() error {
_, _ = q.PopAll()
return nil
})
ctx.Go(func() error {
_ = q.PopAllWithoutClose()
return nil
})
}
func randomJob(n int) piecedeletion.Job {
job := piecedeletion.Job{}
for i := 0; i < n; i++ {
job.Pieces = append(job.Pieces, testrand.PieceID())
}
return job
}

View File

@ -1,254 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/overlay"
)
// Config defines configuration options for Service.
type Config struct {
MaxConcurrency int `help:"maximum number of concurrent requests to storage nodes" default:"100"`
MaxConcurrentPieces int `help:"maximum number of concurrent pieces can be processed" default:"1000000" testDefault:"1000"`
MaxPiecesPerBatch int `help:"maximum number of pieces per batch" default:"5000" testDefault:"4000"`
MaxPiecesPerRequest int `help:"maximum number pieces per single request" default:"1000" testDefault:"2000"`
DialTimeout time.Duration `help:"timeout for dialing nodes (0 means satellite default)" default:"3s" testDefault:"2s"`
FailThreshold time.Duration `help:"threshold for retrying a failed node" releaseDefault:"10m" devDefault:"2s"`
RequestTimeout time.Duration `help:"timeout for a single delete request" releaseDefault:"15s" devDefault:"2s"`
DeleteSuccessThreshold float64 `help:"Which fraction of nodes should be contacted successfully until the delete of a batch of pieces is considered completed" default:".75"`
}
const (
minTimeout = 5 * time.Millisecond
maxTimeout = 5 * time.Minute
)
// Verify verifies configuration sanity.
func (config *Config) Verify() errs.Group {
var errlist errs.Group
if config.MaxConcurrency <= 0 {
errlist.Add(Error.New("concurrency %d must be greater than 0", config.MaxConcurrency))
}
if config.MaxConcurrentPieces <= 0 {
errlist.Add(Error.New("max concurrent pieces %d must be greater than 0", config.MaxConcurrentPieces))
}
if config.MaxPiecesPerBatch < config.MaxPiecesPerRequest {
errlist.Add(Error.New("max pieces per batch %d should be larger than max pieces per request %d", config.MaxPiecesPerBatch, config.MaxPiecesPerRequest))
}
if config.MaxPiecesPerBatch <= 0 {
errlist.Add(Error.New("max pieces per batch %d must be greater than 0", config.MaxPiecesPerBatch))
}
if config.MaxPiecesPerRequest <= 0 {
errlist.Add(Error.New("max pieces per request %d must be greater than 0", config.MaxPiecesPerRequest))
}
if config.DialTimeout != 0 && (config.DialTimeout <= minTimeout || maxTimeout <= config.DialTimeout) {
errlist.Add(Error.New("dial timeout %v must be between %v and %v", config.DialTimeout, minTimeout, maxTimeout))
}
if config.RequestTimeout < minTimeout || maxTimeout < config.RequestTimeout {
errlist.Add(Error.New("request timeout %v should be between %v and %v", config.RequestTimeout, minTimeout, maxTimeout))
}
return errlist
}
// Nodes stores reliable nodes information.
type Nodes interface {
GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*overlay.SelectedNode, err error)
}
// Service handles combining piece deletion requests.
//
// architecture: Service
type Service struct {
log *zap.Logger
config Config
concurrentRequests *semaphore.Weighted
rpcDialer rpc.Dialer
nodesDB Nodes
running sync2.Fence
combiner *Combiner
dialer *Dialer
limited *LimitedHandler
}
// NewService creates a new service.
func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) {
var errlist errs.Group
if log == nil {
errlist.Add(Error.New("log is nil"))
}
if dialer == (rpc.Dialer{}) {
errlist.Add(Error.New("dialer is zero"))
}
if nodesDB == nil {
errlist.Add(Error.New("nodesDB is nil"))
}
if errs := config.Verify(); len(errs) > 0 {
errlist.Add(errs...)
}
if err := errlist.Err(); err != nil {
return nil, Error.Wrap(err)
}
dialerClone := dialer
if config.DialTimeout > 0 {
dialerClone.DialTimeout = config.DialTimeout
}
if dialerClone.Pool == nil {
dialerClone.Pool = rpc.NewDefaultConnectionPool()
}
return &Service{
log: log,
config: config,
concurrentRequests: semaphore.NewWeighted(int64(config.MaxConcurrentPieces)),
rpcDialer: dialerClone,
nodesDB: nodesDB,
}, nil
}
// newQueue creates the configured queue.
func (service *Service) newQueue() Queue {
return NewLimitedJobs(service.config.MaxPiecesPerBatch)
}
// Run initializes the service.
func (service *Service) Run(ctx context.Context) error {
defer service.running.Release()
config := service.config
service.dialer = NewDialer(service.log.Named("dialer"), service.rpcDialer, config.RequestTimeout, config.FailThreshold, config.MaxPiecesPerRequest)
service.limited = NewLimitedHandler(service.dialer, config.MaxConcurrency)
service.combiner = NewCombiner(ctx, service.limited, service.newQueue)
return nil
}
// Close shuts down the service.
func (service *Service) Close() error {
if service.combiner != nil {
service.combiner.Close()
}
return nil
}
// DeleteWithCustomThreshold deletes the pieces specified in the requests,
// returning when they have been deleted from the specified fraction of storage nodes.
func (service *Service) DeleteWithCustomThreshold(ctx context.Context, requests []Request, successThreshold float64) (err error) {
defer mon.Task()(&ctx, len(requests), requestsPieceCount(requests), successThreshold)(&err)
if len(requests) == 0 {
return nil
}
// wait for combiner and dialer to set themselves up.
if !service.running.Wait(ctx) {
return Error.Wrap(ctx.Err())
}
for i, req := range requests {
if !req.IsValid() {
return Error.New("request #%d is invalid", i)
}
}
// When number of pieces are more than the maximum limit, we let it overflow,
// so we don't have to split requests in to separate batches.
totalPieceCount := requestsPieceCount(requests)
if totalPieceCount > service.config.MaxConcurrentPieces {
totalPieceCount = service.config.MaxConcurrentPieces
}
if err := service.concurrentRequests.Acquire(ctx, int64(totalPieceCount)); err != nil {
return Error.Wrap(err)
}
defer service.concurrentRequests.Release(int64(totalPieceCount))
// Create a map for matching node information with the corresponding
// request.
nodesReqs := make(map[storj.NodeID]Request, len(requests))
nodeIDs := []storj.NodeID{}
for _, req := range requests {
if req.Node.Address == "" {
nodeIDs = append(nodeIDs, req.Node.ID)
}
nodesReqs[req.Node.ID] = req
}
if len(nodeIDs) > 0 {
nodes, err := service.nodesDB.GetNodes(ctx, nodeIDs)
if err != nil {
// Pieces will be collected by garbage collector
return Error.Wrap(err)
}
for _, node := range nodes {
req := nodesReqs[node.ID]
nodesReqs[node.ID] = Request{
Node: storj.NodeURL{
ID: node.ID,
Address: node.Address.Address,
},
Pieces: req.Pieces,
}
}
}
threshold, err := sync2.NewSuccessThreshold(len(nodesReqs), successThreshold)
if err != nil {
return Error.Wrap(err)
}
for _, req := range nodesReqs {
service.combiner.Enqueue(req.Node, Job{
Pieces: req.Pieces,
Resolve: threshold,
})
}
threshold.Wait(ctx)
return nil
}
// Delete deletes the pieces specified in the requests,
// returning when they have been deleted from the default fraction of storage nodes.
func (service *Service) Delete(ctx context.Context, requests []Request) (err error) {
return service.DeleteWithCustomThreshold(ctx, requests, service.config.DeleteSuccessThreshold)
}
// Request defines a deletion requests for a node.
type Request struct {
Node storj.NodeURL
Pieces []storj.PieceID
}
// IsValid returns whether the request is valid.
func (req *Request) IsValid() bool {
return !req.Node.ID.IsZero() && len(req.Pieces) > 0
}
func requestsPieceCount(requests []Request) int {
total := 0
for _, r := range requests {
total += len(r.Pieces)
}
return total
}

View File

@ -1,417 +0,0 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/blobstore/testblobs"
"storj.io/storj/storagenode/pieces"
)
func TestService_New_Error(t *testing.T) {
log := zaptest.NewLogger(t)
dialer := rpc.NewDefaultDialer(nil)
_, err := piecedeletion.NewService(nil, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 8,
MaxConcurrentPieces: 10,
MaxPiecesPerBatch: 0,
MaxPiecesPerRequest: 0,
DialTimeout: time.Second,
FailThreshold: 5 * time.Minute,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "log is nil")
_, err = piecedeletion.NewService(log, rpc.Dialer{}, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 87,
MaxConcurrentPieces: 10,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "dialer is zero")
_, err = piecedeletion.NewService(log, dialer, nil, piecedeletion.Config{
MaxConcurrency: 8,
MaxConcurrentPieces: 10,
MaxPiecesPerBatch: 0,
MaxPiecesPerRequest: 0,
DialTimeout: time.Second,
FailThreshold: 5 * time.Minute,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "nodesDB is nil")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 0,
MaxConcurrentPieces: 10,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "greater than 0")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: -3,
MaxConcurrentPieces: 10,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "greater than 0")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 3,
MaxConcurrentPieces: -10,
DialTimeout: time.Second,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "greater than 0")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 3,
MaxConcurrentPieces: 10,
DialTimeout: time.Nanosecond,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "dial timeout 1ns must be between 5ms and 5m0s")
_, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{
MaxConcurrency: 3,
MaxConcurrentPieces: 10,
DialTimeout: time.Hour,
})
require.True(t, piecedeletion.Error.Has(err), err)
require.Contains(t, err.Error(), "dial timeout 1h0m0s must be between 5ms and 5m0s")
}
func TestService_DeletePieces_AllNodesUp(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 2, 4, 4),
testplanet.MaxSegmentSize(15*memory.KiB),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0]
percentExp := 0.75
{
data := testrand.Bytes(10 * memory.KiB)
err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data)
require.NoError(t, err)
}
// ensure that no requests return an error
err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, nil, percentExp)
require.NoError(t, err)
var (
totalUsedSpace int64
requests []piecedeletion.Request
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += piecesTotal
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
requests = append(requests, nodePieces)
}
err = satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, percentExp)
require.NoError(t, err)
planet.WaitForStorageNodeDeleters(ctx)
// calculate the SNs used space after delete the pieces
var totalUsedSpaceAfterDelete int64
for _, sn := range planet.StorageNodes {
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpaceAfterDelete += piecesTotal
}
// At this point we can only guarantee that the 75% of the SNs pieces
// are delete due to the success threshold
deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace)
if deletedUsedSpace < percentExp {
t.Fatalf("deleted used space is less than %e%%. Got %f", percentExp, deletedUsedSpace)
}
})
}
func TestService_DeletePieces_SomeNodesDown(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 2, 4, 4),
testplanet.MaxSegmentSize(15*memory.KiB),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0]
numToShutdown := 2
{
data := testrand.Bytes(10 * memory.KiB)
err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data)
require.NoError(t, err)
}
var requests []piecedeletion.Request
for i, sn := range planet.StorageNodes {
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err := sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
requests = append(requests, nodePieces)
// stop the first numToShutdown SNs before deleting pieces
if i < numToShutdown {
require.NoError(t, planet.StopPeer(sn))
}
}
err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.9999)
require.NoError(t, err)
planet.WaitForStorageNodeDeleters(ctx)
// Check that storage nodes which are online when deleting pieces don't
// hold any piece
var totalUsedSpace int64
for i := numToShutdown; i < len(planet.StorageNodes); i++ {
piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += piecesTotal
}
require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes")
})
}
func TestService_DeletePieces_AllNodesDown(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
// Use RSConfig for ensuring that we don't have long-tail cancellations
// and the upload doesn't leave garbage in the SNs
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.Combine(
testplanet.ReconfigureRS(2, 2, 4, 4),
testplanet.MaxSegmentSize(15*memory.KiB),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0]
{
data := testrand.Bytes(10 * memory.KiB)
err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data)
require.NoError(t, err)
}
var (
expectedTotalUsedSpace int64
requests []piecedeletion.Request
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
expectedTotalUsedSpace += piecesTotal
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
requests = append(requests, nodePieces)
require.NoError(t, planet.StopPeer(sn))
}
err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.9999)
require.NoError(t, err)
planet.WaitForStorageNodeDeleters(ctx)
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += piecesTotal
}
require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace")
})
}
func TestService_DeletePieces_DisproportionateNumberOfRequestsAndNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satelliteSys := planet.Satellites[0]
// make sure that the ratio of number of requests and number of nodes to
// be greater than the success threshold
percentExp := 0.75
numberOfRequests := 20
require.Less(t, float64(len(planet.StorageNodes))/float64(numberOfRequests), percentExp)
// populate requests
requests := make([]piecedeletion.Request, numberOfRequests)
for i := range requests {
requests[i] = piecedeletion.Request{
Node: planet.StorageNodes[i%2].NodeURL(),
Pieces: []storj.PieceID{testrand.PieceID()},
}
}
err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, percentExp)
require.NoError(t, err)
})
}
func TestService_DeletePieces_Invalid(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].API.Metainfo.PieceDeletion
nodesPieces := []piecedeletion.Request{
{Pieces: make([]storj.PieceID, 1)},
{Pieces: make([]storj.PieceID, 1)},
}
err := service.DeleteWithCustomThreshold(ctx, nodesPieces, 1)
require.Error(t, err)
assert.Contains(t, err.Error(), "request #0 is invalid")
})
}
func TestService_DeletePieces_Timeout(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) {
return testblobs.NewSlowDB(log.Named("slowdb"), db), nil
},
Satellite: testplanet.Combine(
func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.PieceDeletion.RequestTimeout = 200 * time.Millisecond
config.Metainfo.MaxSegmentSize = 15 * memory.KiB
},
testplanet.ReconfigureRS(2, 2, 4, 4),
),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
uplnk := planet.Uplinks[0]
satelliteSys := planet.Satellites[0]
{
data := testrand.Bytes(10 * memory.KiB)
err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data)
require.NoError(t, err)
}
var (
expectedTotalUsedSpace int64
requests []piecedeletion.Request
)
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
expectedTotalUsedSpace += piecesTotal
// Get all the pieces of the storage node
nodePieces := piecedeletion.Request{Node: sn.NodeURL()}
err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(),
func(store pieces.StoredPieceAccess) error {
nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID())
return nil
},
)
require.NoError(t, err)
requests = append(requests, nodePieces)
// make delete operation on storage nodes slow
storageNodeDB := sn.DB.(*testblobs.SlowDB)
delay := 500 * time.Millisecond
storageNodeDB.SetLatency(delay)
}
err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.75)
require.NoError(t, err)
// A timeout error won't be propagated up to the service level
// but we'll know that the deletes didn't happen based on usedSpace
// check below.
var totalUsedSpace int64
for _, sn := range planet.StorageNodes {
// calculate the SNs total used space after data upload
piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx)
require.NoError(t, err)
totalUsedSpace += piecesTotal
}
require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace")
})
}
type nodesDB struct{}
func (n *nodesDB) GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*overlay.SelectedNode, err error) {
return nil, nil
}

View File

@ -652,30 +652,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key
# toggle flag if overlay is enabled
# metainfo.overlay: true
# Which fraction of nodes should be contacted successfully until the delete of a batch of pieces is considered completed
# metainfo.piece-deletion.delete-success-threshold: 0.75
# timeout for dialing nodes (0 means satellite default)
# metainfo.piece-deletion.dial-timeout: 3s
# threshold for retrying a failed node
# metainfo.piece-deletion.fail-threshold: 10m0s
# maximum number of concurrent requests to storage nodes
# metainfo.piece-deletion.max-concurrency: 100
# maximum number of concurrent pieces can be processed
# metainfo.piece-deletion.max-concurrent-pieces: 1000000
# maximum number of pieces per batch
# metainfo.piece-deletion.max-pieces-per-batch: 5000
# maximum number pieces per single request
# metainfo.piece-deletion.max-pieces-per-request: 1000
# timeout for a single delete request
# metainfo.piece-deletion.request-timeout: 15s
# max bucket count for a project.
# metainfo.project-limits.max-buckets: 100