storj/storagenode/storagenodedb/storageusage.go
Simon Guindon a2b1e9fa95
storagenode/storagenodedb: refactor both data access objects and migrations to support multiple DB connections (#3057)
* Split the info.db database into multiple DBs using Backup API.

* Remove location. Prev refactor assumed we would need this but don't.

* Added VACUUM to reclaim space after splitting storage node databases.

* Added unique names to SQLite3 connection hooks to fix testplanet.

* Moving DB closing to the migration step.

* Removing the closing of the versions DB. It's already getting closed.

* Swapping the database connection references on reconnect.

* Moved sqlite closing logic away from the boltdb closing logic.

* Moved sqlite closing logic away from the boltdb closing logic.

* Remove certificate and vouchers from DB split migration.

* Removed vouchers and bumped up the migration version.

* Use same constructor in tests for storage node databases.

* Use same constructor in tests for storage node databases.

* Adding method to access underlining SQL database connections and cleanup

* Adding logging for migration diagnostics.

* Moved migration closing database logic to minimize disk usage.

* Cleaning up error handling.

* Fix missing copyright.

* Fix linting error.

* Add test for migration 21 (#3012)

* Refactoring migration code into a nicer to use object.

* Refactoring migration code into a nicer to use object.

* Fixing broken migration test.

* Removed unnecessary code that is no longer needed now that we close DBs.

* Removed unnecessary code that is no longer needed now that we close DBs.

* Fixed bug where an invalid database path was being opened.

* Fixed linting errors.

* Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys.

* Renamed VersionsDB to LegacyInfoDB and refactored DB lookup keys.

* Fix migration test. NOTE: This change does not address new tables satellites and satellite_exit_progress

* Removing v22 migration to move into it's own PR.

* Removing v22 migration to move into it's own PR.

* Refactored schema, rebind and configure functions to be re-useable.

* Renamed LegacyInfoDB to DeprecatedInfoDB.

* Cleaned up closeDatabase function.

* Renamed storageNodeSQLDB to migratableDB.

* Switched from using errs.Combine() to errs.Group in closeDatabases func.

* Removed constructors from storage node data access objects.

* Reformatted usage of const.

* Fixed broken test snapshots.

* Fixed linting error.
2019-09-18 12:17:28 -04:00

177 lines
4.4 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"database/sql"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/storj"
"storj.io/storj/storagenode/storageusage"
)
// StorageUsageDBName represents the database name.
const StorageUsageDBName = "storage_usage"
// storageUsageDB storage usage DB
type storageUsageDB struct {
migratableDB
}
// Store stores storage usage stamps to db replacing conflicting entries
func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) {
defer mon.Task()(&ctx)(&err)
if len(stamps) == 0 {
return nil
}
query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, interval_start)
VALUES(?,?,?)`
return db.withTx(ctx, func(tx *sql.Tx) error {
for _, stamp := range stamps {
_, err = tx.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.IntervalStart.UTC())
if err != nil {
return err
}
}
return nil
})
}
// GetDaily returns daily storage usage stamps for particular satellite
// for provided time range
func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT satellite_id,
SUM(at_rest_total),
interval_start
FROM storage_usage
WHERE satellite_id = ?
AND ? <= interval_start AND interval_start <= ?
GROUP BY DATE(interval_start)
ORDER BY interval_start`
rows, err := db.QueryContext(ctx, query, satelliteID, from.UTC(), to.UTC())
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var stamps []storageusage.Stamp
for rows.Next() {
var satellite storj.NodeID
var atRestTotal float64
var intervalStart time.Time
err = rows.Scan(&satellite, &atRestTotal, &intervalStart)
if err != nil {
return nil, err
}
stamps = append(stamps, storageusage.Stamp{
SatelliteID: satellite,
AtRestTotal: atRestTotal,
IntervalStart: intervalStart,
})
}
return stamps, nil
}
// GetDailyTotal returns daily storage usage stamps summed across all known satellites
// for provided time range
func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT SUM(at_rest_total), interval_start
FROM storage_usage
WHERE ? <= interval_start AND interval_start <= ?
GROUP BY DATE(interval_start)
ORDER BY interval_start`
rows, err := db.QueryContext(ctx, query, from.UTC(), to.UTC())
if err != nil {
return nil, err
}
defer func() {
err = errs.Combine(err, rows.Close())
}()
var stamps []storageusage.Stamp
for rows.Next() {
var atRestTotal float64
var intervalStart time.Time
err = rows.Scan(&atRestTotal, &intervalStart)
if err != nil {
return nil, err
}
stamps = append(stamps, storageusage.Stamp{
AtRestTotal: atRestTotal,
IntervalStart: intervalStart,
})
}
return stamps, nil
}
// Summary returns aggregated storage usage across all satellites.
func (db *storageUsageDB) Summary(ctx context.Context, from, to time.Time) (_ float64, err error) {
defer mon.Task()(&ctx, from, to)(&err)
var summary sql.NullFloat64
query := `SELECT SUM(at_rest_total)
FROM storage_usage
WHERE ? <= interval_start AND interval_start <= ?`
err = db.QueryRowContext(ctx, query, from.UTC(), to.UTC()).Scan(&summary)
return summary.Float64, err
}
// SatelliteSummary returns aggregated storage usage for a particular satellite.
func (db *storageUsageDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ float64, err error) {
defer mon.Task()(&ctx, satelliteID, from, to)(&err)
var summary sql.NullFloat64
query := `SELECT SUM(at_rest_total)
FROM storage_usage
WHERE satellite_id = ?
AND ? <= interval_start AND interval_start <= ?`
err = db.QueryRowContext(ctx, query, satelliteID, from.UTC(), to.UTC()).Scan(&summary)
return summary.Float64, err
}
// withTx is a helper method which executes callback in transaction scope
func (db *storageUsageDB) withTx(ctx context.Context, cb func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
err = errs.Combine(err, tx.Rollback())
return
}
err = tx.Commit()
}()
return cb(tx)
}