Refactor accountingdb interface (#1897)

* splits accounting db into storagenodeaccounting and projectaccounting interfaces and renames methods to match
This commit is contained in:
Jennifer Li Johnson 2019-05-10 15:05:42 -04:00 committed by GitHub
parent 8c641563c4
commit 5395ff5fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1228 additions and 1149 deletions

View File

@ -29,7 +29,7 @@ func generateCSV(ctx context.Context, start time.Time, end time.Time, output io.
err = errs.Combine(err, db.Close())
}()
rows, err := db.Accounting().QueryPaymentInfo(ctx, start, end)
rows, err := db.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end)
if err != nil {
return err
}

View File

@ -15,14 +15,12 @@ import (
// RollupStats is a convenience alias
type RollupStats map[time.Time]map[storj.NodeID]*Rollup
// Raw mirrors dbx.AccountingRaw, allowing us to use that struct without leaking dbx
type Raw struct {
// StoragenodeStorageTally mirrors dbx.StoragenodeStorageTally, allowing us to use that struct without leaking dbx
type StoragenodeStorageTally struct {
ID int64
NodeID storj.NodeID
IntervalEndTime time.Time
DataTotal float64
DataType int
CreatedAt time.Time
}
// StoragenodeBandwidthRollup mirrors dbx.StoragenodeBandwidthRollup, allowing us to use the struct without leaking dbx
@ -46,30 +44,34 @@ type Rollup struct {
AtRestTotal float64
}
// DB stores information about bandwidth and storage usage
type DB interface {
// LastTimestamp records the latest last tallied time.
LastTimestamp(ctx context.Context, timestampType string) (time.Time, error)
// SaveAtRestRaw records raw tallies of at-rest-data.
SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error
// GetRaw retrieves all raw tallies
GetRaw(ctx context.Context) ([]*Raw, error)
// GetRawSince retrieves all raw tallies since latestRollup
GetRawSince(ctx context.Context, latestRollup time.Time) ([]*Raw, error)
// GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup
GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeBandwidthRollup, error)
// SaveRollup records raw tallies of at rest data to the database
// StoragenodeAccounting stores information about bandwidth and storage usage for storage nodes
type StoragenodeAccounting interface {
// SaveTallies records tallies of data at rest
SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error
// GetTallies retrieves all tallies
GetTallies(ctx context.Context) ([]*StoragenodeStorageTally, error)
// GetTalliesSince retrieves all tallies since latestRollup
GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeStorageTally, error)
// GetBandwidthSince retrieves all bandwidth rollup entires since latestRollup
GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*StoragenodeBandwidthRollup, error)
// SaveRollup records tally and bandwidth rollup aggregations to the database
SaveRollup(ctx context.Context, latestTally time.Time, stats RollupStats) error
// SaveBucketTallies saves the latest bucket info
SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) ([]BucketTally, error)
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
// LastTimestamp records and returns the latest last tallied time.
LastTimestamp(ctx context.Context, timestampType string) (time.Time, error)
// QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID
QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error)
// DeleteRawBefore deletes all raw tallies prior to some time
DeleteRawBefore(ctx context.Context, latestRollup time.Time) error
// CreateBucketStorageTally creates a record for BucketStorageTally in the accounting DB table
CreateBucketStorageTally(ctx context.Context, tally BucketStorageTally) error
// ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error)
// ProjectStorageTotals returns the current inline and remote storage usage for a projectID
ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error)
// DeleteTalliesBefore deletes all tallies prior to some time
DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error
}
// ProjectAccounting stores information about bandwidth and storage usage for projects
type ProjectAccounting interface {
// SaveTallies saves the latest project info
SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*BucketTally) ([]BucketTally, error)
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
CreateStorageTally(ctx context.Context, tally BucketStorageTally) error
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error)
// GetStorageTotals returns the current inline and remote storage usage for a projectID
GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error)
}

View File

