storj/satellite/metainfo/attribution.go
Cameron e8fcdc10a4 satellite/metainfo: set user_agent in bucket_metainfos on bucket recreation
Before this change, if a user creates a bucket with a user_agent attributed then deletes and recreates it, the row in bucket_metainfos
will not have the user_agent. This is because we skip setting the field
in bucket_metainfos if the bucket already exists in value_attributions.
This can be problematic, as we return the bucket's user agent during the
ListBuckets operation, and the client may be expecting this value to be
populated.

This change ensures the bucket table user_agent is set when (re)creating a bucket. To avoid decreasing BeginObject performance, which also
updates attribution, a flag has been added to determine whether to
make sure the buckets table is updated: `forceBucketUpdate`.

Change-Id: Iada2f233b327b292ad9f98c73ea76a1b0113c926
2023-07-12 21:48:05 +00:00

223 lines
7.0 KiB
Go

// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package metainfo
import (
"bytes"
"context"
"sync"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/useragent"
"storj.io/common/uuid"
"storj.io/drpc/drpccache"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/buckets"
"storj.io/storj/satellite/console"
)
// MaxUserAgentLength is the maximum allowable length of the User Agent.
const MaxUserAgentLength = 500
// ensureAttribution ensures that the bucketName has the partner information specified by project-level user agent, or header user agent.
// If `forceBucketUpdate` is true, then the buckets table will be updated if necessary (needed for bucket creation). Otherwise, it is sufficient
// to only ensure the attribution exists in the value attributions db.
//
// Assumes that the user has permissions sufficient for authenticating.
func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, keyInfo *console.APIKeyInfo, bucketName, projectUserAgent []byte, forceBucketUpdate bool) (err error) {
defer mon.Task()(&ctx)(&err)
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
if len(header.UserAgent) == 0 && len(keyInfo.UserAgent) == 0 && len(projectUserAgent) == 0 {
return nil
}
if !forceBucketUpdate {
if conncache := drpccache.FromContext(ctx); conncache != nil {
cache := conncache.LoadOrCreate(attributionCheckCacheKey{},
func() interface{} {
return &attributionCheckCache{}
}).(*attributionCheckCache)
if !cache.needsCheck(string(bucketName)) {
return nil
}
}
}
userAgent := keyInfo.UserAgent
if len(projectUserAgent) > 0 {
userAgent = projectUserAgent
}
// first check keyInfo (user) attribution
if userAgent == nil {
// otherwise, use header (partner tool) as attribution
userAgent = header.UserAgent
}
userAgent, err = TrimUserAgent(userAgent)
if err != nil {
return err
}
err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, userAgent, forceBucketUpdate)
if errs2.IsRPC(err, rpcstatus.NotFound) || errs2.IsRPC(err, rpcstatus.AlreadyExists) {
return nil
}
return err
}
// TrimUserAgent returns userAgentBytes that consist of only the product portion of the user agent, and is bounded by
// the maxUserAgentLength.
func TrimUserAgent(userAgent []byte) ([]byte, error) {
if len(userAgent) == 0 {
return userAgent, nil
}
userAgentEntries, err := useragent.ParseEntries(userAgent)
if err != nil {
return userAgent, Error.New("error while parsing user agent: %w", err)
}
// strip comments, libraries, and empty products from the user agent
newEntries := userAgentEntries[:0]
for _, e := range userAgentEntries {
switch product := e.Product; product {
case "uplink", "common", "drpc", "Gateway-ST", "":
default:
e.Comment = ""
newEntries = append(newEntries, e)
}
}
userAgent, err = useragent.EncodeEntries(newEntries)
if err != nil {
return userAgent, Error.New("error while encoding user agent entries: %w", err)
}
// bound the user agent length
if len(userAgent) > MaxUserAgentLength && len(newEntries) > 0 {
// try to preserve the first entry
if (len(newEntries[0].Product) + len(newEntries[0].Version)) <= MaxUserAgentLength {
userAgent, err = useragent.EncodeEntries(newEntries[:1])
if err != nil {
return userAgent, Error.New("error while encoding first user agent entry: %w", err)
}
} else {
// first entry is too large, truncate
userAgent = userAgent[:MaxUserAgentLength]
}
}
return userAgent, nil
}
func (endpoint *Endpoint) tryUpdateBucketAttribution(ctx context.Context, header *pb.RequestHeader, projectID uuid.UUID, bucketName []byte, userAgent []byte, forceBucketUpdate bool) (err error) {
defer mon.Task()(&ctx)(&err)
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
// check if attribution is set for given bucket
attrInfo, err := endpoint.attributions.Get(ctx, projectID, bucketName)
if err == nil {
if !forceBucketUpdate {
// bucket has already an attribution, no need to update
return nil
}
} else if !attribution.ErrBucketNotAttributed.Has(err) {
endpoint.log.Error("error while getting attribution from DB", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
// checks if bucket exists before updates it or makes a new entry
bucket, err := endpoint.buckets.GetBucket(ctx, bucketName, projectID)
if err != nil {
if buckets.ErrBucketNotFound.Has(err) {
return rpcstatus.Errorf(rpcstatus.NotFound, "bucket %q does not exist", bucketName)
}
endpoint.log.Error("error while getting bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
}
if attrInfo != nil {
// bucket user agent and value attributions user agent already set
if bytes.Equal(bucket.UserAgent, attrInfo.UserAgent) {
return nil
}
// make sure bucket user_agent matches value_attribution
userAgent = attrInfo.UserAgent
}
empty, err := endpoint.isBucketEmpty(ctx, projectID, bucketName)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, Error.Wrap(err).Error())
}
if !empty {
return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q is not empty, Partner %q cannot be attributed", bucketName, userAgent)
}
if attrInfo == nil {
// update attribution table
_, err = endpoint.attributions.Insert(ctx, &attribution.Info{
ProjectID: projectID,
BucketName: bucketName,
UserAgent: userAgent,
})
if err != nil {
endpoint.log.Error("error while inserting attribution to DB", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
// update bucket information
bucket.UserAgent = userAgent
_, err = endpoint.buckets.UpdateBucket(ctx, bucket)
if err != nil {
endpoint.log.Error("error while updating bucket", zap.ByteString("bucketName", bucketName), zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
}
return nil
}
// maxAttributionCacheSize determines how many buckets attributionCheckCache remembers.
const maxAttributionCacheSize = 10
// attributionCheckCacheKey is used as a key for the connection cache.
type attributionCheckCacheKey struct{}
// attributionCheckCache implements a basic lru cache, with a constant size.
type attributionCheckCache struct {
mu sync.Mutex
pos int
buckets []string
}
// needsCheck returns true when the bucket should be tested for setting the useragent.
func (cache *attributionCheckCache) needsCheck(bucket string) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, b := range cache.buckets {
if b == bucket {
return false
}
}
if len(cache.buckets) >= maxAttributionCacheSize {
cache.pos = (cache.pos + 1) % len(cache.buckets)
cache.buckets[cache.pos] = bucket
} else {
cache.pos = len(cache.buckets)
cache.buckets = append(cache.buckets, bucket)
}
return true
}