storagenode: add in-memory tracking for bandwidth and disk usage (#2469)
* Add in-memory cache for bandwidth and space usage monitoring * moved some structs around and added error handling for get piece size query * added to existing bandwidth test. fixed typo * added test, updates from PR review, added monkit for new methods * PR review updates. renamed space used methods * changed bw cache so that only Add updates the cache and it only overwrites when the date moves forward * moved bandwidth usage to bw and space usage to pieceinfodb * fixed interface comment * removed pointer from sync.Once
This commit is contained in:
parent
f9696d6c5e
commit
ff6f1d1b32
@ -18,6 +18,26 @@ import (
|
||||
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
|
||||
)
|
||||
|
||||
var (
|
||||
actions = []pb.PieceAction{
|
||||
pb.PieceAction_INVALID,
|
||||
|
||||
pb.PieceAction_PUT,
|
||||
pb.PieceAction_GET,
|
||||
pb.PieceAction_GET_AUDIT,
|
||||
pb.PieceAction_GET_REPAIR,
|
||||
pb.PieceAction_PUT_REPAIR,
|
||||
pb.PieceAction_DELETE,
|
||||
|
||||
pb.PieceAction_PUT,
|
||||
pb.PieceAction_GET,
|
||||
pb.PieceAction_GET_AUDIT,
|
||||
pb.PieceAction_GET_REPAIR,
|
||||
pb.PieceAction_PUT_REPAIR,
|
||||
pb.PieceAction_DELETE,
|
||||
}
|
||||
)
|
||||
|
||||
func TestDB(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
@ -39,24 +59,6 @@ func TestDB(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, map[storj.NodeID]*bandwidth.Usage{}, usageBySatellite)
|
||||
|
||||
actions := []pb.PieceAction{
|
||||
pb.PieceAction_INVALID,
|
||||
|
||||
pb.PieceAction_PUT,
|
||||
pb.PieceAction_GET,
|
||||
pb.PieceAction_GET_AUDIT,
|
||||
pb.PieceAction_GET_REPAIR,
|
||||
pb.PieceAction_PUT_REPAIR,
|
||||
pb.PieceAction_DELETE,
|
||||
|
||||
pb.PieceAction_PUT,
|
||||
pb.PieceAction_GET,
|
||||
pb.PieceAction_GET_AUDIT,
|
||||
pb.PieceAction_GET_REPAIR,
|
||||
pb.PieceAction_PUT_REPAIR,
|
||||
pb.PieceAction_DELETE,
|
||||
}
|
||||
|
||||
expectedUsage := &bandwidth.Usage{}
|
||||
expectedUsageTotal := &bandwidth.Usage{}
|
||||
|
||||
@ -90,6 +92,10 @@ func TestDB(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedUsage, usage)
|
||||
|
||||
cachedBandwidthUsage, err := bandwidthdb.BandwidthUsed(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedUsageTotal.Total(), cachedBandwidthUsage)
|
||||
|
||||
// only range capturing second satellite
|
||||
expectedUsageBySatellite = map[storj.NodeID]*bandwidth.Usage{
|
||||
satellite1: expectedUsage,
|
||||
@ -99,3 +105,41 @@ func TestDB(t *testing.T) {
|
||||
require.Equal(t, expectedUsageBySatellite, usageBySatellite)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCachedBandwidthMonthRollover(t *testing.T) {
|
||||
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
bandwidthdb := db.Bandwidth()
|
||||
|
||||
satellite0 := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()).ID
|
||||
|
||||
y, m, _ := time.Now().Date()
|
||||
// Last second of the previous month
|
||||
previousMonth := time.Date(y, m, 0, 23, 59, 59, 0, time.Now().UTC().Location())
|
||||
|
||||
// Add data for the previous month.
|
||||
for _, action := range actions {
|
||||
err := bandwidthdb.Add(ctx, satellite0, action, int64(action), previousMonth)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cached, err := bandwidthdb.BandwidthUsed(ctx)
|
||||
require.NoError(t, err)
|
||||
// Cached bandwidth for this month should still be 0 since CachedBandwidthUsed only looks up by the current month
|
||||
require.Equal(t, int64(0), cached)
|
||||
|
||||
thisMonth := previousMonth.Add(time.Second + 1)
|
||||
|
||||
var totalAmount int64
|
||||
for _, action := range actions {
|
||||
totalAmount += int64(action)
|
||||
err := bandwidthdb.Add(ctx, satellite0, action, int64(action), thisMonth)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
cached, err = bandwidthdb.BandwidthUsed(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, totalAmount, cached)
|
||||
})
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
// DB contains information about bandwidth usage.
|
||||
type DB interface {
|
||||
Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) error
|
||||
BandwidthUsed(ctx context.Context) (int64, error)
|
||||
Summary(ctx context.Context, from, to time.Time) (*Usage, error)
|
||||
SummaryBySatellite(ctx context.Context, from, to time.Time) (map[storj.NodeID]*Usage, error)
|
||||
}
|
||||
|
@ -171,11 +171,11 @@ func (service *Service) usedSpace(ctx context.Context) (_ int64, err error) {
|
||||
|
||||
func (service *Service) usedBandwidth(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
|
||||
usage, err := service.usageDB.BandwidthUsed(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return usage.Total(), nil
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
// AvailableSpace returns available disk space for upload
|
||||
@ -192,10 +192,10 @@ func (service *Service) AvailableSpace(ctx context.Context) (_ int64, err error)
|
||||
// AvailableBandwidth returns available bandwidth for upload/download
|
||||
func (service *Service) AvailableBandwidth(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
usage, err := bandwidth.TotalMonthlySummary(ctx, service.usageDB)
|
||||
usage, err := service.usageDB.BandwidthUsed(ctx)
|
||||
if err != nil {
|
||||
return 0, Error.Wrap(err)
|
||||
}
|
||||
allocatedBandwidth := service.allocatedBandwidth
|
||||
return allocatedBandwidth - usage.Total(), nil
|
||||
return allocatedBandwidth - usage, nil
|
||||
}
|
||||
|
@ -62,8 +62,10 @@ type DB interface {
|
||||
Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
|
||||
// DeleteFailed marks piece deletion from disk failed
|
||||
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
|
||||
// SpaceUsed calculates disk space used by all pieces
|
||||
// SpaceUsed returns the in memory value for disk space used by all pieces
|
||||
SpaceUsed(ctx context.Context) (int64, error)
|
||||
// CalculatedSpaceUsed calculates disk space used by all pieces
|
||||
CalculatedSpaceUsed(ctx context.Context) (int64, error)
|
||||
// SpaceUsedBySatellite calculates disk space used by all pieces by satellite
|
||||
SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error)
|
||||
// GetExpired gets orders that are expired and were created before some time
|
||||
|
@ -6,6 +6,7 @@ package storagenodedb
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
@ -15,13 +16,22 @@ import (
|
||||
"storj.io/storj/storagenode/bandwidth"
|
||||
)
|
||||
|
||||
type bandwidthdb struct{ *InfoDB }
|
||||
type bandwidthdb struct {
|
||||
*InfoDB
|
||||
bandwidth bandwidthUsed
|
||||
}
|
||||
|
||||
type bandwidthUsed struct {
|
||||
mu sync.RWMutex
|
||||
usedSince time.Time
|
||||
used int64
|
||||
}
|
||||
|
||||
// Bandwidth returns table for storing bandwidth usage.
|
||||
func (db *DB) Bandwidth() bandwidth.DB { return db.info.Bandwidth() }
|
||||
|
||||
// Bandwidth returns table for storing bandwidth usage.
|
||||
func (db *InfoDB) Bandwidth() bandwidth.DB { return &bandwidthdb{db} }
|
||||
func (db *InfoDB) Bandwidth() bandwidth.DB { return &db.bandwidthdb }
|
||||
|
||||
// Add adds bandwidth usage to the table
|
||||
func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) {
|
||||
@ -31,10 +41,44 @@ func (db *bandwidthdb) Add(ctx context.Context, satelliteID storj.NodeID, action
|
||||
INSERT INTO
|
||||
bandwidth_usage(satellite_id, action, amount, created_at)
|
||||
VALUES(?, ?, ?, ?)`, satelliteID, action, amount, created)
|
||||
if err == nil {
|
||||
db.bandwidth.mu.Lock()
|
||||
defer db.bandwidth.mu.Unlock()
|
||||
|
||||
beginningOfMonth := getBeginningOfMonth(created.UTC())
|
||||
if beginningOfMonth.Equal(db.bandwidth.usedSince) {
|
||||
db.bandwidth.used += amount
|
||||
} else if beginningOfMonth.After(db.bandwidth.usedSince) {
|
||||
usage, err := db.Summary(ctx, beginningOfMonth, time.Now().UTC())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.bandwidth.usedSince = beginningOfMonth
|
||||
db.bandwidth.used = usage.Total()
|
||||
}
|
||||
}
|
||||
return ErrInfo.Wrap(err)
|
||||
}
|
||||
|
||||
// BandwidthUsed returns summary of bandwidth usages
|
||||
func (db *bandwidthdb) BandwidthUsed(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
db.bandwidth.mu.RLock()
|
||||
beginningOfMonth := getBeginningOfMonth(time.Now().UTC())
|
||||
if beginningOfMonth.Equal(db.bandwidth.usedSince) {
|
||||
defer db.bandwidth.mu.RUnlock()
|
||||
return db.bandwidth.used, nil
|
||||
}
|
||||
db.bandwidth.mu.RUnlock()
|
||||
|
||||
usage, err := db.Summary(ctx, beginningOfMonth, time.Now())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Just return the usage, don't update the cache. Let add handle updates
|
||||
return usage.Total(), nil
|
||||
}
|
||||
|
||||
// Summary returns summary of bandwidth usages
|
||||
func (db *bandwidthdb) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -107,3 +151,8 @@ func (db *bandwidthdb) SummaryBySatellite(ctx context.Context, from, to time.Tim
|
||||
|
||||
return entries, ErrInfo.Wrap(rows.Err())
|
||||
}
|
||||
|
||||
func getBeginningOfMonth(now time.Time) time.Time {
|
||||
y, m, _ := now.Date()
|
||||
return time.Date(y, m, 1, 0, 0, 0, 0, time.Now().UTC().Location())
|
||||
}
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
@ -23,7 +25,9 @@ var ErrInfo = errs.Class("infodb")
|
||||
|
||||
// InfoDB implements information database for piecestore.
|
||||
type InfoDB struct {
|
||||
db *sql.DB
|
||||
db *sql.DB
|
||||
bandwidthdb bandwidthdb
|
||||
pieceinfo pieceinfo
|
||||
}
|
||||
|
||||
// newInfo creates or opens InfoDB at the specified path.
|
||||
@ -39,7 +43,11 @@ func newInfo(path string) (*InfoDB, error) {
|
||||
|
||||
dbutil.Configure(db, mon)
|
||||
|
||||
return &InfoDB{db: db}, nil
|
||||
infoDb := &InfoDB{db: db}
|
||||
infoDb.pieceinfo = pieceinfo{infoDb, spaceUsed{sync.Once{}, 0}}
|
||||
infoDb.bandwidthdb = bandwidthdb{infoDb, bandwidthUsed{sync.RWMutex{}, time.Time{}, 0}}
|
||||
|
||||
return infoDb, nil
|
||||
}
|
||||
|
||||
// NewInfoInMemory creates a new inmemory InfoDB.
|
||||
@ -61,7 +69,11 @@ func NewInfoInMemory() (*InfoDB, error) {
|
||||
monkit.StatSourceFromStruct(db.Stats()).Stats(cb)
|
||||
}))
|
||||
|
||||
return &InfoDB{db: db}, nil
|
||||
infoDb := &InfoDB{db: db}
|
||||
infoDb.pieceinfo = pieceinfo{infoDb, spaceUsed{sync.Once{}, 0}}
|
||||
infoDb.bandwidthdb = bandwidthdb{infoDb, bandwidthUsed{sync.RWMutex{}, time.Time{}, 0}}
|
||||
|
||||
return infoDb, nil
|
||||
}
|
||||
|
||||
// Close closes any resources.
|
||||
|
@ -6,6 +6,8 @@ package storagenodedb
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
@ -16,17 +18,26 @@ import (
|
||||
"storj.io/storj/storagenode/pieces"
|
||||
)
|
||||
|
||||
type pieceinfo struct{ *InfoDB }
|
||||
type pieceinfo struct {
|
||||
*InfoDB
|
||||
space spaceUsed
|
||||
}
|
||||
|
||||
type spaceUsed struct {
|
||||
once sync.Once
|
||||
used int64
|
||||
}
|
||||
|
||||
// PieceInfo returns database for storing piece information
|
||||
func (db *DB) PieceInfo() pieces.DB { return db.info.PieceInfo() }
|
||||
|
||||
// PieceInfo returns database for storing piece information
|
||||
func (db *InfoDB) PieceInfo() pieces.DB { return &pieceinfo{db} }
|
||||
func (db *InfoDB) PieceInfo() pieces.DB { return &db.pieceinfo }
|
||||
|
||||
// Add inserts piece information into the database.
|
||||
func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
certdb := db.CertDB()
|
||||
certid, err := certdb.Include(ctx, info.Uplink)
|
||||
if err != nil {
|
||||
@ -44,6 +55,10 @@ func (db *pieceinfo) Add(ctx context.Context, info *pieces.Info) (err error) {
|
||||
VALUES (?,?,?,?,?,?,?)
|
||||
`), info.SatelliteID, info.PieceID, info.PieceSize, info.PieceCreation, info.PieceExpiration, uplinkPieceHash, certid)
|
||||
|
||||
if err == nil {
|
||||
db.loadSpaceUsed(ctx)
|
||||
atomic.AddInt64(&db.space.used, info.PieceSize)
|
||||
}
|
||||
return ErrInfo.Wrap(err)
|
||||
}
|
||||
|
||||
@ -85,12 +100,28 @@ func (db *pieceinfo) Get(ctx context.Context, satelliteID storj.NodeID, pieceID
|
||||
func (db *pieceinfo) Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var pieceSize int64
|
||||
err = db.db.QueryRowContext(ctx, db.Rebind(`
|
||||
SELECT piece_size
|
||||
FROM pieceinfo
|
||||
WHERE satellite_id = ? AND piece_id = ?
|
||||
`), satelliteID, pieceID).Scan(&pieceSize)
|
||||
// Ignore no rows found errors
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return ErrInfo.Wrap(err)
|
||||
}
|
||||
_, err = db.db.ExecContext(ctx, db.Rebind(`
|
||||
DELETE FROM pieceinfo
|
||||
WHERE satellite_id = ?
|
||||
AND piece_id = ?
|
||||
`), satelliteID, pieceID)
|
||||
|
||||
if pieceSize != 0 && err == nil {
|
||||
db.loadSpaceUsed(ctx)
|
||||
|
||||
atomic.AddInt64(&db.space.used, -pieceSize)
|
||||
}
|
||||
|
||||
return ErrInfo.Wrap(err)
|
||||
}
|
||||
|
||||
@ -134,10 +165,25 @@ func (db *pieceinfo) GetExpired(ctx context.Context, expiredAt time.Time, limit
|
||||
return infos, nil
|
||||
}
|
||||
|
||||
// SpaceUsed calculates disk space used by all pieces
|
||||
// SpaceUsed returns disk space used by all pieces from cache
|
||||
func (db *pieceinfo) SpaceUsed(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
db.loadSpaceUsed(ctx)
|
||||
|
||||
return atomic.LoadInt64(&db.space.used), nil
|
||||
}
|
||||
|
||||
func (db *pieceinfo) loadSpaceUsed(ctx context.Context) {
|
||||
defer mon.Task()(&ctx)
|
||||
db.space.once.Do(func() {
|
||||
usedSpace, _ := db.CalculatedSpaceUsed(ctx)
|
||||
atomic.AddInt64(&db.space.used, usedSpace)
|
||||
})
|
||||
}
|
||||
|
||||
// CalculatedSpaceUsed calculates disk space used by all pieces
|
||||
func (db *pieceinfo) CalculatedSpaceUsed(ctx context.Context) (_ int64, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
var sum sql.NullInt64
|
||||
err = db.db.QueryRowContext(ctx, db.Rebind(`
|
||||
SELECT SUM(piece_size)
|
||||
|
Loading…
Reference in New Issue
Block a user