@ -30,8 +30,8 @@ func TestSaveBucketTallies(t *testing.T) {
// Execute test: retrieve the save tallies and confirm they contains the expected data
intervalStart := time.Now()
accountingDB := db.Accounting()
actualTallies, err := accountingDB.SaveBucketTallies(ctx, intervalStart, bucketTallies)
pdb := db.ProjectAccounting()
actualTallies, err := pdb.SaveTallies(ctx, intervalStart, bucketTallies)
require.NoError(t, err)
for _, tally := range actualTallies {
require.Contains(t, expectedTallies, tally)

View File

@ -39,7 +39,7 @@ func TestProjectUsageStorage(t *testing.T) {
SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
saDB := planet.Satellites[0].DB
acctDB := saDB.Accounting()
acctDB := saDB.ProjectAccounting()
// Setup: create a new project to use the projectID
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
@ -57,7 +57,7 @@ func TestProjectUsageStorage(t *testing.T) {
}
// Execute test: get storage totals for a project, then check if that exceeds the max usage limit
inlineTotal, remoteTotal, err := acctDB.ProjectStorageTotals(ctx, projectID)
inlineTotal, remoteTotal, err := acctDB.GetStorageTotals(ctx, projectID)
require.NoError(t, err)
maxAlphaUsage := 25 * memory.GB
actualExceeded, actualResource := accounting.ExceedsAlphaUsage(0, inlineTotal, remoteTotal, maxAlphaUsage)
@ -97,7 +97,7 @@ func TestProjectUsageBandwidth(t *testing.T) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
saDB := planet.Satellites[0].DB
orderDB := saDB.Orders()
acctDB := saDB.Accounting()
acctDB := saDB.ProjectAccounting()
// Setup: get projectID and create bucketID
projects, err := planet.Satellites[0].DB.Console().Projects().GetAll(ctx)
@ -128,7 +128,7 @@ func TestProjectUsageBandwidth(t *testing.T) {
from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth)
// Execute test: get bandwidth totals for a project, then check if that exceeds the max usage limit
bandwidthTotal, err := acctDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from)
bandwidthTotal, err := acctDB.GetAllocatedBandwidthTotal(ctx, bucketID, from)
require.NoError(t, err)
maxAlphaUsage := 25 * memory.GB
actualExceeded, actualResource := accounting.ExceedsAlphaUsage(bandwidthTotal, 0, 0, maxAlphaUsage)
@ -155,7 +155,7 @@ func createBucketID(projectID uuid.UUID, bucket []byte) []byte {
return []byte(storj.JoinPaths(entries...))
}
func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB accounting.DB, time time.Time) error {
func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB accounting.ProjectAccounting, time time.Time) error {
// Create many records that sum greater than project usage limit of 25GB
for i := 0; i < 4; i++ {
@ -169,7 +169,7 @@ func setUpStorageTallies(ctx *testcontext.Context, projectID uuid.UUID, acctDB a
// that sum greater than the maxAlphaUsage * expansionFactor
RemoteBytes: 10 * memory.GB.Int64() * accounting.ExpansionFactor,
}
err := acctDB.CreateBucketStorageTally(ctx, tally)
err := acctDB.CreateStorageTally(ctx, tally)
if err != nil {
return err
}
@ -226,7 +226,7 @@ func TestProjectBandwidthTotal(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
accountingDB := db.Accounting()
pdb := db.ProjectAccounting()
projectID, err := uuid.New()
require.NoError(t, err)
@ -237,7 +237,7 @@ func TestProjectBandwidthTotal(t *testing.T) {
// Execute test: get project bandwidth total
bucketID := createBucketID(*projectID, []byte("testbucket"))
from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth) // past 30 days
actualBandwidthTotal, err := accountingDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from)
actualBandwidthTotal, err := pdb.GetAllocatedBandwidthTotal(ctx, bucketID, from)
require.NoError(t, err)
require.Equal(t, actualBandwidthTotal, expectedTotal)
})

View File

@ -26,16 +26,16 @@ type Config struct {
type Service struct {
logger *zap.Logger
ticker *time.Ticker
db accounting.DB
sdb accounting.StoragenodeAccounting
deleteTallies bool
}
// New creates a new rollup service
func New(logger *zap.Logger, db accounting.DB, interval time.Duration, deleteTallies bool) *Service {
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, interval time.Duration, deleteTallies bool) *Service {
return &Service{
logger: logger,
ticker: time.NewTicker(interval),
db: db,
sdb: sdb,
deleteTallies: deleteTallies,
}
}
@ -60,7 +60,7 @@ func (r *Service) Run(ctx context.Context) (err error) {
// Rollup aggregates storage and bandwidth amounts for the time interval
func (r *Service) Rollup(ctx context.Context) error {
// only Rollup new things - get LastRollup
lastRollup, err := r.db.LastTimestamp(ctx, accounting.LastRollup)
lastRollup, err := r.sdb.LastTimestamp(ctx, accounting.LastRollup)
if err != nil {
return Error.Wrap(err)
}
@ -83,14 +83,14 @@ func (r *Service) Rollup(ctx context.Context) error {
return nil
}
err = r.db.SaveRollup(ctx, latestTally, rollupStats)
err = r.sdb.SaveRollup(ctx, latestTally, rollupStats)
if err != nil {
return Error.Wrap(err)
}
if r.deleteTallies {
// Delete already rolled up tallies
err = r.db.DeleteRawBefore(ctx, latestTally)
err = r.sdb.DeleteTalliesBefore(ctx, latestTally)
if err != nil {
return Error.Wrap(err)
}
@ -101,7 +101,7 @@ func (r *Service) Rollup(ctx context.Context) error {
// RollupStorage rolls up storage tally, modifies rollupStats map
func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) (latestTally time.Time, err error) {
tallies, err := r.db.GetRawSince(ctx, lastRollup)
tallies, err := r.sdb.GetTalliesSince(ctx, lastRollup)
if err != nil {
return time.Now(), Error.Wrap(err)
}
@ -126,12 +126,7 @@ func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollu
rollupStats[iDay][node] = &accounting.Rollup{NodeID: node, StartTime: iDay}
}
//increment data at rest sum
switch tallyRow.DataType {
case accounting.AtRest:
rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal
default:
r.logger.Info("rollupStorage no longer supports non-accounting.AtRest datatypes")
}
rollupStats[iDay][node].AtRestTotal += tallyRow.DataTotal
}
return latestTally, nil
@ -140,7 +135,7 @@ func (r *Service) RollupStorage(ctx context.Context, lastRollup time.Time, rollu
// RollupBW aggregates the bandwidth rollups, modifies rollupStats map
func (r *Service) RollupBW(ctx context.Context, lastRollup time.Time, rollupStats accounting.RollupStats) error {
var latestTally time.Time
bws, err := r.db.GetStoragenodeBandwidthSince(ctx, lastRollup.UTC())
bws, err := r.sdb.GetBandwidthSince(ctx, lastRollup.UTC())
if err != nil {
return Error.Wrap(err)
}

View File

@ -14,7 +14,6 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
@ -37,7 +36,7 @@ func TestRollupNoDeletes(t *testing.T) {
start := timestamp
for i := 0; i < days; i++ {
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, testData[i].nodeData)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, timestamp, testData[i].nodeData)
require.NoError(t, err)
err = saveBW(ctx, planet, testData[i].bwTotals, timestamp)
require.NoError(t, err)
@ -53,7 +52,7 @@ func TestRollupNoDeletes(t *testing.T) {
start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location())
end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location())
rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end)
rows, err := planet.Satellites[0].DB.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end)
require.NoError(t, err)
if i == 0 { // we need at least two days for rollup to work
assert.Equal(t, 0, len(rows))
@ -74,7 +73,7 @@ func TestRollupNoDeletes(t *testing.T) {
assert.NotEmpty(t, r.Wallet)
}
}
raw, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx)
raw, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)
require.NoError(t, err)
assert.Equal(t, days*len(planet.StorageNodes), len(raw))
})
@ -97,7 +96,7 @@ func TestRollupDeletes(t *testing.T) {
start := timestamp
for i := 0; i < days; i++ {
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, timestamp, timestamp, testData[i].nodeData)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, timestamp, testData[i].nodeData)
require.NoError(t, err)
err = saveBW(ctx, planet, testData[i].bwTotals, timestamp)
require.NoError(t, err)
@ -106,15 +105,12 @@ func TestRollupDeletes(t *testing.T) {
require.NoError(t, err)
// Assert that RollupStorage deleted all raws except for today's
raw, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx)
raw, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)
require.NoError(t, err)
for _, r := range raw {
assert.Equal(t, r.IntervalEndTime.UTC().Truncate(time.Second), timestamp.Truncate(time.Second))
if r.DataType == accounting.AtRest {
assert.Equal(t, testData[i].nodeData[r.NodeID], r.DataTotal)
} else {
assert.Equal(t, testData[i].bwTotals[r.NodeID][r.DataType], int64(r.DataTotal))
}
assert.Equal(t, testData[i].nodeData[r.NodeID], r.DataTotal)
}
// Advance time by 24 hours
@ -125,7 +121,7 @@ func TestRollupDeletes(t *testing.T) {
start = time.Date(start.Year(), start.Month(), start.Day(), 0, 0, 0, 0, start.Location())
end = time.Date(end.Year(), end.Month(), end.Day(), 0, 0, 0, 0, end.Location())
rows, err := planet.Satellites[0].DB.Accounting().QueryPaymentInfo(ctx, start, end)
rows, err := planet.Satellites[0].DB.StoragenodeAccounting().QueryPaymentInfo(ctx, start, end)
require.NoError(t, err)
if i == 0 { // we need at least two days for rollup to work
assert.Equal(t, 0, len(rows))

View File

@ -27,25 +27,27 @@ type Config struct {
// Service is the tally service for data stored on each storage node
type Service struct {
logger *zap.Logger
metainfo *metainfo.Service
overlay *overlay.Cache
limit int
ticker *time.Ticker
accountingDB accounting.DB
liveAccounting live.Service
logger *zap.Logger
metainfo *metainfo.Service
overlay *overlay.Cache
limit int
ticker *time.Ticker
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
liveAccounting live.Service
}
// New creates a new tally Service
func New(logger *zap.Logger, accountingDB accounting.DB, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service {
func New(logger *zap.Logger, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, metainfo *metainfo.Service, overlay *overlay.Cache, limit int, interval time.Duration) *Service {
return &Service{
logger: logger,
metainfo: metainfo,
overlay: overlay,
limit: limit,
ticker: time.NewTicker(interval),
accountingDB: accountingDB,
liveAccounting: liveAccounting,
logger: logger,
metainfo: metainfo,
overlay: overlay,
limit: limit,
ticker: time.NewTicker(interval),
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
liveAccounting: liveAccounting,
}
}
@ -83,13 +85,13 @@ func (t *Service) Tally(ctx context.Context) error {
errAtRest = errs.New("Query for data-at-rest failed : %v", err)
} else {
if len(nodeData) > 0 {
err = t.SaveAtRestRaw(ctx, latestTally, time.Now().UTC(), nodeData)
err = t.storagenodeAccountingDB.SaveTallies(ctx, latestTally, nodeData)
if err != nil {
errAtRest = errs.New("Saving storage node data-at-rest failed : %v", err)
}
}
if len(bucketData) > 0 {
_, err = t.accountingDB.SaveBucketTallies(ctx, latestTally, bucketData)
_, err = t.projectAccountingDB.SaveTallies(ctx, latestTally, bucketData)
if err != nil {
errBucketInfo = errs.New("Saving bucket storage data failed")
}
@ -103,7 +105,7 @@ func (t *Service) Tally(ctx context.Context) error {
func (t *Service) CalculateAtRestData(ctx context.Context) (latestTally time.Time, nodeData map[storj.NodeID]float64, bucketTallies map[string]*accounting.BucketTally, err error) {
defer mon.Task()(&ctx)(&err)
latestTally, err = t.accountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
latestTally, err = t.storagenodeAccountingDB.LastTimestamp(ctx, accounting.LastAtRestTally)
if err != nil {
return latestTally, nodeData, bucketTallies, Error.Wrap(err)
}
@ -212,8 +214,3 @@ func (t *Service) CalculateAtRestData(ctx context.Context) (latestTally time.Tim
}
return latestTally, nodeData, bucketTallies, err
}
// SaveAtRestRaw records raw tallies of at-rest-data and updates the LastTimestamp
func (t *Service) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error {
return t.accountingDB.SaveAtRestRaw(ctx, latestTally, created, nodeData)
}

View File

@ -20,19 +20,16 @@ import (
"storj.io/storj/pkg/storj"
)
func TestDeleteRawBefore(t *testing.T) {
func TestDeleteTalliesBefore(t *testing.T) {
tests := []struct {
createdAt time.Time
eraseBefore time.Time
expectedRaws int
}{
{
createdAt: time.Now(),
eraseBefore: time.Now(),
expectedRaws: 1,
},
{
createdAt: time.Now(),
eraseBefore: time.Now().Add(24 * time.Hour),
expectedRaws: 0,
},
@ -46,13 +43,13 @@ func TestDeleteRawBefore(t *testing.T) {
nodeData := make(map[storj.NodeID]float64)
nodeData[id] = float64(1000)
err := planet.Satellites[0].DB.Accounting().SaveAtRestRaw(ctx, tt.createdAt, tt.createdAt, nodeData)
err := planet.Satellites[0].DB.StoragenodeAccounting().SaveTallies(ctx, time.Now(), nodeData)
require.NoError(t, err)
err = planet.Satellites[0].DB.Accounting().DeleteRawBefore(ctx, tt.eraseBefore)
err = planet.Satellites[0].DB.StoragenodeAccounting().DeleteTalliesBefore(ctx, tt.eraseBefore)
require.NoError(t, err)
raws, err := planet.Satellites[0].DB.Accounting().GetRaw(ctx)
raws, err := planet.Satellites[0].DB.StoragenodeAccounting().GetTallies(ctx)
require.NoError(t, err)
assert.Len(t, raws, tt.expectedRaws)
})
@ -98,7 +95,7 @@ func TestOnlyInline(t *testing.T) {
require.NoError(t, err)
assert.Len(t, actualNodeData, 0)
_, err = planet.Satellites[0].DB.Accounting().SaveBucketTallies(ctx, latestTally, actualBucketData)
_, err = planet.Satellites[0].DB.ProjectAccounting().SaveTallies(ctx, latestTally, actualBucketData)
require.NoError(t, err)
// Confirm the correct bucket storage tally was created

View File

@ -226,8 +226,7 @@ func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req Fin
return nodes, nil
}
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new.
func (cache *Cache) KnownUnreliableOrOffline(ctx context.Context, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
criteria := &NodeCriteria{

View File

@ -44,28 +44,30 @@ type APIKeys interface {
// Endpoint metainfo endpoint
type Endpoint struct {
log *zap.Logger
metainfo *Service
orders *orders.Service
cache *overlay.Cache
apiKeys APIKeys
accountingDB accounting.DB
liveAccounting live.Service
maxAlphaUsage memory.Size
log *zap.Logger
metainfo *Service
orders *orders.Service
cache *overlay.Cache
apiKeys APIKeys
storagenodeAccountingDB accounting.StoragenodeAccounting
projectAccountingDB accounting.ProjectAccounting
liveAccounting live.Service
maxAlphaUsage memory.Size
}
// NewEndpoint creates new metainfo endpoint instance
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, acctDB accounting.DB, liveAccounting live.Service, maxAlphaUsage memory.Size) *Endpoint {
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, apiKeys APIKeys, sdb accounting.StoragenodeAccounting, pdb accounting.ProjectAccounting, liveAccounting live.Service, maxAlphaUsage memory.Size) *Endpoint {
// TODO do something with too many params
return &Endpoint{
log: log,
metainfo: metainfo,
orders: orders,
cache: cache,
apiKeys: apiKeys,
accountingDB: acctDB,
liveAccounting: liveAccounting,
maxAlphaUsage: maxAlphaUsage,
log: log,
metainfo: metainfo,
orders: orders,
cache: cache,
apiKeys: apiKeys,
storagenodeAccountingDB: sdb,
projectAccountingDB: pdb,
liveAccounting: liveAccounting,
maxAlphaUsage: maxAlphaUsage,
}
}
@ -150,6 +152,7 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit
inlineTotal, remoteTotal, err := endpoint.getProjectStorageTotals(ctx, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("retrieving project storage totals", zap.Error(err))
}
exceeded, resource := accounting.ExceedsAlphaUsage(0, inlineTotal, remoteTotal, endpoint.maxAlphaUsage)
if exceeded {
@ -192,7 +195,7 @@ func (endpoint *Endpoint) CreateSegment(ctx context.Context, req *pb.SegmentWrit
}
func (endpoint *Endpoint) getProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) {
lastCountInline, lastCountRemote, err := endpoint.accountingDB.ProjectStorageTotals(ctx, projectID)
lastCountInline, lastCountRemote, err := endpoint.projectAccountingDB.GetStorageTotals(ctx, projectID)
if err != nil {
return 0, 0, err
}
@ -296,7 +299,7 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
// Ref: https://storjlabs.atlassian.net/browse/V3-1274
bucketID := createBucketID(keyInfo.ProjectID, req.Bucket)
from := time.Now().AddDate(0, 0, -accounting.AverageDaysInMonth) // past 30 days
bandwidthTotal, err := endpoint.accountingDB.ProjectAllocatedBandwidthTotal(ctx, bucketID, from)
bandwidthTotal, err := endpoint.projectAccountingDB.GetAllocatedBandwidthTotal(ctx, bucketID, from)
if err != nil {
endpoint.log.Error("retrieving ProjectBandwidthTotal", zap.Error(err))
}

View File

@ -73,8 +73,10 @@ type DB interface {
CertDB() certdb.DB
// OverlayCache returns database for caching overlay information
OverlayCache() overlay.DB
// Accounting returns database for storing information about data use
Accounting() accounting.DB
// StoragenodeAccounting returns database for storing information about storagenode use
StoragenodeAccounting() accounting.StoragenodeAccounting
// ProjectAccounting returns database for storing information about project data use
ProjectAccounting() accounting.ProjectAccounting
// RepairQueue returns queue for segments that need repairing
RepairQueue() queue.RepairQueue
// Irreparable returns database for failed repairs
@ -354,7 +356,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Orders.Service,
peer.Overlay.Service,
peer.DB.Console().APIKeys(),
peer.DB.Accounting(),
peer.DB.StoragenodeAccounting(),
peer.DB.ProjectAccounting(),
peer.LiveAccounting.Service,
config.Rollup.MaxAlphaUsage,
)
@ -413,8 +416,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
{ // setup accounting
log.Debug("Setting up accounting")
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.Accounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.Accounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
peer.Accounting.Tally = tally.New(peer.Log.Named("tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Service, peer.Metainfo.Service, peer.Overlay.Service, 0, config.Tally.Interval)
peer.Accounting.Rollup = rollup.New(peer.Log.Named("rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies)
}
{ // setup inspector

View File

@ -1,296 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"bytes"
"context"
"database/sql"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
//database implements DB
type accountingDB struct {
db *dbx.DB
}
// ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame
func (db *accountingDB) ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) {
pathEl := bytes.Split(bucketID, []byte("/"))
_, projectID := pathEl[1], pathEl[0]
var sum *int64
query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start > ?;`
err := db.db.QueryRow(db.db.Rebind(query), projectID, pb.PieceAction_GET, from).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
return 0, nil
}
return *sum, err
}
// ProjectStorageTotals returns the current inline and remote storage usage for a projectID
func (db *accountingDB) ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) {
var inlineSum, remoteSum sql.NullInt64
var intervalStart time.Time
// Sum all the inline and remote values for a project that all share the same interval_start.
// All records for a project that have the same interval start are part of the same tally run.
// This should represent the most recent calculation of a project's total at rest storage.
query := `SELECT interval_start, SUM(inline), SUM(remote)
FROM bucket_storage_tallies
WHERE project_id = ?
GROUP BY interval_start
ORDER BY interval_start DESC LIMIT 1;`
err := db.db.QueryRow(db.db.Rebind(query), projectID[:]).Scan(&intervalStart, &inlineSum, &remoteSum)
if err != nil || !inlineSum.Valid || !remoteSum.Valid {
return 0, 0, nil
}
return inlineSum.Int64, remoteSum.Int64, err
}
// CreateBucketStorageTally creates a record in the bucket_storage_tallies accounting table
func (db *accountingDB) CreateBucketStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error {
_, err := db.db.Create_BucketStorageTally(
ctx,
dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)),
dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]),
dbx.BucketStorageTally_IntervalStart(tally.IntervalStart),
dbx.BucketStorageTally_Inline(uint64(tally.InlineBytes)),
dbx.BucketStorageTally_Remote(uint64(tally.RemoteBytes)),
dbx.BucketStorageTally_RemoteSegmentsCount(uint(tally.RemoteSegmentCount)),
dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)),
dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)),
dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)),
)
if err != nil {
return err
}
return nil
}
// LastTimestamp records the greatest last tallied time
func (db *accountingDB) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) {
lastTally := time.Time{}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
if lt == nil {
update := dbx.AccountingTimestamps_Value(lastTally)
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update)
return err
}
lastTally = lt.Value
return err
})
return lastTally, err
}
// SaveAtRestRaw records raw tallies of at rest data to the database
func (db *accountingDB) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error {
if len(nodeData) == 0 {
return Error.New("In SaveAtRestRaw with empty nodeData")
}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for k, v := range nodeData {
nID := dbx.AccountingRaw_NodeId(k.Bytes())
end := dbx.AccountingRaw_IntervalEndTime(latestTally)
total := dbx.AccountingRaw_DataTotal(v)
dataType := dbx.AccountingRaw_DataType(accounting.AtRest)
timestamp := dbx.AccountingRaw_CreatedAt(created)
_, err := tx.Create_AccountingRaw(ctx, nID, end, total, dataType, timestamp)
if err != nil {
return err
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
return err
})
return Error.Wrap(err)
}
// GetRaw retrieves all raw tallies
func (db *accountingDB) GetRaw(ctx context.Context) ([]*accounting.Raw, error) {
raws, err := db.db.All_AccountingRaw(ctx)
out := make([]*accounting.Raw, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.Raw{
ID: r.Id,
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
DataType: r.DataType,
CreatedAt: r.CreatedAt,
}
}
return out, Error.Wrap(err)
}
// GetRawSince retrieves all raw tallies since latestRollup
func (db *accountingDB) GetRawSince(ctx context.Context, latestRollup time.Time) ([]*accounting.Raw, error) {
raws, err := db.db.All_AccountingRaw_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.AccountingRaw_IntervalEndTime(latestRollup))
out := make([]*accounting.Raw, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.Raw{
ID: r.Id,
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
DataType: r.DataType,
CreatedAt: r.CreatedAt,
}
}
return out, Error.Wrap(err)
}
// GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup
func (db *accountingDB) GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) {
rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup))
out := make([]*accounting.StoragenodeBandwidthRollup, len(rollups))
for i, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
}
}
return out, Error.Wrap(err)
}
// SaveRollup records raw tallies of at rest data to the database
func (db *accountingDB) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error {
if len(stats) == 0 {
return Error.New("In SaveRollup with empty nodeData")
}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, arsByDate := range stats {
for _, ar := range arsByDate {
nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes())
start := dbx.AccountingRollup_StartTime(ar.StartTime)
put := dbx.AccountingRollup_PutTotal(ar.PutTotal)
get := dbx.AccountingRollup_GetTotal(ar.GetTotal)
audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal)
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal)
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal)
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal)
_, err := tx.Create_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
if err != nil {
return err
}
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
return err
})
return Error.Wrap(err)
}
// SaveBucketTallies saves the latest bucket info
func (db *accountingDB) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) {
if len(bucketTallies) == 0 {
return nil, Error.New("In SaveBucketTallies with empty bucketTallies")
}
var result []accounting.BucketTally
for bucketID, info := range bucketTallies {
bucketIDComponents := storj.SplitPath(bucketID)
bucketName := dbx.BucketStorageTally_BucketName([]byte(bucketIDComponents[1]))
projectID := dbx.BucketStorageTally_ProjectId([]byte(bucketIDComponents[0]))
interval := dbx.BucketStorageTally_IntervalStart(intervalStart)
inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes))
remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes))
rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments))
iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments))
objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files))
meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize))
dbxTally, err := db.db.Create_BucketStorageTally(ctx, bucketName, projectID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta)
if err != nil {
return nil, err
}
tally := accounting.BucketTally{
BucketName: dbxTally.BucketName,
ProjectID: dbxTally.ProjectId,
InlineSegments: int64(dbxTally.InlineSegmentsCount),
RemoteSegments: int64(dbxTally.RemoteSegmentsCount),
Files: int64(dbxTally.ObjectCount),
InlineBytes: int64(dbxTally.Inline),
RemoteBytes: int64(dbxTally.Remote),
MetadataSize: int64(dbxTally.MetadataSize),
}
result = append(result, tally)
}
return result, nil
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (db *accountingDB) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
var sqlStmt = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet
FROM (
SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
FROM accounting_rollups
WHERE start_time >= ? AND start_time < ?
GROUP BY node_id
) r
LEFT JOIN nodes n ON n.id = r.node_id
ORDER BY n.id`
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
csv := make([]*accounting.CSVRow, 0, 0)
for rows.Next() {
var nodeID []byte
r := &accounting.CSVRow{}
var wallet sql.NullString
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AuditSuccessRatio, &r.AtRestTotal, &r.GetRepairTotal,
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet)
if err != nil {
return csv, Error.Wrap(err)
}
if wallet.Valid {
r.Wallet = wallet.String
}
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return csv, Error.Wrap(err)
}
r.NodeID = id
csv = append(csv, r)
}
return csv, nil
}
// DeleteRawBefore deletes all raw tallies prior to some time
func (db *accountingDB) DeleteRawBefore(ctx context.Context, latestRollup time.Time) error {
var deleteRawSQL = `DELETE FROM accounting_raws WHERE interval_end_time < ?`
_, err := db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup)
return err
}

View File

@ -114,9 +114,14 @@ func (db *DB) RepairQueue() queue.RepairQueue {
return &repairQueue{db: db.db}
}
// Accounting returns database for tracking bandwidth agreements over time
func (db *DB) Accounting() accounting.DB {
return &accountingDB{db: db.db}
// StoragenodeAccounting returns database for tracking storagenode usage
func (db *DB) StoragenodeAccounting() accounting.StoragenodeAccounting {
return &StoragenodeAccounting{db: db.db}
}
// ProjectAccounting returns database for tracking project data use
func (db *DB) ProjectAccounting() accounting.ProjectAccounting {
return &ProjectAccounting{db: db.db}
}
// Irreparable returns database for storing segments that failed repair

View File

@ -86,34 +86,6 @@ read all (
where accounting_rollup.start_time >= ?
)
model accounting_raw (
key id
field id serial64
field node_id blob
field interval_end_time timestamp
field data_total float64
field data_type int
field created_at timestamp
)
create accounting_raw ( )
delete accounting_raw ( where accounting_raw.id = ? )
read one (
select accounting_raw
where accounting_raw.id = ?
)
read all (
select accounting_raw
)
read all (
select accounting_raw
where accounting_raw.interval_end_time >= ?
)
//--- statdb ---//
model node (
@ -482,12 +454,26 @@ read all (
)
model storagenode_storage_tally (
key storagenode_id interval_start
key id
field id serial64
field node_id blob
field interval_end_time timestamp
field data_total float64
)
field storagenode_id blob
field interval_start utimestamp
field total uint64
create storagenode_storage_tally ()
delete storagenode_storage_tally ( where storagenode_storage_tally.id = ? )
read one (
select storagenode_storage_tally
where storagenode_storage_tally.id = ?
)
read all (
select storagenode_storage_tally
)
read all (
select storagenode_storage_tally
where storagenode_storage_tally.interval_end_time >= ?
)
//--- certRecord ---//

File diff suppressed because it is too large Load Diff

View File

@ -1,14 +1,5 @@
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
-- DO NOT EDIT
CREATE TABLE accounting_raws (
id bigserial NOT NULL,
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
data_type integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE accounting_rollups (
id bigserial NOT NULL,
node_id bytea NOT NULL,
@ -154,10 +145,11 @@ CREATE TABLE storagenode_bandwidth_rollups (
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_storage_tallies (
storagenode_id bytea NOT NULL,
interval_start timestamp NOT NULL,
total bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start )
id bigserial NOT NULL,
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE users (
id bytea NOT NULL,

View File

@ -1,14 +1,5 @@
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
-- DO NOT EDIT
CREATE TABLE accounting_raws (
id INTEGER NOT NULL,
node_id BLOB NOT NULL,
interval_end_time TIMESTAMP NOT NULL,
data_total REAL NOT NULL,
data_type INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE accounting_rollups (
id INTEGER NOT NULL,
node_id BLOB NOT NULL,
@ -154,10 +145,11 @@ CREATE TABLE storagenode_bandwidth_rollups (
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_storage_tallies (
storagenode_id BLOB NOT NULL,
interval_start TIMESTAMP NOT NULL,
total INTEGER NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start )
id INTEGER NOT NULL,
node_id BLOB NOT NULL,
interval_end_time TIMESTAMP NOT NULL,
data_total REAL NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE users (
id BLOB NOT NULL,

View File

@ -37,103 +37,6 @@ func newLocked(db satellite.DB) satellite.DB {
return &locked{&sync.Mutex{}, db}
}
// Accounting returns database for storing information about data use
func (m *locked) Accounting() accounting.DB {
m.Lock()
defer m.Unlock()
return &lockedAccounting{m.Locker, m.db.Accounting()}
}
// lockedAccounting implements locking wrapper for accounting.DB
type lockedAccounting struct {
sync.Locker
db accounting.DB
}
// CreateBucketStorageTally creates a record for BucketStorageTally in the accounting DB table
func (m *lockedAccounting) CreateBucketStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error {
m.Lock()
defer m.Unlock()
return m.db.CreateBucketStorageTally(ctx, tally)
}
// DeleteRawBefore deletes all raw tallies prior to some time
func (m *lockedAccounting) DeleteRawBefore(ctx context.Context, latestRollup time.Time) error {
m.Lock()
defer m.Unlock()
return m.db.DeleteRawBefore(ctx, latestRollup)
}
// GetRaw retrieves all raw tallies
func (m *lockedAccounting) GetRaw(ctx context.Context) ([]*accounting.Raw, error) {
m.Lock()
defer m.Unlock()
return m.db.GetRaw(ctx)
}
// GetRawSince retrieves all raw tallies since latestRollup
func (m *lockedAccounting) GetRawSince(ctx context.Context, latestRollup time.Time) ([]*accounting.Raw, error) {
m.Lock()
defer m.Unlock()
return m.db.GetRawSince(ctx, latestRollup)
}
// GetStoragenodeBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup
func (m *lockedAccounting) GetStoragenodeBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) {
m.Lock()
defer m.Unlock()
return m.db.GetStoragenodeBandwidthSince(ctx, latestRollup)
}
// LastTimestamp records the latest last tallied time.
func (m *lockedAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) {
m.Lock()
defer m.Unlock()
return m.db.LastTimestamp(ctx, timestampType)
}
// ProjectAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
func (m *lockedAccounting) ProjectAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) {
m.Lock()
defer m.Unlock()
return m.db.ProjectAllocatedBandwidthTotal(ctx, bucketID, from)
}
// ProjectStorageTotals returns the current inline and remote storage usage for a projectID
func (m *lockedAccounting) ProjectStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) {
m.Lock()
defer m.Unlock()
return m.db.ProjectStorageTotals(ctx, projectID)
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (m *lockedAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
m.Lock()
defer m.Unlock()
return m.db.QueryPaymentInfo(ctx, start, end)
}
// SaveAtRestRaw records raw tallies of at-rest-data.
func (m *lockedAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, created time.Time, nodeData map[storj.NodeID]float64) error {
m.Lock()
defer m.Unlock()
return m.db.SaveAtRestRaw(ctx, latestTally, created, nodeData)
}
// SaveBucketTallies saves the latest bucket info
func (m *lockedAccounting) SaveBucketTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) {
m.Lock()
defer m.Unlock()
return m.db.SaveBucketTallies(ctx, intervalStart, bucketTallies)
}
// SaveRollup records raw tallies of at rest data to the database
func (m *lockedAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error {
m.Lock()
defer m.Unlock()
return m.db.SaveRollup(ctx, latestTally, stats)
}
// BandwidthAgreement returns database for storing bandwidth agreements
func (m *locked) BandwidthAgreement() bwagreement.DB {
m.Lock()
@ -703,6 +606,13 @@ func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*ove
return m.db.Get(ctx, nodeID)
}
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
func (m *lockedOverlayCache) KnownUnreliableOrOffline(ctx context.Context, a1 *overlay.NodeCriteria, a2 storj.NodeIDList) (storj.NodeIDList, error) {
m.Lock()
defer m.Unlock()
return m.db.KnownUnreliableOrOffline(ctx, a1, a2)
}
// Paginate will page through the database nodes
func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit int) ([]*overlay.NodeDossier, bool, error) {
m.Lock()
@ -724,14 +634,6 @@ func (m *lockedOverlayCache) SelectStorageNodes(ctx context.Context, count int,
return m.db.SelectStorageNodes(ctx, count, criteria)
}
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
func (m *lockedOverlayCache) KnownUnreliableOrOffline(ctx context.Context, a1 *overlay.NodeCriteria, a2 storj.NodeIDList) (storj.NodeIDList, error) {
m.Lock()
defer m.Unlock()
return m.db.KnownUnreliableOrOffline(ctx, a1, a2)
}
// Update updates node address
func (m *lockedOverlayCache) UpdateAddress(ctx context.Context, value *pb.Node) error {
m.Lock()
@ -760,6 +662,47 @@ func (m *lockedOverlayCache) UpdateUptime(ctx context.Context, nodeID storj.Node
return m.db.UpdateUptime(ctx, nodeID, isUp)
}
// ProjectAccounting returns database for storing information about project data use
func (m *locked) ProjectAccounting() accounting.ProjectAccounting {
m.Lock()
defer m.Unlock()
return &lockedProjectAccounting{m.Locker, m.db.ProjectAccounting()}
}
// lockedProjectAccounting implements locking wrapper for accounting.ProjectAccounting
type lockedProjectAccounting struct {
sync.Locker
db accounting.ProjectAccounting
}
// CreateStorageTally creates a record for BucketStorageTally in the accounting DB table
func (m *lockedProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error {
m.Lock()
defer m.Unlock()
return m.db.CreateStorageTally(ctx, tally)
}
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID in the past time frame
func (m *lockedProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) {
m.Lock()
defer m.Unlock()
return m.db.GetAllocatedBandwidthTotal(ctx, bucketID, from)
}
// GetStorageTotals returns the current inline and remote storage usage for a projectID
func (m *lockedProjectAccounting) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) {
m.Lock()
defer m.Unlock()
return m.db.GetStorageTotals(ctx, projectID)
}
// SaveTallies saves the latest project info
func (m *lockedProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) {
m.Lock()
defer m.Unlock()
return m.db.SaveTallies(ctx, intervalStart, bucketTallies)
}
// RepairQueue returns queue for segments that need repairing
func (m *locked) RepairQueue() queue.RepairQueue {
m.Lock()
@ -800,3 +743,72 @@ func (m *lockedRepairQueue) SelectN(ctx context.Context, limit int) ([]pb.Injure
defer m.Unlock()
return m.db.SelectN(ctx, limit)
}
// StoragenodeAccounting returns database for storing information about storagenode use
func (m *locked) StoragenodeAccounting() accounting.StoragenodeAccounting {
m.Lock()
defer m.Unlock()
return &lockedStoragenodeAccounting{m.Locker, m.db.StoragenodeAccounting()}
}
// lockedStoragenodeAccounting implements locking wrapper for accounting.StoragenodeAccounting
type lockedStoragenodeAccounting struct {
sync.Locker
db accounting.StoragenodeAccounting
}
// DeleteTalliesBefore deletes all tallies prior to some time
func (m *lockedStoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error {
m.Lock()
defer m.Unlock()
return m.db.DeleteTalliesBefore(ctx, latestRollup)
}
// GetBandwidthSince retrieves all bandwidth rollup entires since latestRollup
func (m *lockedStoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) {
m.Lock()
defer m.Unlock()
return m.db.GetBandwidthSince(ctx, latestRollup)
}
// GetTallies retrieves all tallies
func (m *lockedStoragenodeAccounting) GetTallies(ctx context.Context) ([]*accounting.StoragenodeStorageTally, error) {
m.Lock()
defer m.Unlock()
return m.db.GetTallies(ctx)
}
// GetTalliesSince retrieves all tallies since latestRollup
func (m *lockedStoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeStorageTally, error) {
m.Lock()
defer m.Unlock()
return m.db.GetTalliesSince(ctx, latestRollup)
}
// LastTimestamp records and returns the latest last tallied time.
func (m *lockedStoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) {
m.Lock()
defer m.Unlock()
return m.db.LastTimestamp(ctx, timestampType)
}
// QueryPaymentInfo queries Nodes and Accounting_Rollup on nodeID
func (m *lockedStoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
m.Lock()
defer m.Unlock()
return m.db.QueryPaymentInfo(ctx, start, end)
}
// SaveRollup records tally and bandwidth rollup aggregations to the database
func (m *lockedStoragenodeAccounting) SaveRollup(ctx context.Context, latestTally time.Time, stats accounting.RollupStats) error {
m.Lock()
defer m.Unlock()
return m.db.SaveRollup(ctx, latestTally, stats)
}
// SaveTallies records tallies of data at rest
func (m *lockedStoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error {
m.Lock()
defer m.Unlock()
return m.db.SaveTallies(ctx, latestTally, nodeData)
}

View File

@ -610,6 +610,16 @@ func (db *DB) PostgresMigration() *migrate.Migration {
UPDATE nodes SET uptime_ratio = 1 WHERE total_uptime_count = 0;`,
},
},
{
Description: "Drops storagenode_storage_tally table, Renames accounting_raws to storagenode_storage_tally, and Drops data_type and created_at columns",
Version: 18,
Action: migrate.SQL{
`DROP TABLE storagenode_storage_tallies CASCADE`,
`ALTER TABLE accounting_raws RENAME TO storagenode_storage_tallies`,
`ALTER TABLE storagenode_storage_tallies DROP COLUMN data_type`,
`ALTER TABLE storagenode_storage_tallies DROP COLUMN created_at`,
},
},
},
}
}

View File

@ -0,0 +1,116 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"bytes"
"context"
"database/sql"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
// ProjectAccounting implements the accounting/db ProjectAccounting interface
type ProjectAccounting struct {
db *dbx.DB
}
// SaveTallies saves the latest bucket info
func (db *ProjectAccounting) SaveTallies(ctx context.Context, intervalStart time.Time, bucketTallies map[string]*accounting.BucketTally) ([]accounting.BucketTally, error) {
if len(bucketTallies) == 0 {
return nil, Error.New("In SaveTallies with empty bucketTallies")
}
var result []accounting.BucketTally
for bucketID, info := range bucketTallies {
bucketIDComponents := storj.SplitPath(bucketID)
bucketName := dbx.BucketStorageTally_BucketName([]byte(bucketIDComponents[1]))
projectID := dbx.BucketStorageTally_ProjectId([]byte(bucketIDComponents[0]))
interval := dbx.BucketStorageTally_IntervalStart(intervalStart)
inlineBytes := dbx.BucketStorageTally_Inline(uint64(info.InlineBytes))
remoteBytes := dbx.BucketStorageTally_Remote(uint64(info.RemoteBytes))
rSegments := dbx.BucketStorageTally_RemoteSegmentsCount(uint(info.RemoteSegments))
iSegments := dbx.BucketStorageTally_InlineSegmentsCount(uint(info.InlineSegments))
objectCount := dbx.BucketStorageTally_ObjectCount(uint(info.Files))
meta := dbx.BucketStorageTally_MetadataSize(uint64(info.MetadataSize))
dbxTally, err := db.db.Create_BucketStorageTally(ctx, bucketName, projectID, interval, inlineBytes, remoteBytes, rSegments, iSegments, objectCount, meta)
if err != nil {
return nil, err
}
tally := accounting.BucketTally{
BucketName: dbxTally.BucketName,
ProjectID: dbxTally.ProjectId,
InlineSegments: int64(dbxTally.InlineSegmentsCount),
RemoteSegments: int64(dbxTally.RemoteSegmentsCount),
Files: int64(dbxTally.ObjectCount),
InlineBytes: int64(dbxTally.Inline),
RemoteBytes: int64(dbxTally.Remote),
MetadataSize: int64(dbxTally.MetadataSize),
}
result = append(result, tally)
}
return result, nil
}
// CreateStorageTally creates a record in the bucket_storage_tallies accounting table
func (db *ProjectAccounting) CreateStorageTally(ctx context.Context, tally accounting.BucketStorageTally) error {
_, err := db.db.Create_BucketStorageTally(
ctx,
dbx.BucketStorageTally_BucketName([]byte(tally.BucketName)),
dbx.BucketStorageTally_ProjectId(tally.ProjectID[:]),
dbx.BucketStorageTally_IntervalStart(tally.IntervalStart),
dbx.BucketStorageTally_Inline(uint64(tally.InlineBytes)),
dbx.BucketStorageTally_Remote(uint64(tally.RemoteBytes)),
dbx.BucketStorageTally_RemoteSegmentsCount(uint(tally.RemoteSegmentCount)),
dbx.BucketStorageTally_InlineSegmentsCount(uint(tally.InlineSegmentCount)),
dbx.BucketStorageTally_ObjectCount(uint(tally.ObjectCount)),
dbx.BucketStorageTally_MetadataSize(uint64(tally.MetadataSize)),
)
if err != nil {
return err
}
return nil
}
// GetAllocatedBandwidthTotal returns the sum of GET bandwidth usage allocated for a projectID for a time frame
func (db *ProjectAccounting) GetAllocatedBandwidthTotal(ctx context.Context, bucketID []byte, from time.Time) (int64, error) {
pathEl := bytes.Split(bucketID, []byte("/"))
_, projectID := pathEl[1], pathEl[0]
var sum *int64
query := `SELECT SUM(allocated) FROM bucket_bandwidth_rollups WHERE project_id = ? AND action = ? AND interval_start > ?;`
err := db.db.QueryRow(db.db.Rebind(query), projectID, pb.PieceAction_GET, from).Scan(&sum)
if err == sql.ErrNoRows || sum == nil {
return 0, nil
}
return *sum, err
}
// GetStorageTotals returns the current inline and remote storage usage for a projectID
func (db *ProjectAccounting) GetStorageTotals(ctx context.Context, projectID uuid.UUID) (int64, int64, error) {
var inlineSum, remoteSum sql.NullInt64
var intervalStart time.Time
// Sum all the inline and remote values for a project that all share the same interval_start.
// All records for a project that have the same interval start are part of the same tally run.
// This should represent the most recent calculation of a project's total at rest storage.
query := `SELECT interval_start, SUM(inline), SUM(remote)
FROM bucket_storage_tallies
WHERE project_id = ?
GROUP BY interval_start
ORDER BY interval_start DESC LIMIT 1;`
err := db.db.QueryRow(db.db.Rebind(query), projectID[:]).Scan(&intervalStart, &inlineSum, &remoteSum)
if err != nil || !inlineSum.Valid || !remoteSum.Valid {
return 0, 0, nil
}
return inlineSum.Int64, remoteSum.Int64, err
}

View File

@ -0,0 +1,194 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"time"
"github.com/zeebo/errs"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/storj"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
// StoragenodeAccounting implements the accounting/db StoragenodeAccounting interface
type StoragenodeAccounting struct {
db *dbx.DB
}
// SaveTallies records raw tallies of at rest data to the database
func (db *StoragenodeAccounting) SaveTallies(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]float64) error {
if len(nodeData) == 0 {
return Error.New("In SaveTallies with empty nodeData")
}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for k, v := range nodeData {
nID := dbx.StoragenodeStorageTally_NodeId(k.Bytes())
end := dbx.StoragenodeStorageTally_IntervalEndTime(latestTally)
total := dbx.StoragenodeStorageTally_DataTotal(v)
_, err := tx.Create_StoragenodeStorageTally(ctx, nID, end, total)
if err != nil {
return err
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestTally)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastAtRestTally), update)
return err
})
return Error.Wrap(err)
}
// GetTallies retrieves all raw tallies
func (db *StoragenodeAccounting) GetTallies(ctx context.Context) ([]*accounting.StoragenodeStorageTally, error) {
raws, err := db.db.All_StoragenodeStorageTally(ctx)
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
ID: r.Id,
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
}
return out, Error.Wrap(err)
}
// GetTalliesSince retrieves all raw tallies since latestRollup
func (db *StoragenodeAccounting) GetTalliesSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeStorageTally, error) {
raws, err := db.db.All_StoragenodeStorageTally_By_IntervalEndTime_GreaterOrEqual(ctx, dbx.StoragenodeStorageTally_IntervalEndTime(latestRollup))
out := make([]*accounting.StoragenodeStorageTally, len(raws))
for i, r := range raws {
nodeID, err := storj.NodeIDFromBytes(r.NodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeStorageTally{
ID: r.Id,
NodeID: nodeID,
IntervalEndTime: r.IntervalEndTime,
DataTotal: r.DataTotal,
}
}
return out, Error.Wrap(err)
}
// GetBandwidthSince retrieves all storagenode_bandwidth_rollup entires since latestRollup
func (db *StoragenodeAccounting) GetBandwidthSince(ctx context.Context, latestRollup time.Time) ([]*accounting.StoragenodeBandwidthRollup, error) {
rollups, err := db.db.All_StoragenodeBandwidthRollup_By_IntervalStart_GreaterOrEqual(ctx, dbx.StoragenodeBandwidthRollup_IntervalStart(latestRollup))
out := make([]*accounting.StoragenodeBandwidthRollup, len(rollups))
for i, r := range rollups {
nodeID, err := storj.NodeIDFromBytes(r.StoragenodeId)
if err != nil {
return nil, Error.Wrap(err)
}
out[i] = &accounting.StoragenodeBandwidthRollup{
NodeID: nodeID,
IntervalStart: r.IntervalStart,
Action: r.Action,
Settled: r.Settled,
}
}
return out, Error.Wrap(err)
}
// SaveRollup records raw tallies of at rest data to the database
func (db *StoragenodeAccounting) SaveRollup(ctx context.Context, latestRollup time.Time, stats accounting.RollupStats) error {
if len(stats) == 0 {
return Error.New("In SaveRollup with empty nodeData")
}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
for _, arsByDate := range stats {
for _, ar := range arsByDate {
nID := dbx.AccountingRollup_NodeId(ar.NodeID.Bytes())
start := dbx.AccountingRollup_StartTime(ar.StartTime)
put := dbx.AccountingRollup_PutTotal(ar.PutTotal)
get := dbx.AccountingRollup_GetTotal(ar.GetTotal)
audit := dbx.AccountingRollup_GetAuditTotal(ar.GetAuditTotal)
getRepair := dbx.AccountingRollup_GetRepairTotal(ar.GetRepairTotal)
putRepair := dbx.AccountingRollup_PutRepairTotal(ar.PutRepairTotal)
atRest := dbx.AccountingRollup_AtRestTotal(ar.AtRestTotal)
_, err := tx.Create_AccountingRollup(ctx, nID, start, put, get, audit, getRepair, putRepair, atRest)
if err != nil {
return err
}
}
}
update := dbx.AccountingTimestamps_Update_Fields{Value: dbx.AccountingTimestamps_Value(latestRollup)}
_, err := tx.Update_AccountingTimestamps_By_Name(ctx, dbx.AccountingTimestamps_Name(accounting.LastRollup), update)
return err
})
return Error.Wrap(err)
}
// LastTimestamp records the greatest last tallied time
func (db *StoragenodeAccounting) LastTimestamp(ctx context.Context, timestampType string) (time.Time, error) {
lastTally := time.Time{}
err := db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
lt, err := tx.Find_AccountingTimestamps_Value_By_Name(ctx, dbx.AccountingTimestamps_Name(timestampType))
if lt == nil {
update := dbx.AccountingTimestamps_Value(lastTally)
_, err = tx.Create_AccountingTimestamps(ctx, dbx.AccountingTimestamps_Name(timestampType), update)
return err
}
lastTally = lt.Value
return err
})
return lastTally, err
}
// QueryPaymentInfo queries Overlay, Accounting Rollup on nodeID
func (db *StoragenodeAccounting) QueryPaymentInfo(ctx context.Context, start time.Time, end time.Time) ([]*accounting.CSVRow, error) {
var sqlStmt = `SELECT n.id, n.created_at, n.audit_success_ratio, r.at_rest_total, r.get_repair_total,
r.put_repair_total, r.get_audit_total, r.put_total, r.get_total, n.wallet
FROM (
SELECT node_id, SUM(at_rest_total) AS at_rest_total, SUM(get_repair_total) AS get_repair_total,
SUM(put_repair_total) AS put_repair_total, SUM(get_audit_total) AS get_audit_total,
SUM(put_total) AS put_total, SUM(get_total) AS get_total
FROM accounting_rollups
WHERE start_time >= ? AND start_time < ?
GROUP BY node_id
) r
LEFT JOIN nodes n ON n.id = r.node_id
ORDER BY n.id`
rows, err := db.db.DB.QueryContext(ctx, db.db.Rebind(sqlStmt), start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
csv := make([]*accounting.CSVRow, 0, 0)
for rows.Next() {
var nodeID []byte
r := &accounting.CSVRow{}
var wallet sql.NullString
err := rows.Scan(&nodeID, &r.NodeCreationDate, &r.AuditSuccessRatio, &r.AtRestTotal, &r.GetRepairTotal,
&r.PutRepairTotal, &r.GetAuditTotal, &r.PutTotal, &r.GetTotal, &wallet)
if err != nil {
return csv, Error.Wrap(err)
}
if wallet.Valid {
r.Wallet = wallet.String
}
id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return csv, Error.Wrap(err)
}
r.NodeID = id
csv = append(csv, r)
}
return csv, nil
}
// DeleteTalliesBefore deletes all raw tallies prior to some time
func (db *StoragenodeAccounting) DeleteTalliesBefore(ctx context.Context, latestRollup time.Time) error {
var deleteRawSQL = `DELETE FROM storagenode_storage_tallies WHERE interval_end_time < ?`
_, err := db.db.DB.ExecContext(ctx, db.db.Rebind(deleteRawSQL), latestRollup)
return err
}

View File

@ -0,0 +1,233 @@
-- Copied from the corresponding version of dbx generated schema
CREATE TABLE accounting_rollups (
id bigserial NOT NULL,
node_id bytea NOT NULL,
start_time timestamp with time zone NOT NULL,
put_total bigint NOT NULL,
get_total bigint NOT NULL,
get_audit_total bigint NOT NULL,
get_repair_total bigint NOT NULL,
put_repair_total bigint NOT NULL,
at_rest_total double precision NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE accounting_timestamps (
name text NOT NULL,
value timestamp with time zone NOT NULL,
PRIMARY KEY ( name )
);
CREATE TABLE bucket_bandwidth_rollups (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
inline bigint NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start, action )
);
CREATE TABLE bucket_storage_tallies (
bucket_name bytea NOT NULL,
project_id bytea NOT NULL,
interval_start timestamp NOT NULL,
inline bigint NOT NULL,
remote bigint NOT NULL,
remote_segments_count integer NOT NULL,
inline_segments_count integer NOT NULL,
object_count integer NOT NULL,
metadata_size bigint NOT NULL,
PRIMARY KEY ( bucket_name, project_id, interval_start )
);
CREATE TABLE bucket_usages (
id bytea NOT NULL,
bucket_id bytea NOT NULL,
rollup_end_time timestamp with time zone NOT NULL,
remote_stored_data bigint NOT NULL,
inline_stored_data bigint NOT NULL,
remote_segments integer NOT NULL,
inline_segments integer NOT NULL,
objects integer NOT NULL,
metadata_size bigint NOT NULL,
repair_egress bigint NOT NULL,
get_egress bigint NOT NULL,
audit_egress bigint NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE bwagreements (
serialnum text NOT NULL,
storage_node_id bytea NOT NULL,
uplink_id bytea NOT NULL,
action bigint NOT NULL,
total bigint NOT NULL,
created_at timestamp with time zone NOT NULL,
expires_at timestamp with time zone NOT NULL,
PRIMARY KEY ( serialnum )
);
CREATE TABLE certRecords (
publickey bytea NOT NULL,
id bytea NOT NULL,
update_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE injuredsegments (
path text NOT NULL,
data bytea NOT NULL,
attempted timestamp,
PRIMARY KEY ( path )
);
CREATE TABLE irreparabledbs (
segmentpath bytea NOT NULL,
segmentdetail bytea NOT NULL,
pieces_lost_count bigint NOT NULL,
seg_damaged_unix_sec bigint NOT NULL,
repair_attempt_count bigint NOT NULL,
PRIMARY KEY ( segmentpath )
);
CREATE TABLE nodes (
id bytea NOT NULL,
address text NOT NULL,
protocol integer NOT NULL,
type integer NOT NULL,
email text NOT NULL,
wallet text NOT NULL,
free_bandwidth bigint NOT NULL,
free_disk bigint NOT NULL,
major bigint NOT NULL,
minor bigint NOT NULL,
patch bigint NOT NULL,
hash text NOT NULL,
timestamp timestamp with time zone NOT NULL,
release boolean NOT NULL,
latency_90 bigint NOT NULL,
audit_success_count bigint NOT NULL,
total_audit_count bigint NOT NULL,
audit_success_ratio double precision NOT NULL,
uptime_success_count bigint NOT NULL,
total_uptime_count bigint NOT NULL,
uptime_ratio double precision NOT NULL,
created_at timestamp with time zone NOT NULL,
updated_at timestamp with time zone NOT NULL,
last_contact_success timestamp with time zone NOT NULL,
last_contact_failure timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE projects (
id bytea NOT NULL,
name text NOT NULL,
description text NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE registration_tokens (
secret bytea NOT NULL,
owner_id bytea,
project_limit integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( secret ),
UNIQUE ( owner_id )
);
CREATE TABLE serial_numbers (
id serial NOT NULL,
serial_number bytea NOT NULL,
bucket_id bytea NOT NULL,
expires_at timestamp NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE storagenode_bandwidth_rollups (
storagenode_id bytea NOT NULL,
interval_start timestamp NOT NULL,
interval_seconds integer NOT NULL,
action integer NOT NULL,
allocated bigint NOT NULL,
settled bigint NOT NULL,
PRIMARY KEY ( storagenode_id, interval_start, action )
);
CREATE TABLE storagenode_storage_tallies (
id bigserial NOT NULL,
node_id bytea NOT NULL,
interval_end_time timestamp with time zone NOT NULL,
data_total double precision NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE users (
id bytea NOT NULL,
full_name text NOT NULL,
short_name text,
email text NOT NULL,
password_hash bytea NOT NULL,
status integer NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id )
);
CREATE TABLE api_keys (
id bytea NOT NULL,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
key bytea NOT NULL,
name text NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( id ),
UNIQUE ( key ),
UNIQUE ( name, project_id )
);
CREATE TABLE project_members (
member_id bytea NOT NULL REFERENCES users( id ) ON DELETE CASCADE,
project_id bytea NOT NULL REFERENCES projects( id ) ON DELETE CASCADE,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( member_id, project_id )
);
CREATE TABLE used_serials (
serial_number_id integer NOT NULL REFERENCES serial_numbers( id ) ON DELETE CASCADE,
storage_node_id bytea NOT NULL,
PRIMARY KEY ( serial_number_id, storage_node_id )
);
CREATE INDEX bucket_name_project_id_interval_start_interval_seconds ON bucket_bandwidth_rollups ( bucket_name, project_id, interval_start, interval_seconds );
CREATE UNIQUE INDEX bucket_id_rollup ON bucket_usages ( bucket_id, rollup_end_time );
CREATE UNIQUE INDEX serial_number ON serial_numbers ( serial_number );
CREATE INDEX serial_numbers_expires_at_index ON serial_numbers ( expires_at );
CREATE INDEX storagenode_id_interval_start_interval_seconds ON storagenode_bandwidth_rollups ( storagenode_id, interval_start, interval_seconds );
---
INSERT INTO "accounting_rollups"("id", "node_id", "start_time", "put_total", "get_total", "get_audit_total", "get_repair_total", "put_repair_total", "at_rest_total") VALUES (1, E'\\367M\\177\\251]t/\\022\\256\\214\\265\\025\\224\\204:\\217\\212\\0102<\\321\\374\\020&\\271Qc\\325\\261\\354\\246\\233'::bytea, '2019-02-09 00:00:00+00', 1000, 2000, 3000, 4000, 0, 5000);
INSERT INTO "accounting_timestamps" VALUES ('LastAtRestTally', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastRollup', '0001-01-01 00:00:00+00');
INSERT INTO "accounting_timestamps" VALUES ('LastBandwidthTally', '0001-01-01 00:00:00+00');
INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001', '127.0.0.1:55516', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 5, 0, 0, 5, 0, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch');
INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '127.0.0.1:55518', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 3, 3, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch');
INSERT INTO "nodes"("id", "address", "protocol", "type", "email", "wallet", "free_bandwidth", "free_disk", "major", "minor", "patch", "hash", "timestamp", "release","latency_90", "audit_success_count", "total_audit_count", "audit_success_ratio", "uptime_success_count", "total_uptime_count", "uptime_ratio", "created_at", "updated_at", "last_contact_success", "last_contact_failure") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014', '127.0.0.1:55517', 0, 4, '', '', -1, -1, 0, 1, 0, '', 'epoch', false, 0, 0, 0, 1, 0, 0, 1, '2019-02-14 08:07:31.028103+00', '2019-02-14 08:07:31.108963+00', 'epoch', 'epoch');
INSERT INTO "projects"("id", "name", "description", "created_at") VALUES (E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, 'ProjectName', 'projects description', '2019-02-14 08:28:24.254934+00');
INSERT INTO "api_keys"("id", "project_id", "key", "name", "created_at") VALUES (E'\\334/\\302;\\225\\355O\\323\\276f\\247\\354/6\\241\\033'::bytea, E'\\022\\217/\\014\\376!K\\023\\276\\031\\311}m\\236\\205\\300'::bytea, E'\\000]\\326N \\343\\270L\\327\\027\\337\\242\\240\\322mOl\\0318\\251.P I'::bytea, 'key 2', '2019-02-14 08:28:24.267934+00');
INSERT INTO "users"("id", "full_name", "short_name", "email", "password_hash", "status", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, 'Noahson', 'William', '1email1@ukr.net', E'some_readable_hash'::bytea, 1, '2019-02-14 08:28:24.614594+00');
INSERT INTO "projects"("id", "name", "description", "created_at") VALUES (E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, 'projName1', 'Test project 1', '2019-02-14 08:28:24.636949+00');
INSERT INTO "project_members"("member_id", "project_id", "created_at") VALUES (E'\\363\\311\\033w\\222\\303Ci\\265\\343U\\303\\312\\204",'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea, '2019-02-14 08:28:24.677953+00');
INSERT INTO "bwagreements"("serialnum", "storage_node_id", "action", "total", "created_at", "expires_at", "uplink_id") VALUES ('8fc0ceaa-984c-4d52-bcf4-b5429e1e35e812FpiifDbcJkePa12jxjDEutKrfLmwzT7sz2jfVwpYqgtM8B74c', E'\\245Z[/\\333\\022\\011\\001\\036\\003\\204\\005\\032.\\206\\333E\\261\\342\\227=y,}aRaH6\\240\\370\\000'::bytea, 1, 666, '2019-02-14 15:09:54.420181+00', '2019-02-14 16:09:54+00', E'\\253Z+\\374eFm\\245$\\036\\206\\335\\247\\263\\350x\\\\\\304+\\364\\343\\364+\\276fIJQ\\361\\014\\232\\000'::bytea);
INSERT INTO "irreparabledbs" ("segmentpath", "segmentdetail", "pieces_lost_count", "seg_damaged_unix_sec", "repair_attempt_count") VALUES ('\x49616d5365676d656e746b6579696e666f30', '\x49616d5365676d656e7464657461696c696e666f30', 10, 1550159554, 10);
INSERT INTO "injuredsegments" ("path", "data") VALUES ('0', '\x0a0130120100');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('here''s/a/great/path', '\x0a136865726527732f612f67726561742f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('yet/another/cool/path', '\x0a157965742f616e6f746865722f636f6f6c2f70617468120a0102030405060708090a');
INSERT INTO "injuredsegments" ("path", "data") VALUES ('so/many/iconic/paths/to/choose/from', '\x0a23736f2f6d616e792f69636f6e69632f70617468732f746f2f63686f6f73652f66726f6d120a0102030405060708090a');
INSERT INTO "certrecords" VALUES (E'0Y0\\023\\006\\007*\\206H\\316=\\002\\001\\006\\010*\\206H\\316=\\003\\001\\007\\003B\\000\\004\\360\\267\\227\\377\\253u\\222\\337Y\\324C:GQ\\010\\277v\\010\\315D\\271\\333\\337.\\203\\023=C\\343\\014T%6\\027\\362?\\214\\326\\017U\\334\\000\\260\\224\\260J\\221\\304\\331F\\304\\221\\236zF,\\325\\326l\\215\\306\\365\\200\\022', E'L\\301|\\200\\247}F|1\\320\\232\\037n\\335\\241\\206\\244\\242\\207\\204.\\253\\357\\326\\352\\033Dt\\202`\\022\\325', '2019-02-14 08:07:31.335028+00');
INSERT INTO "bucket_usages" ("id", "bucket_id", "rollup_end_time", "remote_stored_data", "inline_stored_data", "remote_segments", "inline_segments", "objects", "metadata_size", "repair_egress", "get_egress", "audit_egress") VALUES (E'\\153\\313\\233\\074\\327\\177\\136\\070\\346\\001",'::bytea, E'\\366\\146\\032\\321\\316\\161\\070\\133\\302\\271",'::bytea, '2019-03-06 08:28:24.677953+00', 10, 11, 12, 13, 14, 15, 16, 17, 18);
INSERT INTO "registration_tokens" ("secret", "owner_id", "project_limit", "created_at") VALUES (E'\\070\\127\\144\\013\\332\\344\\102\\376\\306\\056\\303\\130\\106\\132\\321\\276\\321\\274\\170\\264\\054\\333\\221\\116\\154\\221\\335\\070\\220\\146\\344\\216'::bytea, null, 1, '2019-02-14 08:28:24.677953+00');
INSERT INTO "serial_numbers" ("id", "serial_number", "bucket_id", "expires_at") VALUES (1, E'0123456701234567'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014/testbucket'::bytea, '2019-03-06 08:28:24.677953+00');
INSERT INTO "used_serials" ("serial_number_id", "storage_node_id") VALUES (1, E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n');
INSERT INTO "storagenode_bandwidth_rollups" ("storagenode_id", "interval_start", "interval_seconds", "action", "allocated", "settled") VALUES (E'\\006\\223\\250R\\221\\005\\365\\377v>0\\266\\365\\216\\255?\\347\\244\\371?2\\264\\262\\230\\007<\\001\\262\\263\\237\\247n', '2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024);
INSERT INTO "storagenode_storage_tallies" VALUES (1, E'\\3510\\323\\225"~\\036<\\342\\330m\\0253Jhr\\246\\233K\\246#\\2303\\351\\256\\275j\\212UM\\362\\207', '2019-02-14 08:16:57.812849+00', 1000);
INSERT INTO "bucket_bandwidth_rollups" ("bucket_name", "project_id", "interval_start", "interval_seconds", "action", "inline", "allocated", "settled") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 3600, 1, 1024, 2024, 3024);
INSERT INTO "bucket_storage_tallies" ("bucket_name", "project_id", "interval_start", "inline", "remote", "remote_segments_count", "inline_segments_count", "object_count", "metadata_size") VALUES (E'testbucket'::bytea, E'\\363\\342\\363\\371>+F\\256\\263\\300\\273|\\342N\\347\\014'::bytea,'2019-03-06 08:00:00.000000+00', 4024, 5024, 0, 0, 0, 0);
-- NEW DATA --