storj/storagenode/collector/service_test.go

179 lines
5.5 KiB
Go
Raw Normal View History

2019-05-08 12:11:59 +01:00
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package collector_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode/collector"
2019-05-08 12:11:59 +01:00
)
func TestCollector(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
},
2019-05-08 12:11:59 +01:00
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
// stop collector, so we can run it manually
storageNode.Collector.Loop.Pause()
// stop order sender because we will stop satellite later
storageNode.Storage2.Orders.Sender.Pause()
2019-05-08 12:11:59 +01:00
}
expectedData := testrand.Bytes(100 * memory.KiB)
2019-05-08 12:11:59 +01:00
// upload some data to exactly 2 nodes that expires in 8 days
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(8*24*time.Hour))
2019-05-08 12:11:59 +01:00
require.NoError(t, err)
// stop satellite to prevent audits
2019-05-08 12:11:59 +01:00
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
collections := 0
serialsPresent := 0
2019-05-08 12:11:59 +01:00
// imagine we are 30 minutes in the future
2019-05-08 12:11:59 +01:00
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.UsedSerials
2019-05-08 12:11:59 +01:00
// verify that we actually have some data on storage nodes
storagenode: Include trash space when calculating space used This commit adds functionality to include the space used in the trash directory when calculating available space on the node. It also includes this trash value in the space used cache, with methods to keep the cache up-to-date as files are trashed, restored, and emptied. As part of the commit, the RestoreTrash and EmptyTrash methods have slightly changed signatures. RestoreTrash now also returns the keys that were restored, while EmptyTrash also returns the total disk space recovered. Each of these changes makes it possible to keep the cache up-to-date and know how much space is being used/recovered. Also changed is the signature of PieceStoreAccess.ContentSize method. Previously this method returns only the content size of the blob, removing the size of any header data. This method has been renamed `Size` and returns both the full disk size and content size of the blob. This allows us to only stat the file once, and in some instances (i.e. cache) knowing the full file size is useful. Note: This commit simply adds the trash size data to the piece size data we were already collecting. The piece size data is not accurate for all use-cases (e.g. because it does not contain piece header data); however, this commit does not fix that problem. Now that the ContentSize (Size) method returns the full size of the file, it should be easier to fix this problem in a future commit. Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
2019-12-21 13:11:24 +00:00
used, err := pieceStore.SpaceUsedForBlobs(ctx)
2019-05-08 12:11:59 +01:00
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
continue
}
// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute))
2019-05-08 12:11:59 +01:00
require.NoError(t, err)
serialsPresent += usedSerials.Count()
2019-05-08 12:11:59 +01:00
collections++
}
require.NotZero(t, collections)
// ensure we haven't deleted used serials
require.Equal(t, 2, serialsPresent)
serialsPresent = 0
// imagine we are 2 hours in the future
for _, storageNode := range planet.StorageNodes {
usedSerials := storageNode.UsedSerials
// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour))
require.NoError(t, err)
serialsPresent += usedSerials.Count()
collections++
}
// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent)
// imagine we are 10 days in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.UsedSerials
// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour))
require.NoError(t, err)
// verify that we deleted everything
storagenode: Include trash space when calculating space used This commit adds functionality to include the space used in the trash directory when calculating available space on the node. It also includes this trash value in the space used cache, with methods to keep the cache up-to-date as files are trashed, restored, and emptied. As part of the commit, the RestoreTrash and EmptyTrash methods have slightly changed signatures. RestoreTrash now also returns the keys that were restored, while EmptyTrash also returns the total disk space recovered. Each of these changes makes it possible to keep the cache up-to-date and know how much space is being used/recovered. Also changed is the signature of PieceStoreAccess.ContentSize method. Previously this method returns only the content size of the blob, removing the size of any header data. This method has been renamed `Size` and returns both the full disk size and content size of the blob. This allows us to only stat the file once, and in some instances (i.e. cache) knowing the full file size is useful. Note: This commit simply adds the trash size data to the piece size data we were already collecting. The piece size data is not accurate for all use-cases (e.g. because it does not contain piece header data); however, this commit does not fix that problem. Now that the ContentSize (Size) method returns the full size of the file, it should be easier to fix this problem in a future commit. Change-Id: I4a6cae09e262c8452a618116d1dc66b687f59f85
2019-12-21 13:11:24 +00:00
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), used)
serialsPresent += usedSerials.Count()
collections++
}
// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent)
2019-05-08 12:11:59 +01:00
})
}
func TestCollector_fileNotFound(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(1, 1, 2, 2),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
for _, storageNode := range planet.StorageNodes {
// stop collector, so we can start a new service manually
storageNode.Collector.Loop.Stop()
// stop order sender because we will stop satellite later
storageNode.Storage2.Orders.Sender.Pause()
}
expectedData := testrand.Bytes(5 * memory.KiB)
// upload some data to exactly 2 nodes that expires in 1 day
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData, time.Now().Add(1*24*time.Hour))
require.NoError(t, err)
// stop satellite to prevent audits
require.NoError(t, planet.StopPeer(planet.Satellites[0]))
collections := 0
// assume we are 2 days in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
// verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsedForBlobs(ctx)
require.NoError(t, err)
if used == 0 {
// this storage node didn't get picked for storing data
continue
}
// delete file before collector service runs
err = pieceStore.DeleteNamespace(ctx, planet.Satellites[0].Identity.ID.Bytes())
require.NoError(t, err)
// create new observed logger
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
observedLogger := zap.New(observedZapCore)
// start new collector service
collectorService := collector.NewService(observedLogger, storageNode.Storage2.Store, storageNode.UsedSerials, storageNode.Config.Collector)
// collect all the data
err = collectorService.Collect(ctx, time.Now().Add(2*24*time.Hour))
require.NoError(t, err)
require.Equal(t, 2, observedLogs.Len())
// check "file does not exist" log
require.Equal(t, observedLogs.All()[0].Level, zapcore.WarnLevel)
require.Equal(t, observedLogs.All()[0].Message, "file does not exist")
// check piece info deleted from db log
require.Equal(t, observedLogs.All()[1].Level, zapcore.InfoLevel)
require.Equal(t, observedLogs.All()[1].Message, "deleted expired piece info from DB")
collections++
}
require.NotZero(t, collections)
})
}