storagenode/bandwidth: remove bandwidth concerns from console, add satellite summary (#2923)

This commit is contained in:
Yaroslav Vorobiov 2019-09-04 17:01:55 +03:00 committed by GitHub
parent 83815ee7bf
commit 758f7cb3dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 435 additions and 282 deletions

View File

@ -6,13 +6,19 @@ package dbutil
import (
"database/sql/driver"
"time"
"github.com/zeebo/errs"
)
const (
sqliteTimeLayout = "2006-01-02 15:04:05-07:00"
sqliteTimeLayout = "2006-01-02 15:04:05-07:00"
sqliteTimeLayoutNoTimeZone = "2006-01-02 15:04:05"
)
// NullTime time helps convert nil to time.Time
// ErrNullTime defines error class for NullTime.
var ErrNullTime = errs.Class("null time error")
// NullTime time helps convert nil to time.Time.
type NullTime struct {
time.Time
Valid bool
@ -20,6 +26,10 @@ type NullTime struct {
// Scan implements the Scanner interface.
func (nt *NullTime) Scan(value interface{}) error {
if value == nil {
return nil
}
// check if it's time.Time which is what postgres returns
// for lagged time values
if nt.Time, nt.Valid = value.(time.Time); nt.Valid {
@ -29,12 +39,12 @@ func (nt *NullTime) Scan(value interface{}) error {
// try to parse time from bytes which is what sqlite returns
date, ok := value.([]byte)
if !ok {
return nil
return ErrNullTime.New("sql null time: scan received unsupported value type")
}
times, err := time.Parse(sqliteTimeLayout, string(date))
times, err := parseSqliteTimeString(string(date))
if err != nil {
return nil
return ErrNullTime.Wrap(err)
}
nt.Time, nt.Valid = times, true
@ -48,3 +58,18 @@ func (nt NullTime) Value() (driver.Value, error) {
}
return nt.Time, nil
}
// parseSqliteTimeString parses sqlite times string.
// It tries to process value as string with timezone first,
// then fallback to parsing as string without timezone.
func parseSqliteTimeString(val string) (time.Time, error) {
var times time.Time
var err error
times, err = time.Parse(sqliteTimeLayout, val)
if err == nil {
return times, nil
}
return time.Parse(sqliteTimeLayoutNoTimeZone, val)
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
@ -39,7 +40,7 @@ var (
}
)
func TestDB(t *testing.T) {
func TestBandwidthDB(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -51,15 +52,6 @@ func TestDB(t *testing.T) {
now := time.Now()
// ensure zero queries work
usage, err := bandwidthdb.Summary(ctx, now, now)
require.NoError(t, err)
require.Equal(t, &bandwidth.Usage{}, usage)
usageBySatellite, err := bandwidthdb.SummaryBySatellite(ctx, now, now)
require.NoError(t, err)
require.Equal(t, map[storj.NodeID]*bandwidth.Usage{}, usageBySatellite)
expectedUsage := &bandwidth.Usage{}
expectedUsageTotal := &bandwidth.Usage{}
@ -75,35 +67,227 @@ func TestDB(t *testing.T) {
require.NoError(t, err)
}
// test summarizing
usage, err = bandwidthdb.Summary(ctx, now.Add(-10*time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageTotal, usage)
expectedUsageBySatellite := map[storj.NodeID]*bandwidth.Usage{
satellite0: expectedUsage,
satellite1: expectedUsage,
}
usageBySatellite, err = bandwidthdb.SummaryBySatellite(ctx, now.Add(-10*time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite, usageBySatellite)
// only range capturing second satellite
usage, err = bandwidthdb.Summary(ctx, now.Add(time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsage, usage)
// test summarizing
t.Run("test total summary", func(t *testing.T) {
usage, err := bandwidthdb.Summary(ctx, now.Add(-10*time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageTotal, usage)
cachedBandwidthUsage, err := bandwidthdb.MonthSummary(ctx)
require.NoError(t, err)
require.Equal(t, expectedUsageTotal.Total(), cachedBandwidthUsage)
// only range capturing second satellite
usage, err = bandwidthdb.Summary(ctx, now.Add(time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsage, usage)
})
// only range capturing second satellite
expectedUsageBySatellite = map[storj.NodeID]*bandwidth.Usage{
satellite1: expectedUsage,
t.Run("test summary by satellite", func(t *testing.T) {
usageBySatellite, err := bandwidthdb.SummaryBySatellite(ctx, now.Add(-10*time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite, usageBySatellite)
// only range capturing second satellite
expectedUsageBySatellite := map[storj.NodeID]*bandwidth.Usage{
satellite1: expectedUsage,
}
usageBySatellite, err = bandwidthdb.SummaryBySatellite(ctx, now.Add(time.Hour), now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite, usageBySatellite)
})
t.Run("test satellite summary", func(t *testing.T) {
usage, err := bandwidthdb.SatelliteSummary(ctx, satellite0, time.Time{}, now)
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite[satellite0], usage)
usage, err = bandwidthdb.SatelliteSummary(ctx, satellite1, time.Time{}, now.Add(10*time.Hour))
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite[satellite1], usage)
})
t.Run("test cached bandwidth usage", func(t *testing.T) {
cachedBandwidthUsage, err := bandwidthdb.MonthSummary(ctx)
require.NoError(t, err)
require.Equal(t, expectedUsageTotal.Total(), cachedBandwidthUsage)
})
})
}
func TestEmptyBandwidthDB(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
bandwidthdb := db.Bandwidth()
now := time.Now()
t.Run("test total summary", func(t *testing.T) {
usage, err := bandwidthdb.Summary(ctx, now, now)
require.NoError(t, err)
require.Equal(t, &bandwidth.Usage{}, usage)
})
t.Run("test summary by satellite", func(t *testing.T) {
usageBySatellite, err := bandwidthdb.SummaryBySatellite(ctx, now, now)
require.NoError(t, err)
require.Equal(t, map[storj.NodeID]*bandwidth.Usage{}, usageBySatellite)
})
t.Run("test satellite summary", func(t *testing.T) {
usage, err := bandwidthdb.SatelliteSummary(ctx, storj.NodeID{}, now, now)
require.NoError(t, err)
require.Equal(t, &bandwidth.Usage{}, usage)
})
t.Run("test get daily rollups", func(t *testing.T) {
rollups, err := bandwidthdb.GetDailyRollups(ctx, now, now)
require.NoError(t, err)
require.Equal(t, []bandwidth.UsageRollup(nil), rollups)
})
t.Run("test get satellite daily rollups", func(t *testing.T) {
rollups, err := bandwidthdb.GetDailySatelliteRollups(ctx, storj.NodeID{}, now, now)
require.NoError(t, err)
require.Equal(t, []bandwidth.UsageRollup(nil), rollups)
})
})
}
func TestBandwidthDailyRollups(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
const (
numSatellites = 5
days = 30
hours = 12
)
now := time.Now().UTC()
startDate := time.Date(now.Year(), now.Month(), now.Day()-days, 0, 0, 0, 0, now.Location())
bandwidthDB := db.Bandwidth()
totalUsageRollups := make(map[time.Time]*bandwidth.UsageRollup)
addBandwidth := func(day time.Time, satellite storj.NodeID, r *bandwidth.UsageRollup) {
if totalUsageRollups[day] == nil {
totalUsageRollups[day] = &bandwidth.UsageRollup{
IntervalStart: day,
}
}
for h := 0; h < hours; h++ {
get := testrand.Int63n(1000)
getRepair := testrand.Int63n(1000)
getAudit := testrand.Int63n(1000)
put := testrand.Int63n(1000)
putRepair := testrand.Int63n(1000)
_delete := testrand.Int63n(1000)
// add bandwidth
err := bandwidthDB.Add(ctx, satellite, pb.PieceAction_GET, get, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
err = bandwidthDB.Add(ctx, satellite, pb.PieceAction_GET_REPAIR, getRepair, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
err = bandwidthDB.Add(ctx, satellite, pb.PieceAction_GET_AUDIT, getAudit, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
err = bandwidthDB.Add(ctx, satellite, pb.PieceAction_PUT, put, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
err = bandwidthDB.Add(ctx, satellite, pb.PieceAction_PUT_REPAIR, putRepair, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
err = bandwidthDB.Add(ctx, satellite, pb.PieceAction_DELETE, _delete, day.Add(time.Hour*time.Duration(h)))
require.NoError(t, err)
r.Egress.Usage += get
r.Egress.Repair += getRepair
r.Egress.Audit += getAudit
r.Ingress.Usage += put
r.Ingress.Repair += putRepair
r.Delete += _delete
totalUsageRollups[day].Egress.Usage += get
totalUsageRollups[day].Egress.Repair += getRepair
totalUsageRollups[day].Egress.Audit += getAudit
totalUsageRollups[day].Ingress.Usage += put
totalUsageRollups[day].Ingress.Repair += putRepair
totalUsageRollups[day].Delete += _delete
}
}
usageBySatellite, err = bandwidthdb.SummaryBySatellite(ctx, now.Add(time.Hour), now.Add(10*time.Hour))
satelliteID := testrand.NodeID()
var satellites []storj.NodeID
satellites = append(satellites, satelliteID)
for i := 0; i < numSatellites-1; i++ {
satellites = append(satellites, testrand.NodeID())
}
usageRollups := make(map[storj.NodeID]map[time.Time]*bandwidth.UsageRollup)
for _, satellite := range satellites {
usageRollups[satellite] = make(map[time.Time]*bandwidth.UsageRollup)
for d := 0; d < days-1; d++ {
day := startDate.Add(time.Hour * 24 * time.Duration(d))
usageRollup := &bandwidth.UsageRollup{
IntervalStart: day,
}
addBandwidth(day, satellite, usageRollup)
usageRollups[satellite][day] = usageRollup
}
}
// perform rollup for but last day
err := bandwidthDB.Rollup(ctx)
require.NoError(t, err)
require.Equal(t, expectedUsageBySatellite, usageBySatellite)
// last day add bandwidth that won't be rolled up
day := startDate.Add(time.Hour * 24 * time.Duration(days-1))
for _, satellite := range satellites {
usageRollup := &bandwidth.UsageRollup{
IntervalStart: day,
}
addBandwidth(day, satellite, usageRollup)
usageRollups[satellite][day] = usageRollup
}
t.Run("test satellite daily usage rollups", func(t *testing.T) {
rolls, err := bandwidthDB.GetDailySatelliteRollups(ctx, satelliteID, time.Time{}, now)
assert.NoError(t, err)
assert.NotNil(t, rolls)
assert.Equal(t, days, len(rolls))
for _, rollup := range rolls {
expected := *usageRollups[satelliteID][rollup.IntervalStart]
assert.Equal(t, expected, rollup)
}
})
t.Run("test daily usage rollups", func(t *testing.T) {
rolls, err := bandwidthDB.GetDailyRollups(ctx, time.Time{}, now)
assert.NoError(t, err)
assert.NotNil(t, rolls)
assert.Equal(t, days, len(rolls))
for _, rollup := range rolls {
assert.Equal(t, *totalUsageRollups[rollup.IntervalStart], rollup)
}
})
})
}

View File

@ -18,7 +18,15 @@ type DB interface {
MonthSummary(ctx context.Context) (int64, error)
Rollup(ctx context.Context) (err error)
Summary(ctx context.Context, from, to time.Time) (*Usage, error)
// SatelliteSummary returns aggregated bandwidth usage for a particular satellite.
SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (*Usage, error)
SummaryBySatellite(ctx context.Context, from, to time.Time) (map[storj.NodeID]*Usage, error)
// GetDailyRollups returns slice of daily bandwidth usage rollups for provided time range,
// sorted in ascending order.
GetDailyRollups(ctx context.Context, from, to time.Time) ([]UsageRollup, error)
// GetDailySatelliteRollups returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order for a particular satellite.
GetDailySatelliteRollups(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) ([]UsageRollup, error)
}
// Usage contains bandwidth usage information based on the type
@ -34,6 +42,27 @@ type Usage struct {
Delete int64
}
// Egress stores info about storage node egress usage.
type Egress struct {
Repair int64 `json:"repair"`
Audit int64 `json:"audit"`
Usage int64 `json:"usage"`
}
// Ingress stores info about storage node ingress usage.
type Ingress struct {
Repair int64 `json:"repair"`
Usage int64 `json:"usage"`
}
// UsageRollup contains rolluped bandwidth usage.
type UsageRollup struct {
Egress Egress `json:"egress"`
Ingress Ingress `json:"ingress"`
Delete int64 `json:"delete"`
IntervalStart time.Time `json:"intervalStart"`
}
// Include adds specified action to the appropriate field.
func (usage *Usage) Include(action pb.PieceAction, amount int64) {
switch action {

View File

@ -3,50 +3,8 @@
package console
import (
"context"
"time"
"storj.io/storj/pkg/storj"
)
// Bandwidth is interface for querying bandwidth from the db
type Bandwidth interface {
// GetDaily returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order for particular satellite
GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) ([]BandwidthUsed, error)
// GetDailyTotal returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order
GetDailyTotal(ctx context.Context, from, to time.Time) ([]BandwidthUsed, error)
}
// Egress stores info about storage node egress usage
type Egress struct {
Repair int64 `json:"repair"`
Audit int64 `json:"audit"`
Usage int64 `json:"usage"`
}
// Ingress stores info about storage node ingress usage
type Ingress struct {
Repair int64 `json:"repair"`
Usage int64 `json:"usage"`
}
// BandwidthInfo stores all info about storage node bandwidth usage
type BandwidthInfo struct {
Egress Egress `json:"egress"`
Ingress Ingress `json:"ingress"`
Used float64 `json:"used"`
Available float64 `json:"available"`
}
// BandwidthUsed stores bandwidth usage information
// over the period of time
type BandwidthUsed struct {
Egress Egress `json:"egress"`
Ingress Ingress `json:"ingress"`
From time.Time `json:"from"`
To time.Time `json:"to"`
}

View File

@ -1,36 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package console_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
func TestDB_Trivial(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
satelliteID := testrand.NodeID()
now := time.Now()
{ // Ensure Bandwidth GetDailyTotal works at all
_, err := db.Console().Bandwidth().GetDailyTotal(ctx, now, now)
require.NoError(t, err)
}
{ // Ensure Bandwidth GetDaily works at all
_, err := db.Console().Bandwidth().GetDaily(ctx, satelliteID, now, now)
require.NoError(t, err)
}
})
}

View File

@ -30,18 +30,11 @@ var (
mon = monkit.Package()
)
// DB exposes methods for managing SNO dashboard related data.
type DB interface {
// Bandwidth is a getter for Bandwidth db.
Bandwidth() Bandwidth
}
// Service is handling storage node operator related logic.
type Service struct {
log *zap.Logger
trust *trust.Pool
consoleDB DB
bandwidthDB bandwidth.DB
reputationDB reputation.DB
storageUsageDB storageusage.DB
@ -57,17 +50,13 @@ type Service struct {
}
// NewService returns new instance of Service.
func NewService(log *zap.Logger, consoleDB DB, bandwidth bandwidth.DB, pieceStore *pieces.Store, kademlia *kademlia.Kademlia, version *version.Service,
func NewService(log *zap.Logger, bandwidth bandwidth.DB, pieceStore *pieces.Store, kademlia *kademlia.Kademlia, version *version.Service,
allocatedBandwidth, allocatedDiskSpace memory.Size, walletAddress string, versionInfo version.Info, trust *trust.Pool,
reputationDB reputation.DB, storageUsageDB storageusage.DB) (*Service, error) {
if log == nil {
return nil, errs.New("log can't be nil")
}
if consoleDB == nil {
return nil, errs.New("consoleDB can't be nil")
}
if bandwidth == nil {
return nil, errs.New("bandwidth can't be nil")
}
@ -87,7 +76,6 @@ func NewService(log *zap.Logger, consoleDB DB, bandwidth bandwidth.DB, pieceStor
return &Service{
log: log,
trust: trust,
consoleDB: consoleDB,
bandwidthDB: bandwidth,
reputationDB: reputationDB,
storageUsageDB: storageUsageDB,
@ -143,15 +131,6 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
}
data.Bandwidth = BandwidthInfo{
Egress: Egress{
Repair: bandwidthUsage.GetRepair,
Audit: bandwidthUsage.GetAudit,
Usage: bandwidthUsage.Get,
},
Ingress: Ingress{
Repair: bandwidthUsage.PutRepair,
Usage: bandwidthUsage.Put,
},
Used: memory.Size(bandwidthUsage.Total()).GB(),
Available: s.allocatedBandwidth.GB(),
}
@ -161,11 +140,11 @@ func (s *Service) GetDashboardData(ctx context.Context) (_ *Dashboard, err error
// Satellite encapsulates satellite related data.
type Satellite struct {
ID storj.NodeID `json:"id"`
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []BandwidthUsed `json:"bandwidthDaily"`
Audit reputation.Metric `json:"audit"`
Uptime reputation.Metric `json:"uptime"`
ID storj.NodeID `json:"id"`
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []bandwidth.UsageRollup `json:"bandwidthDaily"`
Audit reputation.Metric `json:"audit"`
Uptime reputation.Metric `json:"uptime"`
}
// GetSatelliteData returns satellite related data.
@ -173,7 +152,7 @@ func (s *Service) GetSatelliteData(ctx context.Context, satelliteID storj.NodeID
defer mon.Task()(&ctx)(&err)
from, to := date.MonthBoundary(time.Now())
bandwidthDaily, err := s.consoleDB.Bandwidth().GetDaily(ctx, satelliteID, from, to)
bandwidthDaily, err := s.bandwidthDB.GetDailySatelliteRollups(ctx, satelliteID, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}
@ -199,8 +178,8 @@ func (s *Service) GetSatelliteData(ctx context.Context, satelliteID storj.NodeID
// Satellites represents consolidated data across all satellites.
type Satellites struct {
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []BandwidthUsed `json:"bandwidthDaily"`
StorageDaily []storageusage.Stamp `json:"storageDaily"`
BandwidthDaily []bandwidth.UsageRollup `json:"bandwidthDaily"`
}
// GetAllSatellitesData returns bandwidth and storage daily usage consolidate
@ -209,7 +188,7 @@ func (s *Service) GetAllSatellitesData(ctx context.Context) (_ *Satellites, err
defer mon.Task()(&ctx)(nil)
from, to := date.MonthBoundary(time.Now())
bandwidthDaily, err := s.consoleDB.Bandwidth().GetDailyTotal(ctx, from, to)
bandwidthDaily, err := s.bandwidthDB.GetDailyRollups(ctx, from, to)
if err != nil {
return nil, SNOServiceErr.Wrap(err)
}

View File

@ -60,7 +60,6 @@ type DB interface {
PieceSpaceUsedDB() pieces.PieceSpaceUsedDB
Bandwidth() bandwidth.DB
UsedSerials() piecestore.UsedSerials
Console() console.DB
Reputation() reputation.DB
StorageUsage() storageusage.DB
@ -331,7 +330,6 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
{ // setup storage node operator dashboard
peer.Console.Service, err = console.NewService(
peer.Log.Named("console:service"),
peer.DB.Console(),
peer.DB.Bandwidth(),
peer.Storage2.Store,
peer.Kademlia.Service,

View File

@ -11,6 +11,8 @@ import (
"github.com/zeebo/errs"
"storj.io/storj/internal/date"
"storj.io/storj/internal/dbutil"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/bandwidth"
@ -124,6 +126,51 @@ func (db *bandwidthDB) Summary(ctx context.Context, from, to time.Time) (_ *band
return usage, ErrBandwidth.Wrap(rows.Err())
}
// SatelliteSummary returns aggregated bandwidth usage for a particular satellite.
func (db *bandwidthDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) {
defer mon.Task()(&ctx, satelliteID, from, to)(&err)
from, to = from.UTC(), to.UTC()
query := `SELECT action, sum(a) amount from(
SELECT action, sum(amount) a
FROM bandwidth_usage
WHERE datetime(?) <= datetime(created_at) AND datetime(created_at) <= datetime(?)
AND satellite_id = ?
GROUP BY action
UNION ALL
SELECT action, sum(amount) a
FROM bandwidth_usage_rollups
WHERE datetime(?) <= datetime(interval_start) AND datetime(interval_start) <= datetime(?)
AND satellite_id = ?
GROUP BY action
) GROUP BY action;`
rows, err := db.QueryContext(ctx, query, from, to, satelliteID, from, to, satelliteID)
if err != nil {
return nil, ErrBandwidth.Wrap(err)
}
defer func() {
err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close()))
}()
usage := new(bandwidth.Usage)
for rows.Next() {
var action pb.PieceAction
var amount int64
err := rows.Scan(&action, &amount)
if err != nil {
return nil, err
}
usage.Include(action, amount)
}
return usage, nil
}
// SummaryBySatellite returns summary of bandwidth usage grouping by satellite.
func (db *bandwidthDB) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) {
defer mon.Task()(&ctx)(&err)
@ -220,6 +267,111 @@ func (db *bandwidthDB) Rollup(ctx context.Context) (err error) {
return nil
}
// GetDailyRollups returns slice of daily bandwidth usage rollups for provided time range,
// sorted in ascending order.
func (db *bandwidthDB) GetDailyRollups(ctx context.Context, from, to time.Time) (_ []bandwidth.UsageRollup, err error) {
defer mon.Task()(&ctx, from, to)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyUsageRollups(ctx,
"WHERE DATETIME(?) <= DATETIME(interval_start) AND DATETIME(interval_start) <= DATETIME(?)",
since, before)
}
// GetDailySatelliteRollups returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order for a particular satellite.
func (db *bandwidthDB) GetDailySatelliteRollups(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []bandwidth.UsageRollup, err error) {
defer mon.Task()(&ctx, satelliteID, from, to)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyUsageRollups(ctx,
"WHERE satellite_id = ? AND DATETIME(?) <= DATETIME(interval_start) AND DATETIME(interval_start) <= DATETIME(?)",
satelliteID, since, before)
}
// getDailyUsageRollups returns slice of grouped by date bandwidth usage rollups
// sorted in ascending order and applied condition if any.
func (db *bandwidthDB) getDailyUsageRollups(ctx context.Context, cond string, args ...interface{}) (_ []bandwidth.UsageRollup, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT action, sum(a) as amount, DATETIME(DATE(interval_start)) as date FROM (
SELECT action, sum(amount) as a, created_at AS interval_start
FROM bandwidth_usage
` + cond + `
GROUP BY interval_start, action
UNION ALL
SELECT action, sum(amount) as a, interval_start
FROM bandwidth_usage_rollups
` + cond + `
GROUP BY interval_start, action
) GROUP BY date, action
ORDER BY interval_start`
// duplicate args as they are used twice
args = append(args, args...)
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, ErrBandwidth.Wrap(err)
}
defer func() {
err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close()))
}()
var dates []time.Time
usageRollupsByDate := make(map[time.Time]*bandwidth.UsageRollup)
for rows.Next() {
var action int32
var amount int64
var intervalStartN dbutil.NullTime
err = rows.Scan(&action, &amount, &intervalStartN)
if err != nil {
return nil, err
}
intervalStart := intervalStartN.Time
rollup, ok := usageRollupsByDate[intervalStart]
if !ok {
rollup = &bandwidth.UsageRollup{
IntervalStart: intervalStart,
}
dates = append(dates, intervalStart)
usageRollupsByDate[intervalStart] = rollup
}
switch pb.PieceAction(action) {
case pb.PieceAction_GET:
rollup.Egress.Usage = amount
case pb.PieceAction_GET_AUDIT:
rollup.Egress.Audit = amount
case pb.PieceAction_GET_REPAIR:
rollup.Egress.Repair = amount
case pb.PieceAction_PUT:
rollup.Ingress.Usage = amount
case pb.PieceAction_PUT_REPAIR:
rollup.Ingress.Repair = amount
case pb.PieceAction_DELETE:
rollup.Delete = amount
}
}
var usageRollups []bandwidth.UsageRollup
for _, d := range dates {
usageRollups = append(usageRollups, *usageRollupsByDate[d])
}
return usageRollups, nil
}
func getBeginningOfMonth(now time.Time) time.Time {
y, m, _ := now.Date()
return time.Date(y, m, 1, 0, 0, 0, 0, time.Now().UTC().Location())

View File

@ -1,126 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"time"
"github.com/zeebo/errs"
"storj.io/storj/internal/date"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/console"
)
type consoleDB struct {
SQLDB
}
// Bandwidth returns consoledb as console.Bandwidth
func (db *consoleDB) Bandwidth() console.Bandwidth {
return db
}
// newConsoleDB returns a new instance of consoledb initialized with the specified database.
func newConsoleDB(db SQLDB) *consoleDB {
return &consoleDB{
SQLDB: db,
}
}
// GetDaily returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order for particular satellite
func (db *consoleDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyBandwidthUsed(ctx,
"WHERE satellite_id = ? AND ? <= created_at AND created_at <= ?",
satelliteID, since, before)
}
// GetDaily returns slice of daily bandwidth usage for provided time range,
// sorted in ascending order
func (db *consoleDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
since, _ := date.DayBoundary(from.UTC())
_, before := date.DayBoundary(to.UTC())
return db.getDailyBandwidthUsed(ctx,
"WHERE ? <= created_at AND created_at <= ?",
since, before)
}
// getDailyBandwidthUsed returns slice of grouped by date bandwidth usage
// sorted in ascending order and applied condition if any
func (db *consoleDB) getDailyBandwidthUsed(ctx context.Context, cond string, args ...interface{}) (_ []console.BandwidthUsed, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT action, SUM(amount), created_at
FROM bandwidth_usage
` + cond + `
GROUP BY DATE(created_at), action
ORDER BY created_at ASC`
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var dates []time.Time
dailyBandwidth := make(map[time.Time]*console.BandwidthUsed)
for rows.Next() {
var action int32
var amount int64
var createdAt time.Time
err = rows.Scan(&action, &amount, &createdAt)
if err != nil {
return nil, err
}
from, to := date.DayBoundary(createdAt)
bandwidthUsed, ok := dailyBandwidth[from]
if !ok {
bandwidthUsed = &console.BandwidthUsed{
From: from,
To: to,
}
dates = append(dates, from)
dailyBandwidth[from] = bandwidthUsed
}
switch pb.PieceAction(action) {
case pb.PieceAction_GET:
bandwidthUsed.Egress.Usage = amount
case pb.PieceAction_GET_AUDIT:
bandwidthUsed.Egress.Audit = amount
case pb.PieceAction_GET_REPAIR:
bandwidthUsed.Egress.Repair = amount
case pb.PieceAction_PUT:
bandwidthUsed.Ingress.Usage = amount
case pb.PieceAction_PUT_REPAIR:
bandwidthUsed.Ingress.Repair = amount
}
}
var bandwidthUsedList []console.BandwidthUsed
for _, date := range dates {
bandwidthUsedList = append(bandwidthUsedList, *dailyBandwidth[date])
}
return bandwidthUsedList, nil
}

View File

@ -27,7 +27,6 @@ import (
"storj.io/storj/storage/teststore"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/console"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
@ -92,7 +91,6 @@ type DB struct {
versionsDB *versionsDB
v0PieceInfoDB *v0PieceInfoDB
bandwidthDB *bandwidthDB
consoleDB *consoleDB
ordersDB *ordersDB
pieceExpirationDB *pieceExpirationDB
pieceSpaceUsedDB *pieceSpaceUsedDB
@ -134,7 +132,6 @@ func New(log *zap.Logger, config Config) (*DB, error) {
versionsDB: newVersionsDB(versionsDB, versionsPath),
v0PieceInfoDB: newV0PieceInfoDB(versionsDB, versionsPath),
bandwidthDB: newBandwidthDB(versionsDB, versionsPath),
consoleDB: newConsoleDB(versionsDB),
ordersDB: newOrdersDB(versionsDB, versionsPath),
pieceExpirationDB: newPieceExpirationDB(versionsDB, versionsPath),
pieceSpaceUsedDB: newPieceSpaceUsedDB(versionsDB, versionsPath),
@ -172,7 +169,6 @@ func NewTest(log *zap.Logger, storageDir string) (*DB, error) {
versionsDB: newVersionsDB(versionsDB, versionsPath),
v0PieceInfoDB: newV0PieceInfoDB(versionsDB, versionsPath),
bandwidthDB: newBandwidthDB(versionsDB, versionsPath),
consoleDB: newConsoleDB(versionsDB),
ordersDB: newOrdersDB(versionsDB, versionsPath),
pieceExpirationDB: newPieceExpirationDB(versionsDB, versionsPath),
pieceSpaceUsedDB: newPieceSpaceUsedDB(versionsDB, versionsPath),
@ -236,7 +232,6 @@ func (db *DB) Close() error {
db.versionsDB.Close(),
db.v0PieceInfoDB.Close(),
db.bandwidthDB.Close(),
db.consoleDB.Close(),
db.ordersDB.Close(),
db.pieceExpirationDB.Close(),
db.pieceSpaceUsedDB.Close(),
@ -266,11 +261,6 @@ func (db *DB) Bandwidth() bandwidth.DB {
return db.bandwidthDB
}
// Console returns the instance of the Console database.
func (db *DB) Console() console.DB {
return db.consoleDB
}
// Orders returns the instance of the Orders database.
func (db *DB) Orders() orders.DB {
return db.ordersDB