storj/satellite/metainfo/piecedeletion/handler.go
Egon Elbre 3d6518081a satellite/metainfo/piecedeletion: add Combiner
To handle concurrent deletion requests we need to combine them into a
single request.

To implement this we introduces few concurrency ideas:

* Combiner, which takes a node id and a Job and handles combining
  multiple requests to a single batch.

* Job, which represents deleting of multiple piece ids with a
  notification mechanism to the caller.

* Queue, which provides communication from Combiner to Handler.
  It can limit the number of requests per work queue.

* Handler, which takes an active Queue and processes it until it has
  consumed all the jobs.
  It can provide limits to handling concurrency.

Change-Id: I3299325534abad4bae66969ffa16c6ed95d5574f
2020-03-16 17:13:26 +00:00

37 lines
805 B
Go

// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"golang.org/x/sync/semaphore"
"storj.io/common/pb"
)
// LimitedHandler wraps handler with a concurrency limit.
type LimitedHandler struct {
active *semaphore.Weighted
Handler
}
// NewLimitedHandler wraps handler with a concurrency limit.
func NewLimitedHandler(handler Handler, limit int) *LimitedHandler {
return &LimitedHandler{
active: semaphore.NewWeighted(int64(limit)),
Handler: handler,
}
}
// Handle handles the job queue.
func (handler *LimitedHandler) Handle(ctx context.Context, node *pb.Node, queue Queue) {
if err := handler.active.Acquire(ctx, 1); err != nil {
return
}
defer handler.active.Release(1)
handler.Handler.Handle(ctx, node, queue)
}