Delete accounting raws after rollup (#1646)
* enable deletion of raws * assert deletion of raws during TestRollupRaws * add some unit tests for DeleteRawBefore
This commit is contained in:
parent
e4a70e3fac
commit
34a439a99c
@ -115,11 +115,6 @@ func (r *Rollup) RollupRaws(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
//removed the rolled-up raws
|
|
||||||
var rolledUpRawsHaveBeenSaved bool
|
|
||||||
//todo: write files to disk or whatever we decide to do here
|
|
||||||
if rolledUpRawsHaveBeenSaved {
|
|
||||||
return Error.Wrap(r.db.DeleteRawBefore(ctx, latestTally))
|
return Error.Wrap(r.db.DeleteRawBefore(ctx, latestTally))
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -8,35 +8,51 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"storj.io/storj/internal/testcontext"
|
"storj.io/storj/internal/testcontext"
|
||||||
"storj.io/storj/internal/testplanet"
|
"storj.io/storj/internal/testplanet"
|
||||||
|
"storj.io/storj/pkg/accounting"
|
||||||
"storj.io/storj/pkg/storj"
|
"storj.io/storj/pkg/storj"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type testData struct {
|
||||||
|
nodeData map[storj.NodeID]float64
|
||||||
|
bwTotals map[storj.NodeID][]int64
|
||||||
|
}
|
||||||
|
|
||||||
func TestRollupRaws(t *testing.T) {
|
func TestRollupRaws(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 0,
|
SatelliteCount: 1, StorageNodeCount: 10, UplinkCount: 0,
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
days := 5
|
days := 5
|
||||||
atRest := float64(5000)
|
testData := createData(planet, days)
|
||||||
bw := []int64{1000, 2000, 3000, 4000}
|
|
||||||
|
|
||||||
nodeData, bwTotals := createData(planet, atRest, bw)
|
|
||||||
|
|
||||||
// Set timestamp back by the number of days we want to save
|
// Set timestamp back by the number of days we want to save
|
||||||
timestamp := time.Now().UTC().AddDate(0, 0, -1*days)
|
timestamp := time.Now().UTC().AddDate(0, 0, -1*days)
|
||||||
start := timestamp
|
start := timestamp
|
||||||
|
|
||||||
for i := 0; i <= days; i++ {
|
for i := 0; i < days; i++ {
|
||||||
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, nodeData)
|
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, testData[i].nodeData)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = planet.Satellites[0].DB.Accounting().SaveBWRaw(ctx, timestamp, timestamp, bwTotals)
|
err = planet.Satellites[0].DB.Accounting().SaveBWRaw(ctx, timestamp, timestamp, testData[i].bwTotals)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = planet.Satellites[0].Accounting.Rollup.RollupRaws(ctx)
|
err = planet.Satellites[0].Accounting.Rollup.RollupRaws(ctx)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Assert that RollupRaws deleted all raws except for today's
|
||||||
|
raw, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, r := range raw {
|
||||||
|
assert.Equal(t, r.IntervalEndTime.UTC().Truncate(time.Second), timestamp.Truncate(time.Second))
|
||||||
|
if r.DataType == accounting.AtRest {
|
||||||
|
assert.Equal(t, testData[i].nodeData[r.NodeID], r.DataTotal)
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, testData[i].bwTotals[r.NodeID][r.DataType], int64(r.DataTotal))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Advance time by 24 hours
|
// Advance time by 24 hours
|
||||||
timestamp = timestamp.Add(time.Hour * 24)
|
timestamp = timestamp.Add(time.Hour * 24)
|
||||||
@ -47,36 +63,53 @@ func TestRollupRaws(t *testing.T) {
|
|||||||
end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location())
|
end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location())
|
||||||
|
|
||||||
rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end)
|
rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if i == 0 { // we need at least two days for rollup to work
|
if i == 0 { // we need at least two days for rollup to work
|
||||||
assert.Equal(t, 0, len(rows))
|
assert.Equal(t, 0, len(rows))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// the number of rows should be number of nodes
|
// the number of rows should be number of nodes
|
||||||
|
|
||||||
assert.Equal(t, len(planet.StorageNodes), len(rows))
|
assert.Equal(t, len(planet.StorageNodes), len(rows))
|
||||||
|
|
||||||
// verify data is correct
|
// verify data is correct
|
||||||
for _, r := range rows {
|
for _, r := range rows {
|
||||||
i := int64(i)
|
totals := expectedTotals(testData, r.NodeID, i)
|
||||||
assert.Equal(t, i*bw[0], r.PutTotal)
|
assert.Equal(t, int64(totals[0]), r.PutTotal)
|
||||||
assert.Equal(t, i*bw[1], r.GetTotal)
|
assert.Equal(t, int64(totals[1]), r.GetTotal)
|
||||||
assert.Equal(t, i*bw[2], r.GetAuditTotal)
|
assert.Equal(t, int64(totals[2]), r.GetAuditTotal)
|
||||||
assert.Equal(t, i*bw[3], r.GetRepairTotal)
|
assert.Equal(t, int64(totals[3]), r.GetRepairTotal)
|
||||||
assert.Equal(t, float64(i)*atRest, r.AtRestTotal)
|
assert.Equal(t, totals[4], r.AtRestTotal)
|
||||||
assert.NotNil(t, nodeData[r.NodeID])
|
|
||||||
assert.NotEmpty(t, r.Wallet)
|
assert.NotEmpty(t, r.Wallet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func createData(planet *testplanet.Planet, atRest float64, bw []int64) (nodeData map[storj.NodeID]float64, bwTotals map[storj.NodeID][]int64) {
|
// expectedTotals sums test data up to, but not including the current day's
|
||||||
nodeData = make(map[storj.NodeID]float64)
|
func expectedTotals(data []testData, id storj.NodeID, currentDay int) []float64 {
|
||||||
bwTotals = make(map[storj.NodeID][]int64)
|
totals := make([]float64, 5)
|
||||||
|
for i := 0; i < currentDay; i++ {
|
||||||
|
totals[0] += float64(data[i].bwTotals[id][0])
|
||||||
|
totals[1] += float64(data[i].bwTotals[id][1])
|
||||||
|
totals[2] += float64(data[i].bwTotals[id][2])
|
||||||
|
totals[3] += float64(data[i].bwTotals[id][3])
|
||||||
|
totals[4] += data[i].nodeData[id]
|
||||||
|
}
|
||||||
|
return totals
|
||||||
|
}
|
||||||
|
|
||||||
|
func createData(planet *testplanet.Planet, days int) []testData {
|
||||||
|
data := make([]testData, days)
|
||||||
|
for i := 0; i < days; i++ {
|
||||||
|
i := int64(i)
|
||||||
|
data[i].nodeData = make(map[storj.NodeID]float64)
|
||||||
|
data[i].bwTotals = make(map[storj.NodeID][]int64)
|
||||||
for _, n := range planet.StorageNodes {
|
for _, n := range planet.StorageNodes {
|
||||||
id := n.Identity.ID
|
id := n.Identity.ID
|
||||||
nodeData[id] = atRest
|
data[i].nodeData[id] = float64(i * 5000)
|
||||||
bwTotals[id] = bw
|
data[i].bwTotals[id] = []int64{i * 1000, i * 2000, i * 3000, i * 4000}
|
||||||
}
|
}
|
||||||
return nodeData, bwTotals
|
}
|
||||||
|
return data
|
||||||
}
|
}
|
||||||
|
@ -13,9 +13,11 @@ import (
|
|||||||
|
|
||||||
"storj.io/storj/internal/testcontext"
|
"storj.io/storj/internal/testcontext"
|
||||||
"storj.io/storj/internal/testplanet"
|
"storj.io/storj/internal/testplanet"
|
||||||
|
"storj.io/storj/internal/teststorj"
|
||||||
"storj.io/storj/pkg/bwagreement/testbwagreement"
|
"storj.io/storj/pkg/bwagreement/testbwagreement"
|
||||||
"storj.io/storj/pkg/pb"
|
"storj.io/storj/pkg/pb"
|
||||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||||
|
"storj.io/storj/pkg/storj"
|
||||||
"storj.io/storj/satellite"
|
"storj.io/storj/satellite"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -54,6 +56,45 @@ func TestQueryWithBw(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteRawBefore(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
createdAt time.Time
|
||||||
|
eraseBefore time.Time
|
||||||
|
expectedRaws int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
createdAt: time.Now(),
|
||||||
|
eraseBefore: time.Now(),
|
||||||
|
expectedRaws: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
createdAt: time.Now(),
|
||||||
|
eraseBefore: time.Now().Add(24 * time.Hour),
|
||||||
|
expectedRaws: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
id := teststorj.NodeIDFromBytes([]byte{})
|
||||||
|
nodeData := make(map[storj.NodeID]float64)
|
||||||
|
nodeData[id] = float64(1000)
|
||||||
|
|
||||||
|
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, tt.createdAt, tt.createdAt, nodeData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = planet.Satellites[0].DB.Accounting().DeleteRawBefore(ctx, tt.eraseBefore)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
raws, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, raws, tt.expectedRaws)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func sendGeneratedAgreements(ctx context.Context, t *testing.T, db satellite.DB, planet *testplanet.Planet) {
|
func sendGeneratedAgreements(ctx context.Context, t *testing.T, db satellite.DB, planet *testplanet.Planet) {
|
||||||
satID := planet.Satellites[0].Identity
|
satID := planet.Satellites[0].Identity
|
||||||
upID := planet.Uplinks[0].Identity
|
upID := planet.Uplinks[0].Identity
|
||||||
|
Loading…
Reference in New Issue
Block a user