storj/satellite/orders/rollups_write_cache_test.go

331 lines
11 KiB
Go
Raw Normal View History

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func getSettledBandwidth(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
total, err := accountingDB.GetProjectSettledBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
if err != nil {
return 0, err
}
return total, nil
}
// TestRollupsWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
// db until the batch size is reached.
func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now()
rwc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
accountingDB := db.ProjectAccounting()
expectedTotal := int64(0)
// use different bucketName for each write, so they don't get aggregated yet
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := rwc.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
require.NoError(t, err)
// check that nothing was actually written since it should just be stored
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Zero(t, total)
expectedTotal += amount
}
whenDone := rwc.OnNextFlush()
// write one more rollup record to hit the threshold
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, expectedTotal, total)
})
}
// TestRollupsWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
// db until the chore flushes the DB (assuming the batch size is not reached).
func TestRollupsWriteCacheBatchChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now()
planet.Satellites[0].Orders.Chore.Loop.Pause()
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
ordersDB := planet.Satellites[0].Orders.DB
expectedTotal := int64(0)
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
require.NoError(t, err)
// check that nothing was actually written
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Zero(t, total)
expectedTotal += amount
}
rwc := ordersDB.(*orders.RollupsWriteCache)
whenDone := rwc.OnNextFlush()
// wait for Loop to complete
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, expectedTotal, total)
},
)
}
func TestUpdateBucketBandwidth(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// don't let the loop flush our cache while we're checking it
planet.Satellites[0].Orders.Chore.Loop.Pause()
ordersDB := planet.Satellites[0].Orders.DB
// setup: check there is nothing in the cache to start
cache, ok := ordersDB.(*orders.RollupsWriteCache)
require.True(t, ok)
size := cache.CurrentSize()
require.Equal(t, 0, size)
// setup: add an allocated and settled item to the cache
projectID := testrand.UUID()
bucketName := []byte("testbucketname")
amount := (memory.MB * 500).Int64()
intervalStart := time.Now()
err := ordersDB.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, intervalStart)
require.NoError(t, err)
// test: confirm there is one item in the cache now
size = cache.CurrentSize()
require.Equal(t, 2, size)
projectMap := cache.CurrentData()
expectedKeyAllocated := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedKeySettled := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_PUT,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedCacheDataAllocated := orders.CacheData{
Inline: amount,
Allocated: 0,
Settled: 0,
}
expectedCacheDataSettled := orders.CacheData{
Inline: 0,
Allocated: 0,
Settled: amount,
}
require.Equal(t, expectedCacheDataAllocated, projectMap[expectedKeyAllocated])
require.Equal(t, expectedCacheDataSettled, projectMap[expectedKeySettled])
// setup: add another item to the cache but with a different projectID
projectID2 := testrand.UUID()
amount2 := (memory.MB * 10).Int64()
err = ordersDB.UpdateBucketBandwidthInline(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, intervalStart)
require.NoError(t, err)
size = cache.CurrentSize()
require.Equal(t, 3, size)
projectMap2 := cache.CurrentData()
// test: confirm there are 3 items in the cache now with different projectIDs
expectedKey := orders.CacheKey{
ProjectID: projectID2,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedData := orders.CacheData{
Inline: amount2,
Allocated: 0,
Settled: amount2,
}
require.Equal(t, projectMap2[expectedKey], expectedData)
require.Equal(t, len(projectMap2), 3)
},
)
}
func TestEndpointAndCacheContextCanceled(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Orders.FlushBatchSize = 3
},
},
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
storagenode := planet.StorageNodes[0]
ordersDB := planet.Satellites[0].Orders.DB
now := time.Now()
// create orders to trigger RollupsWriteCache flush
projectID := testrand.UUID()
requests := []*pb.SettlementRequest{}
singleOrderAmount := int64(50)
for i := 0; i < 3; i++ {
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
bucketname := "testbucket" + strconv.Itoa(i)
bucketLocation := metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketname,
}
serialNumber := testrand.SerialNumber()
key := satellite.Config.Orders.EncryptionKeys.Default
encrypted, err := key.EncryptMetadata(
serialNumber,
&internalpb.OrderLimitMetadata{
CompactProjectBucketPrefix: bucketLocation.CompactPrefix(),
},
)
require.NoError(t, err)
limit := &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satellite.ID(),
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID(),
PieceId: storj.NewPieceID(),
Action: pb.PieceAction_GET,
Limit: 1000,
PieceExpiration: time.Time{},
OrderCreation: now,
OrderExpiration: now.Add(24 * time.Hour),
EncryptedMetadataKeyId: key.ID[:],
EncryptedMetadata: encrypted,
}
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
require.NoError(t, err)
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber,
Amount: singleOrderAmount,
})
require.NoError(t, err)
requests = append(requests, &pb.SettlementRequest{
Limit: orderLimit,
Order: order,
})
}
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
for _, request := range requests {
err := stream.Send(&pb.SettlementRequest{
Limit: request.Limit,
Order: request.Order,
})
require.NoError(t, err)
}
require.NoError(t, err)
resp, err := stream.CloseAndRecv()
require.NoError(t, err)
require.Equal(t, pb.SettlementWithWindowResponse_ACCEPTED, resp.Status)
rwc := ordersDB.(*orders.RollupsWriteCache)
whenDone := rwc.OnNextFlush()
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
// verify that orders were stored in DB
bucketBandwidth, err := getSettledBandwidth(ctx, planet.Satellites[0].DB.ProjectAccounting(), projectID, now)
require.NoError(t, err)
require.Equal(t, singleOrderAmount*int64(len(requests)), bucketBandwidth)
},
)
}