satellite/metainfo: attribution based on useragent

Automatically attach attribution information to bucket during
BeginObject or CreateBucket when the UserAgent is set.

Change-Id: I405cb26c5a2f7394b30e3f2cf5d2214c8781eb8b
This commit is contained in:
Egon Elbre 2020-04-14 14:50:50 +03:00
parent a6f0be2047
commit 7e0e74c65c
6 changed files with 179 additions and 18 deletions

2
go.mod
View File

@ -42,7 +42,7 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.28.0
storj.io/common v0.0.0-20200416175331-40469cc6b6d5
storj.io/drpc v0.0.11
storj.io/drpc v0.0.12-0.20200413163255-debb458a7474
storj.io/monkit-jaeger v0.0.0-20200403204040-f5a746eeacca
storj.io/private v0.0.0-20200403212157-26f222c154f0
storj.io/uplink v1.0.4-0.20200406100056-baa89e6fe434

5
go.sum
View File

@ -362,6 +362,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -622,14 +623,14 @@ honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXe
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
storj.io/common v0.0.0-20200402141523-7780ee0cca0d/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20200413160743-f212d3029dbf h1:3ltVLnAtkJ/nbHg1aStSRr99doJRqPLl7BmRV9XKWpg=
storj.io/common v0.0.0-20200413160743-f212d3029dbf/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/common v0.0.0-20200416175331-40469cc6b6d5 h1:ApFVw3uGVbUqPPkJbTVq8wcqJFmJRU24uhjdoedsj/M=
storj.io/common v0.0.0-20200416175331-40469cc6b6d5/go.mod h1:pZyXiIE7bGETIRXtfs0nICqMwp7PM8HqnDuyUeldNA0=
storj.io/drpc v0.0.11 h1:6vLxfpSbwCLtqzAoXzXx/SxBqBtbzbmquXPqfcWKqfw=
storj.io/drpc v0.0.11 h1:6vLxfpSbwCLtqzAoXzXx/SxBqBtbzbmquXPqfcWKqfw=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.12-0.20200413163255-debb458a7474 h1:ToKfIf2Lyz5/cEeoqdeL3xydqUhU2tWRYvsCB3aXsuY=
storj.io/drpc v0.0.12-0.20200413163255-debb458a7474/go.mod h1:82nfl+6YwRwF6UG31cEWWUqv/FaKvP5SGqUvoqTxCMA=
storj.io/monkit-jaeger v0.0.0-20200403204040-f5a746eeacca h1:tv0kEHQrb84M8KZs/fpjRLD3KjbmPvOOOih+i84oJD8=
storj.io/monkit-jaeger v0.0.0-20200403204040-f5a746eeacca/go.mod h1:DDyAU2mcmu8EAh2vSfvPwrKBOo5a1UBnoTG0dba0KTs=
storj.io/private v0.0.0-20200403212157-26f222c154f0 h1:7raQWpuP5poUd2vMYIRE4XcEC4M4I4m3rj5O4/+LNdU=

View File

