satellite/accounting/tally: use objects iterator instead metaloop

Bucket tally calculation will be removed from metaloop and will
use metabase objects iterator directly.

At the moment only bucket tally needs objects so it make no sense
to implement separate objects loop.

Change-Id: Iee60059fc8b9a1bf64d01cafe9659b69b0e27eb1
This commit is contained in:
Michał Niewrzał 2021-07-01 13:29:25 +02:00 committed by Michal Niewrzal
parent 4c7384ca2f
commit 27a714e8b0
17 changed files with 237 additions and 274 deletions

View File

@ -24,8 +24,7 @@ import (
var headers = []string{
"projectID",
"bucketName",
"byte-hours:Remote",
"byte-hours:Inline",
"byte-hours:Total",
"bytes:BWEgress",
}
@ -81,14 +80,12 @@ func csvRowToStringSlice(p *attribution.CSVRow) ([]string, error) {
if err != nil {
return nil, errs.New("Invalid Project ID")
}
remoteGBPerHour := memory.Size(p.RemoteBytesPerHour).GB()
inlineGBPerHour := memory.Size(p.InlineBytesPerHour).GB()
totalGBPerHour := memory.Size(p.TotalBytesPerHour).GB()
egressGBData := memory.Size(p.EgressData).GB()
record := []string{
projectID.String(),
string(p.BucketName),
strconv.FormatFloat(remoteGBPerHour, 'f', 4, 64),
strconv.FormatFloat(inlineGBPerHour, 'f', 4, 64),
strconv.FormatFloat(totalGBPerHour, 'f', 4, 64),
strconv.FormatFloat(egressGBData, 'f', 4, 64),
}
return record, nil

View File

@ -1,18 +1,10 @@
storj.io/storj/private/lifecycle."slow_shutdown" Event
storj.io/storj/private/lifecycle."unexpected_shutdown" Event
storj.io/storj/satellite/accounting."bucket_bytes" IntVal
storj.io/storj/satellite/accounting."bucket_inline_bytes" IntVal
storj.io/storj/satellite/accounting."bucket_inline_segments" IntVal
storj.io/storj/satellite/accounting."bucket_objects" IntVal
storj.io/storj/satellite/accounting."bucket_remote_bytes" IntVal
storj.io/storj/satellite/accounting."bucket_remote_segments" IntVal
storj.io/storj/satellite/accounting."bucket_segments" IntVal
storj.io/storj/satellite/accounting."total_bytes" IntVal
storj.io/storj/satellite/accounting."total_inline_bytes" IntVal
storj.io/storj/satellite/accounting."total_inline_segments" IntVal
storj.io/storj/satellite/accounting."total_objects" IntVal
storj.io/storj/satellite/accounting."total_remote_bytes" IntVal
storj.io/storj/satellite/accounting."total_remote_segments" IntVal
storj.io/storj/satellite/accounting."total_segments" IntVal
storj.io/storj/satellite/accounting/tally."nodetallies.totalsum" IntVal
storj.io/storj/satellite/audit."audit_contained_nodes" IntVal

View File

@ -13,13 +13,8 @@ type BucketTally struct {
ObjectCount int64
TotalSegments int64
InlineSegments int64
RemoteSegments int64
TotalBytes int64
InlineBytes int64
RemoteBytes int64
TotalSegments int64
TotalBytes int64
MetadataSize int64
}
@ -29,26 +24,16 @@ func (s *BucketTally) Combine(o *BucketTally) {
s.ObjectCount += o.ObjectCount
s.TotalSegments += o.TotalSegments
s.InlineSegments += o.InlineSegments
s.RemoteSegments += o.RemoteSegments
s.TotalBytes += o.TotalBytes
s.InlineBytes += o.InlineBytes
s.RemoteBytes += o.RemoteBytes
}
// Segments returns total number of segments.
func (s *BucketTally) Segments() int64 {
if s.TotalSegments != 0 {
return s.TotalSegments
}
return s.InlineSegments + s.RemoteSegments
return s.TotalSegments
}
// Bytes returns total bytes.
func (s *BucketTally) Bytes() int64 {
if s.TotalBytes != 0 {
return s.TotalBytes
}
return s.InlineBytes + s.RemoteBytes
return s.TotalBytes
}

View File

@ -17,20 +17,13 @@ type BucketStorageTally struct {
ObjectCount int64
TotalSegmentCount int64
InlineSegmentCount int64
RemoteSegmentCount int64
TotalSegmentCount int64
TotalBytes int64
TotalBytes int64
InlineBytes int64
RemoteBytes int64
MetadataSize int64
}
// Bytes returns total bytes.
func (s *BucketStorageTally) Bytes() int64 {
if s.TotalBytes != 0 {
return s.TotalBytes
}
return s.InlineBytes + s.RemoteBytes
return s.TotalBytes
}

View File

@ -231,12 +231,10 @@ func createBucketStorageTallies(projectID uuid.UUID) (map[metabase.BucketLocatio
ProjectID: projectID,
BucketName: bucketName,
},
ObjectCount: int64(1),
InlineSegments: int64(1),
RemoteSegments: int64(1),
InlineBytes: int64(1),
RemoteBytes: int64(1),
MetadataSize: int64(1),
ObjectCount: int64(1),
TotalSegments: int64(2),
TotalBytes: int64(2),
MetadataSize: int64(1),
}
bucketTallies[bucketLocation] = &tally
expectedTallies = append(expectedTallies, tally)

View File

@ -467,20 +467,16 @@ func TestUsageRollups(t *testing.T) {
tally1 := &accounting.BucketTally{
BucketLocation: bucketLoc1,
ObjectCount: value1,
InlineSegments: value1,
RemoteSegments: value1,
InlineBytes: value1,
RemoteBytes: value1,
TotalSegments: value1 + value1,
TotalBytes: value1 + value1,
MetadataSize: value1,
}
tally2 := &accounting.BucketTally{
BucketLocation: bucketLoc2,
ObjectCount: value2,
InlineSegments: value2,
RemoteSegments: value2,
InlineBytes: value2,
RemoteBytes: value2,
TotalSegments: value2 + value2,
TotalBytes: value2 + value2,
MetadataSize: value2,
}

View File

@ -15,7 +15,6 @@ import (
"storj.io/common/uuid"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
)
// Error is a standard error class for this package.
@ -29,16 +28,20 @@ type Config struct {
Interval time.Duration `help:"how frequently the tally service should run" releaseDefault:"1h" devDefault:"30s" testDefault:"$TESTINTERVAL"`
SaveRollupBatchSize int `help:"how large of batches SaveRollup should process at a time" default:"1000"`
ReadRollupBatchSize int `help:"how large of batches GetBandwidthSince should process at a time" default:"10000"`
ListLimit int `help:"how many objects to query in a batch" default:"2500"`
AsOfSystemInterval time.Duration `help:"as of system interval" releaseDefault:"-5m" devDefault:"-1us" testDefault:"-1us"`
}
// Service is the tally service for data stored on each storage node.
//
// architecture: Chore
type Service struct {
log *zap.Logger
Loop *sync2.Cycle
log *zap.Logger
config Config
Loop *sync2.Cycle
metainfoLoop *metaloop.Service
metabase *metabase.DB
liveAccounting accounting.Cache
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
@ -46,12 +49,13 @@ type Service struct {
}
// New creates a new tally Service.
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metainfoLoop *metaloop.Service, interval time.Duration) *Service {
func New(log *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting accounting.Cache, metabase *metabase.DB, config Config) *Service {
return &Service{
log: log,
Loop: sync2.NewCycle(interval),
log: log,
config: config,
Loop: sync2.NewCycle(config.Interval),
metainfoLoop: metainfoLoop,
metabase: metabase,
liveAccounting: liveAccounting,
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
@ -164,9 +168,9 @@ func (service *Service) Tally(ctx context.Context) (err error) {
}
}
// add up all nodes and buckets
observer := NewObserver(service.log.Named("observer"), service.nowFn())
err = service.metainfoLoop.Join(ctx, observer)
// add up all buckets
collector := NewBucketTallyCollector(service.log.Named("observer"), service.nowFn(), service.metabase, service.config)
err = collector.Run(ctx)
if err != nil {
return Error.Wrap(err)
}
@ -174,66 +178,97 @@ func (service *Service) Tally(ctx context.Context) (err error) {
// save the new results
var errAtRest error
if len(observer.Bucket) > 0 {
if len(collector.Bucket) > 0 {
// record bucket tallies to DB
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, observer.Bucket)
err = service.projectAccountingDB.SaveTallies(ctx, finishTime, collector.Bucket)
if err != nil {
errAtRest = Error.New("ProjectAccounting.SaveTallies failed: %v", err)
}
updateLiveAccountingTotals(projectTotalsFromBuckets(observer.Bucket))
updateLiveAccountingTotals(projectTotalsFromBuckets(collector.Bucket))
}
// TODO move commented metrics in a different place or wait until metabase
// object will contains such informations
// report bucket metrics
if len(observer.Bucket) > 0 {
if len(collector.Bucket) > 0 {
var total accounting.BucketTally
for _, bucket := range observer.Bucket {
monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked
monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked
monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked
monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked
for _, bucket := range collector.Bucket {
monAccounting.IntVal("bucket_objects").Observe(bucket.ObjectCount) //mon:locked
monAccounting.IntVal("bucket_segments").Observe(bucket.Segments()) //mon:locked
// monAccounting.IntVal("bucket_inline_segments").Observe(bucket.InlineSegments) //mon:locked
// monAccounting.IntVal("bucket_remote_segments").Observe(bucket.RemoteSegments) //mon:locked
monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked
monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked
monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked
monAccounting.IntVal("bucket_bytes").Observe(bucket.Bytes()) //mon:locked
// monAccounting.IntVal("bucket_inline_bytes").Observe(bucket.InlineBytes) //mon:locked
// monAccounting.IntVal("bucket_remote_bytes").Observe(bucket.RemoteBytes) //mon:locked
total.Combine(bucket)
}
monAccounting.IntVal("total_objects").Observe(total.ObjectCount) //mon:locked
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
monAccounting.IntVal("total_inline_segments").Observe(total.InlineSegments) //mon:locked
monAccounting.IntVal("total_remote_segments").Observe(total.RemoteSegments) //mon:locked
monAccounting.IntVal("total_segments").Observe(total.Segments()) //mon:locked
// monAccounting.IntVal("total_inline_segments").Observe(total.InlineSegments) //mon:locked
// monAccounting.IntVal("total_remote_segments").Observe(total.RemoteSegments) //mon:locked
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
monAccounting.IntVal("total_inline_bytes").Observe(total.InlineBytes) //mon:locked
monAccounting.IntVal("total_remote_bytes").Observe(total.RemoteBytes) //mon:locked
monAccounting.IntVal("total_bytes").Observe(total.Bytes()) //mon:locked
// monAccounting.IntVal("total_inline_bytes").Observe(total.InlineBytes) //mon:locked
// monAccounting.IntVal("total_remote_bytes").Observe(total.RemoteBytes) //mon:locked
}
// return errors if something went wrong.
return errAtRest
}
var _ metaloop.Observer = (*Observer)(nil)
// Observer observes metainfo and adds up tallies for nodes and buckets.
type Observer struct {
// BucketTallyCollector collects and adds up tallies for buckets.
type BucketTallyCollector struct {
Now time.Time
Log *zap.Logger
Bucket map[metabase.BucketLocation]*accounting.BucketTally
metabase *metabase.DB
config Config
}
// NewObserver returns an metainfo loop observer that adds up totals for buckets and nodes.
// The now argument controls when the observer considers pointers to be expired.
func NewObserver(log *zap.Logger, now time.Time) *Observer {
return &Observer{
// NewBucketTallyCollector returns an collector that adds up totals for buckets.
// The now argument controls when the collector considers objects to be expired.
func NewBucketTallyCollector(log *zap.Logger, now time.Time, db *metabase.DB, config Config) *BucketTallyCollector {
return &BucketTallyCollector{
Now: now,
Log: log,
Bucket: make(map[metabase.BucketLocation]*accounting.BucketTally),
metabase: db,
config: config,
}
}
// Run runs collecting bucket tallies.
func (observer *BucketTallyCollector) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
startingTime, err := observer.metabase.Now(ctx)
if err != nil {
return err
}
return observer.metabase.IterateLoopObjects(ctx, metabase.IterateLoopObjects{
BatchSize: observer.config.ListLimit,
AsOfSystemTime: startingTime,
AsOfSystemInterval: observer.config.AsOfSystemInterval,
}, func(ctx context.Context, it metabase.LoopObjectsIterator) (err error) {
var entry metabase.LoopObjectEntry
for it.Next(ctx, &entry) {
err = observer.object(ctx, entry)
if err != nil {
return err
}
}
return nil
})
}
// ensureBucket returns bucket corresponding to the passed in path.
func (observer *Observer) ensureBucket(ctx context.Context, location metabase.ObjectLocation) *accounting.BucketTally {
func (observer *BucketTallyCollector) ensureBucket(ctx context.Context, location metabase.ObjectLocation) *accounting.BucketTally {
bucketLocation := location.Bucket()
bucket, exists := observer.Bucket[bucketLocation]
if !exists {
@ -245,13 +280,8 @@ func (observer *Observer) ensureBucket(ctx context.Context, location metabase.Ob
return bucket
}
// LoopStarted is called at each start of a loop.
func (observer *Observer) LoopStarted(context.Context, metaloop.LoopInfo) (err error) {
return nil
}
// Object is called for each object once.
func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (err error) {
func (observer *BucketTallyCollector) object(ctx context.Context, object metabase.LoopObjectEntry) (err error) {
defer mon.Task()(&ctx)(&err)
if object.Expired(observer.Now) {
@ -259,46 +289,18 @@ func (observer *Observer) Object(ctx context.Context, object *metaloop.Object) (
}
bucket := observer.ensureBucket(ctx, object.ObjectStream.Location())
bucket.TotalSegments += int64(object.SegmentCount)
bucket.TotalBytes += object.TotalEncryptedSize
bucket.MetadataSize += int64(object.EncryptedMetadataSize)
bucket.ObjectCount++
return nil
}
// InlineSegment is called for each inline segment.
func (observer *Observer) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if segment.Expired(observer.Now) {
return nil
}
bucket := observer.ensureBucket(ctx, segment.Location.Object())
bucket.InlineSegments++
bucket.InlineBytes += int64(segment.EncryptedSize)
return nil
}
// RemoteSegment is called for each remote segment.
func (observer *Observer) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error) {
defer mon.Task()(&ctx)(&err)
if segment.Expired(observer.Now) {
return nil
}
bucket := observer.ensureBucket(ctx, segment.Location.Object())
bucket.RemoteSegments++
bucket.RemoteBytes += int64(segment.EncryptedSize)
return nil
}
func projectTotalsFromBuckets(buckets map[metabase.BucketLocation]*accounting.BucketTally) map[uuid.UUID]int64 {
projectTallyTotals := make(map[uuid.UUID]int64)
for _, bucket := range buckets {
projectTallyTotals[bucket.ProjectID] += (bucket.InlineBytes + bucket.RemoteBytes)
projectTallyTotals[bucket.ProjectID] += bucket.TotalBytes
}
return projectTallyTotals
}

View File

@ -84,10 +84,10 @@ func TestOnlyInline(t *testing.T) {
ProjectID: uplink.Projects[0].ID,
BucketName: expectedBucketName,
},
ObjectCount: 1,
InlineSegments: 1,
InlineBytes: int64(expectedTotalBytes),
MetadataSize: 0,
ObjectCount: 1,
TotalSegments: 1,
TotalBytes: int64(expectedTotalBytes),
MetadataSize: 0,
}
// Execute test: upload a file, then calculate at rest data
@ -96,16 +96,16 @@ func TestOnlyInline(t *testing.T) {
// run multiple times to ensure we add tallies
for i := 0; i < 2; i++ {
obs := tally.NewObserver(planet.Satellites[0].Log.Named("observer"), time.Now())
err := planet.Satellites[0].Metainfo.Loop.Join(ctx, obs)
collector := tally.NewBucketTallyCollector(planet.Satellites[0].Log.Named("bucket tally"), time.Now(), planet.Satellites[0].Metainfo.Metabase, planet.Satellites[0].Config.Tally)
err := collector.Run(ctx)
require.NoError(t, err)
now := time.Now().Add(time.Duration(i) * time.Second)
err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, obs.Bucket)
err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, now, collector.Bucket)
require.NoError(t, err)
assert.Equal(t, 1, len(obs.Bucket))
for _, actualTally := range obs.Bucket {
assert.Equal(t, 1, len(collector.Bucket))
for _, actualTally := range collector.Bucket {
// checking the exact metadata size is brittle, instead, verify that it's not zero
assert.NotZero(t, actualTally.MetadataSize)
actualTally.MetadataSize = expectedTally.MetadataSize
@ -166,20 +166,15 @@ func TestCalculateBucketAtRestData(t *testing.T) {
for _, segment := range segments {
loc := streamLocation[segment.StreamID]
t := ensure(loc)
if len(segment.Pieces) > 0 {
t.RemoteSegments++
t.RemoteBytes += int64(segment.EncryptedSize)
} else {
t.InlineSegments++
t.InlineBytes += int64(segment.EncryptedSize)
}
t.TotalSegments++
t.TotalBytes += int64(segment.EncryptedSize)
}
require.Len(t, expectedTotal, 3)
obs := tally.NewObserver(satellite.Log.Named("observer"), time.Now())
err = satellite.Metainfo.Loop.Join(ctx, obs)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), time.Now(), satellite.Metainfo.Metabase, planet.Satellites[0].Config.Tally)
err = collector.Run(ctx)
require.NoError(t, err)
require.Equal(t, expectedTotal, obs.Bucket)
require.Equal(t, expectedTotal, collector.Bucket)
})
}
@ -193,12 +188,12 @@ func TestTallyIgnoresExpiredPointers(t *testing.T) {
err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], "bucket", "path", []byte{1}, now.Add(12*time.Hour))
require.NoError(t, err)
obs := tally.NewObserver(satellite.Log.Named("observer"), now.Add(24*time.Hour))
err = satellite.Metainfo.Loop.Join(ctx, obs)
collector := tally.NewBucketTallyCollector(satellite.Log.Named("bucket tally"), now.Add(24*time.Hour), satellite.Metainfo.Metabase, planet.Satellites[0].Config.Tally)
err = collector.Run(ctx)
require.NoError(t, err)
// there should be no observed buckets because all of the pointers are expired
require.Equal(t, obs.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{})
// there should be no observed buckets because all of the objects are expired
require.Equal(t, collector.Bucket, map[metabase.BucketLocation]*accounting.BucketTally{})
})
}

View File

@ -326,28 +326,24 @@ func TestCheckUsageWithUsage(t *testing.T) {
now := time.Now().UTC()
// use fixed intervals to avoid issues at the beginning of the month
tally := accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
tally = accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
@ -400,28 +396,24 @@ func TestCheckUsageLastMonthUnappliedInvoice(t *testing.T) {
// use fixed intervals to avoid issues at the beginning of the month
tally := accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
tally = accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
@ -474,28 +466,24 @@ func TestDeleteProjectWithUsageCurrentMonth(t *testing.T) {
now := time.Now().UTC()
// use fixed intervals to avoid issues at the beginning of the month
tally := accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
tally = accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: time.Date(now.Year(), now.Month(), 1, 0, 1, 0, 1, time.UTC),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
@ -544,28 +532,24 @@ func TestDeleteProjectWithUsagePreviousMonth(t *testing.T) {
// set fixed day to avoid failures at the end of the month
accTime := time.Date(now.Year(), now.Month()-1, 15, now.Hour(), now.Minute(), now.Second(), now.Nanosecond(), time.UTC)
tally := accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: accTime,
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: accTime,
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)
tally = accounting.BucketStorageTally{
BucketName: "test",
ProjectID: projectID,
IntervalStart: accTime.AddDate(0, 0, 1),
ObjectCount: 1,
InlineSegmentCount: 1,
RemoteSegmentCount: 1,
InlineBytes: 10,
RemoteBytes: 640000,
MetadataSize: 2,
BucketName: "test",
ProjectID: projectID,
IntervalStart: accTime.AddDate(0, 0, 1),
ObjectCount: 1,
TotalSegmentCount: 2,
TotalBytes: 640000,
MetadataSize: 2,
}
err = planet.Satellites[0].DB.ProjectAccounting().CreateStorageTally(ctx, tally)
require.NoError(t, err)

View File

@ -26,12 +26,11 @@ type Info struct {
// CSVRow represents data from QueryAttribution without exposing dbx.
type CSVRow struct {
PartnerID []byte
ProjectID []byte
BucketName []byte
RemoteBytesPerHour float64
InlineBytesPerHour float64
EgressData int64
PartnerID []byte
ProjectID []byte
BucketName []byte
TotalBytesPerHour float64
EgressData int64
}
// DB implements the database for value attribution table.

View File

@ -46,10 +46,9 @@ type AttributionTestData struct {
inlineSize int64
egressSize int64
dataCounter int
expectedRemoteBytes int64
expectedInlineBytes int64
expectedEgress int64
dataCounter int
expectedTotalBytes int64
expectedEgress int64
}
func (testData *AttributionTestData) init() {
@ -187,8 +186,7 @@ func verifyData(ctx *testcontext.Context, t *testing.T, attributionDB attributio
assert.Equal(t, testData.partnerID[:], r.PartnerID, testData.name)
assert.Equal(t, testData.projectID[:], r.ProjectID, testData.name)
assert.Equal(t, testData.bucketName, r.BucketName, testData.name)
assert.Equal(t, float64(testData.expectedRemoteBytes/testData.hours), r.RemoteBytesPerHour, testData.name)
assert.Equal(t, float64(testData.expectedInlineBytes/testData.hours), r.InlineBytesPerHour, testData.name)
assert.Equal(t, float64(testData.expectedTotalBytes/testData.hours), r.TotalBytesPerHour, testData.name)
assert.Equal(t, testData.expectedEgress, r.EgressData, testData.name)
}
require.NotEqual(t, 0, count, "Results were returned, but did not match all of the the projectIDs.")
@ -218,8 +216,7 @@ func createData(ctx *testcontext.Context, t *testing.T, db satellite.DB, testDat
require.NoError(t, err)
if (testData.dataInterval.After(testData.start) || testData.dataInterval.Equal(testData.start)) && testData.dataInterval.Before(testData.end) {
testData.expectedRemoteBytes += tally.RemoteBytes
testData.expectedInlineBytes += tally.InlineBytes
testData.expectedTotalBytes += tally.TotalBytes
testData.expectedEgress += testData.egressSize
}
}
@ -232,11 +229,9 @@ func createTallyData(ctx *testcontext.Context, projectAccoutingDB accounting.Pro
ObjectCount: 0,
InlineSegmentCount: 0,
RemoteSegmentCount: 0,
TotalSegmentCount: 0,
InlineBytes: inline,
RemoteBytes: remote,
TotalBytes: inline + remote,
MetadataSize: 0,
}
err = projectAccoutingDB.CreateStorageTally(ctx, tally)

View File

@ -357,7 +357,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
}
{ // setup accounting
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Loop, config.Tally.Interval)
peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally)
peer.Services.Add(lifecycle.Item{
Name: "accounting:tally",
Run: peer.Accounting.Tally.Run,

View File

@ -245,7 +245,7 @@ func TestQueryAttribution(t *testing.T) {
rows, err := planet.Satellites[0].DB.Attribution().QueryAttribution(ctx, partner.UUID, before, after)
require.NoError(t, err)
require.NotZero(t, rows[0].RemoteBytesPerHour)
require.NotZero(t, rows[0].TotalBytesPerHour)
require.Equal(t, rows[0].EgressData, usage.Egress)
}
})
@ -316,7 +316,7 @@ func TestAttributionReport(t *testing.T) {
rows, err := planet.Satellites[0].DB.Attribution().QueryAttribution(ctx, partner.UUID, before, after)
require.NoError(t, err)
require.NotZero(t, rows[0].RemoteBytesPerHour)
require.NotZero(t, rows[0].TotalBytesPerHour)
require.Equal(t, rows[0].EgressData, usage.Egress)
// Minio should have no attribution because bucket was created by Zenko

View File

@ -148,7 +148,7 @@ func TestService_InvoiceUserWithManyProjects(t *testing.T) {
ProjectID: projects[i].ID,
BucketName: "testbucket",
},
RemoteBytes: projectsStorage[i],
TotalBytes: projectsStorage[i],
ObjectCount: int64(i + 1),
}
tallies := map[metabase.BucketLocation]*accounting.BucketTally{
@ -259,7 +259,7 @@ func TestService_InvoiceUserWithManyCoupons(t *testing.T) {
ProjectID: project.ID,
BucketName: "testbucket",
},
RemoteBytes: memory.TiB.Int64(),
TotalBytes: memory.TiB.Int64(),
ObjectCount: 45,
}
tallies := map[metabase.BucketLocation]*accounting.BucketTally{

View File

@ -24,6 +24,7 @@ const (
o.partner_id as partner_id,
o.project_id as project_id,
o.bucket_name as bucket_name,
SUM(o.total) / SUM(o.hours) as total,
SUM(o.remote) / SUM(o.hours) as remote,
SUM(o.inline) / SUM(o.hours) as inline,
SUM(o.settled) as settled
@ -34,7 +35,8 @@ const (
SELECT
bsti.partner_id as partner_id,
bsto.project_id as project_id,
bsto.bucket_name as bucket_name,
bsto.bucket_name as bucket_name,
SUM(bsto.total_bytes) as total,
SUM(bsto.remote) as remote,
SUM(bsto.inline) as inline,
0 as settled,
@ -82,6 +84,7 @@ const (
va.partner_id as partner_id,
bbr.project_id as project_id,
bbr.bucket_name as bucket_name,
0 as total,
0 as remote,
0 as inline,
SUM(settled)::integer as settled,
@ -165,10 +168,16 @@ func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.
results := []*attribution.CSVRow{}
for rows.Next() {
r := &attribution.CSVRow{}
err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.RemoteBytesPerHour, &r.InlineBytesPerHour, &r.EgressData)
var inline, remote float64
err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.TotalBytesPerHour, &inline, &remote, &r.EgressData)
if err != nil {
return results, Error.Wrap(err)
}
if r.TotalBytesPerHour == 0 {
r.TotalBytesPerHour = inline + remote
}
results = append(results, r)
}
return results, Error.Wrap(rows.Err())

View File

@ -40,17 +40,13 @@ func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time
return nil
}
var bucketNames, projectIDs [][]byte
var totalBytes, inlineBytes, remoteBytes, metadataSizes []int64
var totalSegments, remoteSegments, inlineSegments, objectCounts []int64
var totalBytes, metadataSizes []int64
var totalSegments, objectCounts []int64
for _, info := range bucketTallies {
bucketNames = append(bucketNames, []byte(info.BucketName))
projectIDs = append(projectIDs, info.ProjectID[:])
totalBytes = append(totalBytes, info.TotalBytes)
inlineBytes = append(inlineBytes, info.InlineBytes)
remoteBytes = append(remoteBytes, info.RemoteBytes)
totalSegments = append(totalSegments, info.TotalSegments)
remoteSegments = append(remoteSegments, info.RemoteSegments)
inlineSegments = append(inlineSegments, info.InlineSegments)
objectCounts = append(objectCounts, info.ObjectCount)
metadataSizes = append(metadataSizes, info.MetadataSize)
}
@ -64,13 +60,13 @@ func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time
SELECT
$1,
unnest($2::bytea[]), unnest($3::bytea[]),
unnest($4::int8[]), unnest($5::int8[]), unnest($6::int8[]),
unnest($7::int8[]), unnest($8::int8[]), unnest($9::int8[]),
unnest($4::int8[]), $5, $6,
unnest($7::int8[]), $8, $9,
unnest($10::int8[]), unnest($11::int8[])`),
intervalStart,
pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs),
pgutil.Int8Array(totalBytes), pgutil.Int8Array(inlineBytes), pgutil.Int8Array(remoteBytes),
pgutil.Int8Array(totalSegments), pgutil.Int8Array(remoteSegments), pgutil.Int8Array(inlineSegments),
pgutil.Int8Array(totalBytes), 0, 0,
pgutil.Int8Array(totalSegments), 0, 0,
pgutil.Int8Array(objectCounts), pgutil.Int8Array(metadataSizes))
return Error.Wrap(err)
@ -91,19 +87,25 @@ func (db *ProjectAccounting) GetTallies(ctx context.Context) (tallies []accounti
return nil, Error.Wrap(err)
}
totalBytes := dbxTally.TotalBytes
if totalBytes == 0 {
totalBytes = dbxTally.Inline + dbxTally.Remote
}
totalSegments := dbxTally.TotalSegmentsCount
if totalSegments == 0 {
totalSegments = dbxTally.InlineSegmentsCount + dbxTally.RemoteSegmentsCount
}
tallies = append(tallies, accounting.BucketTally{
BucketLocation: metabase.BucketLocation{
ProjectID: projectID,
BucketName: string(dbxTally.BucketName),
},
ObjectCount: int64(dbxTally.ObjectCount),
TotalSegments: int64(dbxTally.TotalSegmentsCount),
InlineSegments: int64(dbxTally.InlineSegmentsCount),
RemoteSegments: int64(dbxTally.RemoteSegmentsCount),
TotalBytes: int64(dbxTally.TotalBytes),
InlineBytes: int64(dbxTally.Inline),
RemoteBytes: int64(dbxTally.Remote),
MetadataSize: int64(dbxTally.MetadataSize),
ObjectCount: int64(dbxTally.ObjectCount),
TotalSegments: int64(totalSegments),
TotalBytes: int64(totalBytes),
MetadataSize: int64(dbxTally.MetadataSize),
})
}
@ -129,8 +131,8 @@ func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accou
?, ?
)`), tally.IntervalStart,
[]byte(tally.BucketName), tally.ProjectID,
tally.TotalBytes, tally.InlineBytes, tally.RemoteBytes,
tally.TotalSegmentCount, tally.RemoteSegmentCount, tally.InlineSegmentCount,
tally.TotalBytes, 0, 0,
tally.TotalSegmentCount, 0, 0,
tally.ObjectCount, tally.MetadataSize,
)
@ -301,10 +303,15 @@ func (db *ProjectAccounting) GetProjectTotal(ctx context.Context, projectID uuid
for storageTalliesRows.Next() {
tally := accounting.BucketStorageTally{}
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &tally.InlineBytes, &tally.RemoteBytes, &tally.ObjectCount)
var inline, remote int64
err = storageTalliesRows.Scan(&tally.IntervalStart, &tally.TotalBytes, &inline, &remote, &tally.ObjectCount)
if err != nil {
return nil, errs.Combine(err, storageTalliesRows.Close())
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
tally.BucketName = bucket
storageTallies = append(storageTallies, &tally)
}
@ -638,13 +645,18 @@ func (db *ProjectAccounting) GetBucketTotals(ctx context.Context, projectID uuid
storageRow := db.db.QueryRowContext(ctx, storageQuery, projectID[:], []byte(bucket), since, before)
var tally accounting.BucketStorageTally
err = storageRow.Scan(&tally.TotalBytes, &tally.InlineBytes, &tally.RemoteBytes, &tally.ObjectCount)
var inline, remote int64
err = storageRow.Scan(&tally.TotalBytes, &inline, &remote, &tally.ObjectCount)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
}
if tally.TotalBytes == 0 {
tally.TotalBytes = inline + remote
}
// fill storage and object count
bucketUsage.Storage = memory.Size(tally.Bytes()).GB()
bucketUsage.ObjectCount = tally.ObjectCount

View File

@ -718,9 +718,15 @@ server.private-address: 127.0.0.1:7778
# length of time a node can go without contacting satellite before being disqualified
# stray-nodes.max-duration-without-contact: 720h0m0s
# as of system interval
# tally.as-of-system-interval: -5m0s
# how frequently the tally service should run
# tally.interval: 1h0m0s
# how many objects to query in a batch
# tally.list-limit: 2500
# how large of batches GetBandwidthSince should process at a time
# tally.read-rollup-batch-size: 10000