From 07beef378d5ceec57bc1de546edad4981264c982 Mon Sep 17 00:00:00 2001 From: Clement Sam Date: Thu, 8 Sep 2022 11:08:49 +0000 Subject: [PATCH] storagenode/collector: delete expired piece info if file does not exist The collector tries deleting a piece over and over again, though the piece does not exist on the storagenode's filesystem. We need to delete the piece info from the expired db if the targeted file does not exist. This does not resolve the base problem of why the file is deleted before the collector tries deleting it. This change deletes the piece info from the expired db if the file does not exist, since we're already trying to delete that piece anyway. Closes https://github.com/storj/storj/issues/4192 Change-Id: If659185ca14f1cb29fd3c4237374df6fcd535df8 --- storagenode/collector/service.go | 10 +++- storagenode/collector/service_test.go | 68 +++++++++++++++++++++++++++ storagenode/pieces/store.go | 20 ++++++-- 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/storagenode/collector/service.go b/storagenode/collector/service.go index 6a72ab8b8..a6f6235dd 100644 --- a/storagenode/collector/service.go +++ b/storagenode/collector/service.go @@ -97,7 +97,13 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error) err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID) if err != nil { if errs.Is(err, os.ErrNotExist) { - service.log.Info("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + service.log.Warn("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + err := service.pieces.DeleteExpired(ctx, expired.SatelliteID, expired.PieceID) + if err != nil { + service.log.Error("unable to delete expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err)) + continue + } + service.log.Info("deleted expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) continue } errfailed := service.pieces.DeleteFailed(ctx, expired, now) @@ -107,7 +113,7 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error) service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err)) continue } - service.log.Info("delete expired", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + service.log.Info("deleted expired piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) count++ } diff --git a/storagenode/collector/service_test.go b/storagenode/collector/service_test.go index 3b5791757..88bbafeeb 100644 --- a/storagenode/collector/service_test.go +++ b/storagenode/collector/service_test.go @@ -8,11 +8,15 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/storagenode/collector" ) func TestCollector(t *testing.T) { @@ -108,3 +112,67 @@ func TestCollector(t *testing.T) { require.Equal(t, 0, serialsPresent) }) } + +func TestCollector_fileNotFound(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(1, 1, 2, 2), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + for _, storageNode := range planet.StorageNodes { + // stop collector, so we can start a new service manually + storageNode.Collector.Loop.Stop() + // stop order sender because we will stop satellite later + storageNode.Storage2.Orders.Sender.Pause() + } + + expectedData := testrand.Bytes(5 * memory.KiB) + + // upload some data to exactly 2 nodes that expires in 1 day + err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour)) + require.NoError(t, err) + + // stop satellite to prevent audits + require.NoError(t, planet.StopPeer(planet.Satellites[0])) + + collections := 0 + + // assume we are 2 days in the future + for _, storageNode := range planet.StorageNodes { + pieceStore := storageNode.DB.Pieces() + + // verify that we actually have some data on storage nodes + used, err := pieceStore.SpaceUsedForBlobs(ctx) + require.NoError(t, err) + if used == 0 { + // this storage node didn't get picked for storing data + continue + } + + // delete file before collector service runs + err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes()) + require.NoError(t, err) + + // create new observed logger + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + observedLogger := zap.New(observedZapCore) + // start new collector service + collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector) + // collect all the data + err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour)) + require.NoError(t, err) + require.Equal(t, 2, observedLogs.Len()) + // check "file does not exist" log + require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel) + require.Equal(t, observedLogs.All()[0].Message, "file does not exist") + // check piece info deleted from db log + require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel) + require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB") + + collections++ + } + + require.NotZero(t, collections) + }) +} diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index 6d0d31d8f..280e9c26a 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -304,8 +304,21 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID return Error.Wrap(err) } - // delete records in both the piece_expirations and pieceinfo DBs, wherever we find it. - // both of these calls should return no error if the requested record is not found. + // delete expired piece records + err = store.DeleteExpired(ctx, satellite, pieceID) + if err == nil { + store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()), + zap.String("Piece ID", pieceID.String())) + } + + return Error.Wrap(err) +} + +// DeleteExpired deletes records in both the piece_expirations and pieceinfo DBs, wherever we find it. +// Should return no error if the requested record is not found in any of the DBs. +func (store *Store) DeleteExpired(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) + if store.expirationInfo != nil { _, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID) } @@ -313,9 +326,6 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID)) } - store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()), - zap.String("Piece ID", pieceID.String())) - return Error.Wrap(err) }