cmd/satellite/reports: sum attribution data by user agent

Attribution is attached to bucket usage, but that's more granular than
necessary for the attribution report. This change iterates over the
bucket attributions, parses the user agent, converts the first entry
to lower case, and uses that as the key to a map which holds the
attribution totals for each unique user agent.

Change-Id: Ib2962ba0f57daa8a7298f11fcb1ac44a8bb97875
This commit is contained in:
Cameron 2022-04-12 14:01:14 -04:00 committed by Cameron
parent d7cd3a0ff8
commit 48fb3e947c
4 changed files with 189 additions and 37 deletions

View File

@ -10,21 +10,20 @@ import (
"io"
"os"
"strconv"
"strings"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/uuid"
"storj.io/common/useragent"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/satellitedb"
)
var headers = []string{
"userAgent",
"projectID",
"bucketName",
"gbHours",
"segmentHours",
"objectHours",
@ -32,10 +31,20 @@ var headers = []string{
"gbEgress",
}
// UserAgentAttributions is a map of attribution totals per user agent.
type UserAgentAttributions map[string]attributionTotal
type attributionTotal struct {
ByteHours float64
SegmentHours float64
ObjectHours float64
BucketHours float64
BytesEgress int64
}
// GenerateAttributionCSV creates a report with.
func GenerateAttributionCSV(ctx context.Context, database string, start time.Time, end time.Time, output io.Writer) error {
log := zap.L().Named("db")
db, err := satellitedb.Open(ctx, log, database, satellitedb.Options{ApplicationName: "satellite-attribution"})
log := zap.L().Named("attribution-report")
db, err := satellitedb.Open(ctx, log.Named("db"), database, satellitedb.Options{ApplicationName: "satellite-attribution"})
if err != nil {
return errs.New("error connecting to master database on satellite: %+v", err)
}
@ -51,6 +60,8 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
return errs.Wrap(err)
}
partnerAttributionTotals := SumAttributionByUserAgent(rows, log)
w := csv.NewWriter(output)
defer func() {
w.Flush()
@ -59,11 +70,16 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
if err := w.Write(headers); err != nil {
return errs.Wrap(err)
}
for _, row := range rows {
record, err := csvRowToStringSlice(row)
if err != nil {
return errs.Wrap(err)
for userAgent, totals := range partnerAttributionTotals {
gbHours := memory.Size(totals.ByteHours).GB()
egressGBData := memory.Size(totals.BytesEgress).GB()
record := []string{
userAgent,
strconv.FormatFloat(gbHours, 'f', 4, 64),
strconv.FormatFloat(totals.SegmentHours, 'f', 4, 64),
strconv.FormatFloat(totals.ObjectHours, 'f', 4, 64),
strconv.FormatFloat(totals.BucketHours, 'f', 4, 64),
strconv.FormatFloat(egressGBData, 'f', 4, 64),
}
if err := w.Write(record); err != nil {
return errs.Wrap(err)
@ -79,22 +95,44 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
return errs.Wrap(err)
}
func csvRowToStringSlice(p *attribution.CSVRow) ([]string, error) {
projectID, err := uuid.FromBytes(p.ProjectID)
if err != nil {
return nil, errs.New("Invalid Project ID")
// SumAttributionByUserAgent sums all bucket attribution by the first entry in the user agent.
func SumAttributionByUserAgent(rows []*attribution.BucketAttribution, log *zap.Logger) UserAgentAttributions {
partnerAttributionTotals := make(map[string]attributionTotal)
userAgentParseFailures := make(map[string]bool)
for _, row := range rows {
userAgentEntries, err := useragent.ParseEntries(row.UserAgent)
// also check the length of user agent for sanity.
// If the length of user agent is zero the parse method will not return an error.
if err != nil || len(row.UserAgent) == 0 {
if _, ok := userAgentParseFailures[string(row.UserAgent)]; !ok {
userAgentParseFailures[string(row.UserAgent)] = true
log.Error("error while parsing user agent", zap.String("user agent", string(row.UserAgent)), zap.Error(err))
}
continue
}
userAgent := strings.ToLower(userAgentEntries[0].Product)
if _, ok := partnerAttributionTotals[userAgent]; !ok {
partnerAttributionTotals[userAgent] = attributionTotal{
ByteHours: row.ByteHours,
SegmentHours: row.SegmentHours,
ObjectHours: row.ObjectHours,
BucketHours: float64(row.Hours),
BytesEgress: row.EgressData,
}
} else {
partnerTotal := partnerAttributionTotals[userAgent]
partnerTotal.ByteHours += row.ByteHours
partnerTotal.SegmentHours += row.SegmentHours
partnerTotal.ObjectHours += row.ObjectHours
partnerTotal.BucketHours += float64(row.Hours)
partnerTotal.BytesEgress += row.EgressData
partnerAttributionTotals[userAgent] = partnerTotal
}
}
gbHours := memory.Size(p.ByteHours).GB()
egressGBData := memory.Size(p.EgressData).GB()
record := []string{
string(p.UserAgent),
projectID.String(),
string(p.BucketName),
strconv.FormatFloat(gbHours, 'f', 4, 64),
strconv.FormatFloat(p.SegmentHours, 'f', 4, 64),
strconv.FormatFloat(p.ObjectHours, 'f', 4, 64),
strconv.FormatFloat(float64(p.Hours), 'f', 4, 64),
strconv.FormatFloat(egressGBData, 'f', 4, 64),
}
return record, nil
return partnerAttributionTotals
}

View File

@ -0,0 +1,114 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package reports_test
import (
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/storj/cmd/satellite/reports"
"storj.io/storj/satellite/attribution"
)
func TestSumAttributionByUserAgent(t *testing.T) {
log := zaptest.NewLogger(t)
// test empty user agents
attributions := []*attribution.BucketAttribution{
{
UserAgent: []byte{},
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
}
totals := reports.SumAttributionByUserAgent(attributions, log)
require.Equal(t, 0, len(totals))
// test user agent with additional entries and uppercase letters is summed with
// the first one
attributions = []*attribution.BucketAttribution{
{
UserAgent: []byte("teststorj"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("TESTSTORJ/other"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
}
totals = reports.SumAttributionByUserAgent(attributions, log)
require.Equal(t, 1, len(totals))
require.Equal(t, float64(2), totals["teststorj"].ByteHours)
require.Equal(t, float64(2), totals["teststorj"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj"].BucketHours)
require.Equal(t, int64(2), totals["teststorj"].BytesEgress)
// test two user agents are summed separately
attributions = []*attribution.BucketAttribution{
{
UserAgent: []byte("teststorj1"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj1"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj2"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj2"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
}
totals = reports.SumAttributionByUserAgent(attributions, log)
require.Equal(t, 2, len(totals))
require.Equal(t, float64(2), totals["teststorj1"].ByteHours)
require.Equal(t, float64(2), totals["teststorj1"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj1"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj1"].BucketHours)
require.Equal(t, int64(2), totals["teststorj1"].BytesEgress)
require.Equal(t, float64(2), totals["teststorj2"].ByteHours)
require.Equal(t, float64(2), totals["teststorj2"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj2"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj2"].BucketHours)
require.Equal(t, int64(2), totals["teststorj2"].BytesEgress)
}

View File

@ -25,8 +25,8 @@ type Info struct {
CreatedAt time.Time
}
// CSVRow represents data from QueryAttribution without exposing dbx.
type CSVRow struct {
// BucketAttribution is attribution data for a single bucket.
type BucketAttribution struct {
PartnerID []byte
UserAgent []byte
ProjectID []byte
@ -47,7 +47,7 @@ type DB interface {
// Insert creates and stores new Info.
Insert(ctx context.Context, info *Info) (*Info, error)
// QueryAttribution queries partner bucket attribution data.
QueryAttribution(ctx context.Context, partnerID uuid.UUID, userAgent []byte, start time.Time, end time.Time) ([]*CSVRow, error)
QueryAttribution(ctx context.Context, partnerID uuid.UUID, userAgent []byte, start time.Time, end time.Time) ([]*BucketAttribution, error)
// QueryAllAttribution queries all partner bucket attribution data.
QueryAllAttribution(ctx context.Context, start time.Time, end time.Time) ([]*CSVRow, error)
QueryAllAttribution(ctx context.Context, start time.Time, end time.Time) ([]*BucketAttribution, error)
}

View File

@ -283,7 +283,7 @@ func (keys *attributionDB) Insert(ctx context.Context, info *attribution.Info) (
}
// QueryAttribution queries partner bucket attribution data.
func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.UUID, userAgent []byte, start time.Time, end time.Time) (_ []*attribution.CSVRow, err error) {
func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.UUID, userAgent []byte, start time.Time, end time.Time) (_ []*attribution.BucketAttribution, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := keys.db.DB.QueryContext(ctx, keys.db.Rebind(valueAttrQuery), partnerID[:], userAgent, start.UTC(), end.UTC(), partnerID[:], userAgent, start.UTC(), end.UTC())
@ -292,9 +292,9 @@ func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.
}
defer func() { err = errs.Combine(err, rows.Close()) }()
results := []*attribution.CSVRow{}
results := []*attribution.BucketAttribution{}
for rows.Next() {
r := &attribution.CSVRow{}
r := &attribution.BucketAttribution{}
var inline, remote float64
err := rows.Scan(&r.PartnerID, &r.UserAgent, &r.ProjectID, &r.BucketName, &r.ByteHours, &inline, &remote, &r.SegmentHours, &r.ObjectHours, &r.EgressData, &r.Hours)
if err != nil {
@ -311,7 +311,7 @@ func (keys *attributionDB) QueryAttribution(ctx context.Context, partnerID uuid.
}
// QueryAllAttribution queries all partner bucket attribution data.
func (keys *attributionDB) QueryAllAttribution(ctx context.Context, start time.Time, end time.Time) (_ []*attribution.CSVRow, err error) {
func (keys *attributionDB) QueryAllAttribution(ctx context.Context, start time.Time, end time.Time) (_ []*attribution.BucketAttribution, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := keys.db.DB.QueryContext(ctx, keys.db.Rebind(allValueAttrQuery), start.UTC(), end.UTC())
@ -320,9 +320,9 @@ func (keys *attributionDB) QueryAllAttribution(ctx context.Context, start time.T
}
defer func() { err = errs.Combine(err, rows.Close()) }()
results := []*attribution.CSVRow{}
results := []*attribution.BucketAttribution{}
for rows.Next() {
r := &attribution.CSVRow{}
r := &attribution.BucketAttribution{}
var inline, remote float64
err := rows.Scan(&r.PartnerID, &r.UserAgent, &r.ProjectID, &r.BucketName, &r.ByteHours, &inline, &remote, &r.SegmentHours, &r.ObjectHours, &r.EgressData, &r.Hours)
if err != nil {