2019-06-19 13:02:37 +01:00
|
|
|
// Copyright (C) 2019 Storj Labs, Inc.
|
|
|
|
// See LICENSE for copying information.
|
|
|
|
|
|
|
|
package satellitedb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-06-21 20:14:34 +01:00
|
|
|
"database/sql"
|
2019-06-25 21:58:38 +01:00
|
|
|
"time"
|
2019-06-19 13:02:37 +01:00
|
|
|
|
|
|
|
"github.com/skyrings/skyring-common/tools/uuid"
|
2019-06-25 21:58:38 +01:00
|
|
|
"github.com/zeebo/errs"
|
2019-06-19 13:02:37 +01:00
|
|
|
|
2019-12-16 17:59:01 +00:00
|
|
|
"storj.io/storj/private/dbutil"
|
2019-06-19 13:02:37 +01:00
|
|
|
"storj.io/storj/satellite/attribution"
|
|
|
|
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
|
|
|
)
|
|
|
|
|
2019-06-25 21:58:38 +01:00
|
|
|
const (
|
|
|
|
valueAttrQuery = `
|
|
|
|
-- A union of both the storage tally and bandwidth rollups.
|
|
|
|
-- Should be 1 row per project/bucket by partner within the timeframe specified
|
|
|
|
SELECT
|
|
|
|
o.partner_id as partner_id,
|
|
|
|
o.project_id as project_id,
|
|
|
|
o.bucket_name as bucket_name,
|
|
|
|
SUM(o.remote) / SUM(o.hours) as remote,
|
|
|
|
SUM(o.inline) / SUM(o.hours) as inline,
|
|
|
|
SUM(o.settled) as settled
|
|
|
|
FROM
|
|
|
|
(
|
|
|
|
-- SUM the storage and hours
|
|
|
|
-- Hours are used to calculate byte hours above
|
|
|
|
SELECT
|
|
|
|
bsti.partner_id as partner_id,
|
|
|
|
bsto.project_id as project_id,
|
|
|
|
bsto.bucket_name as bucket_name,
|
|
|
|
SUM(bsto.remote) as remote,
|
|
|
|
SUM(bsto.inline) as inline,
|
|
|
|
0 as settled,
|
|
|
|
count(1) as hours
|
|
|
|
FROM
|
|
|
|
(
|
|
|
|
-- Collapse entries by the latest record in the hour
|
|
|
|
-- If there are more than 1 records within the hour, only the latest will be considered
|
|
|
|
SELECT
|
|
|
|
va.partner_id,
|
2019-10-18 22:27:57 +01:00
|
|
|
date_trunc('hour', bst.interval_start) as hours,
|
2019-06-25 21:58:38 +01:00
|
|
|
bst.project_id,
|
|
|
|
bst.bucket_name,
|
|
|
|
MAX(bst.interval_start) as max_interval
|
|
|
|
FROM
|
|
|
|
bucket_storage_tallies bst
|
|
|
|
LEFT OUTER JOIN value_attributions va ON (
|
|
|
|
bst.project_id = va.project_id
|
|
|
|
AND bst.bucket_name = va.bucket_name
|
|
|
|
)
|
|
|
|
WHERE
|
|
|
|
va.partner_id = ?
|
|
|
|
AND bst.interval_start >= ?
|
|
|
|
AND bst.interval_start < ?
|
|
|
|
GROUP BY
|
|
|
|
va.partner_id,
|
|
|
|
bst.project_id,
|
|
|
|
bst.bucket_name,
|
2019-12-03 05:20:20 +00:00
|
|
|
date_trunc('hour', bst.interval_start)
|
2019-06-25 21:58:38 +01:00
|
|
|
ORDER BY
|
|
|
|
max_interval DESC
|
|
|
|
) bsti
|
|
|
|
LEFT JOIN bucket_storage_tallies bsto ON (
|
|
|
|
bsto.project_id = bsti.project_id
|
|
|
|
AND bsto.bucket_name = bsti.bucket_name
|
|
|
|
AND bsto.interval_start = bsti.max_interval
|
|
|
|
)
|
|
|
|
GROUP BY
|
|
|
|
bsti.partner_id,
|
|
|
|
bsto.project_id,
|
|
|
|
bsto.bucket_name
|
|
|
|
UNION
|
|
|
|
-- SUM the bandwidth for the timeframe specified grouping by the partner_id, project_id, and bucket_name
|
|
|
|
SELECT
|
|
|
|
va.partner_id as partner_id,
|
|
|
|
bbr.project_id as project_id,
|
|
|
|
bbr.bucket_name as bucket_name,
|
|
|
|
0 as remote,
|
|
|
|
0 as inline,
|
2019-12-03 05:20:20 +00:00
|
|
|
SUM(settled)::integer as settled,
|
2019-06-25 21:58:38 +01:00
|
|
|
NULL as hours
|
|
|
|
FROM
|
|
|
|
bucket_bandwidth_rollups bbr
|
|
|
|
LEFT OUTER JOIN value_attributions va ON (
|
|
|
|
bbr.project_id = va.project_id
|
|
|
|
AND bbr.bucket_name = va.bucket_name
|
|
|
|
)
|
|
|
|
WHERE
|
|
|
|
va.partner_id = ?
|
|
|
|
AND bbr.interval_start >= ?
|
|
|
|
AND bbr.interval_start < ?
|
|
|
|
AND bbr.action = 2
|
|
|
|
GROUP BY
|
|
|
|
va.partner_id,
|
|
|
|
bbr.project_id,
|
|
|
|
bbr.bucket_name
|
|
|
|
) AS o
|
|
|
|
GROUP BY
|
|
|
|
o.partner_id,
|
|
|
|
o.project_id,
|
|
|
|
o.bucket_name;
|
|
|
|
`
|
|
|
|
)
|
|
|
|
|
2019-06-19 13:02:37 +01:00
|
|
|
type attributionDB struct {
|
2019-12-14 02:29:54 +00:00
|
|
|
db *satelliteDB
|
2019-06-19 13:02:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get reads the partner info
|
|
|
|
func (keys *attributionDB) Get(ctx context.Context, projectID uuid.UUID, bucketName []byte) (info *attribution.Info, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
dbxInfo, err := keys.db.Get_ValueAttribution_By_ProjectId_And_BucketName(ctx,
|
|
|
|
dbx.ValueAttribution_ProjectId(projectID[:]),
|
|
|
|
dbx.ValueAttribution_BucketName(bucketName),
|
|
|
|
)
|
2019-06-21 20:14:34 +01:00
|
|
|
if err == sql.ErrNoRows {
|
2019-08-21 17:30:29 +01:00
|
|
|
return nil, attribution.ErrBucketNotAttributed.New("%q", bucketName)
|
2019-06-21 20:14:34 +01:00
|
|
|
}
|
2019-06-19 13:02:37 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return attributionFromDBX(dbxInfo)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Insert implements create partner info
|
|
|
|
func (keys *attributionDB) Insert(ctx context.Context, info *attribution.Info) (_ *attribution.Info, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
|
|
|
dbxInfo, err := keys.db.Create_ValueAttribution(ctx,
|
|
|
|
dbx.ValueAttribution_ProjectId(info.ProjectID[:]),
|
|
|
|
dbx.ValueAttribution_BucketName(info.BucketName),
|
|
|
|
dbx.ValueAttribution_PartnerId(info.PartnerID[:]),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return attributionFromDBX(dbxInfo)
|
|
|
|
}
|
|
|
|
|
2019-06-25 21:58:38 +01:00
|
|
|
// QueryAttribution queries partner bucket attribution data
|
|
|
|
func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.UUID, start time.Time, end time.Time) (_ []*attribution.CSVRow, err error) {
|
|
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
|
2019-10-18 22:27:57 +01:00
|
|
|
rows, err := keys.db.DB.QueryContext(ctx, keys.db.Rebind(valueAttrQuery), partnerID[:], start.UTC(), end.UTC(), partnerID[:], start.UTC(), end.UTC())
|
2019-06-25 21:58:38 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() { err = errs.Combine(err, rows.Close()) }()
|
2019-08-22 12:40:15 +01:00
|
|
|
results := []*attribution.CSVRow{}
|
2019-06-25 21:58:38 +01:00
|
|
|
for rows.Next() {
|
|
|
|
r := &attribution.CSVRow{}
|
|
|
|
err := rows.Scan(&r.PartnerID, &r.ProjectID, &r.BucketName, &r.RemoteBytesPerHour, &r.InlineBytesPerHour, &r.EgressData)
|
|
|
|
if err != nil {
|
|
|
|
return results, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
results = append(results, r)
|
|
|
|
}
|
|
|
|
return results, nil
|
|
|
|
}
|
|
|
|
|
2019-06-19 13:02:37 +01:00
|
|
|
func attributionFromDBX(info *dbx.ValueAttribution) (*attribution.Info, error) {
|
2019-12-16 17:59:01 +00:00
|
|
|
partnerID, err := dbutil.BytesToUUID(info.PartnerId)
|
2019-06-19 13:02:37 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
2019-12-16 17:59:01 +00:00
|
|
|
projectID, err := dbutil.BytesToUUID(info.ProjectId)
|
2019-06-19 13:02:37 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, Error.Wrap(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &attribution.Info{
|
|
|
|
ProjectID: projectID,
|
|
|
|
BucketName: info.BucketName,
|
|
|
|
PartnerID: partnerID,
|
|
|
|
CreatedAt: info.LastUpdated,
|
|
|
|
}, nil
|
|
|
|
}
|