record bucket data into bucket_storage_tally table (#1595)

* add MetadataSize to stats

* add logic for accumulating bucket stats in calculateAtRestData

* rename stats to BucketTally, move to accounting package

* define method on accountingDB for inserting bucketTallies

* insert bucketTallies into bucket_storage_tally table
This commit is contained in:
Cameron 2019-04-01 09:42:17 -04:00 committed by GitHub
parent 42aad35b07
commit 6d43832c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 28 deletions

View File

@ -1,14 +1,18 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package tally
package accounting
import (
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/pkg/pb"
)
// stats all stats fields
type stats struct {
var mon = monkit.Package()
// BucketTally contains information about aggregate data stored in a bucket
type BucketTally struct {
Segments int64
InlineSegments int64
RemoteSegments int64
@ -21,10 +25,12 @@ type stats struct {
Bytes int64
InlineBytes int64
RemoteBytes int64
MetadataSize int64
}
// Combine aggregates all the stats
func (s *stats) Combine(o *stats) {
// Combine aggregates all the tallies
func (s *BucketTally) Combine(o *BucketTally) {
s.Segments += o.Segments
s.InlineSegments += o.InlineSegments
s.RemoteSegments += o.RemoteSegments
@ -40,18 +46,20 @@ func (s *stats) Combine(o *stats) {
}
// AddSegment groups all the data based the passed pointer
func (s *stats) AddSegment(pointer *pb.Pointer, last bool) {
func (s *BucketTally) AddSegment(pointer *pb.Pointer, last bool) {
s.Segments++
switch pointer.GetType() {
case pb.Pointer_INLINE:
s.InlineSegments++
s.InlineBytes += int64(len(pointer.InlineSegment))
s.Bytes += int64(len(pointer.InlineSegment))
s.MetadataSize += int64(len(pointer.Metadata))
case pb.Pointer_REMOTE:
s.RemoteSegments++
s.RemoteBytes += pointer.GetSegmentSize()
s.Bytes += pointer.GetSegmentSize()
s.MetadataSize += int64(len(pointer.Metadata))
default:
s.UnknownSegments++
}
@ -68,7 +76,7 @@ func (s *stats) AddSegment(pointer *pb.Pointer, last bool) {
}
// Report reports the stats thru monkit
func (s *stats) Report(prefix string) {
func (s *BucketTally) Report(prefix string) {
mon.IntVal(prefix + ".segments").Observe(s.Segments)
mon.IntVal(prefix + ".inline_segments").Observe(s.InlineSegments)
mon.IntVal(prefix + ".remote_segments").Observe(s.RemoteSegments)

View File

@ -50,6 +50,8 @@ type DB interface {
GetRawSince(ctx context.Context, latestRollup time.Time) ([]*Raw, error)
// SaveRollup records raw tallies of at rest data to the database
SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error
// SaveBucketTallies saves the latest bucket info
SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) error
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error)
// DeleteRawBefore deletes all raw tallies prior to some time

View File

@ -68,15 +68,23 @@ func (t *Tally) Run(ctx context.Context) (err error) {
//Tally calculates data-at-rest and bandwidth usage once
func (t *Tally) Tally(ctx context.Context) error {
//data at rest
var errAtRest, errBWA error
latestTally, nodeData, err := t.calculateAtRestData(ctx)
// data at rest
var errAtRest, errBWA, errBucketInfo error
latestTally, nodeData, bucketData, err := t.calculateAtRestData(ctx)
if err != nil {
errAtRest = errs.New("Query for data-at-rest failed : %v", err)
} else if len(nodeData) > 0 {
err = t.SaveAtRestRaw(ctx, latestTally, time.Now().UTC(), nodeData)
if err != nil {
errAtRest = errs.New("Saving data-at-rest failed : %v", err)
} else {
if len(nodeData) > 0 {
err = t.SaveAtRestRaw(ctx, latestTally, time.Now().UTC(), nodeData)
if err != nil {
errAtRest = errs.New("Saving storage node data-at-rest failed : %v", err)
}
}
if len(bucketData) > 0 {
err = t.accountingDB.SaveBucketTallies(ctx, latestTally, bucketData)
if err != nil {
errBucketInfo = errs.New("Saving bucket storage data failed")
}
}
}
//bandwdith
@ -104,23 +112,24 @@ func (t *Tally) Tally(ctx context.Context) error {
}
}
}
return errs.Combine(errAtRest, errBWA)
return errs.Combine(errAtRest, errBWA, errBucketInfo)
}
// calculateAtRestData iterates through the pieces on pointerdb and calculates
// the amount of at-rest data stored on each respective node
func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, err error) {
// the amount of at-rest data stored in each bucket and on each respective node
func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, bucketTallies map[string]*accounting.BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
latestTally, err = t.accountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return latestTally, nodeData, Error.Wrap(err)
return latestTally, nodeData, bucketTallies, Error.Wrap(err)
}
nodeData = make(map[storj.NodeID]float64)
bucketTallies = make(map[string]*accounting.BucketTally)
var currentBucket string
var bucketCount int64
var totalStats, currentBucketStats stats
var totalTallies, currentBucketTally accounting.BucketTally
err = t.pointerdb.Iterate("", "", true, false,
func(it storage.Iterator) error {
@ -152,14 +161,17 @@ func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time,
if currentBucket != bucketID {
if currentBucket != "" {
// report the previous bucket and add to the totals
currentBucketStats.Report("bucket")
totalStats.Combine(&currentBucketStats)
currentBucketStats = stats{}
currentBucketTally.Report("bucket")
totalTallies.Combine(&currentBucketTally)
// add currentBucketTally to bucketTallies
bucketTallies[currentBucket] = &currentBucketTally
currentBucketTally = accounting.BucketTally{}
}
currentBucket = bucketID
}
currentBucketStats.AddSegment(pointer, segment == "l")
currentBucketTally.AddSegment(pointer, segment == "l")
}
remote := pointer.GetRemote()
@ -191,18 +203,19 @@ func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time,
},
)
if err != nil {
return latestTally, nodeData, Error.Wrap(err)
return latestTally, nodeData, bucketTallies, Error.Wrap(err)
}
if currentBucket != "" {
// wrap up the last bucket
totalStats.Combine(&currentBucketStats)
totalTallies.Combine(&currentBucketTally)
bucketTallies[currentBucket] = &currentBucketTally
}
totalStats.Report("total")
totalTallies.Report("total")
mon.IntVal("bucket_count").Observe(bucketCount)
if len(nodeData) == 0 {
return latestTally, nodeData, nil
return latestTally, nodeData, bucketTallies, nil
}
//store byte hours, not just bytes
@ -214,7 +227,7 @@ func (t *Tally) calculateAtRestData(ctx context.Context) (latestTally time.Time,
for k := range nodeData {
nodeData[k] *= numHours //calculate byte hours
}
return latestTally, nodeData, err
return latestTally, nodeData, bucketTallies, err
}
// SaveAtRestRaw records raw tallies of at-rest-data and updates the LastTimestamp

View File

@ -162,6 +162,28 @@ func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time,
return Error.Wrap(err)
}
// SaveBucketTallies saves the latest bucket info
func (db *accountingDB) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) error {
if len(bucketTallies) == 0 {
return Error.New("In SaveBucketTallies with empty bucketTallies")
}
for bucketID, info := range bucketTallies {
bID := dbx.BucketStorageTally_BucketId([]byte(bucketID))
interval := dbx.BucketStorageTally_IntervalStart(intervalStart)
inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes))
remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes))
rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments))
iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments))
objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files))
meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize))
_, err := db.db.Create_BucketStorageTally(ctx, bID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta)
if err != nil {
return err
}
}
return nil
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (db *accountingDB) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
var sqlStmt = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total,

