storj/satellite/orders/rollups_write_cache_test.go
Michal Niewrzal 252c437b0e satellite/orders: ignore context canceled when updating bucket bandwidth
Orders from storage nodes are received by SettlementWithWindowFinal method. There is a stream which receives all orders and after getting
all orders we are inserting into DB storagenode and bucket bandwidth. Problem is with bucket bandwidth which is stored through cache which is often using context from SettlementWithWindowFinal stream to perform DB inserts and its doing this in separate goroutine. Because of that is possible that  SettlementWithWindowFinal is finished before flushing was finished and context is canceled while doing insert into DB

Change-Id: I3a72c86390e9aedc060f6b082bb059f1406231ee
2023-02-08 13:21:42 +00:00

331 lines
11 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func getSettledBandwidth(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
total, err := accountingDB.GetProjectSettledBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
if err != nil {
return 0, err
}
return total, nil
}
// TestRollupsWriteCacheBatchLimitReached makes sure bandwidth rollup values are not written to the
// db until the batch size is reached.
func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now()
rwc := orders.NewRollupsWriteCache(zaptest.NewLogger(t), db.Orders(), useBatchSize)
accountingDB := db.ProjectAccounting()
expectedTotal := int64(0)
// use different bucketName for each write, so they don't get aggregated yet
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := rwc.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
require.NoError(t, err)
// check that nothing was actually written since it should just be stored
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Zero(t, total)
expectedTotal += amount
}
whenDone := rwc.OnNextFlush()
// write one more rollup record to hit the threshold
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte("my_files_last"), pb.PieceAction_GET, amount, startTime)
require.NoError(t, err)
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, expectedTotal, total)
})
}
// TestRollupsWriteCacheBatchChore makes sure bandwidth rollup values are not written to the
// db until the chore flushes the DB (assuming the batch size is not reached).
func TestRollupsWriteCacheBatchChore(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
useBatchSize := 10
amount := (memory.MB * 500).Int64()
projectID := testrand.UUID()
startTime := time.Now()
planet.Satellites[0].Orders.Chore.Loop.Pause()
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
ordersDB := planet.Satellites[0].Orders.DB
expectedTotal := int64(0)
for i := 0; i < useBatchSize-1; i++ {
bucketName := fmt.Sprintf("my_files_%d", i)
err := ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
require.NoError(t, err)
// check that nothing was actually written
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Zero(t, total)
expectedTotal += amount
}
rwc := ordersDB.(*orders.RollupsWriteCache)
whenDone := rwc.OnNextFlush()
// wait for Loop to complete
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
require.NoError(t, err)
require.Equal(t, expectedTotal, total)
},
)
}
func TestUpdateBucketBandwidth(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// don't let the loop flush our cache while we're checking it
planet.Satellites[0].Orders.Chore.Loop.Pause()
ordersDB := planet.Satellites[0].Orders.DB
// setup: check there is nothing in the cache to start
cache, ok := ordersDB.(*orders.RollupsWriteCache)
require.True(t, ok)
size := cache.CurrentSize()
require.Equal(t, 0, size)
// setup: add an allocated and settled item to the cache
projectID := testrand.UUID()
bucketName := []byte("testbucketname")
amount := (memory.MB * 500).Int64()
intervalStart := time.Now()
err := ordersDB.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, intervalStart)
require.NoError(t, err)
// test: confirm there is one item in the cache now
size = cache.CurrentSize()
require.Equal(t, 2, size)
projectMap := cache.CurrentData()
expectedKeyAllocated := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedKeySettled := orders.CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: pb.PieceAction_PUT,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedCacheDataAllocated := orders.CacheData{
Inline: amount,
Allocated: 0,
Settled: 0,
}
expectedCacheDataSettled := orders.CacheData{
Inline: 0,
Allocated: 0,
Settled: amount,
}
require.Equal(t, expectedCacheDataAllocated, projectMap[expectedKeyAllocated])
require.Equal(t, expectedCacheDataSettled, projectMap[expectedKeySettled])
// setup: add another item to the cache but with a different projectID
projectID2 := testrand.UUID()
amount2 := (memory.MB * 10).Int64()
err = ordersDB.UpdateBucketBandwidthInline(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
require.NoError(t, err)
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, intervalStart)
require.NoError(t, err)
size = cache.CurrentSize()
require.Equal(t, 3, size)
projectMap2 := cache.CurrentData()
// test: confirm there are 3 items in the cache now with different projectIDs
expectedKey := orders.CacheKey{
ProjectID: projectID2,
BucketName: string(bucketName),
Action: pb.PieceAction_GET,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
expectedData := orders.CacheData{
Inline: amount2,
Allocated: 0,
Settled: amount2,
}
require.Equal(t, projectMap2[expectedKey], expectedData)
require.Equal(t, len(projectMap2), 3)
},
)
}
func TestEndpointAndCacheContextCanceled(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Orders.FlushBatchSize = 3
},
},
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
storagenode := planet.StorageNodes[0]
ordersDB := planet.Satellites[0].Orders.DB
now := time.Now()
// create orders to trigger RollupsWriteCache flush
projectID := testrand.UUID()
requests := []*pb.SettlementRequest{}
singleOrderAmount := int64(50)
for i := 0; i < 3; i++ {
piecePublicKey, piecePrivateKey, err := storj.NewPieceKey()
require.NoError(t, err)
bucketname := "testbucket" + strconv.Itoa(i)
bucketLocation := metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketname,
}
serialNumber := testrand.SerialNumber()
key := satellite.Config.Orders.EncryptionKeys.Default
encrypted, err := key.EncryptMetadata(
serialNumber,
&internalpb.OrderLimitMetadata{
CompactProjectBucketPrefix: bucketLocation.CompactPrefix(),
},
)
require.NoError(t, err)
limit := &pb.OrderLimit{
SerialNumber: serialNumber,
SatelliteId: satellite.ID(),
UplinkPublicKey: piecePublicKey,
StorageNodeId: storagenode.ID(),
PieceId: storj.NewPieceID(),
Action: pb.PieceAction_GET,
Limit: 1000,
PieceExpiration: time.Time{},
OrderCreation: now,
OrderExpiration: now.Add(24 * time.Hour),
EncryptedMetadataKeyId: key.ID[:],
EncryptedMetadata: encrypted,
}
orderLimit, err := signing.SignOrderLimit(ctx, signing.SignerFromFullIdentity(satellite.Identity), limit)
require.NoError(t, err)
order, err := signing.SignUplinkOrder(ctx, piecePrivateKey, &pb.Order{
SerialNumber: serialNumber,
Amount: singleOrderAmount,
})
require.NoError(t, err)
requests = append(requests, &pb.SettlementRequest{
Limit: orderLimit,
Order: order,
})
}
conn, err := storagenode.Dialer.DialNodeURL(ctx, storj.NodeURL{ID: satellite.ID(), Address: satellite.Addr()})
require.NoError(t, err)
defer ctx.Check(conn.Close)
stream, err := pb.NewDRPCOrdersClient(conn).SettlementWithWindow(ctx)
require.NoError(t, err)
defer ctx.Check(stream.Close)
for _, request := range requests {
err := stream.Send(&pb.SettlementRequest{
Limit: request.Limit,
Order: request.Order,
})
require.NoError(t, err)
}
require.NoError(t, err)
resp, err := stream.CloseAndRecv()
require.NoError(t, err)
require.Equal(t, pb.SettlementWithWindowResponse_ACCEPTED, resp.Status)
rwc := ordersDB.(*orders.RollupsWriteCache)
whenDone := rwc.OnNextFlush()
// make sure flushing is done
select {
case <-whenDone:
break
case <-ctx.Done():
t.Fatal(ctx.Err())
}
// verify that orders were stored in DB
bucketBandwidth, err := getSettledBandwidth(ctx, planet.Satellites[0].DB.ProjectAccounting(), projectID, now)
require.NoError(t, err)
require.Equal(t, singleOrderAmount*int64(len(requests)), bucketBandwidth)
},
)
}