From 2e0b687581a19614dca5e9b0601d7157742191e6 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Wed, 14 Jun 2023 11:02:51 +0200 Subject: [PATCH] satellite/metainfo: drop piecedeletion package We are not using this package anymore. Change-Id: If32315d43d73c8deb096e93cb43c03881bd9aad1 --- monkit.lock | 2 - satellite/api.go | 22 +- satellite/metainfo/config.go | 24 +- satellite/metainfo/piecedeletion/combiner.go | 171 ------- .../piecedeletion/combiner_bench_test.go | 136 ------ .../metainfo/piecedeletion/combiner_test.go | 93 ---- satellite/metainfo/piecedeletion/dialer.go | 178 -------- .../metainfo/piecedeletion/dialer_test.go | 107 ----- satellite/metainfo/piecedeletion/doc.go | 15 - satellite/metainfo/piecedeletion/handler.go | 36 -- .../metainfo/piecedeletion/handler_test.go | 53 --- satellite/metainfo/piecedeletion/queue.go | 78 ---- .../metainfo/piecedeletion/queue_test.go | 178 -------- satellite/metainfo/piecedeletion/service.go | 254 ----------- .../metainfo/piecedeletion/service_test.go | 417 ------------------ scripts/testdata/satellite-config.yaml.lock | 24 - 16 files changed, 13 insertions(+), 1775 deletions(-) delete mode 100644 satellite/metainfo/piecedeletion/combiner.go delete mode 100644 satellite/metainfo/piecedeletion/combiner_bench_test.go delete mode 100644 satellite/metainfo/piecedeletion/combiner_test.go delete mode 100644 satellite/metainfo/piecedeletion/dialer.go delete mode 100644 satellite/metainfo/piecedeletion/dialer_test.go delete mode 100644 satellite/metainfo/piecedeletion/doc.go delete mode 100644 satellite/metainfo/piecedeletion/handler.go delete mode 100644 satellite/metainfo/piecedeletion/handler_test.go delete mode 100644 satellite/metainfo/piecedeletion/queue.go delete mode 100644 satellite/metainfo/piecedeletion/queue_test.go delete mode 100644 satellite/metainfo/piecedeletion/service.go delete mode 100644 satellite/metainfo/piecedeletion/service_test.go diff --git a/monkit.lock b/monkit.lock index a6c039896..8a4cc8b8b 100644 --- a/monkit.lock +++ b/monkit.lock @@ -79,8 +79,6 @@ storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_fail" Meter storj.io/storj/satellite/gracefulexit."graceful_exit_transfer_piece_success" Meter storj.io/storj/satellite/metabase/rangedloop."rangedloop_error" Event storj.io/storj/satellite/metainfo."metainfo_rate_limit_exceeded" Event -storj.io/storj/satellite/metainfo/piecedeletion."delete_batch_size" IntVal -storj.io/storj/satellite/metainfo/piecedeletion."deletion_pieces_unhandled_count" IntVal storj.io/storj/satellite/metrics."total_inline_bytes" IntVal storj.io/storj/satellite/metrics."total_inline_segments" IntVal storj.io/storj/satellite/metrics."total_remote_bytes" IntVal diff --git a/satellite/api.go b/satellite/api.go index d8ab867d2..f84022c5e 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -41,7 +41,6 @@ import ( "storj.io/storj/satellite/mailservice" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/metainfo" - "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/storj/satellite/nodestats" "storj.io/storj/satellite/oidc" "storj.io/storj/satellite/orders" @@ -100,9 +99,8 @@ type API struct { } Metainfo struct { - Metabase *metabase.DB - PieceDeletion *piecedeletion.Service - Endpoint *metainfo.Endpoint + Metabase *metabase.DB + Endpoint *metainfo.Endpoint } Userinfo struct { @@ -425,22 +423,6 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, { // setup metainfo peer.Metainfo.Metabase = metabaseDB - peer.Metainfo.PieceDeletion, err = piecedeletion.NewService( - peer.Log.Named("metainfo:piecedeletion"), - peer.Dialer, - // TODO use cache designed for deletion - peer.Overlay.Service.DownloadSelectionCache, - config.Metainfo.PieceDeletion, - ) - if err != nil { - return nil, errs.Combine(err, peer.Close()) - } - peer.Services.Add(lifecycle.Item{ - Name: "metainfo:piecedeletion", - Run: peer.Metainfo.PieceDeletion.Run, - Close: peer.Metainfo.PieceDeletion.Close, - }) - peer.Metainfo.Endpoint, err = metainfo.NewEndpoint( peer.Log.Named("metainfo:endpoint"), peer.Buckets.Service, diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 1f1b3b4ad..a7b782832 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -13,7 +13,6 @@ import ( "storj.io/common/memory" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metainfo/piecedeletion" "storj.io/uplink/private/eestream" ) @@ -131,18 +130,17 @@ type Config struct { MaxInlineSegmentSize memory.Size `default:"4KiB" help:"maximum inline segment size"` // we have such default value because max value for ObjectKey is 1024(1 Kib) but EncryptedObjectKey // has encryption overhead 16 bytes. So overall size is 1024 + 16 * 16. - MaxEncryptedObjectKeyLength int `default:"1750" help:"maximum encrypted object key length"` - MaxSegmentSize memory.Size `default:"64MiB" help:"maximum segment size"` - MaxMetadataSize memory.Size `default:"2KiB" help:"maximum segment metadata size"` - MaxCommitInterval time.Duration `default:"48h" testDefault:"1h" help:"maximum time allowed to pass between creating and committing a segment"` - MinPartSize memory.Size `default:"5MiB" testDefault:"0" help:"minimum allowed part size (last part has no minimum size limit)"` - MaxNumberOfParts int `default:"10000" help:"maximum number of parts object can contain"` - Overlay bool `default:"true" help:"toggle flag if overlay is enabled"` - RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` - RateLimiter RateLimiterConfig `help:"rate limiter configuration"` - UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"` - ProjectLimits ProjectLimitConfig `help:"project limit configuration"` - PieceDeletion piecedeletion.Config `help:"piece deletion configuration"` + MaxEncryptedObjectKeyLength int `default:"1750" help:"maximum encrypted object key length"` + MaxSegmentSize memory.Size `default:"64MiB" help:"maximum segment size"` + MaxMetadataSize memory.Size `default:"2KiB" help:"maximum segment metadata size"` + MaxCommitInterval time.Duration `default:"48h" testDefault:"1h" help:"maximum time allowed to pass between creating and committing a segment"` + MinPartSize memory.Size `default:"5MiB" testDefault:"0" help:"minimum allowed part size (last part has no minimum size limit)"` + MaxNumberOfParts int `default:"10000" help:"maximum number of parts object can contain"` + Overlay bool `default:"true" help:"toggle flag if overlay is enabled"` + RS RSConfig `releaseDefault:"29/35/80/110-256B" devDefault:"4/6/8/10-256B" help:"redundancy scheme configuration in the format k/m/o/n-sharesize"` + RateLimiter RateLimiterConfig `help:"rate limiter configuration"` + UploadLimiter UploadLimiterConfig `help:"object upload limiter configuration"` + ProjectLimits ProjectLimitConfig `help:"project limit configuration"` // TODO remove this flag when server-side copy implementation will be finished ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"` ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"` diff --git a/satellite/metainfo/piecedeletion/combiner.go b/satellite/metainfo/piecedeletion/combiner.go deleted file mode 100644 index 721d62516..000000000 --- a/satellite/metainfo/piecedeletion/combiner.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion - -import ( - "context" - "sync" - - "storj.io/common/storj" - "storj.io/common/sync2" -) - -// Handler handles piece deletion requests from a queue. -type Handler interface { - // Handle should call queue.PopAll until finished. - Handle(ctx context.Context, node storj.NodeURL, queue Queue) -} - -// NewQueue is a constructor func for queues. -type NewQueue func() Queue - -// Queue is a queue for jobs. -type Queue interface { - // TryPush tries to push a new job to the queue. - TryPush(job Job) bool - - // PopAll fetches all jobs in the queue. - // - // When there are no more jobs, the queue must stop accepting new jobs. - PopAll() ([]Job, bool) - - // PopAllWithoutClose fetches all jobs in the queue, - // but without closing the queue for new requests. - PopAllWithoutClose() []Job -} - -// Job is a single of deletion. -type Job struct { - // Pieces are the pieces id-s that need to be deleted. - Pieces []storj.PieceID - // Resolve is for notifying the job issuer about the outcome. - Resolve Promise -} - -// Promise is for signaling to the deletion requests about the result. -type Promise interface { - // Success is called when the job has been successfully handled. - Success() - // Failure is called when the job didn't complete successfully. - Failure() -} - -// Combiner combines multiple concurrent deletion requests into batches. -type Combiner struct { - // ctx context to pass down to the handler. - ctx context.Context - cancel context.CancelFunc - - // handler defines what to do with the jobs. - handler Handler - // newQueue creates a new queue. - newQueue NewQueue - // workers contains all worker goroutines. - workers sync2.WorkGroup - - // mu protects workerByID - mu sync.Mutex - workerByID map[storj.NodeID]*worker -} - -// worker handles a batch of jobs. -type worker struct { - waitFor chan struct{} - node storj.NodeURL - jobs Queue - done chan struct{} -} - -// NewCombiner creates a new combiner. -func NewCombiner(parent context.Context, handler Handler, newQueue NewQueue) *Combiner { - ctx, cancel := context.WithCancel(parent) - return &Combiner{ - ctx: ctx, - cancel: cancel, - handler: handler, - newQueue: newQueue, - workerByID: map[storj.NodeID]*worker{}, - } -} - -// Close shuts down all workers. -func (combiner *Combiner) Close() { - combiner.cancel() - combiner.workers.Close() -} - -// Enqueue adds a deletion job to the queue. -func (combiner *Combiner) Enqueue(node storj.NodeURL, job Job) { - combiner.mu.Lock() - defer combiner.mu.Unlock() - - last := combiner.workerByID[node.ID] - - // Check whether we can use the last worker. - if last != nil && last.jobs.TryPush(job) { - // We've successfully added a job to an existing worker. - return - } - - // Create a new worker when one doesn't exist or the last one was full. - next := &worker{ - node: node, - jobs: combiner.newQueue(), - done: make(chan struct{}), - } - if last != nil { - next.waitFor = last.done - } - combiner.workerByID[node.ID] = next - if !next.jobs.TryPush(job) { - // This should never happen. - job.Resolve.Failure() - } - - // Start the worker. - next.start(combiner) -} - -// schedule starts the worker. -func (worker *worker) start(combiner *Combiner) { - // Try to add to worker pool, this may fail when we are shutting things down. - workerStarted := combiner.workers.Go(func() { - - defer mon.TaskNamed("worker_start")(nil)(nil) - defer close(worker.done) - // Ensure we fail any jobs that the handler didn't handle. - defer FailPending(worker.jobs) - - if worker.waitFor != nil { - // Wait for previous worker to finish work to ensure fairness between nodes. - select { - case <-worker.waitFor: - case <-combiner.ctx.Done(): - return - } - } - - // Handle the job queue. - combiner.handler.Handle(combiner.ctx, worker.node, worker.jobs) - }) - - // If we failed to start a worker, then mark all the jobs as failures. - if !workerStarted { - FailPending(worker.jobs) - } -} - -// FailPending fails all the jobs in the queue. -func FailPending(jobs Queue) { - for { - list, ok := jobs.PopAll() - if !ok { - return - } - - for _, job := range list { - job.Resolve.Failure() - } - } -} diff --git a/satellite/metainfo/piecedeletion/combiner_bench_test.go b/satellite/metainfo/piecedeletion/combiner_bench_test.go deleted file mode 100644 index 13efb8740..000000000 --- a/satellite/metainfo/piecedeletion/combiner_bench_test.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "context" - "fmt" - "sync/atomic" - "testing" - "time" - - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/satellite/metainfo/piecedeletion" -) - -type SleepyHandler struct { - Min, Max time.Duration - - TotalHandled int64 -} - -func (handler *SleepyHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { - if !sync2.Sleep(ctx, handler.Min) { - return - } - - for { - list, ok := queue.PopAll() - if !ok { - return - } - for _, job := range list { - atomic.AddInt64(&handler.TotalHandled, int64(len(job.Pieces))) - job.Resolve.Success() - } - - span := int(handler.Max - handler.Min) - wait := testrand.Intn(span) - - if !sync2.Sleep(ctx, handler.Min+time.Duration(wait)) { - return - } - } -} - -func BenchmarkCombiner(b *testing.B) { - var ( - // currently nodes are picked uniformly, however total piece distribution is not - // hence we use a lower number to simulate frequent nodes - nodeCount = 500 - requestCount = 100 - // assume each request has ~2 segments - callsPerRequest = 160 - // we cannot use realistic values here due to sleep granularity - minWait = 1 * time.Millisecond - maxWait = 20 * time.Millisecond - // add few variations to test - activeLimits = []int{8, 32, 64, -1} - queueSizes = []int{1, 8, 64, 128, -1} - ) - - if testing.Short() { - // use values to make tests run faster - nodeCount = 5 - requestCount = 5 - callsPerRequest = 5 - activeLimits = []int{8, 64, -1} - queueSizes = []int{8, 128, -1} - } - - nodes := []storj.NodeURL{} - for i := 0; i < nodeCount; i++ { - nodes = append(nodes, storj.NodeURL{ - ID: testrand.NodeID(), - }) - } - - for _, activeLimit := range activeLimits { - for _, queueSize := range queueSizes { - activeLimit, queueSize := activeLimit, queueSize - name := fmt.Sprintf("active=%d,queue=%d", activeLimit, queueSize) - b.Run(name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - func() { - sleeper := &SleepyHandler{Min: minWait, Max: maxWait} - limited := piecedeletion.NewLimitedHandler(sleeper, activeLimit) - - ctx := testcontext.New(b) - defer ctx.Cleanup() - - newQueue := func() piecedeletion.Queue { - return piecedeletion.NewLimitedJobs(queueSize) - } - - var combiner *piecedeletion.Combiner - if activeLimit > 0 { - combiner = piecedeletion.NewCombiner(ctx, limited, newQueue) - } else { - combiner = piecedeletion.NewCombiner(ctx, sleeper, newQueue) - } - - for request := 0; request < requestCount; request++ { - ctx.Go(func() error { - done, err := sync2.NewSuccessThreshold(callsPerRequest, 0.999999) - if err != nil { - return err - } - for call := 0; call < callsPerRequest; call++ { - i := testrand.Intn(nodeCount) - combiner.Enqueue(nodes[i], piecedeletion.Job{ - Pieces: []storj.PieceID{testrand.PieceID()}, - Resolve: done, - }) - } - done.Wait(ctx) - return nil - }) - } - - ctx.Wait() - combiner.Close() - - totalRequests := int64(callsPerRequest * requestCount) - if sleeper.TotalHandled != totalRequests { - b.Fatalf("handled only %d expected %d", sleeper.TotalHandled, totalRequests) - } - }() - } - }) - } - } -} diff --git a/satellite/metainfo/piecedeletion/combiner_test.go b/satellite/metainfo/piecedeletion/combiner_test.go deleted file mode 100644 index 90ede0c8a..000000000 --- a/satellite/metainfo/piecedeletion/combiner_test.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "context" - "sync" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/require" - - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/satellite/metainfo/piecedeletion" -) - -type CountHandler struct { - Count int64 -} - -func (handler *CountHandler) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { - for { - list, ok := queue.PopAll() - if !ok { - return - } - for _, job := range list { - atomic.AddInt64(&handler.Count, int64(len(job.Pieces))) - job.Resolve.Success() - } - } -} - -func TestCombiner(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - const ( - activeLimit = 8 - nodeCount = 70 - requestCount = 100 - parallelCount = 10 - queueSize = 5 - ) - - nodes := []storj.NodeURL{} - for i := 0; i < nodeCount; i++ { - nodes = append(nodes, storj.NodeURL{ - ID: testrand.NodeID(), - }) - } - - counter := &CountHandler{} - limited := piecedeletion.NewLimitedHandler(counter, activeLimit) - newQueue := func() piecedeletion.Queue { - return piecedeletion.NewLimitedJobs(queueSize) - } - - combiner := piecedeletion.NewCombiner(ctx, limited, newQueue) - - var wg sync.WaitGroup - for i := 0; i < parallelCount; i++ { - wg.Add(1) - ctx.Go(func() error { - defer wg.Done() - - pending, err := sync2.NewSuccessThreshold(requestCount, 0.999999) - if err != nil { - return err - } - - for k := 0; k < requestCount; k++ { - node := nodes[testrand.Intn(len(nodes))] - - combiner.Enqueue(node, piecedeletion.Job{ - Pieces: []storj.PieceID{testrand.PieceID()}, - Resolve: pending, - }) - } - - pending.Wait(ctx) - return nil - }) - } - wg.Wait() - combiner.Close() - - require.Equal(t, int(counter.Count), int(requestCount*parallelCount)) -} diff --git a/satellite/metainfo/piecedeletion/dialer.go b/satellite/metainfo/piecedeletion/dialer.go deleted file mode 100644 index f40601884..000000000 --- a/satellite/metainfo/piecedeletion/dialer.go +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion - -import ( - "context" - "errors" - "net" - "strconv" - "sync" - "time" - - "github.com/spacemonkeygo/monkit/v3" - "go.uber.org/zap" - - "storj.io/common/errs2" - "storj.io/common/pb" - "storj.io/common/rpc" - "storj.io/common/storj" -) - -// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold. -type Dialer struct { - log *zap.Logger - dialer rpc.Dialer - - requestTimeout time.Duration - failThreshold time.Duration - piecesPerRequest int - - mu sync.RWMutex - dialFailed map[storj.NodeID]time.Time -} - -// NewDialer returns a new Dialer. -func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer { - return &Dialer{ - log: log, - dialer: dialer, - - requestTimeout: requestTimeout, - failThreshold: failThreshold, - piecesPerRequest: piecesPerRequest, - - dialFailed: map[storj.NodeID]time.Time{}, - } -} - -// Handle tries to send the deletion requests to the specified node. -func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) { - defer mon.Task()(&ctx, node.ID.String())(nil) - defer FailPending(queue) - - if dialer.recentlyFailed(ctx, node) { - return - } - - client, conn, err := dialPieceStore(ctx, dialer.dialer, node) - if err != nil { - dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err)) - dialer.markFailed(ctx, node) - return - } - defer func() { - if err := conn.Close(); err != nil { - dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err)) - } - }() - - for { - if err := ctx.Err(); err != nil { - return - } - - jobs, ok := queue.PopAll() - // Add metrics on to the span - s := monkit.SpanFromCtx(ctx) - s.Annotate("delete jobs size", strconv.Itoa(len(jobs))) - - if !ok { - return - } - - for len(jobs) > 0 { - batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest) - // Aggregation metrics - mon.IntVal("delete_batch_size").Observe(int64(len(batch))) //mon:locked - // Tracing metrics - s.Annotate("delete_batch_size", strconv.Itoa(len(batch))) - - jobs = rest - - requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout) - resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{ - PieceIds: batch, - }) - cancel() - - for _, promise := range promises { - if err != nil { - promise.Failure() - } else { - promise.Success() - } - } - - if err != nil { - dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err)) - // don't try to send to this storage node a bit, when the deletion times out - - if errs2.IsCanceled(err) || errors.Is(err, context.DeadlineExceeded) { - dialer.markFailed(ctx, node) - } - - var opErr *net.OpError - if errors.As(err, &opErr) { - dialer.markFailed(ctx, node) - } - break - } else { - mon.IntVal("deletion_pieces_unhandled_count").Observe(resp.UnhandledCount) //mon:locked - } - - jobs = append(jobs, queue.PopAllWithoutClose()...) - } - - // if we failed early, remaining jobs should be marked as failures - for _, job := range jobs { - job.Resolve.Failure() - } - } -} - -// markFailed marks node as something failed recently, so we shouldn't try again, -// for some time. -func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) { - dialer.mu.Lock() - defer dialer.mu.Unlock() - - now := time.Now() - - lastFailed, ok := dialer.dialFailed[node.ID] - if !ok || lastFailed.Before(now) { - dialer.dialFailed[node.ID] = now - } -} - -// recentlyFailed checks whether a request to node recently failed. -func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool { - dialer.mu.RLock() - lastFailed, ok := dialer.dialFailed[node.ID] - dialer.mu.RUnlock() - - // when we recently failed to dial, then fail immediately - return ok && time.Since(lastFailed) < dialer.failThreshold -} - -func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) { - for i, job := range jobs { - if len(pieces) >= maxBatchSize { - return pieces, promises, jobs[i:] - } - - pieces = append(pieces, job.Pieces...) - promises = append(promises, job.Resolve) - } - - return pieces, promises, nil -} - -func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) { - conn, err := dialer.DialNodeURL(ctx, target) - if err != nil { - return nil, nil, err - } - return pb.NewDRPCPiecestoreClient(conn), conn, nil -} diff --git a/satellite/metainfo/piecedeletion/dialer_test.go b/satellite/metainfo/piecedeletion/dialer_test.go deleted file mode 100644 index 328f0b237..000000000 --- a/satellite/metainfo/piecedeletion/dialer_test.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" - - "storj.io/common/storj" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite/metainfo/piecedeletion" -) - -type CountedPromise struct { - SuccessCount int64 - FailureCount int64 -} - -func (p *CountedPromise) Success() { atomic.AddInt64(&p.SuccessCount, 1) } -func (p *CountedPromise) Failure() { atomic.AddInt64(&p.FailureCount, 1) } - -func TestDialer(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - log := zaptest.NewLogger(t) - - dialer := piecedeletion.NewDialer(log, planet.Satellites[0].Dialer, 5*time.Second, 5*time.Second, 100) - require.NotNil(t, dialer) - - storageNode := planet.StorageNodes[0].NodeURL() - - promise, jobs := makeJobsQueue(t, 2) - dialer.Handle(ctx, storageNode, jobs) - - require.Equal(t, int64(2), promise.SuccessCount) - require.Equal(t, int64(0), promise.FailureCount) - }) -} - -func TestDialer_DialTimeout(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, - Reconfigure: testplanet.Reconfigure{}, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - log := zaptest.NewLogger(t) - - const dialTimeout = 5 * time.Second - - rpcdial := planet.Satellites[0].Dialer - rpcdial.DialTimeout = dialTimeout - - dialer := piecedeletion.NewDialer(log, rpcdial, 5*time.Second, 1*time.Minute, 100) - require.NotNil(t, dialer) - - require.NoError(t, planet.StopPeer(planet.StorageNodes[0])) - - storageNode := planet.StorageNodes[0].NodeURL() - - { - promise, jobs := makeJobsQueue(t, 1) - // we should fail to dial in the time allocated - start := time.Now() - dialer.Handle(ctx, storageNode, jobs) - failingToDial := time.Since(start) - - require.Less(t, failingToDial.Seconds(), (2 * dialTimeout).Seconds()) - require.Equal(t, int64(0), promise.SuccessCount) - require.Equal(t, int64(1), promise.FailureCount) - } - - { - promise, jobs := makeJobsQueue(t, 1) - - // we should immediately return when we try to redial within 1 minute - start := time.Now() - dialer.Handle(ctx, storageNode, jobs) - failingToRedial := time.Since(start) - - require.Less(t, failingToRedial.Seconds(), time.Second.Seconds()) - require.Equal(t, int64(0), promise.SuccessCount) - require.Equal(t, int64(1), promise.FailureCount) - } - }) -} - -// we can use a random piece id, since deletion requests for already deleted pieces is expected. -func makeJobsQueue(t *testing.T, n int) (*CountedPromise, piecedeletion.Queue) { - promise := &CountedPromise{} - - jobs := piecedeletion.NewLimitedJobs(-1) - for i := 0; i < n; i++ { - require.True(t, jobs.TryPush(piecedeletion.Job{ - Pieces: []storj.PieceID{testrand.PieceID(), testrand.PieceID()}, - Resolve: promise, - })) - } - - return promise, jobs -} diff --git a/satellite/metainfo/piecedeletion/doc.go b/satellite/metainfo/piecedeletion/doc.go deleted file mode 100644 index eff72106f..000000000 --- a/satellite/metainfo/piecedeletion/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -// Package piecedeletion implements service for deleting pieces that combines concurrent requests. -package piecedeletion - -import ( - "github.com/spacemonkeygo/monkit/v3" - "github.com/zeebo/errs" -) - -var mon = monkit.Package() - -// Error is the default error class for piece deletion. -var Error = errs.Class("piece deletion") diff --git a/satellite/metainfo/piecedeletion/handler.go b/satellite/metainfo/piecedeletion/handler.go deleted file mode 100644 index dabe7edf3..000000000 --- a/satellite/metainfo/piecedeletion/handler.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion - -import ( - "context" - - "golang.org/x/sync/semaphore" - - "storj.io/common/storj" -) - -// LimitedHandler wraps handler with a concurrency limit. -type LimitedHandler struct { - active *semaphore.Weighted - Handler -} - -// NewLimitedHandler wraps handler with a concurrency limit. -func NewLimitedHandler(handler Handler, limit int) *LimitedHandler { - return &LimitedHandler{ - active: semaphore.NewWeighted(int64(limit)), - Handler: handler, - } -} - -// Handle handles the job queue. -func (handler *LimitedHandler) Handle(ctx context.Context, node storj.NodeURL, queue Queue) { - if err := handler.active.Acquire(ctx, 1); err != nil { - return - } - defer handler.active.Release(1) - - handler.Handler.Handle(ctx, node, queue) -} diff --git a/satellite/metainfo/piecedeletion/handler_test.go b/satellite/metainfo/piecedeletion/handler_test.go deleted file mode 100644 index 8cc765b1d..000000000 --- a/satellite/metainfo/piecedeletion/handler_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "context" - "sync/atomic" - "testing" - "time" - - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/common/testcontext" - "storj.io/storj/satellite/metainfo/piecedeletion" -) - -type HandleLimitVerifier struct { - Active int64 - ExpectedLimit int64 -} - -func (*HandleLimitVerifier) NewQueue() piecedeletion.Queue { - panic("should not be called") -} - -func (verifier *HandleLimitVerifier) Handle(ctx context.Context, node storj.NodeURL, queue piecedeletion.Queue) { - current := atomic.AddInt64(&verifier.Active, 1) - if current > verifier.ExpectedLimit { - panic("over limit") - } - defer atomic.AddInt64(&verifier.Active, -1) - defer sync2.Sleep(ctx, time.Millisecond) -} - -func TestLimitedHandler(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - verifier := &HandleLimitVerifier{ - Active: 0, - ExpectedLimit: 8, - } - - limited := piecedeletion.NewLimitedHandler(verifier, int(verifier.ExpectedLimit)) - - for i := 0; i < 800; i++ { - ctx.Go(func() error { - limited.Handle(ctx, storj.NodeURL{}, nil) - return nil - }) - } -} diff --git a/satellite/metainfo/piecedeletion/queue.go b/satellite/metainfo/piecedeletion/queue.go deleted file mode 100644 index 87d225f27..000000000 --- a/satellite/metainfo/piecedeletion/queue.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion - -import "sync" - -// LimitedJobs is a finalizable list of deletion jobs with a limit to how many -// jobs it can handle. -type LimitedJobs struct { - maxPiecesPerBatch int - - mu sync.Mutex - // done indicates that no more items will be appended to the queue. - done bool - // count is the number of piece ids queued here. - count int - // list is the list of delete jobs. - list []Job -} - -// NewLimitedJobs returns a new limited job queue. -func NewLimitedJobs(maxPiecesPerBatch int) *LimitedJobs { - return &LimitedJobs{ - maxPiecesPerBatch: maxPiecesPerBatch, - } -} - -// TryPush tries to add a job to the queue. -// -// maxPiecesPerBatch < 0, means no limit. -func (jobs *LimitedJobs) TryPush(job Job) bool { - jobs.mu.Lock() - defer jobs.mu.Unlock() - - // check whether we have finished work with this jobs queue. - if jobs.done { - return false - } - - // add to the queue, this can potentially overflow `maxPiecesPerBatch`, - // however splitting a single request and promise across multiple batches, is annoying. - jobs.count += len(job.Pieces) - - // check whether the queue is at capacity - if jobs.maxPiecesPerBatch >= 0 && jobs.count >= jobs.maxPiecesPerBatch { - jobs.done = true - } - - jobs.list = append(jobs.list, job) - return true -} - -// PopAll returns all the jobs in this list. -func (jobs *LimitedJobs) PopAll() (_ []Job, ok bool) { - jobs.mu.Lock() - defer jobs.mu.Unlock() - - // when we try to pop and the queue is empty, make the queue final. - if len(jobs.list) == 0 { - jobs.done = true - return nil, false - } - - list := jobs.list - jobs.list = nil - return list, true -} - -// PopAllWithoutClose returns all the jobs in this list without closing the queue. -func (jobs *LimitedJobs) PopAllWithoutClose() []Job { - jobs.mu.Lock() - defer jobs.mu.Unlock() - - list := jobs.list - jobs.list = nil - return list -} diff --git a/satellite/metainfo/piecedeletion/queue_test.go b/satellite/metainfo/piecedeletion/queue_test.go deleted file mode 100644 index 2d526af8f..000000000 --- a/satellite/metainfo/piecedeletion/queue_test.go +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "errors" - "sort" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/satellite/metainfo/piecedeletion" -) - -func TestLimitedJobs(t *testing.T) { - { // pop on an empty list - q := piecedeletion.NewLimitedJobs(-1) - list, ok := q.PopAll() - require.False(t, ok) - require.Nil(t, list) - } - - { // pop on a non-empty list - q := piecedeletion.NewLimitedJobs(-1) - - job1 := randomJob(2) - job2 := randomJob(3) - - // first push should always work - require.True(t, q.TryPush(job1)) - // try push another, currently we don't have limits - require.True(t, q.TryPush(job2)) - - list, ok := q.PopAll() - require.True(t, ok) - require.Equal(t, []piecedeletion.Job{job1, job2}, list) - - // should be empty - list, ok = q.PopAll() - require.False(t, ok) - require.Nil(t, list) - - // pushing another should fail - require.False(t, q.TryPush(randomJob(1))) - } -} - -func TestLimitedJobs_Limiting(t *testing.T) { - { - q := piecedeletion.NewLimitedJobs(2) - require.True(t, q.TryPush(randomJob(1))) - require.True(t, q.TryPush(randomJob(1))) - require.False(t, q.TryPush(randomJob(1))) - require.False(t, q.TryPush(randomJob(1))) - } - - { - q := piecedeletion.NewLimitedJobs(2) - require.True(t, q.TryPush(randomJob(1))) - _, _ = q.PopAll() - require.True(t, q.TryPush(randomJob(1))) - _, _ = q.PopAll() - require.False(t, q.TryPush(randomJob(1))) - _, _ = q.PopAll() - require.False(t, q.TryPush(randomJob(1))) - } -} - -func TestLimitedJobs_NoClose(t *testing.T) { - { - q := piecedeletion.NewLimitedJobs(2) - job1, job2 := randomJob(1), randomJob(1) - - require.True(t, q.TryPush(job1)) - list := q.PopAllWithoutClose() - require.Equal(t, []piecedeletion.Job{job1}, list) - - list = q.PopAllWithoutClose() - require.Empty(t, list) - - require.True(t, q.TryPush(job2)) - list = q.PopAllWithoutClose() - require.Equal(t, []piecedeletion.Job{job2}, list) - } -} - -func TestLimitedJobs_Concurrent(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - const N = 4 - - q := piecedeletion.NewLimitedJobs(-1) - - jobs := []piecedeletion.Job{ - randomJob(1), - randomJob(2), - randomJob(3), - randomJob(4), - } - - var wg sync.WaitGroup - wg.Add(N) - for i := 0; i < N; i++ { - i := i - ctx.Go(func() error { - defer wg.Done() - if !q.TryPush(jobs[i]) { - return errors.New("failed to add job") - } - return nil - }) - } - - ctx.Go(func() error { - wg.Wait() - - list, ok := q.PopAll() - if !ok { - return errors.New("failed to return jobs") - } - - sort.Slice(list, func(i, k int) bool { - return len(list[i].Pieces) < len(list[k].Pieces) - }) - - if !assert.Equal(t, jobs, list) { - return errors.New("not equal") - } - - return nil - }) -} - -func TestLimitedJobs_NoRace(t *testing.T) { - ctx := testcontext.New(t) - defer ctx.Cleanup() - - const N = 4 - - q := piecedeletion.NewLimitedJobs(-1) - - jobs := []piecedeletion.Job{ - randomJob(1), - randomJob(2), - randomJob(3), - randomJob(4), - } - for i := 0; i < N; i++ { - i := i - ctx.Go(func() error { - _ = q.TryPush(jobs[i]) - return nil - }) - } - - ctx.Go(func() error { - _, _ = q.PopAll() - return nil - }) - ctx.Go(func() error { - _ = q.PopAllWithoutClose() - return nil - }) -} - -func randomJob(n int) piecedeletion.Job { - job := piecedeletion.Job{} - for i := 0; i < n; i++ { - job.Pieces = append(job.Pieces, testrand.PieceID()) - } - return job -} diff --git a/satellite/metainfo/piecedeletion/service.go b/satellite/metainfo/piecedeletion/service.go deleted file mode 100644 index 3cd1b0fc4..000000000 --- a/satellite/metainfo/piecedeletion/service.go +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion - -import ( - "context" - "time" - - "github.com/zeebo/errs" - "go.uber.org/zap" - "golang.org/x/sync/semaphore" - - "storj.io/common/rpc" - "storj.io/common/storj" - "storj.io/common/sync2" - "storj.io/storj/satellite/overlay" -) - -// Config defines configuration options for Service. -type Config struct { - MaxConcurrency int `help:"maximum number of concurrent requests to storage nodes" default:"100"` - MaxConcurrentPieces int `help:"maximum number of concurrent pieces can be processed" default:"1000000" testDefault:"1000"` - - MaxPiecesPerBatch int `help:"maximum number of pieces per batch" default:"5000" testDefault:"4000"` - MaxPiecesPerRequest int `help:"maximum number pieces per single request" default:"1000" testDefault:"2000"` - - DialTimeout time.Duration `help:"timeout for dialing nodes (0 means satellite default)" default:"3s" testDefault:"2s"` - FailThreshold time.Duration `help:"threshold for retrying a failed node" releaseDefault:"10m" devDefault:"2s"` - RequestTimeout time.Duration `help:"timeout for a single delete request" releaseDefault:"15s" devDefault:"2s"` - DeleteSuccessThreshold float64 `help:"Which fraction of nodes should be contacted successfully until the delete of a batch of pieces is considered completed" default:".75"` -} - -const ( - minTimeout = 5 * time.Millisecond - maxTimeout = 5 * time.Minute -) - -// Verify verifies configuration sanity. -func (config *Config) Verify() errs.Group { - var errlist errs.Group - if config.MaxConcurrency <= 0 { - errlist.Add(Error.New("concurrency %d must be greater than 0", config.MaxConcurrency)) - } - if config.MaxConcurrentPieces <= 0 { - errlist.Add(Error.New("max concurrent pieces %d must be greater than 0", config.MaxConcurrentPieces)) - } - if config.MaxPiecesPerBatch < config.MaxPiecesPerRequest { - errlist.Add(Error.New("max pieces per batch %d should be larger than max pieces per request %d", config.MaxPiecesPerBatch, config.MaxPiecesPerRequest)) - } - if config.MaxPiecesPerBatch <= 0 { - errlist.Add(Error.New("max pieces per batch %d must be greater than 0", config.MaxPiecesPerBatch)) - } - if config.MaxPiecesPerRequest <= 0 { - errlist.Add(Error.New("max pieces per request %d must be greater than 0", config.MaxPiecesPerRequest)) - } - if config.DialTimeout != 0 && (config.DialTimeout <= minTimeout || maxTimeout <= config.DialTimeout) { - errlist.Add(Error.New("dial timeout %v must be between %v and %v", config.DialTimeout, minTimeout, maxTimeout)) - } - if config.RequestTimeout < minTimeout || maxTimeout < config.RequestTimeout { - errlist.Add(Error.New("request timeout %v should be between %v and %v", config.RequestTimeout, minTimeout, maxTimeout)) - } - return errlist -} - -// Nodes stores reliable nodes information. -type Nodes interface { - GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*overlay.SelectedNode, err error) -} - -// Service handles combining piece deletion requests. -// -// architecture: Service -type Service struct { - log *zap.Logger - config Config - - concurrentRequests *semaphore.Weighted - - rpcDialer rpc.Dialer - nodesDB Nodes - - running sync2.Fence - combiner *Combiner - dialer *Dialer - limited *LimitedHandler -} - -// NewService creates a new service. -func NewService(log *zap.Logger, dialer rpc.Dialer, nodesDB Nodes, config Config) (*Service, error) { - var errlist errs.Group - if log == nil { - errlist.Add(Error.New("log is nil")) - } - if dialer == (rpc.Dialer{}) { - errlist.Add(Error.New("dialer is zero")) - } - if nodesDB == nil { - errlist.Add(Error.New("nodesDB is nil")) - } - if errs := config.Verify(); len(errs) > 0 { - errlist.Add(errs...) - } - if err := errlist.Err(); err != nil { - return nil, Error.Wrap(err) - } - - dialerClone := dialer - if config.DialTimeout > 0 { - dialerClone.DialTimeout = config.DialTimeout - } - - if dialerClone.Pool == nil { - dialerClone.Pool = rpc.NewDefaultConnectionPool() - } - - return &Service{ - log: log, - config: config, - concurrentRequests: semaphore.NewWeighted(int64(config.MaxConcurrentPieces)), - rpcDialer: dialerClone, - nodesDB: nodesDB, - }, nil -} - -// newQueue creates the configured queue. -func (service *Service) newQueue() Queue { - return NewLimitedJobs(service.config.MaxPiecesPerBatch) -} - -// Run initializes the service. -func (service *Service) Run(ctx context.Context) error { - defer service.running.Release() - - config := service.config - service.dialer = NewDialer(service.log.Named("dialer"), service.rpcDialer, config.RequestTimeout, config.FailThreshold, config.MaxPiecesPerRequest) - service.limited = NewLimitedHandler(service.dialer, config.MaxConcurrency) - service.combiner = NewCombiner(ctx, service.limited, service.newQueue) - - return nil -} - -// Close shuts down the service. -func (service *Service) Close() error { - if service.combiner != nil { - service.combiner.Close() - } - return nil -} - -// DeleteWithCustomThreshold deletes the pieces specified in the requests, -// returning when they have been deleted from the specified fraction of storage nodes. -func (service *Service) DeleteWithCustomThreshold(ctx context.Context, requests []Request, successThreshold float64) (err error) { - defer mon.Task()(&ctx, len(requests), requestsPieceCount(requests), successThreshold)(&err) - - if len(requests) == 0 { - return nil - } - - // wait for combiner and dialer to set themselves up. - if !service.running.Wait(ctx) { - return Error.Wrap(ctx.Err()) - } - - for i, req := range requests { - if !req.IsValid() { - return Error.New("request #%d is invalid", i) - } - } - - // When number of pieces are more than the maximum limit, we let it overflow, - // so we don't have to split requests in to separate batches. - totalPieceCount := requestsPieceCount(requests) - if totalPieceCount > service.config.MaxConcurrentPieces { - totalPieceCount = service.config.MaxConcurrentPieces - } - - if err := service.concurrentRequests.Acquire(ctx, int64(totalPieceCount)); err != nil { - return Error.Wrap(err) - } - defer service.concurrentRequests.Release(int64(totalPieceCount)) - - // Create a map for matching node information with the corresponding - // request. - nodesReqs := make(map[storj.NodeID]Request, len(requests)) - nodeIDs := []storj.NodeID{} - for _, req := range requests { - if req.Node.Address == "" { - nodeIDs = append(nodeIDs, req.Node.ID) - } - nodesReqs[req.Node.ID] = req - } - - if len(nodeIDs) > 0 { - nodes, err := service.nodesDB.GetNodes(ctx, nodeIDs) - if err != nil { - // Pieces will be collected by garbage collector - return Error.Wrap(err) - } - - for _, node := range nodes { - req := nodesReqs[node.ID] - - nodesReqs[node.ID] = Request{ - Node: storj.NodeURL{ - ID: node.ID, - Address: node.Address.Address, - }, - Pieces: req.Pieces, - } - } - } - - threshold, err := sync2.NewSuccessThreshold(len(nodesReqs), successThreshold) - if err != nil { - return Error.Wrap(err) - } - - for _, req := range nodesReqs { - service.combiner.Enqueue(req.Node, Job{ - Pieces: req.Pieces, - Resolve: threshold, - }) - } - - threshold.Wait(ctx) - - return nil -} - -// Delete deletes the pieces specified in the requests, -// returning when they have been deleted from the default fraction of storage nodes. -func (service *Service) Delete(ctx context.Context, requests []Request) (err error) { - return service.DeleteWithCustomThreshold(ctx, requests, service.config.DeleteSuccessThreshold) -} - -// Request defines a deletion requests for a node. -type Request struct { - Node storj.NodeURL - Pieces []storj.PieceID -} - -// IsValid returns whether the request is valid. -func (req *Request) IsValid() bool { - return !req.Node.ID.IsZero() && len(req.Pieces) > 0 -} - -func requestsPieceCount(requests []Request) int { - total := 0 - for _, r := range requests { - total += len(r.Pieces) - } - return total -} diff --git a/satellite/metainfo/piecedeletion/service_test.go b/satellite/metainfo/piecedeletion/service_test.go deleted file mode 100644 index e98b9e5f4..000000000 --- a/satellite/metainfo/piecedeletion/service_test.go +++ /dev/null @@ -1,417 +0,0 @@ -// Copyright (C) 2020 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecedeletion_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" - - "storj.io/common/memory" - "storj.io/common/rpc" - "storj.io/common/storj" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/private/testplanet" - "storj.io/storj/satellite" - "storj.io/storj/satellite/metainfo/piecedeletion" - "storj.io/storj/satellite/overlay" - "storj.io/storj/storagenode" - "storj.io/storj/storagenode/blobstore/testblobs" - "storj.io/storj/storagenode/pieces" -) - -func TestService_New_Error(t *testing.T) { - log := zaptest.NewLogger(t) - dialer := rpc.NewDefaultDialer(nil) - - _, err := piecedeletion.NewService(nil, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 8, - MaxConcurrentPieces: 10, - MaxPiecesPerBatch: 0, - MaxPiecesPerRequest: 0, - DialTimeout: time.Second, - FailThreshold: 5 * time.Minute, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "log is nil") - - _, err = piecedeletion.NewService(log, rpc.Dialer{}, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 87, - MaxConcurrentPieces: 10, - DialTimeout: time.Second, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "dialer is zero") - - _, err = piecedeletion.NewService(log, dialer, nil, piecedeletion.Config{ - MaxConcurrency: 8, - MaxConcurrentPieces: 10, - MaxPiecesPerBatch: 0, - MaxPiecesPerRequest: 0, - DialTimeout: time.Second, - FailThreshold: 5 * time.Minute, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "nodesDB is nil") - - _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 0, - MaxConcurrentPieces: 10, - DialTimeout: time.Second, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "greater than 0") - - _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: -3, - MaxConcurrentPieces: 10, - DialTimeout: time.Second, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "greater than 0") - - _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 3, - MaxConcurrentPieces: -10, - DialTimeout: time.Second, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "greater than 0") - - _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 3, - MaxConcurrentPieces: 10, - DialTimeout: time.Nanosecond, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "dial timeout 1ns must be between 5ms and 5m0s") - - _, err = piecedeletion.NewService(log, dialer, &nodesDB{}, piecedeletion.Config{ - MaxConcurrency: 3, - MaxConcurrentPieces: 10, - DialTimeout: time.Hour, - }) - require.True(t, piecedeletion.Error.Has(err), err) - require.Contains(t, err.Error(), "dial timeout 1h0m0s must be between 5ms and 5m0s") -} - -func TestService_DeletePieces_AllNodesUp(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - // Use RSConfig for ensuring that we don't have long-tail cancellations - // and the upload doesn't leave garbage in the SNs - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.Combine( - testplanet.ReconfigureRS(2, 2, 4, 4), - testplanet.MaxSegmentSize(15*memory.KiB), - ), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - uplnk := planet.Uplinks[0] - satelliteSys := planet.Satellites[0] - - percentExp := 0.75 - - { - data := testrand.Bytes(10 * memory.KiB) - err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data) - require.NoError(t, err) - } - - // ensure that no requests return an error - err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, nil, percentExp) - require.NoError(t, err) - - var ( - totalUsedSpace int64 - requests []piecedeletion.Request - ) - for _, sn := range planet.StorageNodes { - // calculate the SNs total used space after data upload - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - totalUsedSpace += piecesTotal - - // Get all the pieces of the storage node - nodePieces := piecedeletion.Request{Node: sn.NodeURL()} - err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), - func(store pieces.StoredPieceAccess) error { - nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) - return nil - }, - ) - require.NoError(t, err) - - requests = append(requests, nodePieces) - } - - err = satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, percentExp) - require.NoError(t, err) - - planet.WaitForStorageNodeDeleters(ctx) - - // calculate the SNs used space after delete the pieces - var totalUsedSpaceAfterDelete int64 - for _, sn := range planet.StorageNodes { - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - totalUsedSpaceAfterDelete += piecesTotal - } - - // At this point we can only guarantee that the 75% of the SNs pieces - // are delete due to the success threshold - deletedUsedSpace := float64(totalUsedSpace-totalUsedSpaceAfterDelete) / float64(totalUsedSpace) - if deletedUsedSpace < percentExp { - t.Fatalf("deleted used space is less than %e%%. Got %f", percentExp, deletedUsedSpace) - } - }) -} - -func TestService_DeletePieces_SomeNodesDown(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - // Use RSConfig for ensuring that we don't have long-tail cancellations - // and the upload doesn't leave garbage in the SNs - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.Combine( - testplanet.ReconfigureRS(2, 2, 4, 4), - testplanet.MaxSegmentSize(15*memory.KiB), - ), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - uplnk := planet.Uplinks[0] - satelliteSys := planet.Satellites[0] - numToShutdown := 2 - - { - data := testrand.Bytes(10 * memory.KiB) - err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data) - require.NoError(t, err) - } - - var requests []piecedeletion.Request - - for i, sn := range planet.StorageNodes { - // Get all the pieces of the storage node - nodePieces := piecedeletion.Request{Node: sn.NodeURL()} - err := sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), - func(store pieces.StoredPieceAccess) error { - nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) - return nil - }, - ) - require.NoError(t, err) - - requests = append(requests, nodePieces) - - // stop the first numToShutdown SNs before deleting pieces - if i < numToShutdown { - require.NoError(t, planet.StopPeer(sn)) - } - } - - err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.9999) - require.NoError(t, err) - - planet.WaitForStorageNodeDeleters(ctx) - - // Check that storage nodes which are online when deleting pieces don't - // hold any piece - var totalUsedSpace int64 - for i := numToShutdown; i < len(planet.StorageNodes); i++ { - piecesTotal, _, err := planet.StorageNodes[i].Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - totalUsedSpace += piecesTotal - } - - require.Zero(t, totalUsedSpace, "totalUsedSpace online nodes") - }) -} - -func TestService_DeletePieces_AllNodesDown(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - // Use RSConfig for ensuring that we don't have long-tail cancellations - // and the upload doesn't leave garbage in the SNs - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.Combine( - testplanet.ReconfigureRS(2, 2, 4, 4), - testplanet.MaxSegmentSize(15*memory.KiB), - ), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - uplnk := planet.Uplinks[0] - satelliteSys := planet.Satellites[0] - - { - data := testrand.Bytes(10 * memory.KiB) - err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data) - require.NoError(t, err) - } - - var ( - expectedTotalUsedSpace int64 - requests []piecedeletion.Request - ) - for _, sn := range planet.StorageNodes { - // calculate the SNs total used space after data upload - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - expectedTotalUsedSpace += piecesTotal - - // Get all the pieces of the storage node - nodePieces := piecedeletion.Request{Node: sn.NodeURL()} - err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), - func(store pieces.StoredPieceAccess) error { - nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) - return nil - }, - ) - require.NoError(t, err) - - requests = append(requests, nodePieces) - require.NoError(t, planet.StopPeer(sn)) - } - - err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.9999) - require.NoError(t, err) - - planet.WaitForStorageNodeDeleters(ctx) - - var totalUsedSpace int64 - for _, sn := range planet.StorageNodes { - // calculate the SNs total used space after data upload - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - totalUsedSpace += piecesTotal - } - - require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace") - }) -} - -func TestService_DeletePieces_DisproportionateNumberOfRequestsAndNodes(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - satelliteSys := planet.Satellites[0] - - // make sure that the ratio of number of requests and number of nodes to - // be greater than the success threshold - percentExp := 0.75 - numberOfRequests := 20 - require.Less(t, float64(len(planet.StorageNodes))/float64(numberOfRequests), percentExp) - - // populate requests - requests := make([]piecedeletion.Request, numberOfRequests) - for i := range requests { - requests[i] = piecedeletion.Request{ - Node: planet.StorageNodes[i%2].NodeURL(), - Pieces: []storj.PieceID{testrand.PieceID()}, - } - } - - err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, percentExp) - require.NoError(t, err) - }) -} - -func TestService_DeletePieces_Invalid(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - service := planet.Satellites[0].API.Metainfo.PieceDeletion - - nodesPieces := []piecedeletion.Request{ - {Pieces: make([]storj.PieceID, 1)}, - {Pieces: make([]storj.PieceID, 1)}, - } - err := service.DeleteWithCustomThreshold(ctx, nodesPieces, 1) - require.Error(t, err) - assert.Contains(t, err.Error(), "request #0 is invalid") - }) -} - -func TestService_DeletePieces_Timeout(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - StorageNodeDB: func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error) { - return testblobs.NewSlowDB(log.Named("slowdb"), db), nil - }, - Satellite: testplanet.Combine( - func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.PieceDeletion.RequestTimeout = 200 * time.Millisecond - config.Metainfo.MaxSegmentSize = 15 * memory.KiB - }, - testplanet.ReconfigureRS(2, 2, 4, 4), - ), - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - uplnk := planet.Uplinks[0] - satelliteSys := planet.Satellites[0] - - { - data := testrand.Bytes(10 * memory.KiB) - err := uplnk.Upload(ctx, satelliteSys, "a-bucket", "object-filename", data) - require.NoError(t, err) - } - - var ( - expectedTotalUsedSpace int64 - requests []piecedeletion.Request - ) - for _, sn := range planet.StorageNodes { - // calculate the SNs total used space after data upload - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - expectedTotalUsedSpace += piecesTotal - - // Get all the pieces of the storage node - nodePieces := piecedeletion.Request{Node: sn.NodeURL()} - err = sn.Storage2.Store.WalkSatellitePieces(ctx, satelliteSys.ID(), - func(store pieces.StoredPieceAccess) error { - nodePieces.Pieces = append(nodePieces.Pieces, store.PieceID()) - return nil - }, - ) - require.NoError(t, err) - - requests = append(requests, nodePieces) - - // make delete operation on storage nodes slow - storageNodeDB := sn.DB.(*testblobs.SlowDB) - delay := 500 * time.Millisecond - storageNodeDB.SetLatency(delay) - } - - err := satelliteSys.API.Metainfo.PieceDeletion.DeleteWithCustomThreshold(ctx, requests, 0.75) - require.NoError(t, err) - // A timeout error won't be propagated up to the service level - // but we'll know that the deletes didn't happen based on usedSpace - // check below. - - var totalUsedSpace int64 - for _, sn := range planet.StorageNodes { - // calculate the SNs total used space after data upload - piecesTotal, _, err := sn.Storage2.Store.SpaceUsedForPieces(ctx) - require.NoError(t, err) - totalUsedSpace += piecesTotal - } - - require.Equal(t, expectedTotalUsedSpace, totalUsedSpace, "totalUsedSpace") - }) -} - -type nodesDB struct{} - -func (n *nodesDB) GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*overlay.SelectedNode, err error) { - return nil, nil -} diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index 5fccb3eb6..03b0e9d70 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -652,30 +652,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # toggle flag if overlay is enabled # metainfo.overlay: true -# Which fraction of nodes should be contacted successfully until the delete of a batch of pieces is considered completed -# metainfo.piece-deletion.delete-success-threshold: 0.75 - -# timeout for dialing nodes (0 means satellite default) -# metainfo.piece-deletion.dial-timeout: 3s - -# threshold for retrying a failed node -# metainfo.piece-deletion.fail-threshold: 10m0s - -# maximum number of concurrent requests to storage nodes -# metainfo.piece-deletion.max-concurrency: 100 - -# maximum number of concurrent pieces can be processed -# metainfo.piece-deletion.max-concurrent-pieces: 1000000 - -# maximum number of pieces per batch -# metainfo.piece-deletion.max-pieces-per-batch: 5000 - -# maximum number pieces per single request -# metainfo.piece-deletion.max-pieces-per-request: 1000 - -# timeout for a single delete request -# metainfo.piece-deletion.request-timeout: 15s - # max bucket count for a project. # metainfo.project-limits.max-buckets: 100