View File

@ -424,6 +424,8 @@ model bucket_storage_tally (
field metadata_size uint64
)
create bucket_storage_tally ( )
// --- storage node accounting tables --- //
model storagenode_bandwidth_rollup (

View File

@ -3953,6 +3953,39 @@ func (obj *postgresImpl) Create_UsedSerial(ctx context.Context,
}
func (obj *postgresImpl) Create_BucketStorageTally(ctx context.Context,
bucket_storage_tally_bucket_id BucketStorageTally_BucketId_Field,
bucket_storage_tally_interval_start BucketStorageTally_IntervalStart_Field,
bucket_storage_tally_inline BucketStorageTally_Inline_Field,
bucket_storage_tally_remote BucketStorageTally_Remote_Field,
bucket_storage_tally_remote_segments_count BucketStorageTally_RemoteSegmentsCount_Field,
bucket_storage_tally_inline_segments_count BucketStorageTally_InlineSegmentsCount_Field,
bucket_storage_tally_object_count BucketStorageTally_ObjectCount_Field,
bucket_storage_tally_metadata_size BucketStorageTally_MetadataSize_Field) (
bucket_storage_tally *BucketStorageTally, err error) {
__bucket_id_val := bucket_storage_tally_bucket_id.value()
__interval_start_val := bucket_storage_tally_interval_start.value()
__inline_val := bucket_storage_tally_inline.value()
__remote_val := bucket_storage_tally_remote.value()
__remote_segments_count_val := bucket_storage_tally_remote_segments_count.value()
__inline_segments_count_val := bucket_storage_tally_inline_segments_count.value()
__object_count_val := bucket_storage_tally_object_count.value()
__metadata_size_val := bucket_storage_tally_metadata_size.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bucket_storage_tallies ( bucket_id, interval_start, inline, remote, remote_segments_count, inline_segments_count, object_count, metadata_size ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? ) RETURNING bucket_storage_tallies.bucket_id, bucket_storage_tallies.interval_start, bucket_storage_tallies.inline, bucket_storage_tallies.remote, bucket_storage_tallies.remote_segments_count, bucket_storage_tallies.inline_segments_count, bucket_storage_tallies.object_count, bucket_storage_tallies.metadata_size")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __bucket_id_val, __interval_start_val, __inline_val, __remote_val, __remote_segments_count_val, __inline_segments_count_val, __object_count_val, __metadata_size_val)
bucket_storage_tally = &BucketStorageTally{}
err = obj.driver.QueryRow(__stmt, __bucket_id_val, __interval_start_val, __inline_val, __remote_val, __remote_segments_count_val, __inline_segments_count_val, __object_count_val, __metadata_size_val).Scan(&bucket_storage_tally.BucketId, &bucket_storage_tally.IntervalStart, &bucket_storage_tally.Inline, &bucket_storage_tally.Remote, &bucket_storage_tally.RemoteSegmentsCount, &bucket_storage_tally.InlineSegmentsCount, &bucket_storage_tally.ObjectCount, &bucket_storage_tally.MetadataSize)
if err != nil {
return nil, obj.makeErr(err)
}
return bucket_storage_tally, nil
}
func (obj *postgresImpl) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
@ -6256,6 +6289,42 @@ func (obj *sqlite3Impl) Create_UsedSerial(ctx context.Context,
}
func (obj *sqlite3Impl) Create_BucketStorageTally(ctx context.Context,
bucket_storage_tally_bucket_id BucketStorageTally_BucketId_Field,
bucket_storage_tally_interval_start BucketStorageTally_IntervalStart_Field,
bucket_storage_tally_inline BucketStorageTally_Inline_Field,
bucket_storage_tally_remote BucketStorageTally_Remote_Field,
bucket_storage_tally_remote_segments_count BucketStorageTally_RemoteSegmentsCount_Field,
bucket_storage_tally_inline_segments_count BucketStorageTally_InlineSegmentsCount_Field,
bucket_storage_tally_object_count BucketStorageTally_ObjectCount_Field,
bucket_storage_tally_metadata_size BucketStorageTally_MetadataSize_Field) (
bucket_storage_tally *BucketStorageTally, err error) {
__bucket_id_val := bucket_storage_tally_bucket_id.value()
__interval_start_val := bucket_storage_tally_interval_start.value()
__inline_val := bucket_storage_tally_inline.value()
__remote_val := bucket_storage_tally_remote.value()
__remote_segments_count_val := bucket_storage_tally_remote_segments_count.value()
__inline_segments_count_val := bucket_storage_tally_inline_segments_count.value()
__object_count_val := bucket_storage_tally_object_count.value()
__metadata_size_val := bucket_storage_tally_metadata_size.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO bucket_storage_tallies ( bucket_id, interval_start, inline, remote, remote_segments_count, inline_segments_count, object_count, metadata_size ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? )")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __bucket_id_val, __interval_start_val, __inline_val, __remote_val, __remote_segments_count_val, __inline_segments_count_val, __object_count_val, __metadata_size_val)
__res, err := obj.driver.Exec(__stmt, __bucket_id_val, __interval_start_val, __inline_val, __remote_val, __remote_segments_count_val, __inline_segments_count_val, __object_count_val, __metadata_size_val)
if err != nil {
return nil, obj.makeErr(err)
}
__pk, err := __res.LastInsertId()
if err != nil {
return nil, obj.makeErr(err)
}
return obj.getLastBucketStorageTally(ctx, __pk)
}
func (obj *sqlite3Impl) Create_CertRecord(ctx context.Context,
certRecord_publickey CertRecord_Publickey_Field,
certRecord_id CertRecord_Id_Field) (
@ -8248,6 +8317,24 @@ func (obj *sqlite3Impl) getLastUsedSerial(ctx context.Context,
}
func (obj *sqlite3Impl) getLastBucketStorageTally(ctx context.Context,
pk int64) (
bucket_storage_tally *BucketStorageTally, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT bucket_storage_tallies.bucket_id, bucket_storage_tallies.interval_start, bucket_storage_tallies.inline, bucket_storage_tallies.remote, bucket_storage_tallies.remote_segments_count, bucket_storage_tallies.inline_segments_count, bucket_storage_tallies.object_count, bucket_storage_tallies.metadata_size FROM bucket_storage_tallies WHERE _rowid_ = ?")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, pk)
bucket_storage_tally = &BucketStorageTally{}
err = obj.driver.QueryRow(__stmt, pk).Scan(&bucket_storage_tally.BucketId, &bucket_storage_tally.IntervalStart, &bucket_storage_tally.Inline, &bucket_storage_tally.Remote, &bucket_storage_tally.RemoteSegmentsCount, &bucket_storage_tally.InlineSegmentsCount, &bucket_storage_tally.ObjectCount, &bucket_storage_tally.MetadataSize)
if err != nil {
return nil, obj.makeErr(err)
}
return bucket_storage_tally, nil
}
func (obj *sqlite3Impl) getLastCertRecord(ctx context.Context,
pk int64) (
certRecord *CertRecord, err error) {
@ -8685,6 +8772,24 @@ func (rx *Rx) Create_ApiKey(ctx context.Context,
}
func (rx *Rx) Create_BucketStorageTally(ctx context.Context,
bucket_storage_tally_bucket_id BucketStorageTally_BucketId_Field,
bucket_storage_tally_interval_start BucketStorageTally_IntervalStart_Field,
bucket_storage_tally_inline BucketStorageTally_Inline_Field,
bucket_storage_tally_remote BucketStorageTally_Remote_Field,
bucket_storage_tally_remote_segments_count BucketStorageTally_RemoteSegmentsCount_Field,
bucket_storage_tally_inline_segments_count BucketStorageTally_InlineSegmentsCount_Field,
bucket_storage_tally_object_count BucketStorageTally_ObjectCount_Field,
bucket_storage_tally_metadata_size BucketStorageTally_MetadataSize_Field) (
bucket_storage_tally *BucketStorageTally, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Create_BucketStorageTally(ctx, bucket_storage_tally_bucket_id, bucket_storage_tally_interval_start, bucket_storage_tally_inline, bucket_storage_tally_remote, bucket_storage_tally_remote_segments_count, bucket_storage_tally_inline_segments_count, bucket_storage_tally_object_count, bucket_storage_tally_metadata_size)
}
func (rx *Rx) Create_BucketUsage(ctx context.Context,
bucket_usage_id BucketUsage_Id_Field,
bucket_usage_bucket_id BucketUsage_BucketId_Field,
@ -9346,6 +9451,17 @@ type Methods interface {
api_key_name ApiKey_Name_Field) (
api_key *ApiKey, err error)
Create_BucketStorageTally(ctx context.Context,
bucket_storage_tally_bucket_id BucketStorageTally_BucketId_Field,
bucket_storage_tally_interval_start BucketStorageTally_IntervalStart_Field,
bucket_storage_tally_inline BucketStorageTally_Inline_Field,
bucket_storage_tally_remote BucketStorageTally_Remote_Field,
bucket_storage_tally_remote_segments_count BucketStorageTally_RemoteSegmentsCount_Field,
bucket_storage_tally_inline_segments_count BucketStorageTally_InlineSegmentsCount_Field,
bucket_storage_tally_object_count BucketStorageTally_ObjectCount_Field,
bucket_storage_tally_metadata_size BucketStorageTally_MetadataSize_Field) (
bucket_storage_tally *BucketStorageTally, err error)
Create_BucketUsage(ctx context.Context,
bucket_usage_id BucketUsage_Id_Field,
bucket_usage_bucket_id BucketUsage_BucketId_Field,

View File

@ -78,6 +78,13 @@ func (m *lockedAccounting) LastTimestamp(ctx context.Context, timestampType stri
return m.db.LastTimestamp(ctx, timestampType)
}
// SaveBucketTallies saves the latest bucket info
func (m *lockedAccounting) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketInfo map[string]*accounting.BucketTally) error {
m.Lock()
defer m.Unlock()
return m.db.SaveBucketTallies(ctx, intervalStart, bucketInfo)
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (m *lockedAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
m.Lock()