storj/satellite/satellitedb/attribution.go
Egon Elbre 00b2e1a7d7 all: enable staticcheck (#2849)
* by having megacheck in disable it also disabled staticcheck

* fix closing body

* keep interfacer disabled

* hide bodies

* don't use deprecated func

* fix dead code

* fix potential overrun

* keep stylecheck disabled

* don't pass nil as context

* fix infinite recursion

* remove extraneous return

* fix data race

* use correct func

* ignore unused var

* remove unused consts
2019-08-22 13:40:15 +02:00

204 lines
5.5 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
sqlite3 "github.com/mattn/go-sqlite3"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
"storj.io/storj/satellite/attribution"
dbx "storj.io/storj/satellite/satellitedb/dbx"
)
const (
valueAttrQuery = `
-- A union of both the storage tally and bandwidth rollups.
-- Should be 1 row per project/bucket by partner within the timeframe specified
SELECT
o.partner_id as partner_id,
o.project_id as project_id,
o.bucket_name as bucket_name,
SUM(o.remote) / SUM(o.hours) as remote,
SUM(o.inline) / SUM(o.hours) as inline,
SUM(o.settled) as settled
FROM
(
-- SUM the storage and hours
-- Hours are used to calculate byte hours above
SELECT
bsti.partner_id as partner_id,
bsto.project_id as project_id,
bsto.bucket_name as bucket_name,
SUM(bsto.remote) as remote,
SUM(bsto.inline) as inline,
0 as settled,
count(1) as hours
FROM
(
-- Collapse entries by the latest record in the hour
-- If there are more than 1 records within the hour, only the latest will be considered
SELECT
va.partner_id,
%v as hours,
bst.project_id,
bst.bucket_name,
MAX(bst.interval_start) as max_interval
FROM
bucket_storage_tallies bst
LEFT OUTER JOIN value_attributions va ON (
bst.project_id = va.project_id
AND bst.bucket_name = va.bucket_name
)
WHERE
va.partner_id = ?
AND bst.interval_start >= ?
AND bst.interval_start < ?
GROUP BY
va.partner_id,
bst.project_id,
bst.bucket_name,
hours
ORDER BY
max_interval DESC
) bsti
LEFT JOIN bucket_storage_tallies bsto ON (
bsto.project_id = bsti.project_id
AND bsto.bucket_name = bsti.bucket_name
AND bsto.interval_start = bsti.max_interval
)
GROUP BY
bsti.partner_id,
bsto.project_id,
bsto.bucket_name
UNION
-- SUM the bandwidth for the timeframe specified grouping by the partner_id, project_id, and bucket_name
SELECT
va.partner_id as partner_id,
bbr.project_id as project_id,
bbr.bucket_name as bucket_name,
0 as remote,
0 as inline,
SUM(settled) as settled,
NULL as hours
FROM
bucket_bandwidth_rollups bbr
LEFT OUTER JOIN value_attributions va ON (
bbr.project_id = va.project_id
AND bbr.bucket_name = va.bucket_name
)
WHERE
va.partner_id = ?
AND bbr.interval_start >= ?
AND bbr.interval_start < ?
AND bbr.action = 2
GROUP BY
va.partner_id,
bbr.project_id,
bbr.bucket_name
) AS o
GROUP BY
o.partner_id,
o.project_id,
o.bucket_name;
`
// DB specific date/time truncations
slHour = "datetime(strftime('%Y-%m-%dT%H:00:00', bst.interval_start))"
pqHour = "date_trunc('hour', bst.interval_start)"
)
type attributionDB struct {
db *dbx.DB
}
// Get reads the partner info
func (keys *attributionDB) Get(ctx context.Context, projectID uuid.UUID, bucketName []byte) (info *attribution.Info, err error) {
defer mon.Task()(&ctx)(&err)
dbxInfo, err := keys.db.Get_ValueAttribution_By_ProjectId_And_BucketName(ctx,
dbx.ValueAttribution_ProjectId(projectID[:]),
dbx.ValueAttribution_BucketName(bucketName),
)
if err == sql.ErrNoRows {
return nil, attribution.ErrBucketNotAttributed.New("%q", bucketName)
}
if err != nil {
return nil, Error.Wrap(err)
}
return attributionFromDBX(dbxInfo)
}
// Insert implements create partner info
func (keys *attributionDB) Insert(ctx context.Context, info *attribution.Info) (_ *attribution.Info, err error) {
defer mon.Task()(&ctx)(&err)
dbxInfo, err := keys.db.Create_ValueAttribution(ctx,
dbx.ValueAttribution_ProjectId(info.ProjectID[:]),
dbx.ValueAttribution_BucketName(info.BucketName),
dbx.ValueAttribution_PartnerId(info.PartnerID[:]),
)
if err != nil {
return nil, Error.Wrap(err)
}
return attributionFromDBX(dbxInfo)
}
// QueryAttribution queries partner bucket attribution data
func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.UUID, start time.Time, end time.Time) (_ []*attribution.CSVRow, err error) {
defer mon.Task()(&ctx)(&err)
var query string
switch t := keys.db.Driver().(type) {
case *sqlite3.SQLiteDriver:
query = fmt.Sprintf(valueAttrQuery, slHour)
case *pq.Driver:
query = fmt.Sprintf(valueAttrQuery, pqHour)
default:
return nil, Error.New("Unsupported database %t", t)
}
rows, err := keys.db.DB.QueryContext(ctx, keys.db.Rebind(query), partnerID[:], start.UTC(), end.UTC(), partnerID[:], start.UTC(), end.UTC())
if err != nil {
return nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
results := []*attribution.CSVRow{}
for rows.Next() {
r := &attribution.CSVRow{}
err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.RemoteBytesPerHour, &r.InlineBytesPerHour, &r.EgressData)
if err != nil {
return results, Error.Wrap(err)
}
results = append(results, r)
}
return results, nil
}
func attributionFromDBX(info *dbx.ValueAttribution) (*attribution.Info, error) {
partnerID, err := bytesToUUID(info.PartnerId)
if err != nil {
return nil, Error.Wrap(err)
}
projectID, err := bytesToUUID(info.ProjectId)
if err != nil {
return nil, Error.Wrap(err)
}
return &attribution.Info{
ProjectID: projectID,
BucketName: info.BucketName,
PartnerID: partnerID,
CreatedAt: info.LastUpdated,
}, nil
}