cmd/satellite: add project ID, bucket name to partner attribution CSV

This change adds project ID and bucket name columns to the generated
partner attribution report. Attribution values are now summed based on
their project ID and bucket name in addition to their user agent.
Additionally, the command to generate the attribution report has been
modified to optionally include only certain user agents.

Change-Id: I61a1d854379134f26b31467d9e83a787beb451dd
This commit is contained in:
Jeremy Wharton 2022-05-25 18:31:42 -05:00 committed by Maximillian von Briesen
parent a61f0f6be3
commit dba28f1d61
3 changed files with 131 additions and 89 deletions

View File

@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"text/tabwriter"
"time"
@ -137,9 +138,9 @@ var (
RunE: cmdNodeUsage,
}
partnerAttributionCmd = &cobra.Command{
Use: "partner-attribution [start] [end]",
Use: "partner-attribution [start] [end] [user-agent,...]",
Short: "Generate a partner attribution report for a given period to use for payments",
Long: "Generate a partner attribution report for a given period to use for payments. Format dates using YYYY-MM-DD. The end date is exclusive.",
Long: "Generate a partner attribution report for a given period to use for payments. Format dates using YYYY-MM-DD. The end date is exclusive. Optionally filter using a comma-separated list of user agents.",
Args: cobra.MinimumNArgs(2),
RunE: cmdValueAttribution,
}
@ -675,9 +676,14 @@ func cmdValueAttribution(cmd *cobra.Command, args []string) (err error) {
return err
}
var userAgents []string
if len(args) > 2 {
userAgents = strings.Split(args[2], ",")
}
// send output to stdout
if partnerAttribtionCfg.Output == "" {
return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, os.Stdout)
return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, userAgents, os.Stdout)
}
// send output to file
@ -696,7 +702,7 @@ func cmdValueAttribution(cmd *cobra.Command, args []string) (err error) {
}
}()
return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, file)
return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, userAgents, file)
}
func cmdPrepareCustomerInvoiceRecords(cmd *cobra.Command, args []string) (err error) {

View File

@ -18,12 +18,15 @@ import (
"storj.io/common/memory"
"storj.io/common/useragent"
"storj.io/common/uuid"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/satellitedb"
)
var headers = []string{
"userAgent",
"projectID",
"bucketName",
"gbHours",
"segmentHours",
"objectHours",
@ -31,8 +34,15 @@ var headers = []string{
"gbEgress",
}
// AttributionTotals is a map of attribution totals per user agent.
type AttributionTotals map[string]Total
// AttributionTotals is a map of attribution totals per user agent, project ID, and bucket name combination.
type AttributionTotals map[AttributionTotalsIndex]Total
// AttributionTotalsIndex is a key into AttributionTotals.
type AttributionTotalsIndex struct {
UserAgent string
ProjectID string
BucketName string
}
// Total is the total attributable usage for a user agent over a period of time.
type Total struct {
@ -44,7 +54,7 @@ type Total struct {
}
// GenerateAttributionCSV creates a report with.
func GenerateAttributionCSV(ctx context.Context, database string, start time.Time, end time.Time, output io.Writer) error {
func GenerateAttributionCSV(ctx context.Context, database string, start time.Time, end time.Time, userAgents []string, output io.Writer) error {
log := zap.L().Named("attribution-report")
db, err := satellitedb.Open(ctx, log.Named("db"), database, satellitedb.Options{ApplicationName: "satellite-attribution"})
if err != nil {
@ -62,7 +72,7 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
return errs.Wrap(err)
}
partnerAttributionTotals := SumAttributionByUserAgent(rows, log)
partnerAttributionTotals := ProcessAttributions(rows, userAgents, log)
w := csv.NewWriter(output)
defer func() {
@ -72,11 +82,13 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
if err := w.Write(headers); err != nil {
return errs.Wrap(err)
}
for userAgent, totals := range partnerAttributionTotals {
for idx, totals := range partnerAttributionTotals {
gbHours := memory.Size(totals.ByteHours).GB()
egressGBData := memory.Size(totals.BytesEgress).GB()
record := []string{
userAgent,
idx.UserAgent,
idx.ProjectID,
idx.BucketName,
strconv.FormatFloat(gbHours, 'f', 4, 64),
strconv.FormatFloat(totals.SegmentHours, 'f', 4, 64),
strconv.FormatFloat(totals.ObjectHours, 'f', 4, 64),
@ -97,9 +109,9 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim
return errs.Wrap(err)
}
// SumAttributionByUserAgent sums all bucket attribution by the first entry in the user agent.
func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger) AttributionTotals {
attributionTotals := make(map[string]Total)
// ProcessAttributions sums all bucket attribution by the first entry in the user agent, project ID, and bucket name.
func ProcessAttributions(rows []*attribution.BucketUsage, userAgents []string, log *zap.Logger) AttributionTotals {
attributionTotals := make(AttributionTotals)
userAgentParseFailures := make(map[string]bool)
for _, row := range rows {
@ -115,9 +127,33 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger)
}
userAgent := strings.ToLower(userAgentEntries[0].Product)
if userAgents != nil {
uaFound := false
for _, allowed := range userAgents {
if userAgent == strings.ToLower(allowed) {
uaFound = true
break
}
}
if !uaFound {
continue
}
}
if _, ok := attributionTotals[userAgent]; !ok {
attributionTotals[userAgent] = Total{
projID, err := uuid.FromBytes(row.ProjectID)
if err != nil {
log.Error("malformed project ID", zap.String("project ID", fmt.Sprintf("%x", projID)), zap.Error(err))
continue
}
idx := AttributionTotalsIndex{
UserAgent: userAgent,
ProjectID: projID.String(),
BucketName: string(row.BucketName),
}
if _, ok := attributionTotals[idx]; !ok {
attributionTotals[idx] = Total{
ByteHours: row.ByteHours,
SegmentHours: row.SegmentHours,
ObjectHours: row.ObjectHours,
@ -125,7 +161,7 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger)
BytesEgress: row.EgressData,
}
} else {
partnerTotal := attributionTotals[userAgent]
partnerTotal := attributionTotals[idx]
partnerTotal.ByteHours += row.ByteHours
partnerTotal.SegmentHours += row.SegmentHours
@ -133,7 +169,7 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger)
partnerTotal.BucketHours += float64(row.Hours)
partnerTotal.BytesEgress += row.EgressData
attributionTotals[userAgent] = partnerTotal
attributionTotals[idx] = partnerTotal
}
}
return attributionTotals

View File

@ -9,22 +9,41 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"storj.io/common/uuid"
"storj.io/storj/cmd/satellite/reports"
"storj.io/storj/satellite/attribution"
)
func TestSumAttributionByUserAgent(t *testing.T) {
func TestProcessAttributions(t *testing.T) {
log := zaptest.NewLogger(t)
// test empty user agents
attributions := []*attribution.BucketUsage{
{
UserAgent: []byte{},
requireSum := func(total reports.Total, n int) {
require.Equal(t, float64(n), total.ByteHours)
require.Equal(t, float64(n), total.SegmentHours)
require.Equal(t, float64(n), total.ObjectHours)
require.Equal(t, float64(n), total.BucketHours)
require.Equal(t, int64(n), total.BytesEgress)
}
newUsage := func(userAgent string, projectID uuid.UUID, bucketName string) *attribution.BucketUsage {
return &attribution.BucketUsage{
UserAgent: []byte(userAgent),
ProjectID: projectID.Bytes(),
BucketName: []byte(bucketName),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
}
}
id, err := uuid.New()
require.NoError(t, err)
// test empty user agents
attributions := []*attribution.BucketUsage{
newUsage("", id, ""),
{
ByteHours: 1,
SegmentHours: 1,
@ -33,82 +52,63 @@ func TestSumAttributionByUserAgent(t *testing.T) {
EgressData: 1,
},
}
totals := reports.SumAttributionByUserAgent(attributions, log)
totals := reports.ProcessAttributions(attributions, nil, log)
require.Equal(t, 0, len(totals))
// test user agent with additional entries and uppercase letters is summed with
// the first one
attributions = []*attribution.BucketUsage{
{
UserAgent: []byte("teststorj"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("TESTSTORJ/other"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
newUsage("teststorj", id, ""),
newUsage("TESTSTORJ/other", id, ""),
}
totals = reports.SumAttributionByUserAgent(attributions, log)
totals = reports.ProcessAttributions(attributions, nil, log)
require.Equal(t, 1, len(totals))
require.Equal(t, float64(2), totals["teststorj"].ByteHours)
require.Equal(t, float64(2), totals["teststorj"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj"].BucketHours)
require.Equal(t, int64(2), totals["teststorj"].BytesEgress)
requireSum(totals[reports.AttributionTotalsIndex{"teststorj", id.String(), ""}], 2)
// test two user agents are summed separately
attributions = []*attribution.BucketUsage{
{
UserAgent: []byte("teststorj1"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj1"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj2"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
{
UserAgent: []byte("teststorj2"),
ByteHours: 1,
SegmentHours: 1,
ObjectHours: 1,
Hours: 1,
EgressData: 1,
},
newUsage("teststorj1", id, ""),
newUsage("teststorj1", id, ""),
newUsage("teststorj2", id, ""),
newUsage("teststorj2", id, ""),
}
totals = reports.SumAttributionByUserAgent(attributions, log)
totals = reports.ProcessAttributions(attributions, nil, log)
require.Equal(t, 2, len(totals))
require.Equal(t, float64(2), totals["teststorj1"].ByteHours)
require.Equal(t, float64(2), totals["teststorj1"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj1"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj1"].BucketHours)
require.Equal(t, int64(2), totals["teststorj1"].BytesEgress)
require.Equal(t, float64(2), totals["teststorj2"].ByteHours)
require.Equal(t, float64(2), totals["teststorj2"].SegmentHours)
require.Equal(t, float64(2), totals["teststorj2"].ObjectHours)
require.Equal(t, float64(2), totals["teststorj2"].BucketHours)
require.Equal(t, int64(2), totals["teststorj2"].BytesEgress)
requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), ""}], 2)
requireSum(totals[reports.AttributionTotalsIndex{"teststorj2", id.String(), ""}], 2)
// Test that different project IDs are summed separately
id2, err := uuid.New()
require.NoError(t, err)
attributions = []*attribution.BucketUsage{
newUsage("teststorj1", id, ""),
newUsage("teststorj1", id, ""),
newUsage("teststorj1", id2, ""),
}
totals = reports.ProcessAttributions(attributions, nil, log)
require.Equal(t, 2, len(totals))
requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), ""}], 2)
requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id2.String(), ""}], 1)
// Test that different bucket names are summed separately
attributions = []*attribution.BucketUsage{
newUsage("teststorj1", id, "1"),
newUsage("teststorj1", id, "1"),
newUsage("teststorj1", id, "2"),
}
totals = reports.ProcessAttributions(attributions, nil, log)
require.Equal(t, 2, len(totals))
requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), "1"}], 2)
requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), "2"}], 1)
// Test that unspecified user agents are filtered out
attributions = []*attribution.BucketUsage{
newUsage("teststorj1", id, ""),
newUsage("teststorj2", id, ""),
newUsage("teststorj3", id, ""),
}
totals = reports.ProcessAttributions(attributions, []string{"teststorj1", "teststorj3"}, log)
require.Equal(t, 2, len(totals))
require.Contains(t, totals, reports.AttributionTotalsIndex{"teststorj1", id.String(), ""})
require.Contains(t, totals, reports.AttributionTotalsIndex{"teststorj3", id.String(), ""})
}