82fede2132
Currently we have a significant number of tallies that need to be deleted together. Add a limit (by default 10k) to how many will be deleted at the same time. Change-Id: If530383f19b4d3bb83ed5fe956610a2e52f130a1
388 lines
15 KiB
Go
388 lines
15 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package rollup_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/storj"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/accounting/rollup"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/overlay"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func TestRollupNoDeletes(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
// In testplanet the setting config.Rollup.DeleteTallies defaults to false.
|
|
// That means if we do not delete any old tally data, then we expect that we
|
|
// can tally/rollup data from anytime in the past.
|
|
// To confirm, this test creates 5 days of tally and rollup data, then we check that all
|
|
// the data is present in the accounting rollup table and in the storage node storage tally table.
|
|
const (
|
|
days = 5
|
|
atRestAmount = 10
|
|
getAmount = 20
|
|
putAmount = 30
|
|
getAuditAmount = 40
|
|
getRepairAmount = 50
|
|
putRepairAmount = 60
|
|
)
|
|
|
|
var (
|
|
ordersDB = db.Orders()
|
|
snAccountingDB = db.StoragenodeAccounting()
|
|
storageNodes = createNodes(ctx, t, db)
|
|
)
|
|
|
|
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, rollup.Config{Interval: 120 * time.Second}, time.Hour)
|
|
|
|
// disqualifying nodes is unrelated to this test, but it is added here
|
|
// to confirm the disqualification shows up in the accounting CSVRow
|
|
dqedNodes, err := dqNodes(ctx, db.OverlayCache(), storageNodes)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, dqedNodes)
|
|
|
|
// Set initialTime back by the number of days we want to save
|
|
initialTime := time.Now().UTC().AddDate(0, 0, -days)
|
|
currentTime := initialTime
|
|
|
|
nodeData := map[storj.NodeID]float64{}
|
|
bwTotals := make(map[storj.NodeID][]int64)
|
|
for _, storageNodeID := range storageNodes {
|
|
nodeData[storageNodeID] = float64(atRestAmount)
|
|
bwTotals[storageNodeID] = []int64{putAmount, getAmount, getAuditAmount, getRepairAmount, putRepairAmount}
|
|
}
|
|
|
|
// Create 5 days worth of tally and rollup data.
|
|
// Add one additional day of data since the rollup service will truncate data from the most recent day.
|
|
for i := 0; i < days+1; i++ {
|
|
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData))
|
|
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotals, currentTime))
|
|
|
|
require.NoError(t, rollupService.Rollup(ctx))
|
|
|
|
currentTime = currentTime.Add(24 * time.Hour)
|
|
}
|
|
|
|
accountingCSVRows, err := snAccountingDB.QueryPaymentInfo(ctx, initialTime.Add(-24*time.Hour), currentTime.Add(24*time.Hour))
|
|
require.NoError(t, err)
|
|
assert.Equal(t, len(storageNodes), len(accountingCSVRows))
|
|
|
|
// Confirm all the data saved over the 5 days is all summed in the accounting rollup table.
|
|
for _, row := range accountingCSVRows {
|
|
assert.Equal(t, int64(days*putAmount), row.PutTotal)
|
|
assert.Equal(t, int64(days*getAmount), row.GetTotal)
|
|
assert.Equal(t, int64(days*getAuditAmount), row.GetAuditTotal)
|
|
assert.Equal(t, int64(days*getRepairAmount), row.GetRepairTotal)
|
|
assert.Equal(t, float64(days*atRestAmount), row.AtRestTotal)
|
|
assert.NotEmpty(t, row.Wallet)
|
|
if dqedNodes[row.NodeID] {
|
|
assert.NotNil(t, row.Disqualified)
|
|
} else {
|
|
assert.Nil(t, row.Disqualified)
|
|
}
|
|
}
|
|
// Confirm there is a storage tally row for each time tally ran for each storage node.
|
|
// We ran tally for one additional day, so expect 6 days of tallies.
|
|
storagenodeTallies, err := snAccountingDB.GetTallies(ctx)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, (days+1)*len(storageNodes), len(storagenodeTallies))
|
|
})
|
|
}
|
|
|
|
func createNodes(ctx *testcontext.Context, t *testing.T, db satellite.DB) []storj.NodeID {
|
|
storageNodes := []storj.NodeID{}
|
|
for i := 0; i < 10; i++ {
|
|
id := testrand.NodeID()
|
|
storageNodes = append(storageNodes, id)
|
|
err := db.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
|
NodeID: id,
|
|
Address: &pb.NodeAddress{
|
|
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
|
Address: "127.0.0.1:1234",
|
|
},
|
|
Version: &pb.NodeVersion{
|
|
Version: "1.12.1",
|
|
},
|
|
Operator: &pb.NodeOperator{
|
|
Wallet: "wallet",
|
|
},
|
|
}, time.Now(), overlay.NodeSelectionConfig{})
|
|
require.NoError(t, err)
|
|
}
|
|
return storageNodes
|
|
}
|
|
|
|
func TestRollupDeletes(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
// In this test config.Rollup.DeleteTallies is set to true.
|
|
// This means old tally data will be deleted when Rollup runs.
|
|
// To confirm, this test creates 5 days of tally and rollup data, then we check
|
|
// that the correct data is in the accounting rollup table and the storagenode storage tally table.
|
|
const (
|
|
days = 5
|
|
atRestAmount = 10
|
|
getAmount = 20
|
|
putAmount = 30
|
|
getAuditAmount = 40
|
|
getRepairAmount = 50
|
|
putRepairAmount = 60
|
|
)
|
|
|
|
var (
|
|
ordersDB = db.Orders()
|
|
snAccountingDB = db.StoragenodeAccounting()
|
|
storageNodes = createNodes(ctx, t, db)
|
|
)
|
|
|
|
rollupService := rollup.New(testplanet.NewLogger(t), snAccountingDB, rollup.Config{Interval: 120 * time.Second, DeleteTallies: true}, time.Hour)
|
|
|
|
// disqualifying nodes is unrelated to this test, but it is added here
|
|
// to confirm the disqualification shows up in the accounting CSVRow
|
|
dqedNodes, err := dqNodes(ctx, db.OverlayCache(), storageNodes)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, dqedNodes)
|
|
|
|
// Set timestamp back by the number of days we want to save
|
|
now := time.Now().UTC()
|
|
initialTime := now.AddDate(0, 0, -days)
|
|
|
|
// TODO: this only runs the test for times that are farther back than
|
|
// an hour from midnight UTC. something is wrong for the hour of
|
|
// 11pm-midnight UTC.
|
|
hour, _, _ := now.Clock()
|
|
if hour == 23 {
|
|
initialTime = initialTime.Add(-time.Hour)
|
|
}
|
|
|
|
currentTime := initialTime
|
|
|
|
nodeData := map[storj.NodeID]float64{}
|
|
bwTotals := make(map[storj.NodeID][]int64)
|
|
for _, storageNodeID := range storageNodes {
|
|
nodeData[storageNodeID] = float64(atRestAmount)
|
|
bwTotals[storageNodeID] = []int64{putAmount, getAmount, getAuditAmount, getRepairAmount, putRepairAmount}
|
|
}
|
|
|
|
// Create 5 days worth of tally and rollup data.
|
|
// Add one additional day of data since the rollup service will truncate data from the most recent day.
|
|
for i := 0; i < days+1; i++ {
|
|
require.NoError(t, snAccountingDB.SaveTallies(ctx, currentTime, nodeData))
|
|
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotals, currentTime))
|
|
|
|
// Since the config.Rollup.DeleteTallies is set to true, at the end of the Rollup(),
|
|
// storagenode storage tallies that exist before the last rollup should be deleted.
|
|
require.NoError(t, rollupService.Rollup(ctx))
|
|
|
|
currentTime = currentTime.Add(24 * time.Hour)
|
|
}
|
|
|
|
accountingCSVRows, err := snAccountingDB.QueryPaymentInfo(ctx, initialTime.Add(-24*time.Hour), currentTime.Add(24*time.Hour))
|
|
require.NoError(t, err)
|
|
assert.Equal(t, len(storageNodes), len(accountingCSVRows))
|
|
|
|
// Confirm all the data saved over the 5 days is all summed in the accounting rollup table.
|
|
for _, row := range accountingCSVRows {
|
|
assert.Equal(t, int64(days*putAmount), row.PutTotal)
|
|
assert.Equal(t, int64(days*getAmount), row.GetTotal)
|
|
assert.Equal(t, int64(days*getAuditAmount), row.GetAuditTotal)
|
|
assert.Equal(t, int64(days*getRepairAmount), row.GetRepairTotal)
|
|
assert.Equal(t, float64(days*atRestAmount), row.AtRestTotal)
|
|
assert.NotEmpty(t, row.Wallet)
|
|
if dqedNodes[row.NodeID] {
|
|
assert.NotNil(t, row.Disqualified)
|
|
} else {
|
|
assert.Nil(t, row.Disqualified)
|
|
}
|
|
}
|
|
// Confirm there are only storage tally rows for the last time tally ran for each storage node.
|
|
storagenodeTallies, err := snAccountingDB.GetTallies(ctx)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, len(storageNodes), len(storagenodeTallies))
|
|
})
|
|
}
|
|
|
|
func TestRollupOldOrders(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
|
|
},
|
|
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
// The purpose of this test is to ensure that running Rollup properly updates storagenode accounting data
|
|
// for a period of time which has already been accounted for in a previous call to Rollup.
|
|
// This is because orders can be added to the bandwidth settlement table in the past, so a previous rollup can become inaccurate.
|
|
// Test overview:
|
|
// We have 2 nodes (A, B).
|
|
// We start at t=now is the initial time, which is right at the beginning of a day.
|
|
// Phase 1:
|
|
// On node A, settle bandwidth {X} at t+2hr.
|
|
// Also settle bandwidth at t+26hr. This is necessary because rollup will truncate data from the most recent day, and we don't want to
|
|
// truncate the data from the day starting at t.
|
|
// Run rollup, expect data in storagenode accounting DB to match {X} for sn A, and have nothing for sn B.
|
|
// Phase 2:
|
|
// On nodes A and B, settle bandwidth {Y} at t+1hr.
|
|
// Run rollup, expect data in storagenode accounting DB to match {X}+{Y} for sn A, and to match {Y} for sn B.
|
|
|
|
var (
|
|
satellitePeer = planet.Satellites[0]
|
|
ordersDB = satellitePeer.DB.Orders()
|
|
snAccountingDB = satellitePeer.DB.StoragenodeAccounting()
|
|
)
|
|
// Run rollup once to start so we add the correct accounting timestamps to the db
|
|
satellitePeer.Accounting.Rollup.Loop.TriggerWait()
|
|
satellitePeer.Accounting.Rollup.Loop.Pause()
|
|
satellitePeer.Accounting.Tally.Loop.Pause()
|
|
|
|
nodeA := planet.StorageNodes[0]
|
|
nodeB := planet.StorageNodes[1]
|
|
// initialTime must start at the beginning of a day so that we can be sure
|
|
// that bandwidth data for both phases of the test is settled on the _same_ day.
|
|
// Subtract 48 hours so that when rollup discards the latest day, the data we care about is not ignored.
|
|
initialTime := time.Now().Truncate(24 * time.Hour)
|
|
|
|
const (
|
|
PutActionAmount1 = 100
|
|
GetActionAmount1 = 200
|
|
GetAuditActionAmount1 = 300
|
|
GetRepairActionAmount1 = 400
|
|
PutRepairActionAmount1 = 500
|
|
AtRestAmount1 = 600
|
|
|
|
PutActionAmount2 = 150
|
|
GetActionAmount2 = 250
|
|
GetAuditActionAmount2 = 350
|
|
GetRepairActionAmount2 = 450
|
|
PutRepairActionAmount2 = 550
|
|
AtRestAmount2 = 650
|
|
)
|
|
|
|
// Phase 1
|
|
storageTotalsPhase1 := make(map[storj.NodeID]float64)
|
|
storageTotalsPhase1[nodeA.ID()] = float64(AtRestAmount1)
|
|
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(2*time.Hour), storageTotalsPhase1))
|
|
// save tallies for the next day too, so that the period we are testing is not truncated by the rollup service.
|
|
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(26*time.Hour), storageTotalsPhase1))
|
|
|
|
bwTotalsPhase1 := make(map[storj.NodeID][]int64)
|
|
bwTotalsPhase1[nodeA.ID()] = []int64{PutActionAmount1, GetActionAmount1, GetAuditActionAmount1, GetRepairActionAmount1, PutRepairActionAmount1}
|
|
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotalsPhase1, initialTime.Add(2*time.Hour)))
|
|
// save bandwidth for the next day too, so that the period we are testing is not truncated by the rollup service.
|
|
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotalsPhase1, initialTime.Add(26*time.Hour)))
|
|
|
|
require.NoError(t, satellitePeer.Accounting.Rollup.Rollup(ctx))
|
|
|
|
accountingCSVRows, err := snAccountingDB.QueryPaymentInfo(ctx, initialTime.Add(-24*time.Hour), initialTime.Add(24*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// there should only be data for node A
|
|
require.Len(t, accountingCSVRows, 1)
|
|
accountingCSVRow := accountingCSVRows[0]
|
|
require.Equal(t, nodeA.ID(), accountingCSVRow.NodeID)
|
|
|
|
// verify data is correct
|
|
require.EqualValues(t, PutActionAmount1, accountingCSVRow.PutTotal)
|
|
require.EqualValues(t, GetActionAmount1, accountingCSVRow.GetTotal)
|
|
require.EqualValues(t, GetAuditActionAmount1, accountingCSVRow.GetAuditTotal)
|
|
require.EqualValues(t, GetRepairActionAmount1, accountingCSVRow.GetRepairTotal)
|
|
require.EqualValues(t, PutRepairActionAmount1, accountingCSVRow.PutRepairTotal)
|
|
require.EqualValues(t, AtRestAmount1, accountingCSVRow.AtRestTotal)
|
|
|
|
// Phase 2
|
|
storageTotalsPhase2 := make(map[storj.NodeID]float64)
|
|
storageTotalsPhase2[nodeA.ID()] = float64(AtRestAmount2)
|
|
storageTotalsPhase2[nodeB.ID()] = float64(AtRestAmount2)
|
|
require.NoError(t, snAccountingDB.SaveTallies(ctx, initialTime.Add(-2*time.Hour), storageTotalsPhase2))
|
|
|
|
bwTotalsPhase2 := make(map[storj.NodeID][]int64)
|
|
bwTotalsPhase2[nodeA.ID()] = []int64{PutActionAmount2, GetActionAmount2, GetAuditActionAmount2, GetRepairActionAmount2, PutRepairActionAmount2}
|
|
bwTotalsPhase2[nodeB.ID()] = []int64{PutActionAmount2, GetActionAmount2, GetAuditActionAmount2, GetRepairActionAmount2, PutRepairActionAmount2}
|
|
require.NoError(t, saveBWPhase3(ctx, ordersDB, bwTotalsPhase2, initialTime.Add(time.Hour)))
|
|
|
|
require.NoError(t, satellitePeer.Accounting.Rollup.Rollup(ctx))
|
|
|
|
accountingCSVRows, err = snAccountingDB.QueryPaymentInfo(ctx, initialTime.Add(-24*time.Hour), initialTime.Add(24*time.Hour))
|
|
require.NoError(t, err)
|
|
|
|
// there should be data for both nodes
|
|
require.Len(t, accountingCSVRows, 2)
|
|
rA := accountingCSVRows[0]
|
|
rB := accountingCSVRows[1]
|
|
if rA.NodeID != nodeA.ID() {
|
|
rA = accountingCSVRows[1]
|
|
rB = accountingCSVRows[0]
|
|
}
|
|
require.Equal(t, nodeA.ID(), rA.NodeID)
|
|
require.Equal(t, nodeB.ID(), rB.NodeID)
|
|
|
|
// verify data is correct
|
|
require.EqualValues(t, PutActionAmount1+PutActionAmount2, rA.PutTotal)
|
|
require.EqualValues(t, GetActionAmount1+GetActionAmount2, rA.GetTotal)
|
|
require.EqualValues(t, GetAuditActionAmount1+GetAuditActionAmount2, rA.GetAuditTotal)
|
|
require.EqualValues(t, GetRepairActionAmount1+GetRepairActionAmount2, rA.GetRepairTotal)
|
|
require.EqualValues(t, PutRepairActionAmount1+PutRepairActionAmount2, rA.PutRepairTotal)
|
|
require.EqualValues(t, AtRestAmount1+AtRestAmount2, rA.AtRestTotal)
|
|
|
|
require.EqualValues(t, PutActionAmount2, rB.PutTotal)
|
|
require.EqualValues(t, GetActionAmount2, rB.GetTotal)
|
|
require.EqualValues(t, GetAuditActionAmount2, rB.GetAuditTotal)
|
|
require.EqualValues(t, GetRepairActionAmount2, rB.GetRepairTotal)
|
|
require.EqualValues(t, PutRepairActionAmount2, rB.PutRepairTotal)
|
|
require.EqualValues(t, AtRestAmount2, rB.AtRestTotal)
|
|
})
|
|
}
|
|
|
|
func saveBWPhase3(ctx context.Context, ordersDB orders.DB, bwTotals map[storj.NodeID][]int64, intervalStart time.Time) error {
|
|
pieceActions := []pb.PieceAction{pb.PieceAction_PUT,
|
|
pb.PieceAction_GET,
|
|
pb.PieceAction_GET_AUDIT,
|
|
pb.PieceAction_GET_REPAIR,
|
|
pb.PieceAction_PUT_REPAIR,
|
|
}
|
|
for nodeID, actions := range bwTotals {
|
|
var actionAmounts = map[int32]int64{}
|
|
for actionType, amount := range actions {
|
|
actionAmounts[int32(pieceActions[actionType])] = amount
|
|
}
|
|
|
|
_, _, err := ordersDB.UpdateStoragenodeBandwidthSettleWithWindow(ctx,
|
|
nodeID,
|
|
actionAmounts,
|
|
intervalStart.Truncate(1*time.Hour),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// dqNodes disqualifies half the nodes in the testplanet and returns a map of dqed nodes.
|
|
func dqNodes(ctx *testcontext.Context, overlayDB overlay.DB, storageNodes []storj.NodeID) (map[storj.NodeID]bool, error) {
|
|
dqed := make(map[storj.NodeID]bool)
|
|
|
|
for i, n := range storageNodes {
|
|
if i%2 == 0 {
|
|
continue
|
|
}
|
|
|
|
err := overlayDB.DisqualifyNode(ctx, n, time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dqed[n] = true
|
|
}
|
|
|
|
return dqed, nil
|
|
}
|