storagenode/collector: delete expired piece info if file does not exist

The collector tries deleting a piece over and over again, though
the piece does not exist on the storagenode's filesystem.
We need to delete the piece info from the expired db if the
targeted file does not exist.
This does not resolve the base problem of why the file
is deleted before the collector tries deleting it.
This change deletes the piece info from the expired db
if the file does not exist, since we're already trying
to delete that piece anyway.

Closes https://github.com/storj/storj/issues/4192

Change-Id: If659185ca14f1cb29fd3c4237374df6fcd535df8
This commit is contained in:
Clement Sam 2022-09-08 11:08:49 +00:00 committed by Storj Robot
parent 90eded4d99
commit 07beef378d
3 changed files with 91 additions and 7 deletions

View File

@ -97,7 +97,13 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)
err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID) err := service.pieces.Delete(ctx, expired.SatelliteID, expired.PieceID)
if err != nil { if err != nil {
if errs.Is(err, os.ErrNotExist) { if errs.Is(err, os.ErrNotExist) {
service.log.Info("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) service.log.Warn("file does not exist", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
err := service.pieces.DeleteExpired(ctx, expired.SatelliteID, expired.PieceID)
if err != nil {
service.log.Error("unable to delete expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
continue
}
service.log.Info("deleted expired piece info from DB", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
continue continue
} }
errfailed := service.pieces.DeleteFailed(ctx, expired, now) errfailed := service.pieces.DeleteFailed(ctx, expired, now)
@ -107,7 +113,7 @@ func (service *Service) Collect(ctx context.Context, now time.Time) (err error)
service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err)) service.log.Error("unable to delete piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID), zap.Error(err))
continue continue
} }
service.log.Info("delete expired", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID)) service.log.Info("deleted expired piece", zap.Stringer("Satellite ID", expired.SatelliteID), zap.Stringer("Piece ID", expired.PieceID))
count++ count++
} }

View File

@ -8,11 +8,15 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
"storj.io/storj/storagenode/collector"
) )
func TestCollector(t *testing.T) { func TestCollector(t *testing.T) {
@ -108,3 +112,67 @@ func TestCollector(t *testing.T) {
require.Equal(t, 0, serialsPresent) require.Equal(t, 0, serialsPresent)
}) })
} }
func TestCollector_fileNotFound(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
// stop collector, so we can start a new service manually
storageNode.Collector.Loop.Stop()
// stop order sender because we will stop satellite later
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(5 * memory.KiB)
// upload some data to exactly 2 nodes that expires in 1 day
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour))
require.NoError(t, err)
// stop satellite to prevent audits
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
collections := 0
// assume we are 2 days in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
// verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
continue
}
// delete file before collector service runs
err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes())
require.NoError(t, err)
// create new observed logger
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
observedLogger := zap.New(observedZapCore)
// start new collector service
collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector)
// collect all the data
err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour))
require.NoError(t, err)
require.Equal(t, 2, observedLogs.Len())
// check "file does not exist" log
require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel)
require.Equal(t, observedLogs.All()[0].Message, "file does not exist")
// check piece info deleted from db log
require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel)
require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB")
collections++
}
require.NotZero(t, collections)
})
}

View File

@ -304,8 +304,21 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID
return Error.Wrap(err) return Error.Wrap(err)
} }
// delete records in both the piece_expirations and pieceinfo DBs, wherever we find it. // delete expired piece records
// both of these calls should return no error if the requested record is not found. err = store.DeleteExpired(ctx, satellite, pieceID)
if err == nil {
store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()),
zap.String("Piece ID", pieceID.String()))
}
return Error.Wrap(err)
}
// DeleteExpired deletes records in both the piece_expirations and pieceinfo DBs, wherever we find it.
// Should return no error if the requested record is not found in any of the DBs.
func (store *Store) DeleteExpired(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
if store.expirationInfo != nil { if store.expirationInfo != nil {
_, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID) _, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID)
} }
@ -313,9 +326,6 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID)) err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID))
} }
store.log.Debug("deleted piece", zap.String("Satellite ID", satellite.String()),
zap.String("Piece ID", pieceID.String()))
return Error.Wrap(err) return Error.Wrap(err)
} }