07beef378d
The collector tries deleting a piece over and over again, though the piece does not exist on the storagenode's filesystem. We need to delete the piece info from the expired db if the targeted file does not exist. This does not resolve the base problem of why the file is deleted before the collector tries deleting it. This change deletes the piece info from the expired db if the file does not exist, since we're already trying to delete that piece anyway. Closes https://github.com/storj/storj/issues/4192 Change-Id: If659185ca14f1cb29fd3c4237374df6fcd535df8
179 lines
5.5 KiB
Go
179 lines
5.5 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package collector_test
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"go.uber.org/zap/zaptest/observer"
|
|
|
|
"storj.io/common/memory"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/storagenode/collector"
|
|
)
|
|
|
|
func TestCollector(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
for _, storageNode := range planet.StorageNodes {
|
|
// stop collector, so we can run it manually
|
|
storageNode.Collector.Loop.Pause()
|
|
// stop order sender because we will stop satellite later
|
|
storageNode.Storage2.Orders.Sender.Pause()
|
|
}
|
|
|
|
expectedData := testrand.Bytes(100 * memory.KiB)
|
|
|
|
// upload some data to exactly 2 nodes that expires in 8 days
|
|
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(8*24*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// stop satellite to prevent audits
|
|
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
|
|
|
|
collections := 0
|
|
serialsPresent := 0
|
|
|
|
// imagine we are 30 minutes in the future
|
|
for _, storageNode := range planet.StorageNodes {
|
|
pieceStore := storageNode.DB.Pieces()
|
|
usedSerials := storageNode.UsedSerials
|
|
|
|
// verify that we actually have some data on storage nodes
|
|
used, err := pieceStore.SpaceUsedForBlobs(ctx)
|
|
require.NoError(t, err)
|
|
if used == 0 {
|
|
// this storage node didn't get picked for storing data
|
|
continue
|
|
}
|
|
|
|
// collect all the data
|
|
err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute))
|
|
require.NoError(t, err)
|
|
|
|
serialsPresent += usedSerials.Count()
|
|
|
|
collections++
|
|
}
|
|
|
|
require.NotZero(t, collections)
|
|
// ensure we haven't deleted used serials
|
|
require.Equal(t, 2, serialsPresent)
|
|
|
|
serialsPresent = 0
|
|
|
|
// imagine we are 2 hours in the future
|
|
for _, storageNode := range planet.StorageNodes {
|
|
usedSerials := storageNode.UsedSerials
|
|
|
|
// collect all the data
|
|
err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
serialsPresent += usedSerials.Count()
|
|
|
|
collections++
|
|
}
|
|
|
|
// ensure we have deleted used serials
|
|
require.Equal(t, 0, serialsPresent)
|
|
|
|
// imagine we are 10 days in the future
|
|
for _, storageNode := range planet.StorageNodes {
|
|
pieceStore := storageNode.DB.Pieces()
|
|
usedSerials := storageNode.UsedSerials
|
|
|
|
// collect all the data
|
|
err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// verify that we deleted everything
|
|
used, err := pieceStore.SpaceUsedForBlobs(ctx)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(0), used)
|
|
|
|
serialsPresent += usedSerials.Count()
|
|
|
|
collections++
|
|
}
|
|
|
|
// ensure we have deleted used serials
|
|
require.Equal(t, 0, serialsPresent)
|
|
})
|
|
}
|
|
|
|
func TestCollector_fileNotFound(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
|
|
Reconfigure: testplanet.Reconfigure{
|
|
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
|
|
},
|
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
for _, storageNode := range planet.StorageNodes {
|
|
// stop collector, so we can start a new service manually
|
|
storageNode.Collector.Loop.Stop()
|
|
// stop order sender because we will stop satellite later
|
|
storageNode.Storage2.Orders.Sender.Pause()
|
|
}
|
|
|
|
expectedData := testrand.Bytes(5 * memory.KiB)
|
|
|
|
// upload some data to exactly 2 nodes that expires in 1 day
|
|
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// stop satellite to prevent audits
|
|
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
|
|
|
|
collections := 0
|
|
|
|
// assume we are 2 days in the future
|
|
for _, storageNode := range planet.StorageNodes {
|
|
pieceStore := storageNode.DB.Pieces()
|
|
|
|
// verify that we actually have some data on storage nodes
|
|
used, err := pieceStore.SpaceUsedForBlobs(ctx)
|
|
require.NoError(t, err)
|
|
if used == 0 {
|
|
// this storage node didn't get picked for storing data
|
|
continue
|
|
}
|
|
|
|
// delete file before collector service runs
|
|
err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes())
|
|
require.NoError(t, err)
|
|
|
|
// create new observed logger
|
|
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
|
|
observedLogger := zap.New(observedZapCore)
|
|
// start new collector service
|
|
collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector)
|
|
// collect all the data
|
|
err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, observedLogs.Len())
|
|
// check "file does not exist" log
|
|
require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel)
|
|
require.Equal(t, observedLogs.All()[0].Message, "file does not exist")
|
|
// check piece info deleted from db log
|
|
require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel)
|
|
require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB")
|
|
|
|
collections++
|
|
}
|
|
|
|
require.NotZero(t, collections)
|
|
})
|
|
}
|