@ -6,19 +6,63 @@ package metainfo
import (
"context"
"strings"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/useragent"
"storj.io/common/uuid"
"storj.io/drpc/drpccache"
"storj.io/storj/pkg/macaroon"
"storj.io/storj/satellite/attribution"
)
// ensureAttribution ensures that the bucketName has the partner information specified by the header.
//
// Assumes that the user has permissions sufficient for authenticating.
func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, bucketName []byte) error {
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
if len(header.UserAgent) == 0 {
return nil
}
if conncache := drpccache.FromContext(ctx); conncache != nil {
cache := conncache.LoadOrCreate(attributionCheckCacheKey{},
func() interface{} {
return &attributionCheckCache{}
}).(*attributionCheckCache)
if !cache.needsCheck(string(bucketName)) {
return nil
}
}
partnerID, err := endpoint.ResolvePartnerID(ctx, header, nil)
if err != nil {
return err
}
if partnerID.IsZero() {
return nil
}
keyInfo, err := endpoint.getKeyInfo(ctx, header)
if err != nil {
return err
}
err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID)
if errs2.IsRPC(err, rpcstatus.NotFound) || errs2.IsRPC(err, rpcstatus.AlreadyExists) {
return nil
}
return err
}
// ResolvePartnerID returns partnerIDBytes as parsed or UUID corresponding to header.UserAgent.
// returns empty uuid when neither is defined.
func (endpoint *Endpoint) ResolvePartnerID(ctx context.Context, header *pb.RequestHeader, partnerIDBytes []byte) (uuid.UUID, error) {
@ -61,7 +105,7 @@ func (endpoint *Endpoint) ResolvePartnerID(ctx context.Context, header *pb.Reque
}
}
return uuid.UUID{}, rpcstatus.Errorf(rpcstatus.InvalidArgument, "unable to resolve user agent %q", string(header.UserAgent))
return uuid.UUID{}, nil
}
func removeUplinkUserAgent(entries []useragent.Entry) []useragent.Entry {
@ -116,26 +160,33 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
partnerID, err := endpoint.ResolvePartnerID(ctx, header, partnerIDBytes)
if err != nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
return err
}
if partnerID.IsZero() {
return rpcstatus.Error(rpcstatus.InvalidArgument, "unknown user agent or partner id")
}
// check if attribution is set for given bucket
_, err = endpoint.attributions.Get(ctx, keyInfo.ProjectID, bucketName)
if err == nil {
endpoint.log.Info("bucket already attributed", zap.ByteString("bucketName", bucketName), zap.Stringer("Partner ID", partnerID))
return nil
return endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID)
}
func (endpoint *Endpoint) tryUpdateBucketAttribution(ctx context.Context, header *pb.RequestHeader, projectID uuid.UUID, bucketName []byte, partnerID uuid.UUID) error {
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
// check if attribution is set for given bucket
_, err := endpoint.attributions.Get(ctx, projectID, bucketName)
if err == nil {
// bucket has already an attribution, no need to update
return nil
}
if !attribution.ErrBucketNotAttributed.Has(err) {
// try only to set the attribution, when it's missing
endpoint.log.Error("error while getting attribution from DB", zap.Error(err))
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
empty, err := endpoint.metainfo.IsBucketEmpty(ctx, keyInfo.ProjectID, bucketName)
empty, err := endpoint.metainfo.IsBucketEmpty(ctx, projectID, bucketName)
if err != nil {
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
@ -144,7 +195,7 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
}
// checks if bucket exists before updates it or makes a new entry
bucket, err := endpoint.metainfo.GetBucket(ctx, bucketName, keyInfo.ProjectID)
bucket, err := endpoint.metainfo.GetBucket(ctx, bucketName, projectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return rpcstatus.Errorf(rpcstatus.NotFound, "bucket %q does not exist", bucketName)
@ -153,8 +204,7 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
return rpcstatus.Error(rpcstatus.Internal, "unable to set bucket attribution")
}
if !bucket.PartnerID.IsZero() {
endpoint.log.Info("bucket already attributed", zap.ByteString("bucketName", bucketName), zap.Stringer("Partner ID", partnerID))
return nil
return rpcstatus.Errorf(rpcstatus.AlreadyExists, "bucket %q already has attribution, PartnerID %q cannot be attributed", bucketName, partnerID)
}
// update bucket information
@ -167,7 +217,7 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
// update attribution table
_, err = endpoint.attributions.Insert(ctx, &attribution.Info{
ProjectID: keyInfo.ProjectID,
ProjectID: projectID,
BucketName: bucketName,
PartnerID: partnerID,
})
@ -178,3 +228,38 @@ func (endpoint *Endpoint) setBucketAttribution(ctx context.Context, header *pb.R
return nil
}
// maxAttributionCacheSize determines how many buckets attributionCheckCache remembers.
const maxAttributionCacheSize = 10
// attributionCheckCacheKey is used as a key for the connection cache.
type attributionCheckCacheKey struct{}
// attributionCheckCache implements a basic lru cache, with a constant size.
type attributionCheckCache struct {
mu sync.Mutex
pos int
buckets []string
}
// needsCheck returns true when the bucket should be tested for setting the useragent.
func (cache *attributionCheckCache) needsCheck(bucket string) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, b := range cache.buckets {
if b == bucket {
return false
}
}
if len(cache.buckets) >= maxAttributionCacheSize {
cache.pos = (cache.pos + 1) % len(cache.buckets)
cache.buckets[cache.pos] = bucket
} else {
cache.pos = len(cache.buckets)
cache.buckets = append(cache.buckets, bucket)
}
return true
}

View File

@ -9,11 +9,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/uplink"
"storj.io/uplink/private/metainfo"
)
@ -41,12 +43,13 @@ func TestResolvePartnerID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, randomUUID, result)
_, err = endpoint.ResolvePartnerID(ctx, &pb.RequestHeader{
partnerID, err := endpoint.ResolvePartnerID(ctx, &pb.RequestHeader{
UserAgent: []byte("not-a-partner"),
}, nil)
require.Error(t, err)
require.NoError(t, err)
require.Equal(t, uuid.UUID{}, partnerID)
partnerID, err := endpoint.ResolvePartnerID(ctx, &pb.RequestHeader{
partnerID, err = endpoint.ResolvePartnerID(ctx, &pb.RequestHeader{
UserAgent: []byte("Zenko"),
}, nil)
require.NoError(t, err)
@ -137,3 +140,45 @@ func TestSetBucketAttribution(t *testing.T) {
}
})
}
func TestUserAgentAttribution(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 1,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
config := uplink.Config{
UserAgent: "Zenko",
}
satellite, uplink := planet.Satellites[0], planet.Uplinks[0]
access, err := config.RequestAccessWithPassphrase(ctx, satellite.URL(), uplink.Projects[0].APIKey, "mypassphrase")
require.NoError(t, err)
project, err := config.OpenProject(ctx, access)
require.NoError(t, err)
defer ctx.Check(project.Close)
_, err = project.EnsureBucket(ctx, "bucket")
require.NoError(t, err)
upload, err := project.UploadObject(ctx, "bucket", "alpha", nil)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
partnerID, err := uuid.FromString("8cd605fa-ad00-45b6-823e-550eddc611d6")
require.NoError(t, err)
bucketInfo, err := satellite.DB.Buckets().GetBucket(ctx, []byte("bucket"), uplink.Projects[0].ID)
require.NoError(t, err)
assert.Equal(t, partnerID, bucketInfo.PartnerID)
attribution, err := satellite.DB.Attribution().Get(ctx, uplink.Projects[0].ID, []byte("bucket"))
require.NoError(t, err)
assert.Equal(t, partnerID, attribution.PartnerID)
})
}

