From f77f61532adbb8d6e29cd52c9f069aee015895e4 Mon Sep 17 00:00:00 2001 From: Fadila Khadar Date: Sun, 30 May 2021 00:16:12 +0200 Subject: [PATCH] satellite/orders: use egress_dead for calculating allocated bandwidth Change-Id: I8240d99d0cdaad4c5e059565e88ee9619b62526e --- satellite/accounting/projectusage_test.go | 118 ++++++++++++++++++++- satellite/satellitedb/orders.go | 1 - satellite/satellitedb/projectaccounting.go | 2 +- 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/satellite/accounting/projectusage_test.go b/satellite/accounting/projectusage_test.go index 476a4c4dc..570fa2003 100644 --- a/satellite/accounting/projectusage_test.go +++ b/satellite/accounting/projectusage_test.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "sync/atomic" "testing" "time" @@ -20,6 +21,7 @@ import ( "storj.io/common/memory" "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -30,7 +32,9 @@ import ( "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/satellitedb/satellitedbtest" + snorders "storj.io/storj/storagenode/orders" "storj.io/uplink" + "storj.io/uplink/private/eestream" ) func TestProjectUsageStorage(t *testing.T) { @@ -205,6 +209,11 @@ func TestProjectBandwidthRollups(t *testing.T) { err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) require.NoError(t, err) + // allocated bandwidth should correspond to the sum of bandwidth corresponding to GET action (4000 here) + alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0) + require.NoError(t, err) + require.EqualValues(t, 4000, alloc) + // things that shouldn't be counted err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_PUT, 1000, hour) require.NoError(t, err) @@ -253,9 +262,23 @@ func TestProjectBandwidthRollups(t *testing.T) { err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) require.NoError(t, err) - alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0) + // things that should be partially counted (settled amount lower than allocated amount) + err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, hour) require.NoError(t, err) - require.EqualValues(t, 4000, alloc) + err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET, 1000, hour) + require.NoError(t, err) + + rollups = []orders.BucketBandwidthRollup{ + {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 300, Dead: 700}, + {ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 500, Dead: 500}, + } + err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) + require.NoError(t, err) + + alloc, err = db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0) + require.NoError(t, err) + // new allocated bandwidth: 4000 (from previously) + 4000 (from these rollups) - 1200 (dead bandwidth = 700+500) + require.EqualValues(t, 6800, alloc) }) } @@ -814,3 +837,94 @@ func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) { require.NoError(t, err) }) } + +func TestProjectUsage_BandwidthDeadAllocation(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.ReconfigureRS(2, 2, 4, 4), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + planet.Satellites[0].Orders.Chore.Loop.Pause() + + for _, storageNode := range planet.StorageNodes { + storageNode.Storage2.Orders.Sender.Pause() + } + + now := time.Now() + project := planet.Uplinks[0].Projects[0] + + sat := planet.Satellites[0] + rs, err := eestream.NewRedundancyStrategyFromStorj(storj.RedundancyScheme{ + RequiredShares: int16(sat.Config.Metainfo.RS.Min), + RepairShares: int16(sat.Config.Metainfo.RS.Repair), + OptimalShares: int16(sat.Config.Metainfo.RS.Success), + TotalShares: int16(sat.Config.Metainfo.RS.Total), + ShareSize: sat.Config.Metainfo.RS.ErasureShareSize.Int32(), + }) + require.NoError(t, err) + + dataSize := 4 * memory.MiB + data := testrand.Bytes(dataSize) + + err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path1", data) + require.NoError(t, err) + + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + + pieceSize := eestream.CalcPieceSize(int64(segments[0].EncryptedSize), rs) + + reader, cleanFn, err := planet.Uplinks[0].DownloadStream(ctx, planet.Satellites[0], "testbucket", "test/path1") + require.NoError(t, err) + + // partially download the object + p := make([]byte, 1*memory.MiB) + total, err := io.ReadFull(reader, p) + require.NoError(t, err) + require.Equal(t, total, len(p)) + require.NoError(t, cleanFn()) + require.NoError(t, reader.Close()) + + planet.Satellites[0].Orders.Chore.Loop.TriggerWait() + + bandwidthUsage, err := planet.Satellites[0].DB.ProjectAccounting().GetProjectBandwidth(ctx, + project.ID, now.Year(), now.Month(), now.Day(), 0) + require.NoError(t, err) + + require.Equal(t, int64(segments[0].Redundancy.DownloadNodes())*pieceSize, bandwidthUsage) + + initialBandwidthUsage := bandwidthUsage + var updatedBandwidthUsage int64 + deadSum := int64(0) + for _, storageNode := range planet.StorageNodes { + storageNode.Storage2.Orders.SendOrders(ctx, now.Add(2*time.Hour)) + require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx)) + + archivedOrders, err := storageNode.OrdersStore.ListArchived() + require.NoError(t, err) + + // submit orders storage node by storage node + for _, order := range archivedOrders { + if order.Status == snorders.StatusAccepted && order.Limit.Action == pb.PieceAction_GET { + deadSum += order.Limit.Limit - order.Order.Amount + } + } + planet.Satellites[0].Orders.Chore.Loop.TriggerWait() + + // new bandwidth allocation should be decreased by dead amount + updatedBandwidthUsage, err = planet.Satellites[0].DB.ProjectAccounting().GetProjectBandwidth(ctx, + project.ID, now.Year(), now.Month(), now.Day(), 0) + require.NoError(t, err) + + require.Equal(t, bandwidthUsage-deadSum, updatedBandwidthUsage) + } + + _, _, dead, err := planet.Satellites[0].DB.ProjectAccounting().GetProjectDailyBandwidth(ctx, + project.ID, now.Year(), now.Month(), now.Day()) + require.NoError(t, err) + require.NotZero(t, dead) + require.Equal(t, initialBandwidthUsage, updatedBandwidthUsage+dead) + }) +} diff --git a/satellite/satellitedb/orders.go b/satellite/satellitedb/orders.go index 50c54ea78..a14f55ad9 100644 --- a/satellite/satellitedb/orders.go +++ b/satellite/satellitedb/orders.go @@ -279,7 +279,6 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar var projectRUAllocated []int64 var projectRUSettled []int64 var projectRUDead []int64 - dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) for projectID, v := range projectRUMap { diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index 96c397332..868a31532 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -171,7 +171,7 @@ func (db *ProjectAccounting) GetProjectBandwidth(ctx context.Context, projectID SELECT CASE WHEN interval_day < ? THEN egress_settled - ELSE egress_allocated + ELSE egress_allocated-egress_dead END AS amount FROM project_bandwidth_daily_rollups WHERE project_id = ? AND interval_day >= ? AND interval_day < ?