// Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. package accounting_test import ( "context" "encoding/binary" "errors" "fmt" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/sync2" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/common/uuid" "storj.io/storj/private/testplanet" "storj.io/storj/satellite" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/orders" "storj.io/storj/satellite/satellitedb/satellitedbtest" "storj.io/uplink" ) func TestProjectUsageStorage(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Console.UsageLimits.Storage.Free = 1 * memory.MB config.Console.UsageLimits.Bandwidth.Free = 1 * memory.MB }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { var uploaded uint32 checkctx, checkcancel := context.WithCancel(ctx) defer checkcancel() var group errgroup.Group group.Go(func() error { // wait things to be uploaded for atomic.LoadUint32(&uploaded) == 0 { if !sync2.Sleep(checkctx, time.Microsecond) { return nil } } for { if !sync2.Sleep(checkctx, time.Microsecond) { return nil } total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, planet.Uplinks[0].Projects[0].ID) if err != nil { return errs.Wrap(err) } if total == 0 { return errs.New("got 0 from GetProjectStorageTotals") } } }) data := testrand.Bytes(1 * memory.MB) // set limit manually to 1MB until column values can be nullable accountingDB := planet.Satellites[0].DB.ProjectAccounting() err := accountingDB.UpdateProjectUsageLimit(ctx, planet.Uplinks[0].Projects[0].ID, 1*memory.MB) require.NoError(t, err) // successful upload err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/0", data) atomic.StoreUint32(&uploaded, 1) require.NoError(t, err) planet.Satellites[0].Accounting.Tally.Loop.TriggerWait() // upload fails due to storage limit err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path/1", data) require.Error(t, err) if !errors.Is(err, uplink.ErrBandwidthLimitExceeded) { t.Fatal("Expected resource exhausted error. Got", err.Error()) } checkcancel() if err := group.Wait(); err != nil { t.Fatal(err) } }) } func TestProjectUsageBandwidth(t *testing.T) { cases := []struct { name string expectedExceeded bool expectedResource string expectedError error }{ {name: "doesn't exceed storage or bandwidth project limit", expectedExceeded: false, expectedError: nil}, {name: "exceeds bandwidth project limit", expectedExceeded: true, expectedResource: "bandwidth", expectedError: uplink.ErrBandwidthLimitExceeded}, } for _, tt := range cases { testCase := tt t.Run(testCase.name, func(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.LiveAccounting.AsOfSystemInterval = -time.Millisecond }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { saDB := planet.Satellites[0].DB orderDB := saDB.Orders() now := time.Now() // make sure we don't end up with a flaky test if we are in the beginning of the month as we have to add expired bandwidth allocations if now.Day() < 5 { now = time.Date(now.Year(), now.Month(), 5, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location()) } bucket := metabase.BucketLocation{ProjectID: planet.Uplinks[0].Projects[0].ID, BucketName: "testbucket"} projectUsage := planet.Satellites[0].Accounting.ProjectUsage // Setup: create a BucketBandwidthRollup record to test exceeding bandwidth project limit if testCase.expectedResource == "bandwidth" { err := setUpBucketBandwidthAllocations(ctx, bucket.ProjectID, orderDB, now) require.NoError(t, err) } // Setup: create a BucketBandwidthRollup record that should not be taken into account as // it is expired. err := setUpBucketBandwidthAllocations(ctx, bucket.ProjectID, orderDB, now.Add(-72*time.Hour)) require.NoError(t, err) // Setup: create some bytes for the uplink to upload to test the download later expectedData := testrand.Bytes(50 * memory.KiB) filePath := "test/path" err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucket.BucketName, filePath, expectedData) require.NoError(t, err) projectUsage.SetNow(func() time.Time { return now }) actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID) require.NoError(t, err) require.Equal(t, testCase.expectedExceeded, actualExceeded) // Execute test: check that the uplink gets an error when they have exceeded bandwidth limits and try to download a file _, actualErr := planet.Uplinks[0].Download(ctx, planet.Satellites[0], bucket.BucketName, filePath) if testCase.expectedResource == "bandwidth" { require.True(t, errors.Is(actualErr, testCase.expectedError)) } else { require.NoError(t, actualErr) } }) }) } } func TestProjectBandwidthRollups(t *testing.T) { timeBuf := time.Second * 5 satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { p1 := testrand.UUID() p2 := testrand.UUID() b1 := testrand.Bytes(10) b2 := testrand.Bytes(20) now := time.Now().UTC() // could be flaky near next month if now.Month() != now.Add(timeBuf).Month() { time.Sleep(timeBuf) now = time.Now().UTC() } // make sure we don't end up with a flaky test if we are in the beginning of the month as we have to add expired bandwidth allocations if now.Day() < 5 { now = time.Date(now.Year(), now.Month(), 5, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location()) } hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) expired := time.Date(now.Year(), now.Month(), now.Day()-3, now.Hour(), 0, 0, 0, now.Location()) // things that should be counted err := db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET, 1000, hour) require.NoError(t, err) rollups := []orders.BucketBandwidthRollup{ {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000}, {ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET, Inline: 1000, Allocated: 1000 /* counted */, Settled: 1000}, } err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) require.NoError(t, err) // things that shouldn't be counted err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_PUT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_PUT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_PUT_GRACEFUL_EXIT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_PUT_REPAIR, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET_AUDIT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET_REPAIR, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b1, pb.PieceAction_PUT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b2, pb.PieceAction_PUT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b1, pb.PieceAction_PUT_GRACEFUL_EXIT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b2, pb.PieceAction_PUT_REPAIR, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b1, pb.PieceAction_GET_AUDIT, 1000, hour) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p2, b2, pb.PieceAction_GET_REPAIR, 1000, hour) require.NoError(t, err) // these two should not be counted. They are expired and have no corresponding rollup err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b1, pb.PieceAction_GET, 1000, expired) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthAllocation(ctx, p1, b2, pb.PieceAction_GET, 1000, expired) require.NoError(t, err) rollups = []orders.BucketBandwidthRollup{ {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p1, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_PUT_GRACEFUL_EXIT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_PUT_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b1), Action: pb.PieceAction_GET_AUDIT, Inline: 1000, Allocated: 1000, Settled: 1000}, {ProjectID: p2, BucketName: string(b2), Action: pb.PieceAction_GET_REPAIR, Inline: 1000, Allocated: 1000, Settled: 1000}, } err = db.Orders().UpdateBucketBandwidthBatch(ctx, hour, rollups) require.NoError(t, err) alloc, err := db.ProjectAccounting().GetProjectBandwidth(ctx, p1, now.Year(), now.Month(), now.Day(), 0) require.NoError(t, err) require.EqualValues(t, 4000, alloc) }) } func createBucketBandwidthRollupsForPast4Days(ctx *testcontext.Context, satelliteDB satellite.DB, projectID uuid.UUID) (int64, error) { var expectedSum int64 ordersDB := satelliteDB.Orders() amount := int64(1000) now := time.Now() for i := 0; i < 4; i++ { var bucketName string var intervalStart time.Time if i%2 == 0 { // When the bucket name and intervalStart is different, a new record is created bucketName = fmt.Sprintf("%s%d", "testbucket", i) // Use a intervalStart time in the past to test we get all records in past 30 days intervalStart = now.AddDate(0, 0, -i) } else { // When the bucket name and intervalStart is the same, we update the existing record bucketName = "testbucket" intervalStart = now } err := ordersDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart, ) if err != nil { return expectedSum, err } err = ordersDB.UpdateBucketBandwidthSettle(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart, ) if err != nil { return expectedSum, err } err = ordersDB.UpdateBucketBandwidthInline(ctx, projectID, []byte(bucketName), pb.PieceAction_GET, amount, intervalStart, ) if err != nil { return expectedSum, err } expectedSum += amount } return expectedSum, nil } func TestProjectBandwidthTotal(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { pdb := db.ProjectAccounting() projectID := testrand.UUID() // Setup: create bucket bandwidth rollup records expectedTotal, err := createBucketBandwidthRollupsForPast4Days(ctx, db, projectID) require.NoError(t, err) // Execute test: get project bandwidth total since := time.Now().AddDate(0, -1, 0) actualBandwidthTotal, err := pdb.GetAllocatedBandwidthTotal(ctx, projectID, since) require.NoError(t, err) require.Equal(t, expectedTotal, actualBandwidthTotal) }) } func setUpBucketBandwidthAllocations(ctx *testcontext.Context, projectID uuid.UUID, orderDB orders.DB, now time.Time) error { // Create many records that sum greater than project usage limit of 50GB for i := 0; i < 4; i++ { bucketName := fmt.Sprintf("%s%d", "testbucket", i) // In order to exceed the project limits, create bandwidth allocation records // that sum greater than the defaultMaxUsage amount := 15 * memory.GB.Int64() action := pb.PieceAction_GET err := orderDB.UpdateBucketBandwidthAllocation(ctx, projectID, []byte(bucketName), action, amount, now) if err != nil { return err } } return nil } func TestProjectUsageCustomLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satDB := planet.Satellites[0].DB acctDB := satDB.ProjectAccounting() projectsDB := satDB.Console().Projects() projects, err := projectsDB.GetAll(ctx) require.NoError(t, err) project := projects[0] // set custom usage limit for project expectedLimit := memory.Size(memory.GiB.Int64() * 10) err = acctDB.UpdateProjectUsageLimit(ctx, project.ID, expectedLimit) require.NoError(t, err) projectUsage := planet.Satellites[0].Accounting.ProjectUsage // Setup: add data to live accounting to exceed new limit err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit.Int64()) require.NoError(t, err) actualExceeded, limit, err := projectUsage.ExceedsStorageUsage(ctx, project.ID) require.NoError(t, err) require.True(t, actualExceeded) require.Equal(t, expectedLimit.Int64(), limit.Int64()) // Setup: create some bytes for the uplink to upload expectedData := testrand.Bytes(50 * memory.KiB) // Execute test: check that the uplink gets an error when they have exceeded storage limits and try to upload a file actualErr := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData) require.Error(t, actualErr) }) } func TestUsageRollups(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { const ( numBuckets = 5 tallyIntervals = 10 tallyInterval = time.Hour ) now := time.Now() start := now.Add(tallyInterval * -tallyIntervals) db := planet.Satellites[0].DB project1 := planet.Uplinks[0].Projects[0].ID project2 := planet.Uplinks[1].Projects[0].ID p1base := binary.BigEndian.Uint64(project1[:8]) >> 48 p2base := binary.BigEndian.Uint64(project2[:8]) >> 48 getValue := func(i, j int, base uint64) int64 { a := uint64((i+1)*(j+1)) ^ base a &^= (1 << 63) return int64(a) } actions := []pb.PieceAction{ pb.PieceAction_GET, pb.PieceAction_GET_AUDIT, pb.PieceAction_GET_REPAIR, } var buckets []string for i := 0; i < numBuckets; i++ { bucketName := fmt.Sprintf("bucket-%d", i) err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], bucketName) require.NoError(t, err) // project 1 for _, action := range actions { value := getValue(0, i, p1base) err := db.Orders().UpdateBucketBandwidthAllocation(ctx, project1, []byte(bucketName), action, value*6, now) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthSettle(ctx, project1, []byte(bucketName), action, value*3, now) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthInline(ctx, project1, []byte(bucketName), action, value, now) require.NoError(t, err) } err = planet.Uplinks[1].CreateBucket(ctx, planet.Satellites[0], bucketName) require.NoError(t, err) // project 2 for _, action := range actions { value := getValue(1, i, p2base) err := db.Orders().UpdateBucketBandwidthAllocation(ctx, project2, []byte(bucketName), action, value*6, now) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthSettle(ctx, project2, []byte(bucketName), action, value*3, now) require.NoError(t, err) err = db.Orders().UpdateBucketBandwidthInline(ctx, project2, []byte(bucketName), action, value, now) require.NoError(t, err) } buckets = append(buckets, bucketName) } for i := 0; i < tallyIntervals; i++ { interval := start.Add(tallyInterval * time.Duration(i)) bucketTallies := make(map[metabase.BucketLocation]*accounting.BucketTally) for j, bucket := range buckets { bucketLoc1 := metabase.BucketLocation{ ProjectID: project1, BucketName: bucket, } bucketLoc2 := metabase.BucketLocation{ ProjectID: project2, BucketName: bucket, } value1 := getValue(i, j, p1base) * 10 value2 := getValue(i, j, p2base) * 10 tally1 := &accounting.BucketTally{ BucketLocation: bucketLoc1, ObjectCount: value1, InlineSegments: value1, RemoteSegments: value1, InlineBytes: value1, RemoteBytes: value1, MetadataSize: value1, } tally2 := &accounting.BucketTally{ BucketLocation: bucketLoc2, ObjectCount: value2, InlineSegments: value2, RemoteSegments: value2, InlineBytes: value2, RemoteBytes: value2, MetadataSize: value2, } bucketTallies[bucketLoc1] = tally1 bucketTallies[bucketLoc2] = tally2 } err := db.ProjectAccounting().SaveTallies(ctx, interval, bucketTallies) require.NoError(t, err) } usageRollups := db.ProjectAccounting() t.Run("test project total", func(t *testing.T) { projTotal1, err := usageRollups.GetProjectTotal(ctx, project1, start, now) require.NoError(t, err) require.NotNil(t, projTotal1) projTotal2, err := usageRollups.GetProjectTotal(ctx, project2, start, now) require.NoError(t, err) require.NotNil(t, projTotal2) }) t.Run("test bucket usage rollups", func(t *testing.T) { rollups1, err := usageRollups.GetBucketUsageRollups(ctx, project1, start, now) require.NoError(t, err) require.NotNil(t, rollups1) rollups2, err := usageRollups.GetBucketUsageRollups(ctx, project2, start, now) require.NoError(t, err) require.NotNil(t, rollups2) }) t.Run("test bucket totals", func(t *testing.T) { cursor := accounting.BucketUsageCursor{ Limit: 20, Page: 1, } totals1, err := usageRollups.GetBucketTotals(ctx, project1, cursor, start, now) require.NoError(t, err) require.NotNil(t, totals1) totals2, err := usageRollups.GetBucketTotals(ctx, project2, cursor, start, now) require.NoError(t, err) require.NotNil(t, totals2) }) t.Run("Get paged", func(t *testing.T) { // sql injection test. F.E '%SomeText%' = > ''%SomeText%' OR 'x' != '%'' will be true bucketsPage, err := usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck%' OR 'x' != '", Page: 1}, start, now) require.NoError(t, err) require.NotNil(t, bucketsPage) assert.Equal(t, uint64(0), bucketsPage.TotalCount) assert.Equal(t, uint(0), bucketsPage.CurrentPage) assert.Equal(t, uint(0), bucketsPage.PageCount) assert.Equal(t, 0, len(bucketsPage.BucketUsages)) bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 3, Search: "", Page: 1}, start, now) require.NoError(t, err) require.NotNil(t, bucketsPage) assert.Equal(t, uint64(5), bucketsPage.TotalCount) assert.Equal(t, uint(1), bucketsPage.CurrentPage) assert.Equal(t, uint(2), bucketsPage.PageCount) assert.Equal(t, 3, len(bucketsPage.BucketUsages)) bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck", Page: 1}, start, now) require.NoError(t, err) require.NotNil(t, bucketsPage) assert.Equal(t, uint64(5), bucketsPage.TotalCount) assert.Equal(t, uint(1), bucketsPage.CurrentPage) assert.Equal(t, uint(1), bucketsPage.PageCount) assert.Equal(t, 5, len(bucketsPage.BucketUsages)) bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "bucket-0", Page: 1}, start, now) require.NoError(t, err) require.NotNil(t, bucketsPage) assert.Equal(t, uint64(1), bucketsPage.TotalCount) assert.Equal(t, uint(1), bucketsPage.CurrentPage) assert.Equal(t, uint(1), bucketsPage.PageCount) assert.Equal(t, 1, len(bucketsPage.BucketUsages)) bucketsPage, err = usageRollups.GetBucketTotals(ctx, project1, accounting.BucketUsageCursor{Limit: 5, Search: "buck\xff", Page: 1}, start, now) require.NoError(t, err) require.NotNil(t, bucketsPage) assert.Equal(t, uint64(0), bucketsPage.TotalCount) assert.Equal(t, uint(0), bucketsPage.CurrentPage) assert.Equal(t, uint(0), bucketsPage.PageCount) assert.Equal(t, 0, len(bucketsPage.BucketUsages)) }) }) } func TestProjectUsage_FreeUsedStorageSpace(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satDB := planet.Satellites[0].DB accounting := planet.Satellites[0].Accounting project := planet.Uplinks[0].Projects[0] accounting.Tally.Loop.Pause() // set custom usage limit for project customLimit := 100 * memory.KiB err := satDB.ProjectAccounting().UpdateProjectUsageLimit(ctx, project.ID, customLimit) require.NoError(t, err) data := testrand.Bytes(50 * memory.KiB) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "1", data) require.NoError(t, err) segments, err := planet.Satellites[0].Metainfo.Metabase.TestingAllSegments(ctx) require.NoError(t, err) usage, err := accounting.ProjectUsage.GetProjectStorageTotals(ctx, project.ID) require.NoError(t, err) require.EqualValues(t, segments[0].EncryptedSize, usage) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "2", data) require.NoError(t, err) // we used limit so we should get error err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "3", data) require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) // delete object to free some storage space err = planet.Uplinks[0].DeleteObject(ctx, planet.Satellites[0], "bucket", "2") require.NoError(t, err) // we need to wait for tally to update storage usage after delete accounting.Tally.Loop.TriggerWait() // try to upload object as we have free space now err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "4", data) require.NoError(t, err) // should fail because we once again used space up to limit err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket", "2", data) require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) }) } func TestProjectUsageBandwidthResetAfter3days(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, Reconfigure: testplanet.Reconfigure{ Satellite: func(log *zap.Logger, index int, config *satellite.Config) { config.Console.UsageLimits.Storage.Free = 1 * memory.MB config.Console.UsageLimits.Bandwidth.Free = 1 * memory.MB config.LiveAccounting.AsOfSystemInterval = -time.Millisecond }, }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { orderDB := planet.Satellites[0].DB.Orders() bucket := metabase.BucketLocation{ProjectID: planet.Uplinks[0].Projects[0].ID, BucketName: "testbucket"} projectUsage := planet.Satellites[0].Accounting.ProjectUsage now := time.Now() allocationTime := time.Date(now.Year(), now.Month(), 2, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location()) amount := 1 * memory.MB.Int64() err := orderDB.UpdateBucketBandwidthAllocation(ctx, bucket.ProjectID, []byte(bucket.BucketName), pb.PieceAction_GET, amount, allocationTime) require.NoError(t, err) beforeResetDay := allocationTime.Add(2 * time.Hour * 24) resetDay := allocationTime.Add(3 * time.Hour * 24) endOfMonth := time.Date(now.Year(), now.Month()+1, 0, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), now.Location()) for _, tt := range []struct { description string now time.Time expectedExceeds bool }{ {"allocation day", allocationTime, true}, {"day before reset", beforeResetDay, true}, {"reset day", resetDay, false}, {"end of month", endOfMonth, false}, } { projectUsage.SetNow(func() time.Time { return tt.now }) actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID) require.NoError(t, err) require.Equal(t, tt.expectedExceeds, actualExceeded, tt.description) } }) } func TestProjectUsage_ResetLimitsFirstDayOfNextMonth(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satDB := planet.Satellites[0].DB project := planet.Uplinks[0].Projects[0] planet.Satellites[0].Orders.Chore.Loop.Pause() // set custom usage limit for project customLimit := 100 * memory.KiB err := satDB.ProjectAccounting().UpdateProjectUsageLimit(ctx, project.ID, customLimit) require.NoError(t, err) err = satDB.ProjectAccounting().UpdateProjectBandwidthLimit(ctx, project.ID, customLimit) require.NoError(t, err) data := testrand.Bytes(customLimit) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path1", data) require.NoError(t, err) // verify that storage limit is all used err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path2", data) require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.NoError(t, err) require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx)) tomorrow := time.Now().Add(24 * time.Hour) for _, storageNode := range planet.StorageNodes { storageNode.Storage2.Orders.SendOrders(ctx, tomorrow) } planet.Satellites[0].Orders.Chore.Loop.TriggerWait() // verify that bandwidth limit is all used _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) now := time.Now() planet.Satellites[0].API.Accounting.ProjectUsage.SetNow(func() time.Time { return time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC) }) // verify that storage limit is all used even at the new billing cycle err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path3", data) require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) // verify that new billing cycle reset bandwidth limit _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.NoError(t, err) }) } func TestProjectUsage_BandwidthCache(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { project := planet.Uplinks[0].Projects[0] projectUsage := planet.Satellites[0].Accounting.ProjectUsage badwidthUsed := int64(42) err := projectUsage.UpdateProjectBandwidthUsage(ctx, project.ID, badwidthUsed) require.NoError(t, err) // verify cache key creation. fromCache, err := projectUsage.GetProjectBandwidthUsage(ctx, project.ID) require.NoError(t, err) require.Equal(t, badwidthUsed, fromCache) // verify cache key increment. increment := int64(10) err = projectUsage.UpdateProjectBandwidthUsage(ctx, project.ID, increment) require.NoError(t, err) fromCache, err = projectUsage.GetProjectBandwidthUsage(ctx, project.ID) require.NoError(t, err) require.Equal(t, badwidthUsed+increment, fromCache) }) } func TestProjectUsage_BandwidthDownloadLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { satDB := planet.Satellites[0].DB acctDB := satDB.ProjectAccounting() now := time.Now() project := planet.Uplinks[0].Projects[0] // set custom bandwidth limit for project 512 Kb bandwidthLimit := 500 * memory.KiB err := acctDB.UpdateProjectBandwidthLimit(ctx, project.ID, bandwidthLimit) require.NoError(t, err) dataSize := 100 * memory.KiB data := testrand.Bytes(dataSize) err = planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path1", data) require.NoError(t, err) // Let's calculate the maximum number of iterations // Bandwidth limit: 512 Kb // Pointer Segment size: 103.936 Kb // (5 x 103.936) = 519.68 // We'll be able to download 5X before reach the limit. for i := 0; i < 5; i++ { _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.NoError(t, err) } // send orders so we get the egress settled require.NoError(t, planet.WaitForStorageNodeEndpoints(ctx)) tomorrow := time.Now().Add(24 * time.Hour) for _, storageNode := range planet.StorageNodes { storageNode.Storage2.Orders.SendOrders(ctx, tomorrow) } planet.Satellites[0].Orders.Chore.Loop.TriggerWait() // An extra download should return 'Exceeded Usage Limit' error _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.Error(t, err) require.True(t, errors.Is(err, uplink.ErrBandwidthLimitExceeded)) planet.Satellites[0].Orders.Chore.Loop.TriggerWait() // Simulate new billing cycle (next month) planet.Satellites[0].API.Accounting.ProjectUsage.SetNow(func() time.Time { return time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC) }) // Should not return an error since it's a new month _, err = planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path1") require.NoError(t, err) }) }