satellite/accounting/tally: save tallies in a batches
Because we are saving all tallies as a single SQL statement we finally reached maximum message size. With this change we will call SaveTallies multiple times in batches. https://github.com/storj/storj/issues/5977 Change-Id: I0c7dd27779b1743ede66448fb891e65c361aa3b0
This commit is contained in:
parent
355ea2133b
commit
2b2bca8e81
@ -6,6 +6,7 @@ storj.io/storj/satellite/accounting."bucket_segments" IntVal
|
|||||||
storj.io/storj/satellite/accounting."total_bytes" IntVal
|
storj.io/storj/satellite/accounting."total_bytes" IntVal
|
||||||
storj.io/storj/satellite/accounting."total_objects" IntVal
|
storj.io/storj/satellite/accounting."total_objects" IntVal
|
||||||
storj.io/storj/satellite/accounting."total_segments" IntVal
|
storj.io/storj/satellite/accounting."total_segments" IntVal
|
||||||
|
storj.io/storj/satellite/accounting/tally."bucket_tally_error" Event
|
||||||
storj.io/storj/satellite/accounting/tally."nodetallies.totalsum" IntVal
|
storj.io/storj/satellite/accounting/tally."nodetallies.totalsum" IntVal
|
||||||
storj.io/storj/satellite/audit."audit_contained_nodes" IntVal
|
storj.io/storj/satellite/audit."audit_contained_nodes" IntVal
|
||||||
storj.io/storj/satellite/audit."audit_contained_nodes_global" Meter
|
storj.io/storj/satellite/audit."audit_contained_nodes_global" Meter
|
||||||
|
@ -26,10 +26,11 @@ var (
|
|||||||
|
|
||||||
// Config contains configurable values for the tally service.
|
// Config contains configurable values for the tally service.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
|
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
|
||||||
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
|
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
|
||||||
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
|
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
|
||||||
UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"`
|
UseRangedLoop bool `help:"whether to enable node tally with ranged loop" default:"true"`
|
||||||
|
SaveTalliesBatchSize int `help:"how large should be insert into tallies" default:"10000"`
|
||||||
|
|
||||||
ListLimit int `help:"how many buckets to query in a batch" default:"2500"`
|
ListLimit int `help:"how many buckets to query in a batch" default:"2500"`
|
||||||
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
|
||||||
@ -75,6 +76,8 @@ func (service *Service) Run(ctx context.Context) (err error) {
|
|||||||
err := service.Tally(ctx)
|
err := service.Tally(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
service.log.Error("tally failed", zap.Error(err))
|
service.log.Error("tally failed", zap.Error(err))
|
||||||
|
|
||||||
|
mon.Event("bucket_tally_error") //mon:locked
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -198,45 +201,65 @@ func (service *Service) Tally(ctx context.Context) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return Error.Wrap(err)
|
return Error.Wrap(err)
|
||||||
}
|
}
|
||||||
finishTime := service.nowFn()
|
|
||||||
|
if len(collector.Bucket) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// save the new results
|
// save the new results
|
||||||
var errAtRest error
|
var errAtRest errs.Group
|
||||||
if len(collector.Bucket) > 0 {
|
|
||||||
// record bucket tallies to DB
|
|
||||||
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, collector.Bucket)
|
|
||||||
if err != nil {
|
|
||||||
errAtRest = Error.New("ProjectAccounting.SaveTallies failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket))
|
// record bucket tallies to DB
|
||||||
|
// TODO we should be able replace map with just slice
|
||||||
|
intervalStart := service.nowFn()
|
||||||
|
buffer := map[metabase.BucketLocation]*accounting.BucketTally{}
|
||||||
|
for location, tally := range collector.Bucket {
|
||||||
|
buffer[location] = tally
|
||||||
|
|
||||||
|
if len(buffer) >= service.config.SaveTalliesBatchSize {
|
||||||
|
// don't stop on error, we would like to store as much as possible
|
||||||
|
errAtRest.Add(service.flushTallies(ctx, intervalStart, buffer))
|
||||||
|
|
||||||
|
for key := range buffer {
|
||||||
|
delete(buffer, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(collector.Bucket) > 0 {
|
errAtRest.Add(service.flushTallies(ctx, intervalStart, buffer))
|
||||||
var total accounting.BucketTally
|
|
||||||
// TODO for now we don't have access to inline/remote stats per bucket
|
|
||||||
// but that may change in the future. To get back those stats we would
|
|
||||||
// most probably need to add inline/remote information to object in
|
|
||||||
// metabase. We didn't decide yet if that is really needed right now.
|
|
||||||
for _, bucket := range collector.Bucket {
|
|
||||||
monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked
|
|
||||||
monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked
|
|
||||||
// monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked
|
|
||||||
// monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked
|
|
||||||
|
|
||||||
monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked
|
updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket))
|
||||||
// monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked
|
|
||||||
// monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked
|
var total accounting.BucketTally
|
||||||
total.Combine(bucket)
|
// TODO for now we don't have access to inline/remote stats per bucket
|
||||||
}
|
// but that may change in the future. To get back those stats we would
|
||||||
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
|
// most probably need to add inline/remote information to object in
|
||||||
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
|
// metabase. We didn't decide yet if that is really needed right now.
|
||||||
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
|
for _, bucket := range collector.Bucket {
|
||||||
monAccounting.IntVal("total_pending_objects").Observe(total.PendingObjectCount)
|
monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked
|
||||||
|
monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked
|
||||||
|
// monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked
|
||||||
|
// monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked
|
||||||
|
|
||||||
|
monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked
|
||||||
|
// monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked
|
||||||
|
// monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked
|
||||||
|
total.Combine(bucket)
|
||||||
}
|
}
|
||||||
|
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
|
||||||
|
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
|
||||||
|
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
|
||||||
|
monAccounting.IntVal("total_pending_objects").Observe(total.PendingObjectCount)
|
||||||
|
|
||||||
// return errors if something went wrong.
|
return errAtRest.Err()
|
||||||
return errAtRest
|
}
|
||||||
|
|
||||||
|
func (service *Service) flushTallies(ctx context.Context, intervalStart time.Time, tallies map[metabase.BucketLocation]*accounting.BucketTally) error {
|
||||||
|
err := service.projectAccountingDB.SaveTallies(ctx, intervalStart, tallies)
|
||||||
|
if err != nil {
|
||||||
|
return Error.New("ProjectAccounting.SaveTallies failed: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketTallyCollector collects and adds up tallies for buckets.
|
// BucketTallyCollector collects and adds up tallies for buckets.
|
||||||
|
@ -346,7 +346,7 @@ func TestTallyOnCopiedObject(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
findTally := func(bucket string, tallies []accounting.BucketTally) accounting.BucketTally {
|
findTally := func(t *testing.T, bucket string, tallies []accounting.BucketTally) accounting.BucketTally {
|
||||||
for _, v := range tallies {
|
for _, v := range tallies {
|
||||||
if v.BucketName == bucket {
|
if v.BucketName == bucket {
|
||||||
return v
|
return v
|
||||||
@ -378,7 +378,7 @@ func TestTallyOnCopiedObject(t *testing.T) {
|
|||||||
|
|
||||||
tallies, err := planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx)
|
tallies, err := planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
lastTally := findTally(tc.name, tallies)
|
lastTally := findTally(t, tc.name, tallies)
|
||||||
require.Equal(t, tc.name, lastTally.BucketName)
|
require.Equal(t, tc.name, lastTally.BucketName)
|
||||||
require.Equal(t, tc.expectedTallyAfterCopy.ObjectCount, lastTally.ObjectCount)
|
require.Equal(t, tc.expectedTallyAfterCopy.ObjectCount, lastTally.ObjectCount)
|
||||||
require.Equal(t, tc.expectedTallyAfterCopy.TotalBytes, lastTally.TotalBytes)
|
require.Equal(t, tc.expectedTallyAfterCopy.TotalBytes, lastTally.TotalBytes)
|
||||||
@ -392,7 +392,7 @@ func TestTallyOnCopiedObject(t *testing.T) {
|
|||||||
|
|
||||||
tallies, err = planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx)
|
tallies, err = planet.Satellites[0].DB.ProjectAccounting().GetTallies(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
lastTally = findTally(tc.name, tallies)
|
lastTally = findTally(t, tc.name, tallies)
|
||||||
require.Equal(t, tc.name, lastTally.BucketName)
|
require.Equal(t, tc.name, lastTally.BucketName)
|
||||||
require.Equal(t, tc.expectedTallyAfterDelete.ObjectCount, lastTally.ObjectCount)
|
require.Equal(t, tc.expectedTallyAfterDelete.ObjectCount, lastTally.ObjectCount)
|
||||||
require.Equal(t, tc.expectedTallyAfterDelete.TotalBytes, lastTally.TotalBytes)
|
require.Equal(t, tc.expectedTallyAfterDelete.TotalBytes, lastTally.TotalBytes)
|
||||||
@ -402,7 +402,7 @@ func TestTallyOnCopiedObject(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTallyBatchSize(t *testing.T) {
|
func TestBucketTallyCollectorListLimit(t *testing.T) {
|
||||||
testplanet.Run(t, testplanet.Config{
|
testplanet.Run(t, testplanet.Config{
|
||||||
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||||
Reconfigure: testplanet.Reconfigure{
|
Reconfigure: testplanet.Reconfigure{
|
||||||
@ -454,3 +454,58 @@ func TestTallyBatchSize(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTallySaveTalliesBatchSize(t *testing.T) {
|
||||||
|
testplanet.Run(t, testplanet.Config{
|
||||||
|
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
|
||||||
|
Reconfigure: testplanet.Reconfigure{
|
||||||
|
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
|
||||||
|
config.Metainfo.ProjectLimits.MaxBuckets = 23
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||||
|
planet.Satellites[0].Accounting.Tally.Loop.Pause()
|
||||||
|
|
||||||
|
projectID := planet.Uplinks[0].Projects[0].ID
|
||||||
|
|
||||||
|
numberOfBuckets := 23
|
||||||
|
expectedBucketLocations := []metabase.BucketLocation{}
|
||||||
|
for i := 0; i < numberOfBuckets; i++ {
|
||||||
|
data := testrand.Bytes(1*memory.KiB + memory.Size(i))
|
||||||
|
err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], "bucket"+strconv.Itoa(i), "test", data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expectedBucketLocations = append(expectedBucketLocations, metabase.BucketLocation{
|
||||||
|
ProjectID: projectID,
|
||||||
|
BucketName: "bucket" + strconv.Itoa(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
satellite := planet.Satellites[0]
|
||||||
|
for _, batchSize := range []int{1, 2, 3, numberOfBuckets, 29, planet.Satellites[0].Config.Tally.SaveTalliesBatchSize} {
|
||||||
|
config := satellite.Config.Tally
|
||||||
|
config.SaveTalliesBatchSize = batchSize
|
||||||
|
|
||||||
|
tally := tally.New(zaptest.NewLogger(t), satellite.DB.StoragenodeAccounting(), satellite.DB.ProjectAccounting(),
|
||||||
|
satellite.LiveAccounting.Cache, satellite.Metabase.DB, satellite.DB.Buckets(), config)
|
||||||
|
|
||||||
|
// collect and store tallies in DB
|
||||||
|
err := tally.Tally(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify we have in DB expected list of tallies
|
||||||
|
tallies, err := satellite.DB.ProjectAccounting().GetTallies(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = satellite.DB.Testing().RawDB().ExecContext(ctx, "DELETE FROM bucket_storage_tallies")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
bucketLocations := []metabase.BucketLocation{}
|
||||||
|
for _, tally := range tallies {
|
||||||
|
bucketLocations = append(bucketLocations, tally.BucketLocation)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.ElementsMatch(t, expectedBucketLocations, bucketLocations)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -822,14 +822,17 @@ func TestCollectBucketTallies(t *testing.T) {
|
|||||||
t.Run("invalid bucket name", func(t *testing.T) {
|
t.Run("invalid bucket name", func(t *testing.T) {
|
||||||
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
defer metabasetest.DeleteAll{}.Check(ctx, t, db)
|
||||||
|
|
||||||
|
projectA := uuid.UUID{1}
|
||||||
|
projectB := uuid.UUID{2}
|
||||||
|
|
||||||
metabasetest.CollectBucketTallies{
|
metabasetest.CollectBucketTallies{
|
||||||
Opts: metabase.CollectBucketTallies{
|
Opts: metabase.CollectBucketTallies{
|
||||||
From: metabase.BucketLocation{
|
From: metabase.BucketLocation{
|
||||||
ProjectID: testrand.UUID(),
|
ProjectID: projectA,
|
||||||
BucketName: "a\\",
|
BucketName: "a\\",
|
||||||
},
|
},
|
||||||
To: metabase.BucketLocation{
|
To: metabase.BucketLocation{
|
||||||
ProjectID: testrand.UUID(),
|
ProjectID: projectB,
|
||||||
BucketName: "b\\",
|
BucketName: "b\\",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
3
scripts/testdata/satellite-config.yaml.lock
vendored
3
scripts/testdata/satellite-config.yaml.lock
vendored
@ -1099,6 +1099,9 @@ server.private-address: 127.0.0.1:7778
|
|||||||
# how large of batches SaveRollup should process at a time
|
# how large of batches SaveRollup should process at a time
|
||||||
# tally.save-rollup-batch-size: 1000
|
# tally.save-rollup-batch-size: 1000
|
||||||
|
|
||||||
|
# how large should be insert into tallies
|
||||||
|
# tally.save-tallies-batch-size: 10000
|
||||||
|
|
||||||
# whether to enable node tally with ranged loop
|
# whether to enable node tally with ranged loop
|
||||||
# tally.use-ranged-loop: true
|
# tally.use-ranged-loop: true
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user