From 27a714e8b01cbce5817fbaa13df863f7ae7de4be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Thu, 1 Jul 2021 13:29:25 +0200 Subject: [PATCH] satellite/accounting/tally: use objects iterator instead metaloop Bucket tally calculation will be removed from metaloop and will use metabase objects iterator directly. At the moment only bucket tally needs objects so it make no sense to implement separate objects loop. Change-Id: Iee60059fc8b9a1bf64d01cafe9659b69b0e27eb1 --- cmd/satellite/reports/attribution.go | 9 +- monkit.lock | 8 - satellite/accounting/bucketstats.go | 23 +-- satellite/accounting/bucketusage.go | 13 +- satellite/accounting/db_test.go | 10 +- satellite/accounting/projectusage_test.go | 12 +- satellite/accounting/tally/tally.go | 152 +++++++++--------- satellite/accounting/tally/tally_test.go | 41 +++-- satellite/admin/project_test.go | 128 +++++++-------- satellite/attribution/db.go | 11 +- satellite/attribution/db_test.go | 19 +-- satellite/core.go | 2 +- satellite/metainfo/attribution_test.go | 4 +- .../stripecoinpayments/service_test.go | 4 +- satellite/satellitedb/attribution.go | 13 +- satellite/satellitedb/projectaccounting.go | 56 ++++--- scripts/testdata/satellite-config.yaml.lock | 6 + 17 files changed, 237 insertions(+), 274 deletions(-) diff --git a/cmd/satellite/reports/attribution.go b/cmd/satellite/reports/attribution.go index abe74fbe8..06b752c3a 100644 --- a/cmd/satellite/reports/attribution.go +++ b/cmd/satellite/reports/attribution.go @@ -24,8 +24,7 @@ import ( var headers = []string{ "projectID", "bucketName", - "byte-hours:Remote", - "byte-hours:Inline", + "byte-hours:Total", "bytes:BWEgress", } @@ -81,14 +80,12 @@ func csvRowToStringSlice(p *attribution.CSVRow) ([]string, error) { if err != nil { return nil, errs.New("Invalid Project ID") } - remoteGBPerHour := memory.Size(p.RemoteBytesPerHour).GB() - inlineGBPerHour := memory.Size(p.InlineBytesPerHour).GB() + totalGBPerHour := memory.Size(p.TotalBytesPerHour).GB() egressGBData := memory.Size(p.EgressData).GB() record := []string{ projectID.String(), string(p.BucketName), - strconv.FormatFloat(remoteGBPerHour, 'f', 4, 64), - strconv.FormatFloat(inlineGBPerHour, 'f', 4, 64), + strconv.FormatFloat(totalGBPerHour, 'f', 4, 64), strconv.FormatFloat(egressGBData, 'f', 4, 64), } return record, nil diff --git a/monkit.lock b/monkit.lock index ccb375ee3..8f8748692 100644 --- a/monkit.lock +++ b/monkit.lock @@ -1,18 +1,10 @@ storj.io/storj/private/lifecycle."slow_shutdown" Event storj.io/storj/private/lifecycle."unexpected_shutdown" Event storj.io/storj/satellite/accounting."bucket_bytes" IntVal -storj.io/storj/satellite/accounting."bucket_inline_bytes" IntVal -storj.io/storj/satellite/accounting."bucket_inline_segments" IntVal storj.io/storj/satellite/accounting."bucket_objects" IntVal -storj.io/storj/satellite/accounting."bucket_remote_bytes" IntVal -storj.io/storj/satellite/accounting."bucket_remote_segments" IntVal storj.io/storj/satellite/accounting."bucket_segments" IntVal storj.io/storj/satellite/accounting."total_bytes" IntVal -storj.io/storj/satellite/accounting."total_inline_bytes" IntVal -storj.io/storj/satellite/accounting."total_inline_segments" IntVal storj.io/storj/satellite/accounting."total_objects" IntVal -storj.io/storj/satellite/accounting."total_remote_bytes" IntVal -storj.io/storj/satellite/accounting."total_remote_segments" IntVal storj.io/storj/satellite/accounting."total_segments" IntVal storj.io/storj/satellite/accounting/tally."nodetallies.totalsum" IntVal storj.io/storj/satellite/audit."audit_contained_nodes" IntVal diff --git a/satellite/accounting/bucketstats.go b/satellite/accounting/bucketstats.go index 139b84998..19bcbee12 100644 --- a/satellite/accounting/bucketstats.go +++ b/satellite/accounting/bucketstats.go @@ -13,13 +13,8 @@ type BucketTally struct { ObjectCount int64 - TotalSegments int64 - InlineSegments int64 - RemoteSegments int64 - - TotalBytes int64 - InlineBytes int64 - RemoteBytes int64 + TotalSegments int64 + TotalBytes int64 MetadataSize int64 } @@ -29,26 +24,16 @@ func (s *BucketTally) Combine(o *BucketTally) { s.ObjectCount += o.ObjectCount s.TotalSegments += o.TotalSegments - s.InlineSegments += o.InlineSegments - s.RemoteSegments += o.RemoteSegments s.TotalBytes += o.TotalBytes - s.InlineBytes += o.InlineBytes - s.RemoteBytes += o.RemoteBytes } // Segments returns total number of segments. func (s *BucketTally) Segments() int64 { - if s.TotalSegments != 0 { - return s.TotalSegments - } - return s.InlineSegments + s.RemoteSegments + return s.TotalSegments } // Bytes returns total bytes. func (s *BucketTally) Bytes() int64 { - if s.TotalBytes != 0 { - return s.TotalBytes - } - return s.InlineBytes + s.RemoteBytes + return s.TotalBytes } diff --git a/satellite/accounting/bucketusage.go b/satellite/accounting/bucketusage.go index 575053030..6ada92adc 100644 --- a/satellite/accounting/bucketusage.go +++ b/satellite/accounting/bucketusage.go @@ -17,20 +17,13 @@ type BucketStorageTally struct { ObjectCount int64 - TotalSegmentCount int64 - InlineSegmentCount int64 - RemoteSegmentCount int64 + TotalSegmentCount int64 + TotalBytes int64 - TotalBytes int64 - InlineBytes int64 - RemoteBytes int64 MetadataSize int64 } // Bytes returns total bytes. func (s *BucketStorageTally) Bytes() int64 { - if s.TotalBytes != 0 { - return s.TotalBytes - } - return s.InlineBytes + s.RemoteBytes + return s.TotalBytes } diff --git a/satellite/accounting/db_test.go b/satellite/accounting/db_test.go index a1a36ec37..f2fac0ccf 100644 --- a/satellite/accounting/db_test.go +++ b/satellite/accounting/db_test.go @@ -231,12 +231,10 @@ func createBucketStorageTallies(projectID uuid.UUID) (map[metabase.BucketLocatio ProjectID: projectID, BucketName: bucketName, }, - ObjectCount: int64(1), - InlineSegments: int64(1), - RemoteSegments: int64(1), - InlineBytes: int64(1), - RemoteBytes: int64(1), - MetadataSize: int64(1), + ObjectCount: int64(1), + TotalSegments: int64(2), + TotalBytes: int64(2), + MetadataSize: int64(1), } bucketTallies[bucketLocation] = &tally expectedTallies = append(expectedTallies, tally) diff --git a/satellite/accounting/projectusage_test.go b/satellite/accounting/projectusage_test.go index fe8a0d33d..f3a9ceb05 100644 --- a/satellite/accounting/projectusage_test.go +++ b/satellite/accounting/projectusage_test.go @@ -467,20 +467,16 @@ func TestUsageRollups(t *testing.T) { tally1 := &accounting.BucketTally{ BucketLocation: bucketLoc1, ObjectCount: value1, - InlineSegments: value1, - RemoteSegments: value1, - InlineBytes: value1, - RemoteBytes: value1, + TotalSegments: value1 + value1, + TotalBytes: value1 + value1, MetadataSize: value1, } tally2 := &accounting.BucketTally{ BucketLocation: bucketLoc2, ObjectCount: value2, - InlineSegments: value2, - RemoteSegments: value2, - InlineBytes: value2, - RemoteBytes: value2, + TotalSegments: value2 + value2, + TotalBytes: value2 + value2, MetadataSize: value2, } diff --git a/satellite/accounting/tally/tally.go b/satellite/accounting/tally/tally.go index 86281fd65..c480a47ac 100644 --- a/satellite/accounting/tally/tally.go +++ b/satellite/accounting/tally/tally.go @@ -15,7 +15,6 @@ import ( "storj.io/common/uuid" "storj.io/storj/satellite/accounting" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/metaloop" ) // Error is a standard error class for this package. @@ -29,16 +28,20 @@ type Config struct { Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"` SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"` ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"` + + ListLimit int `help:"how many objects to query in a batch" default:"2500"` + AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"` } // Service is the tally service for data stored on each storage node. // // architecture: Chore type Service struct { - log *zap.Logger - Loop *sync2.Cycle + log *zap.Logger + config Config + Loop *sync2.Cycle - metainfoLoop *metaloop.Service + metabase *metabase.DB liveAccounting accounting.Cache storagenodeAccountingDB accounting.StoragenodeAccounting projectAccountingDB accounting.ProjectAccounting @@ -46,12 +49,13 @@ type Service struct { } // New creates a new tally Service. -func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metaloop.Service, interval time.Duration) *Service { +func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metabase *metabase.DB, config Config) *Service { return &Service{ - log: log, - Loop: sync2.NewCycle(interval), + log: log, + config: config, + Loop: sync2.NewCycle(config.Interval), - metainfoLoop: metainfoLoop, + metabase: metabase, liveAccounting: liveAccounting, storagenodeAccountingDB: sdb, projectAccountingDB: pdb, @@ -164,9 +168,9 @@ func (service *Service) Tally(ctx context.Context) (err error) { } } - // add up all nodes and buckets - observer := NewObserver(service.log.Named("observer"), service.nowFn()) - err = service.metainfoLoop.Join(ctx, observer) + // add up all buckets + collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.config) + err = collector.Run(ctx) if err != nil { return Error.Wrap(err) } @@ -174,66 +178,97 @@ func (service *Service) Tally(ctx context.Context) (err error) { // save the new results var errAtRest error - if len(observer.Bucket) > 0 { + if len(collector.Bucket) > 0 { // record bucket tallies to DB - err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket) + err = service.projectAccountingDB.SaveTallies(ctx, finishTime, collector.Bucket) if err != nil { errAtRest = Error.New("ProjectAccounting.SaveTallies failed: %v", err) } - updateLiveAccountingTotals(projectTotalsFromBuckets(observer.Bucket)) + updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket)) } + // TODO move commented metrics in a different place or wait until metabase + // object will contains such informations // report bucket metrics - if len(observer.Bucket) > 0 { + if len(collector.Bucket) > 0 { var total accounting.BucketTally - for _, bucket := range observer.Bucket { - monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked - monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked - monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked - monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked + for _, bucket := range collector.Bucket { + monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked + monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked + // monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked + // monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked - monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked - monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked - monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked + monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked + // monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked + // monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked total.Combine(bucket) } monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked - monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked - monAccounting.IntVal("total_inline_segments").Observe(total.InlineSegments) //mon:locked - monAccounting.IntVal("total_remote_segments").Observe(total.RemoteSegments) //mon:locked + monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked + // monAccounting.IntVal("total_inline_segments").Observe(total.InlineSegments) //mon:locked + // monAccounting.IntVal("total_remote_segments").Observe(total.RemoteSegments) //mon:locked - monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked - monAccounting.IntVal("total_inline_bytes").Observe(total.InlineBytes) //mon:locked - monAccounting.IntVal("total_remote_bytes").Observe(total.RemoteBytes) //mon:locked + monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked + // monAccounting.IntVal("total_inline_bytes").Observe(total.InlineBytes) //mon:locked + // monAccounting.IntVal("total_remote_bytes").Observe(total.RemoteBytes) //mon:locked } // return errors if something went wrong. return errAtRest } -var _ metaloop.Observer = (*Observer)(nil) - -// Observer observes metainfo and adds up tallies for nodes and buckets. -type Observer struct { +// BucketTallyCollector collects and adds up tallies for buckets. +type BucketTallyCollector struct { Now time.Time Log *zap.Logger Bucket map[metabase.BucketLocation]*accounting.BucketTally + + metabase *metabase.DB + config Config } -// NewObserver returns an metainfo loop observer that adds up totals for buckets and nodes. -// The now argument controls when the observer considers pointers to be expired. -func NewObserver(log *zap.Logger, now time.Time) *Observer { - return &Observer{ +// NewBucketTallyCollector returns an collector that adds up totals for buckets. +// The now argument controls when the collector considers objects to be expired. +func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, config Config) *BucketTallyCollector { + return &BucketTallyCollector{ Now: now, Log: log, Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally), + + metabase: db, + config: config, } } +// Run runs collecting bucket tallies. +func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) { + defer mon.Task()(&ctx)(&err) + + startingTime, err := observer.metabase.Now(ctx) + if err != nil { + return err + } + + return observer.metabase.IterateLoopObjects(ctx, metabase.IterateLoopObjects{ + BatchSize: observer.config.ListLimit, + AsOfSystemTime: startingTime, + AsOfSystemInterval: observer.config.AsOfSystemInterval, + }, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) { + var entry metabase.LoopObjectEntry + for it.Next(ctx, &entry) { + err = observer.object(ctx, entry) + if err != nil { + return err + } + } + return nil + }) +} + // ensureBucket returns bucket corresponding to the passed in path. -func (observer *Observer) ensureBucket(ctx context.Context, location metabase.ObjectLocation) *accounting.BucketTally { +func (observer *BucketTallyCollector) ensureBucket(ctx context.Context, location metabase.ObjectLocation) *accounting.BucketTally { bucketLocation := location.Bucket() bucket, exists := observer.Bucket[bucketLocation] if !exists { @@ -245,13 +280,8 @@ func (observer *Observer) ensureBucket(ctx context.Context, location metabase.Ob return bucket } -// LoopStarted is called at each start of a loop. -func (observer *Observer) LoopStarted(context.Context, metaloop.LoopInfo) (err error) { - return nil -} - // Object is called for each object once. -func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (err error) { +func (observer *BucketTallyCollector) object(ctx context.Context, object metabase.LoopObjectEntry) (err error) { defer mon.Task()(&ctx)(&err) if object.Expired(observer.Now) { @@ -259,46 +289,18 @@ func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) ( } bucket := observer.ensureBucket(ctx, object.ObjectStream.Location()) + bucket.TotalSegments += int64(object.SegmentCount) + bucket.TotalBytes += object.TotalEncryptedSize bucket.MetadataSize += int64(object.EncryptedMetadataSize) bucket.ObjectCount++ return nil } -// InlineSegment is called for each inline segment. -func (observer *Observer) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) { - defer mon.Task()(&ctx)(&err) - - if segment.Expired(observer.Now) { - return nil - } - - bucket := observer.ensureBucket(ctx, segment.Location.Object()) - bucket.InlineSegments++ - bucket.InlineBytes += int64(segment.EncryptedSize) - - return nil -} - -// RemoteSegment is called for each remote segment. -func (observer *Observer) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) { - defer mon.Task()(&ctx)(&err) - - if segment.Expired(observer.Now) { - return nil - } - - bucket := observer.ensureBucket(ctx, segment.Location.Object()) - bucket.RemoteSegments++ - bucket.RemoteBytes += int64(segment.EncryptedSize) - - return nil -} - func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]int64 { projectTallyTotals := make(map[uuid.UUID]int64) for _, bucket := range buckets { - projectTallyTotals[bucket.ProjectID] += (bucket.InlineBytes + bucket.RemoteBytes) + projectTallyTotals[bucket.ProjectID] += bucket.TotalBytes } return projectTallyTotals } diff --git a/satellite/accounting/tally/tally_test.go b/satellite/accounting/tally/tally_test.go index ac23ec48e..5efc1b20a 100644 --- a/satellite/accounting/tally/tally_test.go +++ b/satellite/accounting/tally/tally_test.go @@ -84,10 +84,10 @@ func TestOnlyInline(t *testing.T) { ProjectID: uplink.Projects[0].ID, BucketName: expectedBucketName, }, - ObjectCount: 1, - InlineSegments: 1, - InlineBytes: int64(expectedTotalBytes), - MetadataSize: 0, + ObjectCount: 1, + TotalSegments: 1, + TotalBytes: int64(expectedTotalBytes), + MetadataSize: 0, } // Execute test: upload a file, then calculate at rest data @@ -96,16 +96,16 @@ func TestOnlyInline(t *testing.T) { // run multiple times to ensure we add tallies for i := 0; i < 2; i++ { - obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now()) - err := planet.Satellites[0].Metainfo.Loop.Join(ctx, obs) + collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metainfo.Metabase, planet.Satellites[0].Config.Tally) + err := collector.Run(ctx) require.NoError(t, err) now := time.Now().Add(time.Duration(i) * time.Second) - err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, obs.Bucket) + err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, collector.Bucket) require.NoError(t, err) - assert.Equal(t, 1, len(obs.Bucket)) - for _, actualTally := range obs.Bucket { + assert.Equal(t, 1, len(collector.Bucket)) + for _, actualTally := range collector.Bucket { // checking the exact metadata size is brittle, instead, verify that it's not zero assert.NotZero(t, actualTally.MetadataSize) actualTally.MetadataSize = expectedTally.MetadataSize @@ -166,20 +166,15 @@ func TestCalculateBucketAtRestData(t *testing.T) { for _, segment := range segments { loc := streamLocation[segment.StreamID] t := ensure(loc) - if len(segment.Pieces) > 0 { - t.RemoteSegments++ - t.RemoteBytes += int64(segment.EncryptedSize) - } else { - t.InlineSegments++ - t.InlineBytes += int64(segment.EncryptedSize) - } + t.TotalSegments++ + t.TotalBytes += int64(segment.EncryptedSize) } require.Len(t, expectedTotal, 3) - obs := tally.NewObserver(satellite.Log.Named("observer"), time.Now()) - err = satellite.Metainfo.Loop.Join(ctx, obs) + collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metainfo.Metabase, planet.Satellites[0].Config.Tally) + err = collector.Run(ctx) require.NoError(t, err) - require.Equal(t, expectedTotal, obs.Bucket) + require.Equal(t, expectedTotal, collector.Bucket) }) } @@ -193,12 +188,12 @@ func TestTallyIgnoresExpiredPointers(t *testing.T) { err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour)) require.NoError(t, err) - obs := tally.NewObserver(satellite.Log.Named("observer"), now.Add(24*time.Hour)) - err = satellite.Metainfo.Loop.Join(ctx, obs) + collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metainfo.Metabase, planet.Satellites[0].Config.Tally) + err = collector.Run(ctx) require.NoError(t, err) - // there should be no observed buckets because all of the pointers are expired - require.Equal(t, obs.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{}) + // there should be no observed buckets because all of the objects are expired + require.Equal(t, collector.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{}) }) } diff --git a/satellite/admin/project_test.go b/satellite/admin/project_test.go index 9a1e53bef..a620c7e88 100644 --- a/satellite/admin/project_test.go +++ b/satellite/admin/project_test.go @@ -326,28 +326,24 @@ func TestCheckUsageWithUsage(t *testing.T) { now := time.Now().UTC() // use fixed intervals to avoid issues at the beginning of the month tally := accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) tally = accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) @@ -400,28 +396,24 @@ func TestCheckUsageLastMonthUnappliedInvoice(t *testing.T) { // use fixed intervals to avoid issues at the beginning of the month tally := accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) tally = accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) @@ -474,28 +466,24 @@ func TestDeleteProjectWithUsageCurrentMonth(t *testing.T) { now := time.Now().UTC() // use fixed intervals to avoid issues at the beginning of the month tally := accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) tally = accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) @@ -544,28 +532,24 @@ func TestDeleteProjectWithUsagePreviousMonth(t *testing.T) { // set fixed day to avoid failures at the end of the month accTime := time.Date(now.Year(), now.Month()-1, 15, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), time.UTC) tally := accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: accTime, - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: accTime, + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) tally = accounting.BucketStorageTally{ - BucketName: "test", - ProjectID: projectID, - IntervalStart: accTime.AddDate(0, 0, 1), - ObjectCount: 1, - InlineSegmentCount: 1, - RemoteSegmentCount: 1, - InlineBytes: 10, - RemoteBytes: 640000, - MetadataSize: 2, + BucketName: "test", + ProjectID: projectID, + IntervalStart: accTime.AddDate(0, 0, 1), + ObjectCount: 1, + TotalSegmentCount: 2, + TotalBytes: 640000, + MetadataSize: 2, } err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally) require.NoError(t, err) diff --git a/satellite/attribution/db.go b/satellite/attribution/db.go index 3876ec1e1..e069b0eca 100644 --- a/satellite/attribution/db.go +++ b/satellite/attribution/db.go @@ -26,12 +26,11 @@ type Info struct { // CSVRow represents data from QueryAttribution without exposing dbx. type CSVRow struct { - PartnerID []byte - ProjectID []byte - BucketName []byte - RemoteBytesPerHour float64 - InlineBytesPerHour float64 - EgressData int64 + PartnerID []byte + ProjectID []byte + BucketName []byte + TotalBytesPerHour float64 + EgressData int64 } // DB implements the database for value attribution table. diff --git a/satellite/attribution/db_test.go b/satellite/attribution/db_test.go index 565bbe572..034c5b5f3 100644 --- a/satellite/attribution/db_test.go +++ b/satellite/attribution/db_test.go @@ -46,10 +46,9 @@ type AttributionTestData struct { inlineSize int64 egressSize int64 - dataCounter int - expectedRemoteBytes int64 - expectedInlineBytes int64 - expectedEgress int64 + dataCounter int + expectedTotalBytes int64 + expectedEgress int64 } func (testData *AttributionTestData) init() { @@ -187,8 +186,7 @@ func verifyData(ctx *testcontext.Context, t *testing.T, attributionDB attributio assert.Equal(t, testData.partnerID[:], r.PartnerID, testData.name) assert.Equal(t, testData.projectID[:], r.ProjectID, testData.name) assert.Equal(t, testData.bucketName, r.BucketName, testData.name) - assert.Equal(t, float64(testData.expectedRemoteBytes/testData.hours), r.RemoteBytesPerHour, testData.name) - assert.Equal(t, float64(testData.expectedInlineBytes/testData.hours), r.InlineBytesPerHour, testData.name) + assert.Equal(t, float64(testData.expectedTotalBytes/testData.hours), r.TotalBytesPerHour, testData.name) assert.Equal(t, testData.expectedEgress, r.EgressData, testData.name) } require.NotEqual(t, 0, count, "Results were returned, but did not match all of the the projectIDs.") @@ -218,8 +216,7 @@ func createData(ctx *testcontext.Context, t *testing.T, db satellite.DB, testDat require.NoError(t, err) if (testData.dataInterval.After(testData.start) || testData.dataInterval.Equal(testData.start)) && testData.dataInterval.Before(testData.end) { - testData.expectedRemoteBytes += tally.RemoteBytes - testData.expectedInlineBytes += tally.InlineBytes + testData.expectedTotalBytes += tally.TotalBytes testData.expectedEgress += testData.egressSize } } @@ -232,11 +229,9 @@ func createTallyData(ctx *testcontext.Context, projectAccoutingDB accounting.Pro ObjectCount: 0, - InlineSegmentCount: 0, - RemoteSegmentCount: 0, + TotalSegmentCount: 0, - InlineBytes: inline, - RemoteBytes: remote, + TotalBytes: inline + remote, MetadataSize: 0, } err = projectAccoutingDB.CreateStorageTally(ctx, tally) diff --git a/satellite/core.go b/satellite/core.go index c1e6b87ed..ab9780d8c 100644 --- a/satellite/core.go +++ b/satellite/core.go @@ -357,7 +357,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, } { // setup accounting - peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval) + peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally) peer.Services.Add(lifecycle.Item{ Name: "accounting:tally", Run: peer.Accounting.Tally.Run, diff --git a/satellite/metainfo/attribution_test.go b/satellite/metainfo/attribution_test.go index 474668d95..41f757791 100644 --- a/satellite/metainfo/attribution_test.go +++ b/satellite/metainfo/attribution_test.go @@ -245,7 +245,7 @@ func TestQueryAttribution(t *testing.T) { rows, err := planet.Satellites[0].DB.Attribution().QueryAttribution(ctx, partner.UUID, before, after) require.NoError(t, err) - require.NotZero(t, rows[0].RemoteBytesPerHour) + require.NotZero(t, rows[0].TotalBytesPerHour) require.Equal(t, rows[0].EgressData, usage.Egress) } }) @@ -316,7 +316,7 @@ func TestAttributionReport(t *testing.T) { rows, err := planet.Satellites[0].DB.Attribution().QueryAttribution(ctx, partner.UUID, before, after) require.NoError(t, err) - require.NotZero(t, rows[0].RemoteBytesPerHour) + require.NotZero(t, rows[0].TotalBytesPerHour) require.Equal(t, rows[0].EgressData, usage.Egress) // Minio should have no attribution because bucket was created by Zenko diff --git a/satellite/payments/stripecoinpayments/service_test.go b/satellite/payments/stripecoinpayments/service_test.go index a7716142f..82aae1524 100644 --- a/satellite/payments/stripecoinpayments/service_test.go +++ b/satellite/payments/stripecoinpayments/service_test.go @@ -148,7 +148,7 @@ func TestService_InvoiceUserWithManyProjects(t *testing.T) { ProjectID: projects[i].ID, BucketName: "testbucket", }, - RemoteBytes: projectsStorage[i], + TotalBytes: projectsStorage[i], ObjectCount: int64(i + 1), } tallies := map[metabase.BucketLocation]*accounting.BucketTally{ @@ -259,7 +259,7 @@ func TestService_InvoiceUserWithManyCoupons(t *testing.T) { ProjectID: project.ID, BucketName: "testbucket", }, - RemoteBytes: memory.TiB.Int64(), + TotalBytes: memory.TiB.Int64(), ObjectCount: 45, } tallies := map[metabase.BucketLocation]*accounting.BucketTally{ diff --git a/satellite/satellitedb/attribution.go b/satellite/satellitedb/attribution.go index c190d14fd..619547627 100644 --- a/satellite/satellitedb/attribution.go +++ b/satellite/satellitedb/attribution.go @@ -24,6 +24,7 @@ const ( o.partner_id as partner_id, o.project_id as project_id, o.bucket_name as bucket_name, + SUM(o.total) / SUM(o.hours) as total, SUM(o.remote) / SUM(o.hours) as remote, SUM(o.inline) / SUM(o.hours) as inline, SUM(o.settled) as settled @@ -34,7 +35,8 @@ const ( SELECT bsti.partner_id as partner_id, bsto.project_id as project_id, - bsto.bucket_name as bucket_name, + bsto.bucket_name as bucket_name, + SUM(bsto.total_bytes) as total, SUM(bsto.remote) as remote, SUM(bsto.inline) as inline, 0 as settled, @@ -82,6 +84,7 @@ const ( va.partner_id as partner_id, bbr.project_id as project_id, bbr.bucket_name as bucket_name, + 0 as total, 0 as remote, 0 as inline, SUM(settled)::integer as settled, @@ -165,10 +168,16 @@ func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid. results := []*attribution.CSVRow{} for rows.Next() { r := &attribution.CSVRow{} - err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.RemoteBytesPerHour, &r.InlineBytesPerHour, &r.EgressData) + var inline, remote float64 + err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.TotalBytesPerHour, &inline, &remote, &r.EgressData) if err != nil { return results, Error.Wrap(err) } + + if r.TotalBytesPerHour == 0 { + r.TotalBytesPerHour = inline + remote + } + results = append(results, r) } return results, Error.Wrap(rows.Err()) diff --git a/satellite/satellitedb/projectaccounting.go b/satellite/satellitedb/projectaccounting.go index ecf238c0d..f043a15cb 100644 --- a/satellite/satellitedb/projectaccounting.go +++ b/satellite/satellitedb/projectaccounting.go @@ -40,17 +40,13 @@ func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time return nil } var bucketNames, projectIDs [][]byte - var totalBytes, inlineBytes, remoteBytes, metadataSizes []int64 - var totalSegments, remoteSegments, inlineSegments, objectCounts []int64 + var totalBytes, metadataSizes []int64 + var totalSegments, objectCounts []int64 for _, info := range bucketTallies { bucketNames = append(bucketNames, []byte(info.BucketName)) projectIDs = append(projectIDs, info.ProjectID[:]) totalBytes = append(totalBytes, info.TotalBytes) - inlineBytes = append(inlineBytes, info.InlineBytes) - remoteBytes = append(remoteBytes, info.RemoteBytes) totalSegments = append(totalSegments, info.TotalSegments) - remoteSegments = append(remoteSegments, info.RemoteSegments) - inlineSegments = append(inlineSegments, info.InlineSegments) objectCounts = append(objectCounts, info.ObjectCount) metadataSizes = append(metadataSizes, info.MetadataSize) } @@ -64,13 +60,13 @@ func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time SELECT $1, unnest($2::bytea[]), unnest($3::bytea[]), - unnest($4::int8[]), unnest($5::int8[]), unnest($6::int8[]), - unnest($7::int8[]), unnest($8::int8[]), unnest($9::int8[]), + unnest($4::int8[]), $5, $6, + unnest($7::int8[]), $8, $9, unnest($10::int8[]), unnest($11::int8[])`), intervalStart, pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs), - pgutil.Int8Array(totalBytes), pgutil.Int8Array(inlineBytes), pgutil.Int8Array(remoteBytes), - pgutil.Int8Array(totalSegments), pgutil.Int8Array(remoteSegments), pgutil.Int8Array(inlineSegments), + pgutil.Int8Array(totalBytes), 0, 0, + pgutil.Int8Array(totalSegments), 0, 0, pgutil.Int8Array(objectCounts), pgutil.Int8Array(metadataSizes)) return Error.Wrap(err) @@ -91,19 +87,25 @@ func (db *ProjectAccounting) GetTallies(ctx context.Context) (tallies []accounti return nil, Error.Wrap(err) } + totalBytes := dbxTally.TotalBytes + if totalBytes == 0 { + totalBytes = dbxTally.Inline + dbxTally.Remote + } + + totalSegments := dbxTally.TotalSegmentsCount + if totalSegments == 0 { + totalSegments = dbxTally.InlineSegmentsCount + dbxTally.RemoteSegmentsCount + } + tallies = append(tallies, accounting.BucketTally{ BucketLocation: metabase.BucketLocation{ ProjectID: projectID, BucketName: string(dbxTally.BucketName), }, - ObjectCount: int64(dbxTally.ObjectCount), - TotalSegments: int64(dbxTally.TotalSegmentsCount), - InlineSegments: int64(dbxTally.InlineSegmentsCount), - RemoteSegments: int64(dbxTally.RemoteSegmentsCount), - TotalBytes: int64(dbxTally.TotalBytes), - InlineBytes: int64(dbxTally.Inline), - RemoteBytes: int64(dbxTally.Remote), - MetadataSize: int64(dbxTally.MetadataSize), + ObjectCount: int64(dbxTally.ObjectCount), + TotalSegments: int64(totalSegments), + TotalBytes: int64(totalBytes), + MetadataSize: int64(dbxTally.MetadataSize), }) } @@ -129,8 +131,8 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou ?, ? )`), tally.IntervalStart, []byte(tally.BucketName), tally.ProjectID, - tally.TotalBytes, tally.InlineBytes, tally.RemoteBytes, - tally.TotalSegmentCount, tally.RemoteSegmentCount, tally.InlineSegmentCount, + tally.TotalBytes, 0, 0, + tally.TotalSegmentCount, 0, 0, tally.ObjectCount, tally.MetadataSize, ) @@ -301,10 +303,15 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid for storageTalliesRows.Next() { tally := accounting.BucketStorageTally{} - err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &tally.InlineBytes, &tally.RemoteBytes, &tally.ObjectCount) + var inline, remote int64 + err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.ObjectCount) if err != nil { return nil, errs.Combine(err, storageTalliesRows.Close()) } + if tally.TotalBytes == 0 { + tally.TotalBytes = inline + remote + } + tally.BucketName = bucket storageTallies = append(storageTallies, &tally) } @@ -638,13 +645,18 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid storageRow := db.db.QueryRowContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before) var tally accounting.BucketStorageTally - err = storageRow.Scan(&tally.TotalBytes, &tally.InlineBytes, &tally.RemoteBytes, &tally.ObjectCount) + var inline, remote int64 + err = storageRow.Scan(&tally.TotalBytes, &inline, &remote, &tally.ObjectCount) if err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, err } } + if tally.TotalBytes == 0 { + tally.TotalBytes = inline + remote + } + // fill storage and object count bucketUsage.Storage = memory.Size(tally.Bytes()).GB() bucketUsage.ObjectCount = tally.ObjectCount diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index b89d7770e..b71bb07b4 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -718,9 +718,15 @@ server.private-address: 127.0.0.1:7778 # length of time a node can go without contacting satellite before being disqualified # stray-nodes.max-duration-without-contact: 720h0m0s +# as of system interval +# tally.as-of-system-interval: -5m0s + # how frequently the tally service should run # tally.interval: 1h0m0s +# how many objects to query in a batch +# tally.list-limit: 2500 + # how large of batches GetBandwidthSince should process at a time # tally.read-rollup-batch-size: 10000