View File

@ -686,6 +686,10 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
// checks if bucket exists before updates it or makes a new entry
_, err = endpoint.metainfo.GetBucket(ctx, req.GetName(), keyInfo.ProjectID)
if err == nil {
// When the bucket exists, try to set the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, req.GetName()); err != nil {
return nil, err
}
return nil, rpcstatus.Error(rpcstatus.AlreadyExists, "bucket already exists")
}
if !storj.ErrBucketNotFound.Has(err) {
@ -703,6 +707,11 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
return nil, rpcstatus.Error(rpcstatus.Internal, "unable to create bucket")
}
// Once we have created the bucket, we can try setting the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, req.GetName()); err != nil {
return nil, err
}
// override RS to fit satellite settings
convBucket, err := convertBucketToProto(ctx, bucket, endpoint.redundancyScheme())
if err != nil {
@ -948,6 +957,10 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.ensureAttribution(ctx, req.Header, req.Bucket); err != nil {
return nil, err
}
// use only satellite values for Redundancy Scheme
pbRS := endpoint.redundancyScheme()

View File

@ -165,6 +165,23 @@ func (endpoint *Endpoint) validateAuth(ctx context.Context, header *pb.RequestHe
return keyInfo, nil
}
// getKeyInfo returns key info based on the header.
func (endpoint *Endpoint) getKeyInfo(ctx context.Context, header *pb.RequestHeader) (_ *console.APIKeyInfo, err error) {
defer mon.Task()(&ctx)(&err)
key, err := getAPIKey(ctx, header)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid API credentials")
}
keyInfo, err := endpoint.apiKeys.GetByHead(ctx, key.Head())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
}
return keyInfo, nil
}
func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
if !endpoint.config.RateLimiter.Enabled {