3b6e1123b8
Sorting by primary key before inserting data into DB is fixed. Earlier we were sorting input slice of BucketBandwidthRollup but then we were putting all entries into map to rollup input data. Iteration over map with a range loop doesn't guarantee any specific order so we were loosing sorted order when we were creating with this map slices to use with DB insert. New code is also using map but when map is full its sorting map keys separately and iterates over them to get data from map. https://github.com/storj/storj/issues/5332 Change-Id: I5bf09489b0eecb6858bf854ab387b660124bf53f
215 lines
7.2 KiB
Go
215 lines
7.2 KiB
Go
// Copyright (C) 2019 Storj Labs, Inc.
|
|
// See LICENSE for copying information.
|
|
|
|
package orders_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap/zaptest"
|
|
|
|
"storj.io/common/memory"
|
|
"storj.io/common/pb"
|
|
"storj.io/common/testcontext"
|
|
"storj.io/common/testrand"
|
|
"storj.io/common/uuid"
|
|
"storj.io/storj/private/testplanet"
|
|
"storj.io/storj/satellite"
|
|
"storj.io/storj/satellite/accounting"
|
|
"storj.io/storj/satellite/orders"
|
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
)
|
|
|
|
func getSettledBandwidth(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
|
|
total, err := accountingDB.GetProjectSettledBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return total, nil
|
|
}
|
|
|
|
// TestRollupsWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
|
|
// db until the batch size is reached.
|
|
func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
|
|
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
|
useBatchSize := 10
|
|
amount := (memory.MB * 500).Int64()
|
|
projectID := testrand.UUID()
|
|
startTime := time.Now()
|
|
|
|
rwc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
|
|
|
|
accountingDB := db.ProjectAccounting()
|
|
|
|
expectedTotal := int64(0)
|
|
// use different bucketName for each write, so they don't get aggregated yet
|
|
for i := 0; i < useBatchSize-1; i++ {
|
|
bucketName := fmt.Sprintf("my_files_%d", i)
|
|
err := rwc.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
|
|
require.NoError(t, err)
|
|
|
|
// check that nothing was actually written since it should just be stored
|
|
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
|
require.NoError(t, err)
|
|
require.Zero(t, total)
|
|
|
|
expectedTotal += amount
|
|
}
|
|
|
|
whenDone := rwc.OnNextFlush()
|
|
// write one more rollup record to hit the threshold
|
|
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
|
|
require.NoError(t, err)
|
|
|
|
// make sure flushing is done
|
|
select {
|
|
case <-whenDone:
|
|
break
|
|
case <-ctx.Done():
|
|
t.Fatal(ctx.Err())
|
|
}
|
|
|
|
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTotal, total)
|
|
})
|
|
}
|
|
|
|
// TestRollupsWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
|
|
// db until the chore flushes the DB (assuming the batch size is not reached).
|
|
func TestRollupsWriteCacheBatchChore(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
},
|
|
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
useBatchSize := 10
|
|
amount := (memory.MB * 500).Int64()
|
|
projectID := testrand.UUID()
|
|
startTime := time.Now()
|
|
|
|
planet.Satellites[0].Orders.Chore.Loop.Pause()
|
|
|
|
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
|
|
ordersDB := planet.Satellites[0].Orders.DB
|
|
|
|
expectedTotal := int64(0)
|
|
for i := 0; i < useBatchSize-1; i++ {
|
|
bucketName := fmt.Sprintf("my_files_%d", i)
|
|
err := ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
|
|
require.NoError(t, err)
|
|
|
|
// check that nothing was actually written
|
|
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
|
require.NoError(t, err)
|
|
require.Zero(t, total)
|
|
|
|
expectedTotal += amount
|
|
}
|
|
|
|
rwc := ordersDB.(*orders.RollupsWriteCache)
|
|
whenDone := rwc.OnNextFlush()
|
|
// wait for Loop to complete
|
|
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
|
|
|
|
// make sure flushing is done
|
|
select {
|
|
case <-whenDone:
|
|
break
|
|
case <-ctx.Done():
|
|
t.Fatal(ctx.Err())
|
|
}
|
|
|
|
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedTotal, total)
|
|
},
|
|
)
|
|
}
|
|
|
|
func TestUpdateBucketBandwidth(t *testing.T) {
|
|
testplanet.Run(t, testplanet.Config{
|
|
SatelliteCount: 1,
|
|
},
|
|
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
// don't let the loop flush our cache while we're checking it
|
|
planet.Satellites[0].Orders.Chore.Loop.Pause()
|
|
ordersDB := planet.Satellites[0].Orders.DB
|
|
|
|
// setup: check there is nothing in the cache to start
|
|
cache, ok := ordersDB.(*orders.RollupsWriteCache)
|
|
require.True(t, ok)
|
|
size := cache.CurrentSize()
|
|
require.Equal(t, 0, size)
|
|
|
|
// setup: add an allocated and settled item to the cache
|
|
projectID := testrand.UUID()
|
|
bucketName := []byte("testbucketname")
|
|
amount := (memory.MB * 500).Int64()
|
|
intervalStart := time.Now()
|
|
err := ordersDB.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
|
|
require.NoError(t, err)
|
|
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, intervalStart)
|
|
require.NoError(t, err)
|
|
|
|
// test: confirm there is one item in the cache now
|
|
size = cache.CurrentSize()
|
|
require.Equal(t, 2, size)
|
|
projectMap := cache.CurrentData()
|
|
expectedKeyAllocated := orders.CacheKey{
|
|
ProjectID: projectID,
|
|
BucketName: string(bucketName),
|
|
Action: pb.PieceAction_GET,
|
|
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
|
|
}
|
|
expectedKeySettled := orders.CacheKey{
|
|
ProjectID: projectID,
|
|
BucketName: string(bucketName),
|
|
Action: pb.PieceAction_PUT,
|
|
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
|
|
}
|
|
expectedCacheDataAllocated := orders.CacheData{
|
|
Inline: amount,
|
|
Allocated: 0,
|
|
Settled: 0,
|
|
}
|
|
expectedCacheDataSettled := orders.CacheData{
|
|
Inline: 0,
|
|
Allocated: 0,
|
|
Settled: amount,
|
|
}
|
|
require.Equal(t, expectedCacheDataAllocated, projectMap[expectedKeyAllocated])
|
|
require.Equal(t, expectedCacheDataSettled, projectMap[expectedKeySettled])
|
|
|
|
// setup: add another item to the cache but with a different projectID
|
|
projectID2 := testrand.UUID()
|
|
amount2 := (memory.MB * 10).Int64()
|
|
err = ordersDB.UpdateBucketBandwidthInline(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
|
|
require.NoError(t, err)
|
|
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, intervalStart)
|
|
require.NoError(t, err)
|
|
size = cache.CurrentSize()
|
|
require.Equal(t, 3, size)
|
|
projectMap2 := cache.CurrentData()
|
|
|
|
// test: confirm there are 3 items in the cache now with different projectIDs
|
|
expectedKey := orders.CacheKey{
|
|
ProjectID: projectID2,
|
|
BucketName: string(bucketName),
|
|
Action: pb.PieceAction_GET,
|
|
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
|
|
}
|
|
expectedData := orders.CacheData{
|
|
Inline: amount2,
|
|
Allocated: 0,
|
|
Settled: amount2,
|
|
}
|
|
require.Equal(t, projectMap2[expectedKey], expectedData)
|
|
require.Equal(t, len(projectMap2), 3)
|
|
},
|
|
)
|
|
}
|