storagenode/storageusage: add interval_end_time, rename interval_start to timestamp

The satellite now returns the last interval_end_time for each
daily storage usage.
We need to store the interval_end_time in the storage usage cache.
Also, renamed interval_start to timestamp to avoid ambiguity since
the interval_start only stores just the date/day returned by the
satellite.

Updates https://github.com/storj/storj/issues/4178

Change-Id: I94138ba8a506eeedd6703787ee03ab3e072efa32
This commit is contained in:
Clement Sam 2022-07-21 11:15:03 +00:00 committed by Storj Robot
parent 04c0e5d135
commit 9a539c4830
10 changed files with 224 additions and 47 deletions

View File

@ -151,10 +151,12 @@ func makeStorageUsageStamps(satellites []storj.NodeID) ([]storageusage.Stamp, ma
for _, satellite := range satellites {
for i := 0; i < currentDay; i++ {
intervalEndTime := now.Add(time.Hour * -24 * time.Duration(i))
stamp := storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: 30000000000000,
IntervalStart: now.Add(time.Hour * -24 * time.Duration(i)),
SatelliteID: satellite,
AtRestTotal: 30000000000000,
IntervalStart: time.Date(intervalEndTime.Year(), intervalEndTime.Month(), intervalEndTime.Day(), 0, 0, 0, 0, intervalEndTime.Location()),
IntervalEndTime: intervalEndTime,
}
summary[satellite] += stamp.AtRestTotal

View File

@ -289,10 +289,12 @@ func makeStorageUsageStamps(satellites []storj.NodeID) ([]storageusage.Stamp, ma
for _, satellite := range satellites {
for i := 0; i < now; i++ {
intervalEndTime := time.Now().UTC().Add(time.Hour * -24 * time.Duration(i))
stamp := storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: 31234567891234,
IntervalStart: time.Now().UTC().Add(time.Hour * -24 * time.Duration(i)),
SatelliteID: satellite,
AtRestTotal: 31234567891234,
IntervalStart: time.Date(intervalEndTime.Year(), intervalEndTime.Month(), intervalEndTime.Day(), 0, 0, 0, 0, intervalEndTime.Location()),
IntervalEndTime: intervalEndTime,
}
summary[satellite] += stamp.AtRestTotal

View File

@ -168,9 +168,10 @@ func fromSpaceUsageResponse(resp *pb.DailyStorageUsageResponse, satelliteID stor
for _, pbUsage := range resp.GetDailyStorageUsage() {
stamps = append(stamps, storageusage.Stamp{
SatelliteID: satelliteID,
AtRestTotal: pbUsage.AtRestTotal,
IntervalStart: pbUsage.Timestamp,
SatelliteID: satelliteID,
AtRestTotal: pbUsage.AtRestTotal,
IntervalStart: pbUsage.Timestamp,
IntervalEndTime: pbUsage.IntervalEndTime,
})
}

View File

@ -2002,6 +2002,32 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
UPDATE satellites SET address = 'satellite.stefan-benten.de:7777' WHERE node_id = X'004ae89e970e703df42ba4ab1416a3b30b7e1d8e14aa0e558f7ee26800000000'`,
},
},
{
DB: &db.storageUsageDB.DB,
Description: "Add interval_end_time field to storage_usage db, backfill interval_end_time with interval_start, rename interval_start to timestamp",
Version: 54,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) error {
_, err := rtx.Exec(ctx, `
CREATE TABLE storage_usage_new (
timestamp TIMESTAMP NOT NULL,
satellite_id BLOB NOT NULL,
at_rest_total REAL NOT NULL,
interval_end_time TIMESTAMP NOT NULL,
PRIMARY KEY (timestamp, satellite_id)
);
INSERT INTO storage_usage_new SELECT
interval_start,
satellite_id,
at_rest_total,
interval_start
FROM storage_usage;
DROP TABLE storage_usage;
ALTER TABLE storage_usage_new RENAME TO storage_usage;
`)
return errs.Wrap(err)
}),
},
},
}
}

View File

@ -710,7 +710,7 @@ func Schema() map[string]*dbschema.Schema {
Tables: []*dbschema.Table{
{
Name: "storage_usage",
PrimaryKey: []string{"interval_start", "satellite_id"},
PrimaryKey: []string{"satellite_id", "timestamp"},
Columns: []*dbschema.Column{
{
Name: "at_rest_total",
@ -718,7 +718,7 @@ func Schema() map[string]*dbschema.Schema {
IsNullable: false,
},
{
Name: "interval_start",
Name: "interval_end_time",
Type: "TIMESTAMP",
IsNullable: false,
},
@ -727,6 +727,11 @@ func Schema() map[string]*dbschema.Schema {
Type: "BLOB",
IsNullable: false,
},
{
Name: "timestamp",
Type: "TIMESTAMP",
IsNullable: false,
},
},
},
},

View File

@ -31,12 +31,12 @@ func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp
return nil
}
query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, interval_start)
VALUES(?,?,?)`
query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, interval_end_time, timestamp)
VALUES(?,?,?,?)`
return withTx(ctx, db.GetDB(), func(tx tagsql.Tx) error {
for _, stamp := range stamps {
_, err = tx.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.IntervalStart.UTC())
_, err = tx.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.IntervalEndTime.UTC(), stamp.IntervalStart.UTC())
if err != nil {
return err
@ -52,14 +52,21 @@ func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp
func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
// the at_rest_total is in bytes*hours, so to find the total number
// of hours used to get the at_rest_total, we find the hour difference,
// between the interval_end_time of a row and that of the previous row
// and divide the at_rest_total by the hour interval and multiply by 24 hours
// 24 hours to estimate the value for a 24hour time window.
// i.e. 24 * (at_rest_total/hour_difference), where the
// hour_difference = current row interval_end_time - previous row interval_end_time
// Rows with 0-hour difference are assumed to be 24 hours.
query := `SELECT satellite_id,
SUM(at_rest_total),
interval_start
24 * (at_rest_total / COALESCE((CAST(strftime('%s', interval_end_time) AS NUMERIC) - CAST(strftime('%s', LAG(interval_end_time) OVER (PARTITION BY satellite_id ORDER BY interval_end_time)) AS NUMERIC)) / 3600, 24)),
timestamp
FROM storage_usage
WHERE satellite_id = ?
AND ? <= interval_start AND interval_start <= ?
GROUP BY DATE(interval_start)
ORDER BY interval_start`
AND ? <= timestamp AND timestamp <= ?
ORDER BY timestamp`
rows, err := db.QueryContext(ctx, query, satelliteID, from.UTC(), to.UTC())
if err != nil {
@ -71,9 +78,9 @@ func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID
for rows.Next() {
var satellite storj.NodeID
var atRestTotal float64
var intervalStart time.Time
var timestamp time.Time
err = rows.Scan(&satellite, &atRestTotal, &intervalStart)
err = rows.Scan(&satellite, &atRestTotal, &timestamp)
if err != nil {
return nil, err
}
@ -81,7 +88,7 @@ func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID
stamps = append(stamps, storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: atRestTotal,
IntervalStart: intervalStart,
IntervalStart: timestamp,
})
}
@ -93,11 +100,23 @@ func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID
func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT SUM(at_rest_total), interval_start
FROM storage_usage
WHERE ? <= interval_start AND interval_start <= ?
GROUP BY DATE(interval_start)
ORDER BY interval_start`
// the at_rest_total is in bytes*hours, so to find the total number
// of hours used to get the at_rest_total, we find the hour difference,
// between the interval_end_time of a row and that of the previous row
// and divide the at_rest_total by the hour interval and multiply by 24 hours
// 24 hours to estimate the value for a 24hour time window.
// i.e. 24 * (at_rest_total/hour_difference), where the
// hour_difference = current row interval_end_time - previous row interval_end_time
// Rows with 0-hour difference are assumed to be 24 hours.
query := `SELECT SUM(usages.at_rest_total), usages.timestamp
FROM (
SELECT timestamp,
24 * (at_rest_total / COALESCE((CAST(strftime('%s', interval_end_time) AS NUMERIC) - CAST(strftime('%s', LAG(interval_end_time) OVER (PARTITION BY satellite_id ORDER BY interval_end_time)) AS NUMERIC)) / 3600, 24)) AS at_rest_total
FROM storage_usage
WHERE ? <= timestamp AND timestamp <= ?
) as usages
GROUP BY usages.timestamp
ORDER BY usages.timestamp`
rows, err := db.QueryContext(ctx, query, from.UTC(), to.UTC())
if err != nil {
@ -110,16 +129,16 @@ func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time)
var stamps []storageusage.Stamp
for rows.Next() {
var atRestTotal float64
var intervalStart time.Time
var timestamp time.Time
err = rows.Scan(&atRestTotal, &intervalStart)
err = rows.Scan(&atRestTotal, &timestamp)
if err != nil {
return nil, err
}
stamps = append(stamps, storageusage.Stamp{
AtRestTotal: atRestTotal,
IntervalStart: intervalStart,
IntervalStart: timestamp,
})
}
@ -133,7 +152,7 @@ func (db *storageUsageDB) Summary(ctx context.Context, from, to time.Time) (_ fl
query := `SELECT SUM(at_rest_total)
FROM storage_usage
WHERE ? <= interval_start AND interval_start <= ?`
WHERE ? <= timestamp AND timestamp <= ?`
err = db.QueryRowContext(ctx, query, from.UTC(), to.UTC()).Scan(&summary)
return summary.Float64, err
@ -147,7 +166,7 @@ func (db *storageUsageDB) SatelliteSummary(ctx context.Context, satelliteID stor
query := `SELECT SUM(at_rest_total)
FROM storage_usage
WHERE satellite_id = ?
AND ? <= interval_start AND interval_start <= ?`
AND ? <= timestamp AND timestamp <= ?`
err = db.QueryRowContext(ctx, query, satelliteID, from.UTC(), to.UTC()).Scan(&summary)
return summary.Float64, err

View File

@ -68,6 +68,7 @@ var States = MultiDBStates{
&v51,
&v52,
&v53,
&v54,
},
}

View File

@ -0,0 +1,89 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v54 = MultiDBState{
Version: 54,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v53.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: &DBState{
SQL: `
CREATE TABLE storage_usage (
timestamp TIMESTAMP NOT NULL,
satellite_id BLOB NOT NULL,
at_rest_total REAL NOT NULL,
interval_end_time TIMESTAMP NOT NULL,
PRIMARY KEY (timestamp, satellite_id)
);
INSERT INTO storage_usage (satellite_id, at_rest_total, interval_end_time, timestamp) VALUES
(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000', 5.0, '2019-07-19 20:00:00+00:00', '2019-07-19 20:00:00+00:00');
`,
},
storagenodedb.ReputationDBName: &DBState{
SQL: `
-- table to store nodestats cache
CREATE TABLE reputation (
satellite_id BLOB NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
audit_history BLOB,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
vetted_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL,
PRIMARY KEY (satellite_id)
);
INSERT INTO reputation (satellite_id, audit_success_count, audit_total_count, audit_reputation_alpha, audit_reputation_beta, audit_reputation_score, audit_unknown_reputation_alpha, audit_unknown_reputation_beta, audit_unknown_reputation_score, online_score, audit_history, disqualified_at, updated_at, suspended_at, offline_suspended_at, offline_under_review_at, vetted_at, joined_at) VALUES
(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, '2019-07-19 20:00:00+00:00', '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, NULL, '1970-01-01 00:00:00+00:00'),
(X'953fdf144a088a4116a1f6acfc8475c78278c018849db050d894a89572e56d00', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, '2019-07-19 20:00:00+00:00', '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, '2019-06-25 20:00:00+00:00', '1970-01-01 00:00:00+00:00'),
(X'1a438a44e3cc9ab9faaacc1c034339f0ebec05f310f0ba270414dac753882f00', 1, 1, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, NULL, NULL, '2019-08-23 20:00:00+00:00', NULL, NULL, NULL, NULL, '1970-01-01 00:00:00+00:00');
`,
},
storagenodedb.PieceSpaceUsedDBName: v53.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v53.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v53.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v53.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v53.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: &DBState{
SQL: `
CREATE TABLE satellites (
node_id BLOB NOT NULL,
address TEXT,
added_at TIMESTAMP NOT NULL,
status INTEGER NOT NULL,
PRIMARY KEY (node_id)
);
CREATE TABLE satellite_exit_progress (
satellite_id BLOB NOT NULL,
initiated_at TIMESTAMP,
finished_at TIMESTAMP,
starting_disk_usage INTEGER NOT NULL,
bytes_deleted INTEGER NOT NULL,
completion_receipt BLOB,
FOREIGN KEY (satellite_id) REFERENCES satellites (node_id)
);
INSERT INTO satellites (node_id, added_at, status) VALUES
(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000', '2019-09-10 20:00:00+00:00', 0);
INSERT INTO satellite_exit_progress VALUES(X'0ed28abb2813e184a1e98b0f6605c4911ea468c7e8433eb583e0fca7ceac3000','2019-09-10 20:00:00+00:00', null, 100, 0, null);
`,
},
storagenodedb.DeprecatedInfoDBName: v53.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v53.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: v53.DBStates[storagenodedb.HeldAmountDBName],
storagenodedb.PricingDBName: v53.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v53.DBStates[storagenodedb.APIKeysDBName],
},
}

View File

@ -30,7 +30,12 @@ type DB interface {
// Stamp is storage usage stamp for satellite from interval start till next interval.
type Stamp struct {
SatelliteID storj.NodeID `json:"-"`
AtRestTotal float64 `json:"atRestTotal"`
IntervalStart time.Time `json:"intervalStart"`
SatelliteID storj.NodeID `json:"-"`
AtRestTotal float64 `json:"atRestTotal"`
// IntervalStart represents one tally day
// TODO: rename to timestamp to match DB
IntervalStart time.Time `json:"intervalStart"`
// IntervalEndTime represents the timestamp for the last tally run time
// (i.e. last interval_end_time) for the day
IntervalEndTime time.Time `json:"-"`
}

View File

@ -41,15 +41,33 @@ func TestStorageUsage(t *testing.T) {
totalSummary += summ
}
dailyStamps := make(map[time.Time]storageusage.Stamp)
dailyStampsTotals := make(map[time.Time]float64)
expectedDailyStamps := make(map[storj.NodeID]map[time.Time]storageusage.Stamp)
expectedDailyStampsTotals := make(map[time.Time]float64)
for _, stamp := range stamps {
if stamp.SatelliteID == satelliteID {
dailyStamps[stamp.IntervalStart.UTC()] = stamp
if expectedDailyStamps[stamp.SatelliteID] == nil {
expectedDailyStamps[stamp.SatelliteID] = map[time.Time]storageusage.Stamp{}
}
expectedDailyStamps[stamp.SatelliteID][stamp.IntervalStart.UTC()] = stamp
}
dailyStampsTotals[stamp.IntervalStart.UTC()] += stamp.AtRestTotal
for _, satellite := range satellites {
for _, stamp := range expectedDailyStamps[satellite] {
intervalStart := stamp.IntervalStart.UTC()
prevTimestamp := intervalStart.AddDate(0, 0, -1)
atRestTotal := stamp.AtRestTotal
if prevStamp, ok := expectedDailyStamps[satellite][prevTimestamp]; ok {
diff := stamp.IntervalEndTime.UTC().Sub(prevStamp.IntervalEndTime.UTC()).Hours()
atRestTotal = (stamp.AtRestTotal / diff) * 24
}
expectedDailyStamps[satellite][intervalStart] = storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: atRestTotal,
IntervalStart: intervalStart,
IntervalEndTime: stamp.IntervalEndTime,
}
expectedDailyStampsTotals[intervalStart] += atRestTotal
}
}
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
@ -64,11 +82,10 @@ func TestStorageUsage(t *testing.T) {
res, err := storageUsageDB.GetDaily(ctx, satelliteID, time.Time{}, now)
assert.NoError(t, err)
assert.NotNil(t, res)
assert.Equal(t, days, len(res))
for _, stamp := range res {
assert.Equal(t, satelliteID, stamp.SatelliteID)
assert.Equal(t, dailyStamps[stamp.IntervalStart].AtRestTotal, stamp.AtRestTotal)
assert.Equal(t, expectedDailyStamps[satelliteID][stamp.IntervalStart].AtRestTotal, stamp.AtRestTotal)
}
})
@ -79,7 +96,10 @@ func TestStorageUsage(t *testing.T) {
assert.Equal(t, days, len(res))
for _, stamp := range res {
assert.Equal(t, dailyStampsTotals[stamp.IntervalStart], stamp.AtRestTotal)
// there can be inconsistencies in the values due to rounding off errors
// and can make the test flaky.
// rounding the values to 5 decimal places to avoid flakiness
assert.Equal(t, roundFloat(expectedDailyStampsTotals[stamp.IntervalStart]), roundFloat(stamp.AtRestTotal))
}
})
@ -137,13 +157,15 @@ func makeStorageUsageStamps(satellites []storj.NodeID, days int, endDate time.Ti
summary := make(map[storj.NodeID]float64)
startDate := time.Date(endDate.Year(), endDate.Month(), endDate.Day()-days, 0, 0, 0, 0, endDate.Location())
for _, satellite := range satellites {
for i := 0; i < days; i++ {
h := testrand.Intn(24)
intervalEndTime := startDate.Add(time.Hour * 24 * time.Duration(i)).Add(time.Hour * time.Duration(h))
stamp := storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: math.Round(testrand.Float64n(1000)),
IntervalStart: startDate.Add(time.Hour * 24 * time.Duration(i)),
SatelliteID: satellite,
AtRestTotal: math.Round(testrand.Float64n(1000)),
IntervalStart: time.Date(intervalEndTime.Year(), intervalEndTime.Month(), intervalEndTime.Day(), 0, 0, 0, 0, intervalEndTime.Location()),
IntervalEndTime: intervalEndTime,
}
summary[satellite] += stamp.AtRestTotal
@ -153,3 +175,8 @@ func makeStorageUsageStamps(satellites []storj.NodeID, days int, endDate time.Ti
return stamps, summary
}
// RoundFloat rounds float value to 5 decimal places.
func roundFloat(value float64) float64 {
return math.Round(value*100000) / 100000
}