diff --git a/storagenode/collector/service.go b/storagenode/collector/service.go index 6a72ab8b8..a6f6235dd 100644 --- a/storagenode/collector/service.go +++ b/storagenode/collector/service.go @@ -97,7 +97,13 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error) err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID) if err != nil { if errs.Is(err, os.ErrNotExist) { - service.log.Info("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + service.log.Warn("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + err := service.pieces.DeleteExpired(ctx, expired.SatelliteID, expired.PieceID) + if err != nil { + service.log.Error("unable to delete expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err)) + continue + } + service.log.Info("deleted expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) continue } errfailed := service.pieces.DeleteFailed(ctx, expired, now) @@ -107,7 +113,7 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error) service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err)) continue } - service.log.Info("delete expired", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) + service.log.Info("deleted expired piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) count++ } diff --git a/storagenode/collector/service_test.go b/storagenode/collector/service_test.go index 3b5791757..88bbafeeb 100644 --- a/storagenode/collector/service_test.go +++ b/storagenode/collector/service_test.go @@ -8,11 +8,15 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "storj.io/common/memory" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" + "storj.io/storj/storagenode/collector" ) func TestCollector(t *testing.T) { @@ -108,3 +112,67 @@ func TestCollector(t *testing.T) { require.Equal(t, 0, serialsPresent) }) } + +func TestCollector_fileNotFound(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(1, 1, 2, 2), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + for _, storageNode := range planet.StorageNodes { + // stop collector, so we can start a new service manually + storageNode.Collector.Loop.Stop() + // stop order sender because we will stop satellite later + storageNode.Storage2.Orders.Sender.Pause() + } + + expectedData := testrand.Bytes(5 * memory.KiB) + + // upload some data to exactly 2 nodes that expires in 1 day + err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour)) + require.NoError(t, err) + + // stop satellite to prevent audits + require.NoError(t, planet.StopPeer(planet.Satellites[0])) + + collections := 0 + + // assume we are 2 days in the future + for _, storageNode := range planet.StorageNodes { + pieceStore := storageNode.DB.Pieces() + + // verify that we actually have some data on storage nodes + used, err := pieceStore.SpaceUsedForBlobs(ctx) + require.NoError(t, err) + if used == 0 { + // this storage node didn't get picked for storing data + continue + } + + // delete file before collector service runs + err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes()) + require.NoError(t, err) + + // create new observed logger + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + observedLogger := zap.New(observedZapCore) + // start new collector service + collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector) + // collect all the data + err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour)) + require.NoError(t, err) + require.Equal(t, 2, observedLogs.Len()) + // check "file does not exist" log + require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel) + require.Equal(t, observedLogs.All()[0].Message, "file does not exist") + // check piece info deleted from db log + require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel) + require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB") + + collections++ + } + + require.NotZero(t, collections) + }) +} diff --git a/storagenode/pieces/store.go b/storagenode/pieces/store.go index 6d0d31d8f..280e9c26a 100644 --- a/storagenode/pieces/store.go +++ b/storagenode/pieces/store.go @@ -304,8 +304,21 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID return Error.Wrap(err) } - // delete records in both the piece_expirations and pieceinfo DBs, wherever we find it. - // both of these calls should return no error if the requested record is not found. + // delete expired piece records + err = store.DeleteExpired(ctx, satellite, pieceID) + if err == nil { + store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()), + zap.String("Piece ID", pieceID.String())) + } + + return Error.Wrap(err) +} + +// DeleteExpired deletes records in both the piece_expirations and pieceinfo DBs, wherever we find it. +// Should return no error if the requested record is not found in any of the DBs. +func (store *Store) DeleteExpired(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) { + defer mon.Task()(&ctx)(&err) + if store.expirationInfo != nil { _, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID) } @@ -313,9 +326,6 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID)) } - store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()), - zap.String("Piece ID", pieceID.String())) - return Error.Wrap(err) }