diff --git a/go.mod b/go.mod index 22dc0d31f..7721b48b0 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e gopkg.in/segmentio/analytics-go.v3 v3.1.0 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 - storj.io/common v0.0.0-20221215155610-3715c7f7ce66 + storj.io/common v0.0.0-20221223153333-f5b4455d9cbe storj.io/drpc v0.0.32 storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41 storj.io/private v0.0.0-20221108123115-3a27297f0b78 diff --git a/go.sum b/go.sum index 1004ab6fa..b4fb8f01b 100644 --- a/go.sum +++ b/go.sum @@ -951,8 +951,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck= sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI= storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg= storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41 h1:SVuEocEhZfFc13J1AmlVLitdGXTVrvmbzN4Z9C9Ms40= diff --git a/storagenode/pieces/trashchore.go b/storagenode/pieces/trashchore.go index 506b5ed46..86b71f02e 100644 --- a/storagenode/pieces/trashchore.go +++ b/storagenode/pieces/trashchore.go @@ -9,7 +9,6 @@ import ( "time" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "storj.io/common/storj" "storj.io/common/sync2" @@ -25,11 +24,19 @@ type TrashChore struct { Cycle *sync2.Cycle - workers workersService - mu sync.Mutex - restoring map[storj.NodeID]bool + started sync2.Fence + root context.Context + + mu sync.Mutex + done bool + satellites map[storj.NodeID]*sync2.Workplace } +const ( + jobEmptyTrash = 1 + jobRestoreTrash = 2 +) + // NewTrashChore instantiates a new TrashChore. choreInterval is how often this // chore runs, and trashExpiryInterval is passed into the EmptyTrash method to // determine which trashed pieces should be deleted. @@ -40,8 +47,8 @@ func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Dura store: store, trust: trust, - Cycle: sync2.NewCycle(choreInterval), - restoring: map[storj.NodeID]bool{}, + Cycle: sync2.NewCycle(choreInterval), + satellites: map[storj.NodeID]*sync2.Workplace{}, } } @@ -49,47 +56,78 @@ func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Dura func (chore *TrashChore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - var group errgroup.Group - chore.Cycle.Start(ctx, &group, func(ctx context.Context) error { + chore.root = ctx + chore.started.Release() + + err = chore.Cycle.Run(ctx, func(ctx context.Context) error { chore.log.Debug("starting to empty trash") - for _, satelliteID := range chore.trust.GetSatellites(ctx) { - // ignore satellites that are being restored - chore.mu.Lock() - isRestoring := chore.restoring[satelliteID] - chore.mu.Unlock() - if isRestoring { - continue - } + var wg sync.WaitGroup + limiter := make(chan struct{}, 1) + for _, satellite := range chore.trust.GetSatellites(ctx) { + satellite := satellite + place := chore.ensurePlace(satellite) + wg.Add(1) + ok := place.Start(chore.root, jobEmptyTrash, nil, func(ctx context.Context) { + defer wg.Done() + // don't allow multiple trash jobs at the same time + select { + case <-ctx.Done(): + return + case limiter <- struct{}{}: + } + defer func() { <-limiter }() - trashedBefore := time.Now().Add(-chore.trashExpiryInterval) - err := chore.store.EmptyTrash(ctx, satelliteID, trashedBefore) - if err != nil { - chore.log.Error("emptying trash failed", zap.Error(err)) + chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite)) + trashedBefore := time.Now().Add(-chore.trashExpiryInterval) + err := chore.store.EmptyTrash(ctx, satellite, trashedBefore) + if err != nil { + chore.log.Error("emptying trash failed", zap.Error(err)) + } + }) + if !ok { + wg.Done() } } - + wg.Wait() return nil }) - group.Go(func() error { - chore.workers.Run(ctx) - return nil - }) - return group.Wait() -} -// StartRestore starts restoring trash for the specified satellite. -func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) { chore.mu.Lock() - isRestoring := chore.restoring[satellite] - if isRestoring { - chore.mu.Unlock() - return - } - chore.restoring[satellite] = true + chore.done = true chore.mu.Unlock() - ok := chore.workers.Go(ctx, func(ctx context.Context) { + for _, place := range chore.satellites { + place.Cancel() + } + for _, place := range chore.satellites { + <-place.Done() + } + + return err +} + +// Close closes the chore. +func (chore *TrashChore) Close() error { + chore.Cycle.Close() + return nil +} + +// StartRestore starts a satellite restore, if it hasn't already started and +// the chore is not shutting down. +func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) error { + if !chore.started.Wait(ctx) { + return ctx.Err() + } + + place := chore.ensurePlace(satellite) + if place == nil { + return context.Canceled + } + + place.Start(chore.root, jobRestoreTrash, func(jobID interface{}) bool { + return jobID == jobEmptyTrash + }, func(ctx context.Context) { chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite)) err := chore.store.RestoreTrash(ctx, satellite) if err != nil { @@ -97,70 +135,23 @@ func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeI } else { chore.log.Info("restore trash finished", zap.Stringer("Satellite ID", satellite)) } - - chore.mu.Lock() - delete(chore.restoring, satellite) - chore.mu.Unlock() }) - if !ok { - chore.log.Info("failed to start restore trash", zap.Stringer("Satellite ID", satellite)) - } -} -// Close the chore. -func (chore *TrashChore) Close() error { - chore.Cycle.Close() return nil } -// workersService allows to start workers with a different context. -type workersService struct { - started sync2.Fence - root context.Context - active sync.WaitGroup - - mu sync.Mutex - closed bool -} - -// Run starts waiting for worker requests with the specified context. -func (workers *workersService) Run(ctx context.Context) { - // setup root context that the workers are bound to - workers.root = ctx - workers.started.Release() - - // wait until it's time to shut down: - <-workers.root.Done() - - // ensure we don't allow starting workers after it's time to shut down - workers.mu.Lock() - workers.closed = true - workers.mu.Unlock() - - // wait for any remaining workers - workers.active.Wait() -} - -// Go tries to start a worker. -func (workers *workersService) Go(ctx context.Context, work func(context.Context)) bool { - // Wait until we can use workers.root. - if !workers.started.Wait(ctx) { - return false +// ensurePlace creates a work place for the specified satellite. +func (chore *TrashChore) ensurePlace(satellite storj.NodeID) *sync2.Workplace { + chore.mu.Lock() + defer chore.mu.Unlock() + if chore.done { + return nil } - // check that we are still allowed to start new workers - workers.mu.Lock() - if workers.closed { - workers.mu.Unlock() - return false + place, ok := chore.satellites[satellite] + if !ok { + place = sync2.NewWorkPlace() + chore.satellites[satellite] = place } - workers.active.Add(1) - workers.mu.Unlock() - - go func() { - defer workers.active.Done() - work(workers.root) - }() - - return true + return place } diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 66cb6475e..ba3e4abbc 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -819,7 +819,10 @@ func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb. return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "RestoreTrash called with untrusted ID") } - endpoint.trashChore.StartRestore(ctx, peer.ID) + err = endpoint.trashChore.StartRestore(ctx, peer.ID) + if err != nil { + return nil, rpcstatus.Error(rpcstatus.Internal, "failed to start restore") + } return &pb.RestoreTrashResponse{}, nil } diff --git a/testsuite/storjscan/go.mod b/testsuite/storjscan/go.mod index 592a0cb82..95dafe77b 100644 --- a/testsuite/storjscan/go.mod +++ b/testsuite/storjscan/go.mod @@ -10,7 +10,7 @@ require ( github.com/zeebo/errs v1.3.0 go.uber.org/zap v1.21.0 golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde - storj.io/common v0.0.0-20221215155610-3715c7f7ce66 + storj.io/common v0.0.0-20221223153333-f5b4455d9cbe storj.io/private v0.0.0-20221108123115-3a27297f0b78 storj.io/storj v1.63.1 storj.io/storjscan v0.0.0-20220926140643-1623c3b391b0 diff --git a/testsuite/storjscan/go.sum b/testsuite/storjscan/go.sum index c040b2f3e..11fa368dd 100644 --- a/testsuite/storjscan/go.sum +++ b/testsuite/storjscan/go.sum @@ -1263,8 +1263,8 @@ storj.io/common v0.0.0-20220802175255-aae0c09ec9d4/go.mod h1:+gF7jbVvpjVIVHhK+EJ storj.io/common v0.0.0-20220829171748-14b0a3c9565e/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI= storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg= storj.io/monkit-jaeger v0.0.0-20220726162929-c3a9898b5bca/go.mod h1:iK+dmHZZXQlW7ahKdNSOo+raMk5BDL2wbD62FIeXLWs= diff --git a/testsuite/ui/go.mod b/testsuite/ui/go.mod index 3296e40ad..281f4296f 100644 --- a/testsuite/ui/go.mod +++ b/testsuite/ui/go.mod @@ -10,7 +10,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.17.0 - storj.io/common v0.0.0-20221215155610-3715c7f7ce66 + storj.io/common v0.0.0-20221223153333-f5b4455d9cbe storj.io/gateway-mt v1.18.1-0.20211210081136-cada9a567d31 storj.io/private v0.0.0-20221108123115-3a27297f0b78 storj.io/storj v0.12.1-0.20221125175451-ef4b564b82f7 diff --git a/testsuite/ui/go.sum b/testsuite/ui/go.sum index 2e92e0202..ef06c7e1f 100644 --- a/testsuite/ui/go.sum +++ b/testsuite/ui/go.sum @@ -1502,8 +1502,8 @@ storj.io/common v0.0.0-20211102144601-401a79f0706a/go.mod h1:a2Kw7Uipu929OFANfWK storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ= storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg= -storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o= +storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys= storj.io/dotworld v0.0.0-20210324183515-0d11aeccd840/go.mod h1:KU9YvEgRrMMiWLvH8pzn1UkoCoxggKIPvQxmNdx7aXQ= storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw= storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=