satellite/orders: use egress_dead for calculating allocated bandwidth
Change-Id: I8240d99d0cdaad4c5e059565e88ee9619b62526e
This commit is contained in:
parent
9fd091831d
commit
f77f61532a
@ -8,6 +8,7 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -20,6 +21,7 @@ import (
|
|||||||
|
|
||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/common/pb"
|
"storj.io/common/pb"
|
||||||
|
"storj.io/common/storj"
|
||||||
"storj.io/common/sync2"
|
"storj.io/common/sync2"
|
||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
"storj.io/common/testrand"
|
"storj.io/common/testrand"
|
||||||
@ -30,7 +32,9 @@ import (
|
|||||||
"storj.io/storj/satellite/metabase"
|
"storj.io/storj/satellite/metabase"
|
||||||
"storj.io/storj/satellite/orders"
|
"storj.io/storj/satellite/orders"
|
||||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||||
|
snorders "storj.io/storj/storagenode/orders"
|
||||||
"storj.io/uplink"
|
"storj.io/uplink"
|
||||||
|
"storj.io/uplink/private/eestream"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProjectUsageStorage(t *testing.T) {
|
func TestProjectUsageStorage(t *testing.T) {
|
||||||
@ -205,6 +209,11 @@ func TestProjectBandwidthRollups(t *testing.T) {
|
|||||||
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
|
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// allocated bandwidth should correspond to the sum of bandwidth corresponding to GET action (4000 here)
|
||||||
|
alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 4000, alloc)
|
||||||
|
|
||||||
// things that shouldn't be counted
|
// things that shouldn't be counted
|
||||||
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_PUT, 1000, hour)
|
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_PUT, 1000, hour)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -253,9 +262,23 @@ func TestProjectBandwidthRollups(t *testing.T) {
|
|||||||
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
|
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0)
|
// things that should be partially counted (settled amount lower than allocated amount)
|
||||||
|
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, hour)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 4000, alloc)
|
err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET, 1000, hour)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
rollups = []orders.BucketBandwidthRollup{
|
||||||
|
{ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 300, Dead: 700},
|
||||||
|
{ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000, Settled: 500, Dead: 500},
|
||||||
|
}
|
||||||
|
err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
alloc, err = db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// new allocated bandwidth: 4000 (from previously) + 4000 (from these rollups) - 1200 (dead bandwidth = 700+500)
|
||||||
|
require.EqualValues(t, 6800, alloc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -814,3 +837,94 @@ func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProjectUsage_BandwidthDeadAllocation(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: testplanet.ReconfigureRS(2, 2, 4, 4),
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
planet.Satellites[0].Orders.Chore.Loop.Pause()
|
||||||
|
|
||||||
|
for _, storageNode := range planet.StorageNodes {
|
||||||
|
storageNode.Storage2.Orders.Sender.Pause()
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
project := planet.Uplinks[0].Projects[0]
|
||||||
|
|
||||||
|
sat := planet.Satellites[0]
|
||||||
|
rs, err := eestream.NewRedundancyStrategyFromStorj(storj.RedundancyScheme{
|
||||||
|
RequiredShares: int16(sat.Config.Metainfo.RS.Min),
|
||||||
|
RepairShares: int16(sat.Config.Metainfo.RS.Repair),
|
||||||
|
OptimalShares: int16(sat.Config.Metainfo.RS.Success),
|
||||||
|
TotalShares: int16(sat.Config.Metainfo.RS.Total),
|
||||||
|
ShareSize: sat.Config.Metainfo.RS.ErasureShareSize.Int32(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dataSize := 4 * memory.MiB
|
||||||
|
data := testrand.Bytes(dataSize)
|
||||||
|
|
||||||
|
err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path1", data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, segments, 1)
|
||||||
|
|
||||||
|
pieceSize := eestream.CalcPieceSize(int64(segments[0].EncryptedSize), rs)
|
||||||
|
|
||||||
|
reader, cleanFn, err := planet.Uplinks[0].DownloadStream(ctx, planet.Satellites[0], "testbucket", "test/path1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// partially download the object
|
||||||
|
p := make([]byte, 1*memory.MiB)
|
||||||
|
total, err := io.ReadFull(reader, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, total, len(p))
|
||||||
|
require.NoError(t, cleanFn())
|
||||||
|
require.NoError(t, reader.Close())
|
||||||
|
|
||||||
|
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
|
||||||
|
|
||||||
|
bandwidthUsage, err := planet.Satellites[0].DB.ProjectAccounting().GetProjectBandwidth(ctx,
|
||||||
|
project.ID, now.Year(), now.Month(), now.Day(), 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, int64(segments[0].Redundancy.DownloadNodes())*pieceSize, bandwidthUsage)
|
||||||
|
|
||||||
|
initialBandwidthUsage := bandwidthUsage
|
||||||
|
var updatedBandwidthUsage int64
|
||||||
|
deadSum := int64(0)
|
||||||
|
for _, storageNode := range planet.StorageNodes {
|
||||||
|
storageNode.Storage2.Orders.SendOrders(ctx, now.Add(2*time.Hour))
|
||||||
|
require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx))
|
||||||
|
|
||||||
|
archivedOrders, err := storageNode.OrdersStore.ListArchived()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// submit orders storage node by storage node
|
||||||
|
for _, order := range archivedOrders {
|
||||||
|
if order.Status == snorders.StatusAccepted && order.Limit.Action == pb.PieceAction_GET {
|
||||||
|
deadSum += order.Limit.Limit - order.Order.Amount
|
||||||
|
}
|
||||||
|
}
|
||||||
|
planet.Satellites[0].Orders.Chore.Loop.TriggerWait()
|
||||||
|
|
||||||
|
// new bandwidth allocation should be decreased by dead amount
|
||||||
|
updatedBandwidthUsage, err = planet.Satellites[0].DB.ProjectAccounting().GetProjectBandwidth(ctx,
|
||||||
|
project.ID, now.Year(), now.Month(), now.Day(), 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, bandwidthUsage-deadSum, updatedBandwidthUsage)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, dead, err := planet.Satellites[0].DB.ProjectAccounting().GetProjectDailyBandwidth(ctx,
|
||||||
|
project.ID, now.Year(), now.Month(), now.Day())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotZero(t, dead)
|
||||||
|
require.Equal(t, initialBandwidthUsage, updatedBandwidthUsage+dead)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -279,7 +279,6 @@ func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStar
|
|||||||
var projectRUAllocated []int64
|
var projectRUAllocated []int64
|
||||||
var projectRUSettled []int64
|
var projectRUSettled []int64
|
||||||
var projectRUDead []int64
|
var projectRUDead []int64
|
||||||
|
|
||||||
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
|
dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
for projectID, v := range projectRUMap {
|
for projectID, v := range projectRUMap {
|
||||||
|
@ -171,7 +171,7 @@ func (db *ProjectAccounting) GetProjectBandwidth(ctx context.Context, projectID
|
|||||||
SELECT
|
SELECT
|
||||||
CASE WHEN interval_day < ?
|
CASE WHEN interval_day < ?
|
||||||
THEN egress_settled
|
THEN egress_settled
|
||||||
ELSE egress_allocated
|
ELSE egress_allocated-egress_dead
|
||||||
END AS amount
|
END AS amount
|
||||||
FROM project_bandwidth_daily_rollups
|
FROM project_bandwidth_daily_rollups
|
||||||
WHERE project_id = ? AND interval_day >= ? AND interval_day < ?
|
WHERE project_id = ? AND interval_day >= ? AND interval_day < ?
|
||||||
|
Loading…
Reference in New Issue
Block a user