satellite/orders: don't store allocated bandwidth in
bucket_bandwidth_rollups table We have performance problems with updating bucket_bandwidth_rollups. To improve situation we can stop storing allocated bandwidth in this table. This should reduce large number of updates which are comming from metainfo endpoints, repair workers and audit. Next step will be to drop `allocated` column completely from bucket_bandwidth_rollups. Allocated GET bandwidth is all we need and we are keeping it in bucket_bandwidth_rollups table. Change-Id: Ifdd26a89ba8262acbca6d794a6c02883ad0c0c9b
This commit is contained in:
parent
2876d5f20b
commit
a2a9dafa33
@ -209,8 +209,8 @@ type ProjectAccounting interface {
|
||||
GetTallies(ctx context.Context) ([]BucketTally, error)
|
||||
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
|
||||
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
|
||||
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
|
||||
GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (int64, error)
|
||||
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
||||
GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error)
|
||||
// GetProjectBandwidth returns project allocated bandwidth for the specified year, month and day.
|
||||
GetProjectBandwidth(ctx context.Context, projectID uuid.UUID, year int, month time.Month, day int, asOfSystemInterval time.Duration) (int64, error)
|
||||
// GetProjectDailyBandwidth returns bandwidth (allocated and settled) for the specified day.
|
||||
|
@ -396,7 +396,7 @@ func TestProjectBandwidthRollups(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func createBucketBandwidthRollupsForPast4Days(ctx *testcontext.Context, satelliteDB satellite.DB, projectID uuid.UUID) (int64, error) {
|
||||
func createBucketSettleBandwidthRollupsForPast4Days(ctx *testcontext.Context, satelliteDB satellite.DB, projectID uuid.UUID) (int64, error) {
|
||||
var expectedSum int64
|
||||
ordersDB := satelliteDB.Orders()
|
||||
amount := int64(1000)
|
||||
@ -416,42 +416,30 @@ func createBucketBandwidthRollupsForPast4Days(ctx *testcontext.Context, satellit
|
||||
intervalStart = now
|
||||
}
|
||||
|
||||
err := ordersDB.UpdateBucketBandwidthAllocation(ctx,
|
||||
projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart,
|
||||
)
|
||||
if err != nil {
|
||||
return expectedSum, err
|
||||
}
|
||||
err = ordersDB.UpdateBucketBandwidthSettle(ctx,
|
||||
err := ordersDB.UpdateBucketBandwidthSettle(ctx,
|
||||
projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, intervalStart,
|
||||
)
|
||||
if err != nil {
|
||||
return expectedSum, err
|
||||
}
|
||||
err = ordersDB.UpdateBucketBandwidthInline(ctx,
|
||||
projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart,
|
||||
)
|
||||
if err != nil {
|
||||
return expectedSum, err
|
||||
}
|
||||
expectedSum += amount
|
||||
}
|
||||
return expectedSum, nil
|
||||
}
|
||||
|
||||
func TestProjectBandwidthTotal(t *testing.T) {
|
||||
func TestGetProjectSettledBandwidthTotal(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
pdb := db.ProjectAccounting()
|
||||
projectID := testrand.UUID()
|
||||
|
||||
// Setup: create bucket bandwidth rollup records
|
||||
expectedTotal, err := createBucketBandwidthRollupsForPast4Days(ctx, db, projectID)
|
||||
expectedTotal, err := createBucketSettleBandwidthRollupsForPast4Days(ctx, db, projectID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Execute test: get project bandwidth total
|
||||
since := time.Now().AddDate(0, -1, 0)
|
||||
|
||||
actualBandwidthTotal, err := pdb.GetAllocatedBandwidthTotal(ctx, projectID, since)
|
||||
actualBandwidthTotal, err := pdb.GetProjectSettledBandwidthTotal(ctx, projectID, since)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTotal, actualBandwidthTotal)
|
||||
})
|
||||
|
@ -25,11 +25,8 @@ import (
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
// unfortunately, in GB is apparently the only way we can get this data, so we need to
|
||||
// arrange for the test to produce a total value that won't lose too much precision
|
||||
// in the conversion to GB.
|
||||
func getTotalBandwidthInGB(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
|
||||
total, err := accountingDB.GetAllocatedBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
|
||||
func getSettledBandwidth(ctx context.Context, accountingDB accounting.ProjectAccounting, projectID uuid.UUID, since time.Time) (int64, error) {
|
||||
total, err := accountingDB.GetProjectSettledBandwidthTotal(ctx, projectID, since.Add(-time.Hour))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -49,16 +46,19 @@ func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
|
||||
|
||||
accountingDB := db.ProjectAccounting()
|
||||
|
||||
expectedTotal := int64(0)
|
||||
// use different bucketName for each write, so they don't get aggregated yet
|
||||
for i := 0; i < useBatchSize-1; i++ {
|
||||
bucketName := fmt.Sprintf("my_files_%d", i)
|
||||
err := rwc.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
|
||||
err := rwc.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that nothing was actually written since it should just be stored
|
||||
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
|
||||
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), total)
|
||||
require.Zero(t, total)
|
||||
|
||||
expectedTotal += amount
|
||||
}
|
||||
|
||||
whenDone := rwc.OnNextFlush()
|
||||
@ -74,9 +74,9 @@ func TestRollupsWriteCacheBatchLimitReached(t *testing.T) {
|
||||
t.Fatal(ctx.Err())
|
||||
}
|
||||
|
||||
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
|
||||
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, amount*int64(useBatchSize), total)
|
||||
require.Equal(t, expectedTotal, total)
|
||||
})
|
||||
}
|
||||
|
||||
@ -97,16 +97,18 @@ func TestRollupsWriteCacheBatchChore(t *testing.T) {
|
||||
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
|
||||
ordersDB := planet.Satellites[0].Orders.DB
|
||||
|
||||
// use different pieceAction for each write, so they don't get aggregated yet
|
||||
expectedTotal := int64(0)
|
||||
for i := 0; i < useBatchSize-1; i++ {
|
||||
bucketName := fmt.Sprintf("my_files_%d", i)
|
||||
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, startTime)
|
||||
err := ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, 0, startTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check that nothing was actually written
|
||||
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
|
||||
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), total)
|
||||
require.Zero(t, total)
|
||||
|
||||
expectedTotal += amount
|
||||
}
|
||||
|
||||
rwc := ordersDB.(*orders.RollupsWriteCache)
|
||||
@ -122,9 +124,9 @@ func TestRollupsWriteCacheBatchChore(t *testing.T) {
|
||||
t.Fatal(ctx.Err())
|
||||
}
|
||||
|
||||
total, err := getTotalBandwidthInGB(ctx, accountingDB, projectID, startTime)
|
||||
total, err := getSettledBandwidth(ctx, accountingDB, projectID, startTime)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, amount*int64(useBatchSize-1), total)
|
||||
require.Equal(t, expectedTotal, total)
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -149,7 +151,7 @@ func TestUpdateBucketBandwidth(t *testing.T) {
|
||||
bucketName := []byte("testbucketname")
|
||||
amount := (memory.MB * 500).Int64()
|
||||
intervalStart := time.Now()
|
||||
err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
|
||||
err := ordersDB.UpdateBucketBandwidthInline(ctx, projectID, bucketName, pb.PieceAction_GET, amount, intervalStart)
|
||||
require.NoError(t, err)
|
||||
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, bucketName, pb.PieceAction_PUT, amount, 0, intervalStart)
|
||||
require.NoError(t, err)
|
||||
@ -171,8 +173,8 @@ func TestUpdateBucketBandwidth(t *testing.T) {
|
||||
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
|
||||
}
|
||||
expectedCacheDataAllocated := orders.CacheData{
|
||||
Inline: 0,
|
||||
Allocated: amount,
|
||||
Inline: amount,
|
||||
Allocated: 0,
|
||||
Settled: 0,
|
||||
}
|
||||
expectedCacheDataSettled := orders.CacheData{
|
||||
@ -186,7 +188,7 @@ func TestUpdateBucketBandwidth(t *testing.T) {
|
||||
// setup: add another item to the cache but with a different projectID
|
||||
projectID2 := testrand.UUID()
|
||||
amount2 := (memory.MB * 10).Int64()
|
||||
err = ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
|
||||
err = ordersDB.UpdateBucketBandwidthInline(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, intervalStart)
|
||||
require.NoError(t, err)
|
||||
err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID2, bucketName, pb.PieceAction_GET, amount2, 0, intervalStart)
|
||||
require.NoError(t, err)
|
||||
@ -202,8 +204,8 @@ func TestUpdateBucketBandwidth(t *testing.T) {
|
||||
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
|
||||
}
|
||||
expectedData := orders.CacheData{
|
||||
Inline: 0,
|
||||
Allocated: amount2,
|
||||
Inline: amount2,
|
||||
Allocated: 0,
|
||||
Settled: amount2,
|
||||
}
|
||||
require.Equal(t, projectMap2[expectedKey], expectedData)
|
||||
|
@ -245,10 +245,6 @@ func (service *Service) CreatePutOrderLimits(ctx context.Context, bucket metabas
|
||||
}
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, signer.AddressedLimits...); err != nil {
|
||||
return storj.PieceID{}, nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return signer.RootPieceID, signer.AddressedLimits, signer.PrivateKey, nil
|
||||
}
|
||||
|
||||
@ -437,10 +433,6 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
|
||||
return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err())
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return limits, signer.PrivateKey, cachedNodesInfo, nil
|
||||
}
|
||||
|
||||
@ -510,10 +502,6 @@ func (service *Service) CreatePutRepairOrderLimits(ctx context.Context, bucket m
|
||||
}
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return limits, signer.PrivateKey, nil
|
||||
}
|
||||
|
||||
@ -548,10 +536,6 @@ func (service *Service) CreateGracefulExitPutOrderLimit(ctx context.Context, buc
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
if err := service.updateBandwidth(ctx, bucket, limit); err != nil {
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
}
|
||||
|
||||
return limit, signer.PrivateKey, nil
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,9 @@ type bandwidthRollupKey struct {
|
||||
func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
// TODO I wanted to remove this implementation but it looks it's heavily used in tests
|
||||
// we should do cleanup as a separate change (Michal)
|
||||
|
||||
return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
|
||||
var batch pgx.Batch
|
||||
|
||||
@ -235,17 +238,21 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
|
||||
|
||||
bucketRUMap := rollupBandwidth(rollups, toHourlyInterval, getBucketRollupKey)
|
||||
|
||||
// TODO reorg code to make clear what we are inserting/updating to
|
||||
// bucket_bandwidth_rollups and project_bandwidth_daily_rollups
|
||||
|
||||
inlineSlice := make([]int64, 0, len(bucketRUMap))
|
||||
allocatedSlice := make([]int64, 0, len(bucketRUMap))
|
||||
settledSlice := make([]int64, 0, len(bucketRUMap))
|
||||
bucketNames := make([][]byte, 0, len(bucketRUMap))
|
||||
projectIDs := make([]uuid.UUID, 0, len(bucketRUMap))
|
||||
intervalStartSlice := make([]time.Time, 0, len(bucketRUMap))
|
||||
actionSlice := make([]int32, 0, len(bucketRUMap))
|
||||
|
||||
// allocated must be not-null so lets keep slice until we will change DB schema
|
||||
emptyAllocatedSlice := make([]int64, len(bucketRUMap))
|
||||
|
||||
for rollupInfo, usage := range bucketRUMap {
|
||||
inlineSlice = append(inlineSlice, usage.Inline)
|
||||
allocatedSlice = append(allocatedSlice, usage.Allocated)
|
||||
settledSlice = append(settledSlice, usage.Settled)
|
||||
bucketNames = append(bucketNames, []byte(rollupInfo.BucketName))
|
||||
projectIDs = append(projectIDs, rollupInfo.ProjectID)
|
||||
@ -264,12 +271,11 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
|
||||
unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
|
||||
ON CONFLICT(bucket_name, project_id, interval_start, action)
|
||||
DO UPDATE SET
|
||||
allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated,
|
||||
inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
|
||||
settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled`,
|
||||
pgutil.ByteaArray(bucketNames), pgutil.UUIDArray(projectIDs), pgutil.TimestampTZArray(intervalStartSlice),
|
||||
defaultIntervalSeconds,
|
||||
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice))
|
||||
pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(emptyAllocatedSlice), pgutil.Int8Array(settledSlice))
|
||||
if err != nil {
|
||||
db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
|
||||
}
|
||||
@ -278,7 +284,7 @@ func (db *ordersDB) UpdateBandwidthBatch(ctx context.Context, rollups []orders.B
|
||||
|
||||
projectIDs = make([]uuid.UUID, 0, len(projectRUMap))
|
||||
intervalStartSlice = make([]time.Time, 0, len(projectRUMap))
|
||||
allocatedSlice = make([]int64, 0, len(projectRUMap))
|
||||
allocatedSlice := make([]int64, 0, len(projectRUMap))
|
||||
settledSlice = make([]int64, 0, len(projectRUMap))
|
||||
deadSlice := make([]int64, 0, len(projectRUMap))
|
||||
|
||||
|
@ -143,11 +143,11 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame.
|
||||
func (db *ProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) {
|
||||
// GetProjectSettledBandwidthTotal returns the sum of GET bandwidth usage settled for a projectID in the past time frame.
|
||||
func (db *ProjectAccounting) GetProjectSettledBandwidthTotal(ctx context.Context, projectID uuid.UUID, from time.Time) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var sum *int64
|
||||
query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start >= ?;`
|
||||
query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE project_id = $1 AND action = $2 AND interval_start >= $3;`
|
||||
err = db.db.QueryRow(ctx, db.db.Rebind(query), projectID[:], pb.PieceAction_GET, from.UTC()).Scan(&sum)
|
||||
if errors.Is(err, sql.ErrNoRows) || sum == nil {
|
||||
return 0, nil
|
||||
|
@ -78,19 +78,14 @@ func Test_DailyUsage(t *testing.T) {
|
||||
|
||||
err = satelliteSys.DB.ProjectAccounting().SaveTallies(ctx, now, tallies)
|
||||
require.NoError(t, err)
|
||||
err = satelliteSys.DB.Orders().UpdateBucketBandwidthAllocation(ctx, projectID, []byte(firstBucketName), pb.PieceAction_GET, segment, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
err = satelliteSys.DB.Orders().UpdateBucketBandwidthSettle(ctx, projectID, []byte(firstBucketName), pb.PieceAction_GET, segment, 0, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
err = satelliteSys.DB.Orders().UpdateBucketBandwidthAllocation(ctx, projectID, []byte(secondBucketName), pb.PieceAction_GET, segment, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
err = planet.Satellites[0].DB.Orders().UpdateBucketBandwidthSettle(ctx, projectID, []byte(secondBucketName), pb.PieceAction_GET, segment, 0, inFiveMinutes)
|
||||
require.NoError(t, err)
|
||||
|
||||
usage1, err := satelliteSys.DB.ProjectAccounting().GetProjectDailyUsageByDateRange(ctx, projectID, now, inFiveMinutes, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2*segment, usage1.StorageUsage[0].Value)
|
||||
require.Equal(t, 2*segment, usage1.AllocatedBandwidthUsage[0].Value)
|
||||
require.Equal(t, 2*segment, usage1.SettledBandwidthUsage[0].Value)
|
||||
},
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user