diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index a300eed73..832b3cefd 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync/atomic" "text/tabwriter" "time" @@ -137,9 +138,9 @@ var ( RunE: cmdNodeUsage, } partnerAttributionCmd = &cobra.Command{ - Use: "partner-attribution [start] [end]", + Use: "partner-attribution [start] [end] [user-agent,...]", Short: "Generate a partner attribution report for a given period to use for payments", - Long: "Generate a partner attribution report for a given period to use for payments. Format dates using YYYY-MM-DD. The end date is exclusive.", + Long: "Generate a partner attribution report for a given period to use for payments. Format dates using YYYY-MM-DD. The end date is exclusive. Optionally filter using a comma-separated list of user agents.", Args: cobra.MinimumNArgs(2), RunE: cmdValueAttribution, } @@ -675,9 +676,14 @@ func cmdValueAttribution(cmd *cobra.Command, args []string) (err error) { return err } + var userAgents []string + if len(args) > 2 { + userAgents = strings.Split(args[2], ",") + } + // send output to stdout if partnerAttribtionCfg.Output == "" { - return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, os.Stdout) + return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, userAgents, os.Stdout) } // send output to file @@ -696,7 +702,7 @@ func cmdValueAttribution(cmd *cobra.Command, args []string) (err error) { } }() - return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, file) + return reports.GenerateAttributionCSV(ctx, partnerAttribtionCfg.Database, start, end, userAgents, file) } func cmdPrepareCustomerInvoiceRecords(cmd *cobra.Command, args []string) (err error) { diff --git a/cmd/satellite/reports/attribution.go b/cmd/satellite/reports/attribution.go index 61b3f27e5..a1d24e127 100644 --- a/cmd/satellite/reports/attribution.go +++ b/cmd/satellite/reports/attribution.go @@ -18,12 +18,15 @@ import ( "storj.io/common/memory" "storj.io/common/useragent" + "storj.io/common/uuid" "storj.io/storj/satellite/attribution" "storj.io/storj/satellite/satellitedb" ) var headers = []string{ "userAgent", + "projectID", + "bucketName", "gbHours", "segmentHours", "objectHours", @@ -31,8 +34,15 @@ var headers = []string{ "gbEgress", } -// AttributionTotals is a map of attribution totals per user agent. -type AttributionTotals map[string]Total +// AttributionTotals is a map of attribution totals per user agent, project ID, and bucket name combination. +type AttributionTotals map[AttributionTotalsIndex]Total + +// AttributionTotalsIndex is a key into AttributionTotals. +type AttributionTotalsIndex struct { + UserAgent string + ProjectID string + BucketName string +} // Total is the total attributable usage for a user agent over a period of time. type Total struct { @@ -44,7 +54,7 @@ type Total struct { } // GenerateAttributionCSV creates a report with. -func GenerateAttributionCSV(ctx context.Context, database string, start time.Time, end time.Time, output io.Writer) error { +func GenerateAttributionCSV(ctx context.Context, database string, start time.Time, end time.Time, userAgents []string, output io.Writer) error { log := zap.L().Named("attribution-report") db, err := satellitedb.Open(ctx, log.Named("db"), database, satellitedb.Options{ApplicationName: "satellite-attribution"}) if err != nil { @@ -62,7 +72,7 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim return errs.Wrap(err) } - partnerAttributionTotals := SumAttributionByUserAgent(rows, log) + partnerAttributionTotals := ProcessAttributions(rows, userAgents, log) w := csv.NewWriter(output) defer func() { @@ -72,11 +82,13 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim if err := w.Write(headers); err != nil { return errs.Wrap(err) } - for userAgent, totals := range partnerAttributionTotals { + for idx, totals := range partnerAttributionTotals { gbHours := memory.Size(totals.ByteHours).GB() egressGBData := memory.Size(totals.BytesEgress).GB() record := []string{ - userAgent, + idx.UserAgent, + idx.ProjectID, + idx.BucketName, strconv.FormatFloat(gbHours, 'f', 4, 64), strconv.FormatFloat(totals.SegmentHours, 'f', 4, 64), strconv.FormatFloat(totals.ObjectHours, 'f', 4, 64), @@ -97,9 +109,9 @@ func GenerateAttributionCSV(ctx context.Context, database string, start time.Tim return errs.Wrap(err) } -// SumAttributionByUserAgent sums all bucket attribution by the first entry in the user agent. -func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger) AttributionTotals { - attributionTotals := make(map[string]Total) +// ProcessAttributions sums all bucket attribution by the first entry in the user agent, project ID, and bucket name. +func ProcessAttributions(rows []*attribution.BucketUsage, userAgents []string, log *zap.Logger) AttributionTotals { + attributionTotals := make(AttributionTotals) userAgentParseFailures := make(map[string]bool) for _, row := range rows { @@ -115,9 +127,33 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger) } userAgent := strings.ToLower(userAgentEntries[0].Product) + if userAgents != nil { + uaFound := false + for _, allowed := range userAgents { + if userAgent == strings.ToLower(allowed) { + uaFound = true + break + } + } + if !uaFound { + continue + } + } - if _, ok := attributionTotals[userAgent]; !ok { - attributionTotals[userAgent] = Total{ + projID, err := uuid.FromBytes(row.ProjectID) + if err != nil { + log.Error("malformed project ID", zap.String("project ID", fmt.Sprintf("%x", projID)), zap.Error(err)) + continue + } + + idx := AttributionTotalsIndex{ + UserAgent: userAgent, + ProjectID: projID.String(), + BucketName: string(row.BucketName), + } + + if _, ok := attributionTotals[idx]; !ok { + attributionTotals[idx] = Total{ ByteHours: row.ByteHours, SegmentHours: row.SegmentHours, ObjectHours: row.ObjectHours, @@ -125,7 +161,7 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger) BytesEgress: row.EgressData, } } else { - partnerTotal := attributionTotals[userAgent] + partnerTotal := attributionTotals[idx] partnerTotal.ByteHours += row.ByteHours partnerTotal.SegmentHours += row.SegmentHours @@ -133,7 +169,7 @@ func SumAttributionByUserAgent(rows []*attribution.BucketUsage, log *zap.Logger) partnerTotal.BucketHours += float64(row.Hours) partnerTotal.BytesEgress += row.EgressData - attributionTotals[userAgent] = partnerTotal + attributionTotals[idx] = partnerTotal } } return attributionTotals diff --git a/cmd/satellite/reports/attribution_test.go b/cmd/satellite/reports/attribution_test.go index 33176c2dc..c8d952610 100644 --- a/cmd/satellite/reports/attribution_test.go +++ b/cmd/satellite/reports/attribution_test.go @@ -9,22 +9,41 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "storj.io/common/uuid" "storj.io/storj/cmd/satellite/reports" "storj.io/storj/satellite/attribution" ) -func TestSumAttributionByUserAgent(t *testing.T) { +func TestProcessAttributions(t *testing.T) { log := zaptest.NewLogger(t) - // test empty user agents - attributions := []*attribution.BucketUsage{ - { - UserAgent: []byte{}, + + requireSum := func(total reports.Total, n int) { + require.Equal(t, float64(n), total.ByteHours) + require.Equal(t, float64(n), total.SegmentHours) + require.Equal(t, float64(n), total.ObjectHours) + require.Equal(t, float64(n), total.BucketHours) + require.Equal(t, int64(n), total.BytesEgress) + } + + newUsage := func(userAgent string, projectID uuid.UUID, bucketName string) *attribution.BucketUsage { + return &attribution.BucketUsage{ + UserAgent: []byte(userAgent), + ProjectID: projectID.Bytes(), + BucketName: []byte(bucketName), ByteHours: 1, SegmentHours: 1, ObjectHours: 1, Hours: 1, EgressData: 1, - }, + } + } + + id, err := uuid.New() + require.NoError(t, err) + + // test empty user agents + attributions := []*attribution.BucketUsage{ + newUsage("", id, ""), { ByteHours: 1, SegmentHours: 1, @@ -33,82 +52,63 @@ func TestSumAttributionByUserAgent(t *testing.T) { EgressData: 1, }, } - totals := reports.SumAttributionByUserAgent(attributions, log) + totals := reports.ProcessAttributions(attributions, nil, log) require.Equal(t, 0, len(totals)) // test user agent with additional entries and uppercase letters is summed with // the first one attributions = []*attribution.BucketUsage{ - { - UserAgent: []byte("teststorj"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, - { - UserAgent: []byte("TESTSTORJ/other"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, + newUsage("teststorj", id, ""), + newUsage("TESTSTORJ/other", id, ""), } - totals = reports.SumAttributionByUserAgent(attributions, log) + totals = reports.ProcessAttributions(attributions, nil, log) require.Equal(t, 1, len(totals)) - require.Equal(t, float64(2), totals["teststorj"].ByteHours) - require.Equal(t, float64(2), totals["teststorj"].SegmentHours) - require.Equal(t, float64(2), totals["teststorj"].ObjectHours) - require.Equal(t, float64(2), totals["teststorj"].BucketHours) - require.Equal(t, int64(2), totals["teststorj"].BytesEgress) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj", id.String(), ""}], 2) // test two user agents are summed separately attributions = []*attribution.BucketUsage{ - { - UserAgent: []byte("teststorj1"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, - { - UserAgent: []byte("teststorj1"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, - { - UserAgent: []byte("teststorj2"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, - { - UserAgent: []byte("teststorj2"), - ByteHours: 1, - SegmentHours: 1, - ObjectHours: 1, - Hours: 1, - EgressData: 1, - }, + newUsage("teststorj1", id, ""), + newUsage("teststorj1", id, ""), + newUsage("teststorj2", id, ""), + newUsage("teststorj2", id, ""), } - totals = reports.SumAttributionByUserAgent(attributions, log) + totals = reports.ProcessAttributions(attributions, nil, log) require.Equal(t, 2, len(totals)) - require.Equal(t, float64(2), totals["teststorj1"].ByteHours) - require.Equal(t, float64(2), totals["teststorj1"].SegmentHours) - require.Equal(t, float64(2), totals["teststorj1"].ObjectHours) - require.Equal(t, float64(2), totals["teststorj1"].BucketHours) - require.Equal(t, int64(2), totals["teststorj1"].BytesEgress) - require.Equal(t, float64(2), totals["teststorj2"].ByteHours) - require.Equal(t, float64(2), totals["teststorj2"].SegmentHours) - require.Equal(t, float64(2), totals["teststorj2"].ObjectHours) - require.Equal(t, float64(2), totals["teststorj2"].BucketHours) - require.Equal(t, int64(2), totals["teststorj2"].BytesEgress) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), ""}], 2) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj2", id.String(), ""}], 2) + + // Test that different project IDs are summed separately + id2, err := uuid.New() + require.NoError(t, err) + attributions = []*attribution.BucketUsage{ + newUsage("teststorj1", id, ""), + newUsage("teststorj1", id, ""), + newUsage("teststorj1", id2, ""), + } + totals = reports.ProcessAttributions(attributions, nil, log) + require.Equal(t, 2, len(totals)) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), ""}], 2) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id2.String(), ""}], 1) + + // Test that different bucket names are summed separately + attributions = []*attribution.BucketUsage{ + newUsage("teststorj1", id, "1"), + newUsage("teststorj1", id, "1"), + newUsage("teststorj1", id, "2"), + } + totals = reports.ProcessAttributions(attributions, nil, log) + require.Equal(t, 2, len(totals)) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), "1"}], 2) + requireSum(totals[reports.AttributionTotalsIndex{"teststorj1", id.String(), "2"}], 1) + + // Test that unspecified user agents are filtered out + attributions = []*attribution.BucketUsage{ + newUsage("teststorj1", id, ""), + newUsage("teststorj2", id, ""), + newUsage("teststorj3", id, ""), + } + totals = reports.ProcessAttributions(attributions, []string{"teststorj1", "teststorj3"}, log) + require.Equal(t, 2, len(totals)) + require.Contains(t, totals, reports.AttributionTotalsIndex{"teststorj1", id.String(), ""}) + require.Contains(t, totals, reports.AttributionTotalsIndex{"teststorj3", id.String(), ""}) }