satellite/nodestats: return storage usage in Byte*hours (#2858)
This commit is contained in:
parent
a4dddc65ba
commit
dadd7327df
@ -13,6 +13,7 @@ import (
|
||||
const (
|
||||
sqliteTimeLayout = "2006-01-02 15:04:05-07:00"
|
||||
sqliteTimeLayoutNoTimeZone = "2006-01-02 15:04:05"
|
||||
sqliteTimeLayoutDate = "2006-01-02"
|
||||
)
|
||||
|
||||
// ErrNullTime defines error class for NullTime.
|
||||
@ -61,7 +62,8 @@ func (nt NullTime) Value() (driver.Value, error) {
|
||||
|
||||
// parseSqliteTimeString parses sqlite times string.
|
||||
// It tries to process value as string with timezone first,
|
||||
// then fallback to parsing as string without timezone.
|
||||
// then fallback to parsing as string without timezone and
|
||||
// finally to parsing value as date.
|
||||
func parseSqliteTimeString(val string) (time.Time, error) {
|
||||
var times time.Time
|
||||
var err error
|
||||
@ -71,5 +73,10 @@ func parseSqliteTimeString(val string) (time.Time, error) {
|
||||
return times, nil
|
||||
}
|
||||
|
||||
return time.Parse(sqliteTimeLayoutNoTimeZone, val)
|
||||
times, err = time.Parse(sqliteTimeLayoutNoTimeZone, val)
|
||||
if err == nil {
|
||||
return times, nil
|
||||
}
|
||||
|
||||
return time.Parse(sqliteTimeLayoutDate, val)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package accounting_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -20,10 +21,6 @@ import (
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
const (
|
||||
rollupsCount = 25
|
||||
)
|
||||
|
||||
func TestSaveBucketTallies(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
@ -51,7 +48,14 @@ func TestStorageNodeUsage(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
const (
|
||||
days = 30
|
||||
)
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
nodeID := testrand.NodeID()
|
||||
startDate := now.Add(time.Hour * 24 * -days)
|
||||
|
||||
var nodes storj.NodeIDList
|
||||
nodes = append(nodes, nodeID)
|
||||
@ -59,31 +63,73 @@ func TestStorageNodeUsage(t *testing.T) {
|
||||
nodes = append(nodes, testrand.NodeID())
|
||||
nodes = append(nodes, testrand.NodeID())
|
||||
|
||||
rollups, dates := createRollups(nodes)
|
||||
rollups, tallies, lastDate := makeRollupsAndStorageNodeStorageTallies(nodes, startDate, days)
|
||||
|
||||
// run 2 rollups for the same day
|
||||
err := db.StoragenodeAccounting().SaveRollup(ctx, time.Now(), rollups)
|
||||
require.NoError(t, err)
|
||||
err = db.StoragenodeAccounting().SaveRollup(ctx, time.Now(), rollups)
|
||||
lastRollup := rollups[lastDate]
|
||||
delete(rollups, lastDate)
|
||||
|
||||
accountingDB := db.StoragenodeAccounting()
|
||||
|
||||
// create last rollup timestamp
|
||||
_, err := accountingDB.LastTimestamp(ctx, accounting.LastRollup)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeStorageUsages, err := db.StoragenodeAccounting().QueryStorageNodeUsage(ctx, nodeID, time.Time{}, time.Now())
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, nodeStorageUsages)
|
||||
assert.Equal(t, rollupsCount-1, len(nodeStorageUsages))
|
||||
|
||||
for _, usage := range nodeStorageUsages {
|
||||
assert.Equal(t, nodeID, usage.NodeID)
|
||||
// save tallies
|
||||
for latestTally, tallies := range tallies {
|
||||
err = accountingDB.SaveTallies(ctx, latestTally, tallies)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
lastDate, prevDate := dates[len(dates)-1], dates[len(dates)-2]
|
||||
lastRollup, prevRollup := rollups[lastDate][nodeID], rollups[prevDate][nodeID]
|
||||
// save rollup
|
||||
err = accountingDB.SaveRollup(ctx, lastDate.Add(time.Hour*-24), rollups)
|
||||
require.NoError(t, err)
|
||||
|
||||
testValue := lastRollup.AtRestTotal - prevRollup.AtRestTotal
|
||||
testValue /= lastRollup.StartTime.Sub(prevRollup.StartTime).Hours()
|
||||
t.Run("usage with pending tallies", func(t *testing.T) {
|
||||
nodeStorageUsages, err := accountingDB.QueryStorageNodeUsage(ctx, nodeID, time.Time{}, now)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, nodeStorageUsages)
|
||||
assert.Equal(t, days, len(nodeStorageUsages))
|
||||
|
||||
assert.Equal(t, testValue, nodeStorageUsages[len(nodeStorageUsages)-1].StorageUsed)
|
||||
assert.Equal(t, lastDate, nodeStorageUsages[len(nodeStorageUsages)-1].Timestamp.UTC())
|
||||
// check usage from rollups
|
||||
for _, usage := range nodeStorageUsages[:len(nodeStorageUsages)-1] {
|
||||
assert.Equal(t, nodeID, usage.NodeID)
|
||||
assert.Equal(t, rollups[usage.Timestamp.UTC()][nodeID].AtRestTotal, usage.StorageUsed)
|
||||
}
|
||||
|
||||
// check last usage that calculated from tallies
|
||||
lastUsage := nodeStorageUsages[len(nodeStorageUsages)-1]
|
||||
|
||||
assert.Equal(t,
|
||||
nodeID,
|
||||
lastUsage.NodeID)
|
||||
assert.Equal(t,
|
||||
lastRollup[nodeID].StartTime,
|
||||
lastUsage.Timestamp.UTC())
|
||||
assert.Equal(t,
|
||||
lastRollup[nodeID].AtRestTotal,
|
||||
lastUsage.StorageUsed)
|
||||
})
|
||||
|
||||
t.Run("usage entirely from rollups", func(t *testing.T) {
|
||||
const (
|
||||
start = 10
|
||||
// should be greater than 2
|
||||
// not to include tallies into result
|
||||
end = 2
|
||||
)
|
||||
|
||||
startDate, endDate := now.Add(time.Hour*24*-start), now.Add(time.Hour*24*-end)
|
||||
|
||||
nodeStorageUsages, err := accountingDB.QueryStorageNodeUsage(ctx, nodeID, startDate, endDate)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, nodeStorageUsages)
|
||||
assert.Equal(t, start-end, len(nodeStorageUsages))
|
||||
|
||||
for _, usage := range nodeStorageUsages {
|
||||
assert.Equal(t, nodeID, usage.NodeID)
|
||||
assert.Equal(t, rollups[usage.Timestamp.UTC()][nodeID].AtRestTotal, usage.StorageUsed)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@ -113,37 +159,42 @@ func createBucketStorageTallies(projectID uuid.UUID) (map[string]*accounting.Buc
|
||||
return bucketTallies, expectedTallies, nil
|
||||
}
|
||||
|
||||
func createRollups(nodes storj.NodeIDList) (accounting.RollupStats, []time.Time) {
|
||||
var dates []time.Time
|
||||
// make rollups and tallies for specified nodes and date range.
|
||||
func makeRollupsAndStorageNodeStorageTallies(nodes []storj.NodeID, start time.Time, days int) (accounting.RollupStats, map[time.Time]map[storj.NodeID]float64, time.Time) {
|
||||
rollups := make(accounting.RollupStats)
|
||||
now := time.Now().UTC()
|
||||
tallies := make(map[time.Time]map[storj.NodeID]float64)
|
||||
|
||||
var rollupCounter int64
|
||||
for i := 0; i < rollupsCount; i++ {
|
||||
startDate := time.Date(now.Year(), now.Month()-1, 1+i, 0, 0, 0, 0, now.Location())
|
||||
if rollups[startDate] == nil {
|
||||
rollups[startDate] = make(map[storj.NodeID]*accounting.Rollup)
|
||||
const (
|
||||
hours = 12
|
||||
)
|
||||
|
||||
for i := 0; i < days; i++ {
|
||||
startDay := time.Date(start.Year(), start.Month(), start.Day()+i, 0, 0, 0, 0, start.Location())
|
||||
if rollups[startDay] == nil {
|
||||
rollups[startDay] = make(map[storj.NodeID]*accounting.Rollup)
|
||||
}
|
||||
|
||||
for _, nodeID := range nodes {
|
||||
for _, node := range nodes {
|
||||
rollup := &accounting.Rollup{
|
||||
ID: rollupCounter,
|
||||
NodeID: nodeID,
|
||||
StartTime: startDate,
|
||||
PutTotal: testrand.Int63n(10000),
|
||||
GetTotal: testrand.Int63n(10000),
|
||||
GetAuditTotal: testrand.Int63n(10000),
|
||||
GetRepairTotal: testrand.Int63n(10000),
|
||||
PutRepairTotal: testrand.Int63n(10000),
|
||||
AtRestTotal: testrand.Float64n(10000),
|
||||
NodeID: node,
|
||||
StartTime: startDay,
|
||||
}
|
||||
|
||||
rollupCounter++
|
||||
rollups[startDate][nodeID] = rollup
|
||||
}
|
||||
for h := 0; h < hours; h++ {
|
||||
startTime := startDay.Add(time.Hour * time.Duration(h))
|
||||
if tallies[startTime] == nil {
|
||||
tallies[startTime] = make(map[storj.NodeID]float64)
|
||||
}
|
||||
|
||||
dates = append(dates, startDate)
|
||||
tallieAtRest := math.Round(testrand.Float64n(1000))
|
||||
tallies[startTime][node] = tallieAtRest
|
||||
rollup.AtRestTotal += tallieAtRest
|
||||
}
|
||||
|
||||
rollups[startDay][node] = rollup
|
||||
}
|
||||
}
|
||||
|
||||
return rollups, dates
|
||||
return rollups, tallies,
|
||||
time.Date(start.Year(), start.Month(), start.Day()+days-1, 0, 0, 0, 0, start.Location())
|
||||
}
|
||||
|
@ -200,21 +200,36 @@ func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start tim
|
||||
func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, nodeID storj.NodeID, start time.Time, end time.Time) (_ []accounting.StorageNodeUsage, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
query := `SELECT at_rest_total, start_time,
|
||||
LAG(at_rest_total) OVER win AS prev_at_rest,
|
||||
LAG(start_time) OVER win AS prev_start_time
|
||||
FROM accounting_rollups
|
||||
WHERE id IN (
|
||||
SELECT MAX(id)
|
||||
lastRollup, err := db.db.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup))
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
if lastRollup == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
start, end = start.UTC(), end.UTC()
|
||||
|
||||
query := `WITH r AS (
|
||||
SELECT at_rest_total, start_time
|
||||
FROM accounting_rollups
|
||||
WHERE node_id = ?
|
||||
AND ? <= start_time AND start_time <= ?
|
||||
GROUP BY start_time
|
||||
ORDER BY start_time ASC
|
||||
)
|
||||
WINDOW win AS (ORDER BY start_time)`
|
||||
SELECT at_rest_total, start_time from r
|
||||
UNION
|
||||
SELECT SUM(data_total) AS at_rest_total, DATE(interval_end_time) AS start_time
|
||||
FROM storagenode_storage_tallies
|
||||
WHERE node_id = ?
|
||||
AND NOT EXISTS (SELECT 1 FROM r WHERE DATE(r.start_time) = DATE(interval_end_time))
|
||||
AND (SELECT value FROM accounting_timestamps WHERE name = ?) < interval_end_time AND interval_end_time <= ?
|
||||
GROUP BY start_time
|
||||
ORDER BY start_time`
|
||||
|
||||
rows, err := db.db.QueryContext(ctx, db.db.Rebind(query),
|
||||
nodeID, start, end,
|
||||
nodeID, accounting.LastRollup, end)
|
||||
|
||||
rows, err := db.db.QueryContext(ctx, db.db.Rebind(query), nodeID, start.UTC(), end.UTC())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -226,32 +241,17 @@ func (db *StoragenodeAccounting) QueryStorageNodeUsage(ctx context.Context, node
|
||||
var nodeStorageUsages []accounting.StorageNodeUsage
|
||||
for rows.Next() {
|
||||
var atRestTotal float64
|
||||
var startTime time.Time
|
||||
var prevAtRestTotal sql.NullFloat64
|
||||
var prevStartTime dbutil.NullTime
|
||||
var startTime dbutil.NullTime
|
||||
|
||||
err = rows.Scan(&atRestTotal, &startTime, &prevAtRestTotal, &prevStartTime)
|
||||
err = rows.Scan(&atRestTotal, &startTime)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
// skip first entry as we can not extract hours
|
||||
// properly without storagenode storage tallies
|
||||
// which formed this value
|
||||
if !prevStartTime.Valid {
|
||||
continue
|
||||
}
|
||||
|
||||
atRest := atRestTotal - prevAtRestTotal.Float64
|
||||
hours := startTime.Sub(prevStartTime.Time).Hours()
|
||||
if hours != 0 {
|
||||
atRest /= hours
|
||||
}
|
||||
|
||||
nodeStorageUsages = append(nodeStorageUsages, accounting.StorageNodeUsage{
|
||||
NodeID: nodeID,
|
||||
StorageUsed: atRest,
|
||||
Timestamp: startTime,
|
||||
StorageUsed: atRestTotal,
|
||||
Timestamp: startTime.Time,
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user