storj/storagenode/pieces/cache_test.go
Isaac Hess 4dafd03f11 storagenode: Prevent negative values in piece_space_used, migrate negatives to 0
Change-Id: Ibd663db087058c928190aa52c520f22e9338dd04
2020-01-30 13:03:18 -05:00

583 lines
17 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces_test
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/storage"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
func TestDBInit(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
spaceUsedDB := db.PieceSpaceUsedDB()
// Expect that no total record exists since we haven't
// initialized yet
piecesTotal, piecesContentSize, err := spaceUsedDB.GetPieceTotals(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), piecesTotal)
require.Equal(t, int64(0), piecesContentSize)
// Expect no record for trash total
trashTotal, err := spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), trashTotal)
// Now initialize the db to create the total record
err = spaceUsedDB.Init(ctx)
require.NoError(t, err)
// Now that a total record exists, we can update it
err = spaceUsedDB.UpdatePieceTotals(ctx, int64(100), int64(101))
require.NoError(t, err)
err = spaceUsedDB.UpdateTrashTotal(ctx, int64(150))
require.NoError(t, err)
// Confirm the total record has been updated
piecesTotal, piecesContentSize, err = spaceUsedDB.GetPieceTotals(ctx)
require.NoError(t, err)
require.Equal(t, int64(100), piecesTotal)
require.Equal(t, int64(101), piecesContentSize)
// Confirm the trash total record has been updated
trashTotal, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
require.Equal(t, int64(150), trashTotal)
})
}
func TestCacheInit(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
spaceUsedDB := db.PieceSpaceUsedDB()
err := spaceUsedDB.Init(ctx)
require.NoError(t, err)
// setup the cache with zero values
cache := pieces.NewBlobsUsageCacheTest(nil, 0, 0, 0, nil)
cacheService := pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
1*time.Hour,
)
// Confirm that when we call init before the cache has been persisted.
// that the cache gets initialized with zero values
err = cacheService.Init(ctx)
require.NoError(t, err)
piecesTotal, piecesContentSize, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), piecesTotal)
require.Equal(t, int64(0), piecesContentSize)
satPiecesTotal, satPiecesContentSize, err := cache.SpaceUsedBySatellite(ctx, storj.NodeID{1})
require.NoError(t, err)
require.Equal(t, int64(0), satPiecesTotal)
require.Equal(t, int64(0), satPiecesContentSize)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
require.Equal(t, int64(0), trashTotal)
// setup: update the cache then sync those cache values
// to the database
expectedPiecesTotal := int64(150)
expectedPiecesContentSize := int64(151)
expectedTotalBySA := map[storj.NodeID]pieces.SatelliteUsage{
{1}: {
Total: 100,
ContentSize: 101,
},
{2}: {
Total: 50,
ContentSize: 51,
},
}
expectedTrash := int64(127)
cache = pieces.NewBlobsUsageCacheTest(nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalBySA)
cacheService = pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
1*time.Hour,
)
err = cacheService.PersistCacheTotals(ctx)
require.NoError(t, err)
// Now create an empty cache. Values will be read later by the db.
cache = pieces.NewBlobsUsageCacheTest(nil, 0, 0, 0, nil)
cacheService = pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
1*time.Hour,
)
// Confirm that when we call Init after the cache has been persisted
// that the cache gets initialized with the values from the database
err = cacheService.Init(ctx)
require.NoError(t, err)
piecesTotal, piecesContentSize, err = cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
require.Equal(t, expectedPiecesTotal, piecesTotal)
require.Equal(t, expectedPiecesContentSize, piecesContentSize)
sat1PiecesTotal, sat1PiecesContentSize, err := cache.SpaceUsedBySatellite(ctx, storj.NodeID{1})
require.NoError(t, err)
require.Equal(t, int64(100), sat1PiecesTotal)
require.Equal(t, int64(101), sat1PiecesContentSize)
sat2PiecesTotal, sat2PiecesContentSize, err := cache.SpaceUsedBySatellite(ctx, storj.NodeID{2})
require.NoError(t, err)
require.Equal(t, int64(50), sat2PiecesTotal)
require.Equal(t, int64(51), sat2PiecesContentSize)
actualTrash, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
require.Equal(t, int64(127), actualTrash)
})
}
func TestPersistCacheTotals(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
// The database should start out with 0 for all totals
var expectedPiecesTotal int64
var expectedPiecesContentSize int64
spaceUsedDB := db.PieceSpaceUsedDB()
err := spaceUsedDB.Init(ctx)
require.NoError(t, err)
piecesTotal, piecesContentSize, err := spaceUsedDB.GetPieceTotals(ctx)
require.NoError(t, err)
assert.Equal(t, expectedPiecesTotal, piecesTotal)
assert.Equal(t, expectedPiecesContentSize, piecesContentSize)
var expectedTrash int64
err = spaceUsedDB.Init(ctx)
require.NoError(t, err)
trashTotal, err := spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash, trashTotal)
var expectedTotalsBySA = map[storj.NodeID]pieces.SatelliteUsage{}
totalsBySA, err := spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalsBySA, totalsBySA)
// setup: update the cache then sync those cache values
// to the database
// setup the cache with zero values
expectedPiecesTotal = 150
expectedPiecesContentSize = 151
expectedTotalsBySA = map[storj.NodeID]pieces.SatelliteUsage{
{1}: {
Total: 100,
ContentSize: 101,
},
{2}: {
Total: 50,
ContentSize: 51,
},
}
expectedTrash = 127
cache := pieces.NewBlobsUsageCacheTest(nil, expectedPiecesTotal, expectedPiecesContentSize, expectedTrash, expectedTotalsBySA)
cacheService := pieces.NewService(zap.L(),
cache,
pieces.NewStore(zap.L(), cache, nil, nil, spaceUsedDB),
1*time.Hour,
)
err = cacheService.PersistCacheTotals(ctx)
require.NoError(t, err)
// Confirm those cache values are now saved persistently in the database
piecesTotal, piecesContentSize, err = spaceUsedDB.GetPieceTotals(ctx)
require.NoError(t, err)
assert.Equal(t, expectedPiecesTotal, piecesTotal)
assert.Equal(t, expectedPiecesContentSize, piecesContentSize)
totalsBySA, err = spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalsBySA, totalsBySA)
trashTotal, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash, trashTotal)
// Change piece sizes
piecesTotalDelta := int64(35)
piecesContentSizeDelta := int64(30)
trashDelta := int64(35)
cache.Update(ctx, storj.NodeID{1}, -piecesTotalDelta, -piecesContentSizeDelta, trashDelta)
err = cacheService.PersistCacheTotals(ctx)
require.NoError(t, err)
// Confirm that the deleted stuff is not in the database anymore
piecesTotal, piecesContentSize, err = spaceUsedDB.GetPieceTotals(ctx)
require.NoError(t, err)
assert.Equal(t, expectedPiecesTotal-piecesTotalDelta, piecesTotal)
assert.Equal(t, expectedPiecesContentSize-piecesContentSizeDelta, piecesContentSize)
trashTotal, err = spaceUsedDB.GetTrashTotal(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTrash+trashDelta, trashTotal)
expectedTotalsBySA = map[storj.NodeID]pieces.SatelliteUsage{
{1}: {
Total: 65,
ContentSize: 71,
},
{2}: {
Total: 50,
ContentSize: 51,
},
}
totalsBySA, err = spaceUsedDB.GetPieceTotalsForAllSatellites(ctx)
require.NoError(t, err)
assert.Equal(t, expectedTotalsBySA, totalsBySA)
})
}
func TestRecalculateCache(t *testing.T) {
type testItem struct {
start, end, new, expected int64
}
type testCase struct {
name string
piecesTotal testItem
piecesContentSize testItem
trash testItem
}
testCases := []testCase{
{
name: "1",
piecesTotal: testItem{
start: 0,
end: 0,
new: 0,
expected: 0,
},
piecesContentSize: testItem{
start: 0,
end: 0,
new: 0,
expected: 0,
},
trash: testItem{
start: 0,
end: 0,
new: 0,
expected: 0,
},
},
{
name: "2",
piecesTotal: testItem{
start: 0,
end: 100,
new: 0,
expected: 50,
},
piecesContentSize: testItem{
start: 0,
end: 100,
new: 0,
expected: 50,
},
trash: testItem{
start: 100,
end: 110,
new: 50,
expected: 55,
},
},
{
name: "3",
piecesTotal: testItem{
start: 0,
end: 100,
new: 90,
expected: 140,
},
piecesContentSize: testItem{
start: 0,
end: 50,
new: 100,
expected: 125,
},
trash: testItem{
start: 0,
end: 100,
new: 50,
expected: 100,
},
},
{
name: "4",
piecesTotal: testItem{
start: 0,
end: 100,
new: -25,
expected: 25,
},
piecesContentSize: testItem{
start: 0,
end: 100,
new: -50,
expected: 0,
},
trash: testItem{
start: 0,
end: 10,
new: -3,
expected: 2,
},
},
{
name: "5",
piecesTotal: testItem{
start: 100,
end: 0,
new: 90,
expected: 40,
},
piecesContentSize: testItem{
start: 100,
end: 0,
new: 50,
expected: 0,
},
trash: testItem{
start: 100,
end: 0,
new: -25,
expected: 0,
},
},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
ID1 := storj.NodeID{1, 1}
cache := pieces.NewBlobsUsageCacheTest(nil,
tt.piecesTotal.end,
tt.piecesContentSize.end,
tt.trash.end,
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: tt.piecesTotal.end, ContentSize: tt.piecesContentSize.end}},
)
cache.Recalculate(
tt.piecesTotal.new,
tt.piecesTotal.start,
tt.piecesContentSize.new,
tt.piecesContentSize.start,
tt.trash.new,
tt.trash.start,
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: tt.piecesTotal.new, ContentSize: tt.piecesContentSize.new}},
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: tt.piecesTotal.start, ContentSize: tt.piecesContentSize.start}},
)
// Test: confirm correct cache values
piecesTotal, piecesContentSize, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
assert.Equal(t, tt.piecesTotal.expected, piecesTotal)
assert.Equal(t, tt.piecesContentSize.expected, piecesContentSize)
sat1PiecesTotal, sat1PiecesContentSize, err := cache.SpaceUsedBySatellite(ctx, ID1)
require.NoError(t, err)
assert.Equal(t, tt.piecesTotal.expected, sat1PiecesTotal)
assert.Equal(t, tt.piecesContentSize.expected, sat1PiecesContentSize)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
assert.Equal(t, tt.trash.expected, trashTotal)
})
}
}
func TestRecalculateCacheMissed(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
ID1 := storj.NodeID{1}
ID2 := storj.NodeID{2}
// setup: once we are done recalculating the pieces on disk,
// there are items in the cache that are not in the
// new recalculated values
cache := pieces.NewBlobsUsageCacheTest(nil,
150,
200,
100,
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: 100, ContentSize: 50}, ID2: {Total: 100, ContentSize: 50}},
)
cache.Recalculate(
100,
0,
50,
25,
200,
0,
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: 100, ContentSize: 50}},
map[storj.NodeID]pieces.SatelliteUsage{ID1: {Total: 0, ContentSize: 0}},
)
// Test: confirm correct cache values
piecesTotal, piecesContentSize, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
assert.Equal(t, int64(175), piecesTotal)
assert.Equal(t, int64(137), piecesContentSize)
piecesTotal, piecesContentSize, err = cache.SpaceUsedBySatellite(ctx, ID2)
require.NoError(t, err)
assert.Equal(t, int64(50), piecesTotal)
assert.Equal(t, int64(25), piecesContentSize)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err)
assert.Equal(t, int64(250), trashTotal)
}
func TestCacheCreateDeleteAndTrash(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
cache := pieces.NewBlobsUsageCache(zap.L(), db.Pieces())
pieceContent := []byte("stuff")
satelliteID := testrand.NodeID()
refs := []storage.BlobRef{
{
Namespace: satelliteID.Bytes(),
Key: testrand.Bytes(32),
},
{
Namespace: satelliteID.Bytes(),
Key: testrand.Bytes(32),
},
}
for _, ref := range refs {
blob, err := cache.Create(ctx, ref, int64(4096))
require.NoError(t, err)
blobWriter, err := pieces.NewWriter(blob, cache, satelliteID)
require.NoError(t, err)
_, err = blobWriter.Write(pieceContent)
require.NoError(t, err)
header := pb.PieceHeader{}
err = blobWriter.Commit(ctx, &header)
require.NoError(t, err)
}
assertValues := func(msg string, satID storj.NodeID, expPiecesTotal, expPiecesContentSize, expTrash int) {
piecesTotal, piecesContentSize, err := cache.SpaceUsedForPieces(ctx)
require.NoError(t, err, msg)
assert.Equal(t, expPiecesTotal, int(piecesTotal), msg)
assert.Equal(t, expPiecesContentSize, int(piecesContentSize), msg)
piecesTotal, piecesContentSize, err = cache.SpaceUsedBySatellite(ctx, satelliteID)
require.NoError(t, err, msg)
assert.Equal(t, expPiecesTotal, int(piecesTotal), msg)
assert.Equal(t, expPiecesContentSize, int(piecesContentSize), msg)
trashTotal, err := cache.SpaceUsedForTrash(ctx)
require.NoError(t, err, msg)
assert.Equal(t, expTrash, int(trashTotal), msg)
}
expPieceSize := len(pieceContent) + pieces.V1PieceHeaderReservedArea
assertValues("first write", satelliteID, expPieceSize*len(refs), len(pieceContent)*len(refs), 0)
// Trash one piece
blobInfo, err := cache.Stat(ctx, refs[0])
require.NoError(t, err)
fileInfo, err := blobInfo.Stat(ctx)
require.NoError(t, err)
ref0Size := fileInfo.Size()
err = cache.Trash(ctx, refs[0])
require.NoError(t, err)
assertValues("trashed refs[0]", satelliteID, expPieceSize, len(pieceContent), int(ref0Size))
// Restore one piece
_, err = cache.RestoreTrash(ctx, satelliteID.Bytes())
require.NoError(t, err)
assertValues("restore trash for satellite", satelliteID, expPieceSize*len(refs), len(pieceContent)*len(refs), 0)
// Trash piece again
err = cache.Trash(ctx, refs[0])
require.NoError(t, err)
assertValues("trashed again", satelliteID, expPieceSize, len(pieceContent), int(ref0Size))
// Empty trash
_, _, err = cache.EmptyTrash(ctx, satelliteID.Bytes(), time.Now().Add(24*time.Hour))
require.NoError(t, err)
assertValues("emptied trash", satelliteID, expPieceSize, len(pieceContent), 0)
// Delete that piece and confirm the cache is updated
err = cache.Delete(ctx, refs[1])
require.NoError(t, err)
assertValues("delete item", satelliteID, 0, 0, 0)
})
}
func TestCacheCreateMultipleSatellites(t *testing.T) {
t.Skip("flaky: V3-2416")
testplanet.Run(t, testplanet.Config{
SatelliteCount: 2, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite1 := planet.Satellites[0]
satellite2 := planet.Satellites[1]
uplink := planet.Uplinks[0]
// Setup: create data for the uplink to upload
expectedData := testrand.Bytes(5 * memory.KiB)
err := uplink.Upload(ctx, satellite1, "testbucket", "test/path", expectedData)
require.NoError(t, err)
err = uplink.Upload(ctx, satellite2, "testbucket", "test/path", expectedData)
require.NoError(t, err)
var total, total1, total2 int64
for _, sn := range planet.StorageNodes {
piecesTotal, _, err := sn.Storage2.BlobsCache.SpaceUsedForPieces(ctx)
require.NoError(t, err)
total += piecesTotal
totalBySA1, _, err := sn.Storage2.BlobsCache.SpaceUsedBySatellite(ctx, satellite1.Identity.ID)
require.NoError(t, err)
total1 += totalBySA1
totalBySA2, _, err := sn.Storage2.BlobsCache.SpaceUsedBySatellite(ctx, satellite2.Identity.ID)
require.NoError(t, err)
total2 += totalBySA2
}
require.Equal(t, int64(47104), total)
require.Equal(t, int64(23552), total1)
require.Equal(t, int64(23552), total2)
})
}
func TestConcurrency(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
node := planet.StorageNodes[0]
satellite := planet.Satellites[0]
var group errgroup.Group
group.Go(func() error {
node.Storage2.BlobsCache.Update(ctx, satellite.ID(), 2000, 1000, 0)
return nil
})
err := node.Storage2.CacheService.PersistCacheTotals(ctx)
require.NoError(t, err)
require.NoError(t, group.Wait())
})
}