diff --git a/storagenode/peer.go b/storagenode/peer.go index fbcf97d09..26a3d2617 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -542,6 +542,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.RetainService, peer.Contact.PingStats, peer.Storage2.Store, + peer.Storage2.TrashChore, peer.Storage2.PieceDeleter, peer.OrdersStore, peer.DB.Bandwidth(), diff --git a/storagenode/pieces/store_test.go b/storagenode/pieces/store_test.go index ec61e2bed..dd2b2b9ea 100644 --- a/storagenode/pieces/store_test.go +++ b/storagenode/pieces/store_test.go @@ -418,12 +418,14 @@ func TestTrashAndRestore(t *testing.T) { // Empty trash by running the chore once trashDur := 4 * 24 * time.Hour + chorectx, chorecancel := context.WithCancel(ctx) chore := pieces.NewTrashChore(zaptest.NewLogger(t), 24*time.Hour, trashDur, trust, store) ctx.Go(func() error { - return chore.Run(ctx) + return chore.Run(chorectx) }) - chore.TriggerWait(ctx) + chore.Cycle.TriggerWait() require.NoError(t, chore.Close()) + chorecancel() // Restore all pieces in the first satellite require.NoError(t, store.RestoreTrash(ctx, satellites[0].satelliteID)) diff --git a/storagenode/pieces/trashchore.go b/storagenode/pieces/trashchore.go index e2f46e17e..506b5ed46 100644 --- a/storagenode/pieces/trashchore.go +++ b/storagenode/pieces/trashchore.go @@ -5,11 +5,13 @@ package pieces import ( "context" + "sync" "time" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "storj.io/common/storj" "storj.io/common/sync2" "storj.io/storj/storagenode/trust" ) @@ -17,12 +19,15 @@ import ( // TrashChore is the chore that periodically empties the trash. type TrashChore struct { log *zap.Logger - interval time.Duration trashExpiryInterval time.Duration store *Store trust *trust.Pool - cycle *sync2.Cycle - started sync2.Fence + + Cycle *sync2.Cycle + + workers workersService + mu sync.Mutex + restoring map[storj.NodeID]bool } // NewTrashChore instantiates a new TrashChore. choreInterval is how often this @@ -31,10 +36,12 @@ type TrashChore struct { func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Duration, trust *trust.Pool, store *Store) *TrashChore { return &TrashChore{ log: log, - interval: choreInterval, trashExpiryInterval: trashExpiryInterval, store: store, trust: trust, + + Cycle: sync2.NewCycle(choreInterval), + restoring: map[storj.NodeID]bool{}, } } @@ -42,11 +49,19 @@ func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Dura func (chore *TrashChore) Run(ctx context.Context) (err error) { defer mon.Task()(&ctx)(&err) - chore.cycle = sync2.NewCycle(chore.interval) - chore.cycle.Start(ctx, &errgroup.Group{}, func(ctx context.Context) error { + var group errgroup.Group + chore.Cycle.Start(ctx, &group, func(ctx context.Context) error { chore.log.Debug("starting to empty trash") for _, satelliteID := range chore.trust.GetSatellites(ctx) { + // ignore satellites that are being restored + chore.mu.Lock() + isRestoring := chore.restoring[satelliteID] + chore.mu.Unlock() + if isRestoring { + continue + } + trashedBefore := time.Now().Add(-chore.trashExpiryInterval) err := chore.store.EmptyTrash(ctx, satelliteID, trashedBefore) if err != nil { @@ -56,22 +71,96 @@ func (chore *TrashChore) Run(ctx context.Context) (err error) { return nil }) - chore.started.Release() - return err + group.Go(func() error { + chore.workers.Run(ctx) + return nil + }) + return group.Wait() } -// TriggerWait ensures that the cycle is done at least once and waits for -// completion. If the cycle is currently running it waits for the previous to -// complete and then runs. -func (chore *TrashChore) TriggerWait(ctx context.Context) { - chore.started.Wait(ctx) - chore.cycle.TriggerWait() +// StartRestore starts restoring trash for the specified satellite. +func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) { + chore.mu.Lock() + isRestoring := chore.restoring[satellite] + if isRestoring { + chore.mu.Unlock() + return + } + chore.restoring[satellite] = true + chore.mu.Unlock() + + ok := chore.workers.Go(ctx, func(ctx context.Context) { + chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite)) + err := chore.store.RestoreTrash(ctx, satellite) + if err != nil { + chore.log.Error("restore trash failed", zap.Stringer("Satellite ID", satellite), zap.Error(err)) + } else { + chore.log.Info("restore trash finished", zap.Stringer("Satellite ID", satellite)) + } + + chore.mu.Lock() + delete(chore.restoring, satellite) + chore.mu.Unlock() + }) + if !ok { + chore.log.Info("failed to start restore trash", zap.Stringer("Satellite ID", satellite)) + } } // Close the chore. func (chore *TrashChore) Close() error { - if chore.cycle != nil { - chore.cycle.Close() - } + chore.Cycle.Close() return nil } + +// workersService allows to start workers with a different context. +type workersService struct { + started sync2.Fence + root context.Context + active sync.WaitGroup + + mu sync.Mutex + closed bool +} + +// Run starts waiting for worker requests with the specified context. +func (workers *workersService) Run(ctx context.Context) { + // setup root context that the workers are bound to + workers.root = ctx + workers.started.Release() + + // wait until it's time to shut down: + <-workers.root.Done() + + // ensure we don't allow starting workers after it's time to shut down + workers.mu.Lock() + workers.closed = true + workers.mu.Unlock() + + // wait for any remaining workers + workers.active.Wait() +} + +// Go tries to start a worker. +func (workers *workersService) Go(ctx context.Context, work func(context.Context)) bool { + // Wait until we can use workers.root. + if !workers.started.Wait(ctx) { + return false + } + + // check that we are still allowed to start new workers + workers.mu.Lock() + if workers.closed { + workers.mu.Unlock() + return false + } + workers.active.Add(1) + workers.mu.Unlock() + + go func() { + defer workers.active.Done() + work(workers.root) + }() + + return true +} diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 16cf7b877..96c1e0153 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -96,6 +96,7 @@ type Endpoint struct { pingStats pingStatsSource store *pieces.Store + trashChore *pieces.TrashChore ordersStore *orders.FileStore usage bandwidth.DB usedSerials *usedserials.Table @@ -105,7 +106,7 @@ type Endpoint struct { } // NewEndpoint creates a new piecestore endpoint. -func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, ordersStore *orders.FileStore, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error) { +func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, trashChore *pieces.TrashChore, pieceDeleter *pieces.Deleter, ordersStore *orders.FileStore, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error) { return &Endpoint{ log: log, config: config, @@ -117,6 +118,7 @@ func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, moni pingStats: pingStats, store: store, + trashChore: trashChore, ordersStore: ordersStore, usage: usage, usedSerials: usedSerials, @@ -750,13 +752,7 @@ func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb. return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "RestoreTrash called with untrusted ID") } - endpoint.log.Info("restore trash started", zap.Stringer("Satellite ID", peer.ID)) - err = endpoint.store.RestoreTrash(ctx, peer.ID) - if err != nil { - endpoint.log.Error("restore trash failed", zap.Stringer("Satellite ID", peer.ID), zap.Error(err)) - return nil, rpcstatus.Wrap(rpcstatus.Internal, err) - } - endpoint.log.Info("restore trash finished", zap.Stringer("Satellite ID", peer.ID)) + endpoint.trashChore.StartRestore(ctx, peer.ID) return &pb.RestoreTrashResponse{}, nil }