satellite/accounting/tally: remove objects loop
This change removes the use of the objects loop for calculating bucket tallies. It has been superseded by a custom query. Change-Id: I9ea4633006c9af3ea14d7de40871639e7b687c22
This commit is contained in:
parent
eabd9dd994
commit
d8f64326f5
@ -11,9 +11,7 @@ import (
|
|||||||
type BucketTally struct {
|
type BucketTally struct {
|
||||||
metabase.BucketLocation
|
metabase.BucketLocation
|
||||||
|
|
||||||
ObjectCount int64
|
ObjectCount int64
|
||||||
PendingObjectCount int64
|
|
||||||
|
|
||||||
TotalSegments int64
|
TotalSegments int64
|
||||||
TotalBytes int64
|
TotalBytes int64
|
||||||
|
|
||||||
@ -23,10 +21,7 @@ type BucketTally struct {
|
|||||||
// Combine aggregates all the tallies.
|
// Combine aggregates all the tallies.
|
||||||
func (s *BucketTally) Combine(o *BucketTally) {
|
func (s *BucketTally) Combine(o *BucketTally) {
|
||||||
s.ObjectCount += o.ObjectCount
|
s.ObjectCount += o.ObjectCount
|
||||||
s.PendingObjectCount += o.PendingObjectCount
|
|
||||||
|
|
||||||
s.TotalSegments += o.TotalSegments
|
s.TotalSegments += o.TotalSegments
|
||||||
|
|
||||||
s.TotalBytes += o.TotalBytes
|
s.TotalBytes += o.TotalBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,10 +29,9 @@ type Config struct {
|
|||||||
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
|
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
|
||||||
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
|
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
|
||||||
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
|
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
|
||||||
UseObjectsLoop bool `help:"flag to switch between calculating bucket tallies using objects loop or custom query" default:"false"`
|
|
||||||
UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"`
|
UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"`
|
||||||
|
|
||||||
ListLimit int `help:"how many objects to query in a batch" default:"2500"`
|
ListLimit int `help:"how many buckets to query in a batch" default:"2500"`
|
||||||
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,15 +232,12 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
|||||||
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
|
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
|
||||||
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
|
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
|
||||||
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
|
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
|
||||||
monAccounting.IntVal("total_pending_objects").Observe(total.PendingObjectCount)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return errors if something went wrong.
|
// return errors if something went wrong.
|
||||||
return errAtRest
|
return errAtRest
|
||||||
}
|
}
|
||||||
|
|
||||||
var objectFunc = mon.Task()
|
|
||||||
|
|
||||||
// BucketTallyCollector collects and adds up tallies for buckets.
|
// BucketTallyCollector collects and adds up tallies for buckets.
|
||||||
type BucketTallyCollector struct {
|
type BucketTallyCollector struct {
|
||||||
Now time.Time
|
Now time.Time
|
||||||
@ -276,24 +272,7 @@ func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !observer.config.UseObjectsLoop {
|
return observer.fillBucketTallies(ctx, startingTime)
|
||||||
return observer.fillBucketTallies(ctx, startingTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
return observer.metabase.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
|
|
||||||
BatchSize: observer.config.ListLimit,
|
|
||||||
AsOfSystemTime: startingTime,
|
|
||||||
AsOfSystemInterval: observer.config.AsOfSystemInterval,
|
|
||||||
}, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) {
|
|
||||||
var entry metabase.LoopObjectEntry
|
|
||||||
for it.Next(ctx, &entry) {
|
|
||||||
err = observer.object(ctx, entry)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fillBucketTallies collects all bucket tallies and fills observer's buckets map with results.
|
// fillBucketTallies collects all bucket tallies and fills observer's buckets map with results.
|
||||||
@ -352,26 +331,6 @@ func (observer *BucketTallyCollector) ensureBucket(location metabase.ObjectLocat
|
|||||||
return bucket
|
return bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object is called for each object once.
|
|
||||||
func (observer *BucketTallyCollector) object(ctx context.Context, object metabase.LoopObjectEntry) error {
|
|
||||||
defer objectFunc(&ctx)(nil)
|
|
||||||
|
|
||||||
if object.Expired(observer.Now) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
bucket := observer.ensureBucket(object.ObjectStream.Location())
|
|
||||||
bucket.TotalSegments += int64(object.SegmentCount)
|
|
||||||
bucket.TotalBytes += object.TotalEncryptedSize
|
|
||||||
bucket.MetadataSize += int64(object.EncryptedMetadataSize)
|
|
||||||
bucket.ObjectCount++
|
|
||||||
if object.Status == metabase.Pending {
|
|
||||||
bucket.PendingObjectCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]accounting.Usage {
|
func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]accounting.Usage {
|
||||||
projectTallyTotals := make(map[uuid.UUID]accounting.Usage)
|
projectTallyTotals := make(map[uuid.UUID]accounting.Usage)
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
|
@ -199,59 +199,9 @@ func TestIgnoresExpiredPointers(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLiveAccountingWithObjectsLoop(t *testing.T) {
|
|
||||||
testplanet.Run(t, testplanet.Config{
|
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: testplanet.MaxSegmentSize(20 * memory.KiB),
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
|
||||||
tally := planet.Satellites[0].Accounting.Tally
|
|
||||||
projectID := planet.Uplinks[0].Projects[0].ID
|
|
||||||
tally.Loop.Pause()
|
|
||||||
|
|
||||||
expectedData := testrand.Bytes(19 * memory.KiB)
|
|
||||||
|
|
||||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", "test/path", expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, segments, 1)
|
|
||||||
|
|
||||||
segmentSize := int64(segments[0].EncryptedSize)
|
|
||||||
|
|
||||||
tally.Loop.TriggerWait()
|
|
||||||
|
|
||||||
expectedSize := segmentSize
|
|
||||||
|
|
||||||
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, expectedSize, total)
|
|
||||||
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "testbucket", fmt.Sprintf("test/path/%d", i), expectedData)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
tally.Loop.TriggerWait()
|
|
||||||
|
|
||||||
expectedSize += segmentSize
|
|
||||||
|
|
||||||
total, err := planet.Satellites[0].Accounting.ProjectUsage.GetProjectStorageTotals(ctx, projectID)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, expectedSize, total)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLiveAccountingWithCustomSQLQuery(t *testing.T) {
|
func TestLiveAccountingWithCustomSQLQuery(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
|
||||||
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
|
||||||
config.Tally.UseObjectsLoop = false
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
tally := planet.Satellites[0].Accounting.Tally
|
tally := planet.Satellites[0].Accounting.Tally
|
||||||
projectID := planet.Uplinks[0].Projects[0].ID
|
projectID := planet.Uplinks[0].Projects[0].ID
|
||||||
@ -450,7 +400,6 @@ func TestTallyBatchSize(t *testing.T) {
|
|||||||
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} {
|
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 14, planet.Satellites[0].Config.Tally.ListLimit} {
|
||||||
collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{
|
collector := tally.NewBucketTallyCollector(zaptest.NewLogger(t), time.Now(), planet.Satellites[0].Metabase.DB, planet.Satellites[0].DB.Buckets(), tally.Config{
|
||||||
Interval: 1 * time.Hour,
|
Interval: 1 * time.Hour,
|
||||||
UseObjectsLoop: false,
|
|
||||||
ListLimit: batchSize,
|
ListLimit: batchSize,
|
||||||
AsOfSystemInterval: 1 * time.Microsecond,
|
AsOfSystemInterval: 1 * time.Microsecond,
|
||||||
})
|
})
|
||||||
|
5
scripts/testdata/satellite-config.yaml.lock
vendored
5
scripts/testdata/satellite-config.yaml.lock
vendored
@ -1099,7 +1099,7 @@ server.private-address: 127.0.0.1:7778
|
|||||||
# how frequently the tally service should run
|
# how frequently the tally service should run
|
||||||
# tally.interval: 1h0m0s
|
# tally.interval: 1h0m0s
|
||||||
|
|
||||||
# how many objects to query in a batch
|
# how many buckets to query in a batch
|
||||||
# tally.list-limit: 2500
|
# tally.list-limit: 2500
|
||||||
|
|
||||||
# how large of batches GetBandwidthSince should process at a time
|
# how large of batches GetBandwidthSince should process at a time
|
||||||
@ -1108,9 +1108,6 @@ server.private-address: 127.0.0.1:7778
|
|||||||
# how large of batches SaveRollup should process at a time
|
# how large of batches SaveRollup should process at a time
|
||||||
# tally.save-rollup-batch-size: 1000
|
# tally.save-rollup-batch-size: 1000
|
||||||
|
|
||||||
# flag to switch between calculating bucket tallies using objects loop or custom query
|
|
||||||
# tally.use-objects-loop: false
|
|
||||||
|
|
||||||
# whether to enable node tally with ranged loop
|
# whether to enable node tally with ranged loop
|
||||||
# tally.use-ranged-loop: true
|
# tally.use-ranged-loop: true
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user