satellite/{accounting, buckets}: added feature that allows to use custom SQL query instead of object loop for buckets accounting

since amount of objects is growing and looping through all of them
starts taking lot of time, we are switching for SQL query to do it
in chunks of tallies per bucket. 2nd part of issue fix.

Closes https://github.com/storj/team-metainfo/issues/125

Change-Id: Ia26bcac0a7e2c6503df9ebbf4817a636841d3284
This commit is contained in:
Qweder93 2022-10-05 13:53:02 +03:00 committed by Storj Robot
parent 9a09d8920e
commit 5e5d6ecf6c
7 changed files with 225 additions and 13 deletions

View File

@ -14,6 +14,7 @@ import (
"storj.io/common/sync2"
"storj.io/common/uuid"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/metabase"
)
@ -28,6 +29,7 @@ type Config struct {
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
UseObjectsLoop bool `help:"flag to switch between calculating bucket tallies using objects loop or custom query" default:"true"`
ListLimit int `help:"how many objects to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
@ -42,6 +44,7 @@ type Service struct {
Loop *sync2.Cycle
metabase *metabase.DB
bucketsDB buckets.DB
liveAccounting accounting.Cache
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
@ -49,13 +52,14 @@ type Service struct {
}
// New creates a new tally Service.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metabase *metabase.DB, config Config) *Service {
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metabase *metabase.DB, bucketsDB buckets.DB, config Config) *Service {
return &Service{
log: log,
config: config,
Loop: sync2.NewCycle(config.Interval),
metabase: metabase,
bucketsDB: bucketsDB,
liveAccounting: liveAccounting,
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
@ -189,7 +193,7 @@ func (service *Service) Tally(ctx context.Context) (err error) {
}
// add up all buckets
collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.config)
collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.bucketsDB, service.config)
err = collector.Run(ctx)
if err != nil {
return Error.Wrap(err)
@ -243,20 +247,22 @@ type BucketTallyCollector struct {
Log *zap.Logger
Bucket map[metabase.BucketLocation]*accounting.BucketTally
metabase *metabase.DB
config Config
metabase *metabase.DB
bucketsDB buckets.DB
config Config
}
// NewBucketTallyCollector returns an collector that adds up totals for buckets.
// NewBucketTallyCollector returns a collector that adds up totals for buckets.
// The now argument controls when the collector considers objects to be expired.
func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, config Config) *BucketTallyCollector {
func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, bucketsDB buckets.DB, config Config) *BucketTallyCollector {
return &BucketTallyCollector{
Now: now,
Log: log,
Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally),
metabase: db,
config: config,
metabase: db,
bucketsDB: bucketsDB,
config: config,
}
}
@ -269,6 +275,10 @@ func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) {
return err
}
if !observer.config.UseObjectsLoop {
return observer.fillBucketTallies(ctx, startingTime)
}
return observer.metabase.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
BatchSize: observer.config.ListLimit,
AsOfSystemTime: startingTime,
@ -285,6 +295,55 @@ func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) {
})
}
// fillBucketTallies collects all bucket tallies and fills observer's buckets map with results.
func (observer *BucketTallyCollector) fillBucketTallies(ctx context.Context, startingTime time.Time) error {
var lastBucketLocation metabase.BucketLocation
var bucketLocationsSize int
for {
err := observer.bucketsDB.IterateBucketLocations(ctx, lastBucketLocation.ProjectID, lastBucketLocation.BucketName, observer.config.ListLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
if len(bucketLocations) < 1 {
return nil
}
tallies, err := observer.metabase.CollectBucketTallies(ctx, metabase.CollectBucketTallies{
From: bucketLocations[0],
To: bucketLocations[len(bucketLocations)-1],
AsOfSystemTime: startingTime,
AsOfSystemInterval: observer.config.AsOfSystemInterval,
})
if err != nil {
return err
}
for _, tally := range tallies {
bucket := observer.ensureBucket(metabase.ObjectLocation{
ProjectID: tally.ProjectID,
BucketName: tally.BucketName,
})
bucket.TotalSegments = tally.TotalSegments
bucket.TotalBytes = tally.TotalBytes
bucket.MetadataSize = tally.MetadataSize
bucket.ObjectCount = tally.ObjectCount
}
bucketLocationsSize = len(bucketLocations)
lastBucketLocation = bucketLocations[len(bucketLocations)-1]
return nil
})
if err != nil {
return err
}
if bucketLocationsSize < observer.config.ListLimit {
break
}
}
return nil
}
// ensureBucket returns bucket corresponding to the passed in path.
func (observer *BucketTallyCollector) ensureBucket(location metabase.ObjectLocation) *accounting.BucketTally {
bucketLocation := location.Bucket()

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
@ -17,6 +18,7 @@ import (
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/tally"
"storj.io/storj/satellite/metabase"
@ -96,7 +98,7 @@ func TestOnlyInline(t *testing.T) {
// run multiple times to ensure we add tallies
for i := 0; i < 2; i++ {
collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].Config.Tally)
collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
err := collector.Run(ctx)
require.NoError(t, err)
@ -171,7 +173,7 @@ func TestCalculateBucketAtRestData(t *testing.T) {
}
require.Len(t, expectedTotal, 3)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metabase.DB, planet.Satellites[0].Config.Tally)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
err = collector.Run(ctx)
require.NoError(t, err)
require.Equal(t, expectedTotal, collector.Bucket)
@ -188,7 +190,7 @@ func TestIgnoresExpiredPointers(t *testing.T) {
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour))
require.NoError(t, err)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metabase.DB, planet.Satellites[0].Config.Tally)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metabase.DB, planet.Satellites[0].DB.Buckets(), planet.Satellites[0].Config.Tally)
err = collector.Run(ctx)
require.NoError(t, err)
@ -197,7 +199,7 @@ func TestIgnoresExpiredPointers(t *testing.T) {
})
}
func TestLiveAccounting(t *testing.T) {
func TestLiveAccountingWithObjectsLoop(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
@ -242,6 +244,53 @@ func TestLiveAccounting(t *testing.T) {
})
}
func TestLiveAccountingWithCustomSQLQuery(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Tally.UseObjectsLoop = false
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
tally := planet.Satellites[0].Accounting.Tally
projectID := planet.Uplinks[0].Projects[0].ID
tally.Loop.Pause()
expectedData := testrand.Bytes(19 * memory.KiB)
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
require.NoError(t, err)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
segmentSize := int64(segments[0].EncryptedSize)
tally.Loop.TriggerWait()
expectedSize := segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
for i := 0; i < 3; i++ {
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", fmt.Sprintf("test/path/%d", i), expectedData)
require.NoError(t, err)
tally.Loop.TriggerWait()
expectedSize += segmentSize
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
require.NoError(t, err)
require.Equal(t, expectedSize, total)
}
})
}
func TestEmptyProjectUpdatesLiveAccounting(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 2,

View File

@ -43,4 +43,6 @@ type DB interface {
ListBuckets(ctx context.Context, projectID uuid.UUID, listOpts storj.BucketListOptions, allowedBuckets macaroon.AllowedBuckets) (bucketList storj.BucketList, err error)
// CountBuckets returns the number of buckets a project currently has
CountBuckets(ctx context.Context, projectID uuid.UUID) (int, error)
// IterateBucketLocations iterates through all buckets from some point with limit.
IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error)
}

View File

@ -4,6 +4,8 @@
package buckets_test
import (
"sort"
"strconv"
"testing"
"github.com/stretchr/testify/require"
@ -15,6 +17,7 @@ import (
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/metabase"
)
func newTestBucket(name string, projectID uuid.UUID) storj.Bucket {
@ -235,3 +238,60 @@ func TestListBucketsNotAllowed(t *testing.T) {
}
})
}
func TestBatchBuckets(t *testing.T) {
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sat := planet.Satellites[0]
db := sat.DB
consoleDB := db.Console()
var testBucketNames = []string{"aaa", "bbb", "mmm", "qqq", "zzz",
"test.bucket", "123", "0test", "999", "test-bucket.thing",
}
bucketsService := sat.API.Buckets.Service
var expectedBucketLocations []metabase.BucketLocation
for i := 1; i < 4; i++ {
project, err := consoleDB.Projects().Insert(ctx, &console.Project{Name: "testproject" + strconv.Itoa(i)})
require.NoError(t, err)
for _, bucket := range testBucketNames {
testBucket := newTestBucket(bucket, project.ID)
_, err := bucketsService.CreateBucket(ctx, testBucket)
require.NoError(t, err)
expectedBucketLocations = append(expectedBucketLocations, metabase.BucketLocation{
ProjectID: project.ID,
BucketName: bucket,
})
}
}
sortBucketLocations(expectedBucketLocations)
testLimits := []int{1, 3, 30, 1000}
for _, testLimit := range testLimits {
err := db.Buckets().IterateBucketLocations(ctx, uuid.UUID{}, "", testLimit, func(bucketLocations []metabase.BucketLocation) (err error) {
if testLimit > len(expectedBucketLocations) {
testLimit = len(expectedBucketLocations)
}
expectedResult := expectedBucketLocations[:testLimit]
require.Equal(t, expectedResult, bucketLocations)
return nil
})
require.NoError(t, err)
}
})
}
func sortBucketLocations(locations []metabase.BucketLocation) {
sort.Slice(locations, func(i, j int) bool {
if locations[i].ProjectID == locations[j].ProjectID {
return locations[i].BucketName < locations[j].BucketName
}
return locations[i].ProjectID.Less(locations[j].ProjectID)
})
}

View File

@ -446,7 +446,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
{ // setup accounting
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally)
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, peer.DB.Buckets(), config.Tally)
peer.Services.Add(lifecycle.Item{
Name: "accounting:tally",
Run: peer.Accounting.Tally.Run,

View File

@ -8,6 +8,8 @@ import (
"database/sql"
"errors"
"github.com/zeebo/errs"
"storj.io/common/macaroon"
"storj.io/common/storj"
"storj.io/common/uuid"
@ -321,3 +323,40 @@ func convertDBXtoBucket(dbxBucket *dbx.BucketMetainfo) (bucket storj.Bucket, err
return bucket, nil
}
// IterateBucketLocations iterates through all buckets from some point with limit.
func (db *bucketsDB) IterateBucketLocations(ctx context.Context, projectID uuid.UUID, bucketName string, limit int, fn func([]metabase.BucketLocation) error) (err error) {
defer mon.Task()(&ctx)(&err)
var result []metabase.BucketLocation
rows, err := db.db.QueryContext(ctx, `
SELECT project_id, name
FROM bucket_metainfos
WHERE (project_id, name) > ($1, $2)
GROUP BY (project_id, name)
ORDER BY (project_id, name) ASC LIMIT $3
`, projectID, bucketName, limit)
if err != nil {
return storj.ErrBucket.New("BatchBuckets query error: %s", err)
}
defer func() {
err = errs.Combine(err, Error.Wrap(rows.Close()))
}()
for rows.Next() {
var bucketLocation metabase.BucketLocation
if err = rows.Scan(&bucketLocation.ProjectID, &bucketLocation.BucketName); err != nil {
return storj.ErrBucket.New("bucket location scan error: %s", err)
}
result = append(result, bucketLocation)
}
if err = rows.Err(); err != nil {
return storj.ErrBucket.Wrap(err)
}
return Error.Wrap(fn(result))
}

View File

@ -970,6 +970,9 @@ server.private-address: 127.0.0.1:7778
# how large of batches SaveRollup should process at a time
# tally.save-rollup-batch-size: 1000
# flag to switch between calculating bucket tallies using objects loop or custom query
# tally.use-objects-loop: true
# address for jaeger agent
# tracing.agent-addr: agent.tracing.datasci.storj.io:5775