satellite/metainfo: use user PartnerID for bucket attribution

Change-Id: I20f1bd432333f9b37ca8fb457c349eff94ffb392
This commit is contained in:
Michal Niewrzal 2020-07-24 11:40:17 +02:00 committed by Kaloyan Raev
parent 68a2726e2a
commit 88dcc93f3c
8 changed files with 224 additions and 85 deletions

View File

@ -201,18 +201,16 @@ func (system *Satellite) NodeURL() storj.NodeURL {
return storj.NodeURL{ID: system.API.ID(), Address: system.API.Addr()}
}
// AddUser adds user to a satellite.
func (system *Satellite) AddUser(ctx context.Context, fullName, email string, maxNumberOfProjects int) (*console.User, error) {
// AddUser adds user to a satellite. Password from newUser will be always overridden by FullName to have
// known password which can be used automatically.
func (system *Satellite) AddUser(ctx context.Context, newUser console.CreateUser, maxNumberOfProjects int) (*console.User, error) {
regToken, err := system.API.Console.Service.CreateRegToken(ctx, maxNumberOfProjects)
if err != nil {
return nil, err
}
user, err := system.API.Console.Service.CreateUser(ctx, console.CreateUser{
FullName: fullName,
Email: email,
Password: fullName,
}, regToken.Secret, "")
newUser.Password = newUser.FullName
user, err := system.API.Console.Service.CreateUser(ctx, newUser, regToken.Secret, "")
if err != nil {
return nil, err
}
@ -227,7 +225,7 @@ func (system *Satellite) AddUser(ctx context.Context, fullName, email string, ma
return nil, err
}
authCtx, err := system.authenticatedContext(ctx, user.ID)
authCtx, err := system.AuthenticatedContext(ctx, user.ID)
if err != nil {
return nil, err
}
@ -241,7 +239,7 @@ func (system *Satellite) AddUser(ctx context.Context, fullName, email string, ma
// AddProject adds project to a satellite and makes specified user an owner.
func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name string) (*console.Project, error) {
authCtx, err := system.authenticatedContext(ctx, ownerID)
authCtx, err := system.AuthenticatedContext(ctx, ownerID)
if err != nil {
return nil, err
}
@ -254,7 +252,8 @@ func (system *Satellite) AddProject(ctx context.Context, ownerID uuid.UUID, name
return project, nil
}
func (system *Satellite) authenticatedContext(ctx context.Context, userID uuid.UUID) (context.Context, error) {
// AuthenticatedContext creates context with authentication date for given user.
func (system *Satellite) AuthenticatedContext(ctx context.Context, userID uuid.UUID) (context.Context, error) {
user, err := system.API.Console.Service.GetUser(ctx, userID)
if err != nil {
return nil, err

View File

@ -11,13 +11,17 @@ import (
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/console"
)
func TestSatellite_AddProject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
user, err := planet.Satellites[0].AddUser(ctx, "test user", "test-email@test", 4)
user, err := planet.Satellites[0].AddUser(ctx, console.CreateUser{
FullName: "test user",
Email: "test-email@test",
}, 4)
require.NoError(t, err)
limit, err := planet.Satellites[0].DB.Console().Users().GetProjectLimit(ctx, user.ID)

View File

@ -21,6 +21,7 @@ import (
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/console"
"storj.io/uplink"
"storj.io/uplink/private/metainfo"
"storj.io/uplink/private/piecestore"
@ -108,15 +109,13 @@ func (planet *Planet) newUplink(name string) (*Uplink, error) {
planetUplink.Dialer = rpc.NewDefaultDialer(tlsOptions)
for j, satellite := range planet.Satellites {
console := satellite.API.Console
consoleAPI := satellite.API.Console
projectName := fmt.Sprintf("%s_%d", name, j)
user, err := satellite.AddUser(
ctx,
fmt.Sprintf("User %s", projectName),
fmt.Sprintf("user@%s.test", projectName),
10,
)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: fmt.Sprintf("User %s", projectName),
Email: fmt.Sprintf("user@%s.test", projectName),
}, 10)
if err != nil {
return nil, err
}
@ -126,11 +125,11 @@ func (planet *Planet) newUplink(name string) (*Uplink, error) {
return nil, err
}
authCtx, err := satellite.authenticatedContext(ctx, user.ID)
authCtx, err := satellite.AuthenticatedContext(ctx, user.ID)
if err != nil {
return nil, err
}
_, apiKey, err := console.Service.CreateAPIKey(authCtx, project.ID, "root")
_, apiKey, err := consoleAPI.Service.CreateAPIKey(authCtx, project.ID, "root")
if err != nil {
return nil, err
}

View File

@ -18,16 +18,18 @@ import (
"storj.io/common/uuid"
"storj.io/drpc/drpccache"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
)
// ensureAttribution ensures that the bucketName has the partner information specified by the header.
// ensureAttribution ensures that the bucketName has the partner information specified by keyInfo partner ID or the header user agent.
// PartnerID from keyInfo is a value associated with registered user and prevails over header user agent.
//
// Assumes that the user has permissions sufficient for authenticating.
func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, bucketName []byte) error {
func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.RequestHeader, keyInfo *console.APIKeyInfo, bucketName []byte) error {
if header == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "header is nil")
}
if len(header.UserAgent) == 0 {
if len(header.UserAgent) == 0 && keyInfo.PartnerID.IsZero() {
return nil
}
@ -41,17 +43,16 @@ func (endpoint *Endpoint) ensureAttribution(ctx context.Context, header *pb.Requ
}
}
partnerID, err := endpoint.ResolvePartnerID(ctx, header)
if err != nil {
return err
}
var err error
partnerID := keyInfo.PartnerID
if partnerID.IsZero() {
return nil
}
keyInfo, err := endpoint.getKeyInfo(ctx, header)
if err != nil {
return err
partnerID, err = endpoint.ResolvePartnerID(ctx, header)
if err != nil {
return err
}
if partnerID.IsZero() {
return nil
}
}
err = endpoint.tryUpdateBucketAttribution(ctx, header, keyInfo.ProjectID, bucketName, partnerID)

View File

@ -4,6 +4,9 @@
package metainfo_test
import (
"fmt"
"io/ioutil"
"strconv"
"testing"
"time"
@ -16,6 +19,8 @@ import (
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
"storj.io/uplink"
)
@ -70,45 +75,174 @@ func TestResolvePartnerID(t *testing.T) {
})
}
func TestUserAgentAttribution(t *testing.T) {
func TestBucketAttribution(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 1,
UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
config := uplink.Config{
UserAgent: "Zenko",
for i, tt := range []struct {
signupPartner string
userAgent string
expectedAttribution string
}{
{signupPartner: "", userAgent: "", expectedAttribution: ""},
{signupPartner: "Minio", userAgent: "", expectedAttribution: "Minio"},
{signupPartner: "Minio", userAgent: "Minio", expectedAttribution: "Minio"},
{signupPartner: "Minio", userAgent: "Zenko", expectedAttribution: "Minio"},
{signupPartner: "", userAgent: "Zenko", expectedAttribution: "Zenko"},
} {
errTag := fmt.Sprintf("%d. %+v", i, tt)
satellite := planet.Satellites[0]
var signupPartnerID string
if tt.signupPartner != "" {
partner, err := satellite.API.Marketing.PartnersService.ByName(ctx, tt.signupPartner)
require.NoError(t, err, errTag)
signupPartnerID = partner.ID
}
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "Test User " + strconv.Itoa(i),
Email: "user@test" + strconv.Itoa(i),
PartnerID: signupPartnerID,
}, 1)
require.NoError(t, err, errTag)
satProject, err := satellite.AddProject(ctx, user.ID, "test"+strconv.Itoa(i))
require.NoError(t, err, errTag)
authCtx, err := satellite.AuthenticatedContext(ctx, user.ID)
require.NoError(t, err, errTag)
_, apiKeyInfo, err := satellite.API.Console.Service.CreateAPIKey(authCtx, satProject.ID, "root")
require.NoError(t, err, errTag)
config := uplink.Config{
UserAgent: tt.userAgent,
}
access, err := config.RequestAccessWithPassphrase(ctx, satellite.NodeURL().String(), apiKeyInfo.Serialize(), "mypassphrase")
require.NoError(t, err, errTag)
project, err := config.OpenProject(ctx, access)
require.NoError(t, err, errTag)
_, err = project.CreateBucket(ctx, "bucket")
require.NoError(t, err, errTag)
var expectedPartnerID uuid.UUID
if tt.expectedAttribution != "" {
expectedPartner, err := planet.Satellites[0].API.Marketing.PartnersService.ByName(ctx, tt.expectedAttribution)
require.NoError(t, err, errTag)
expectedPartnerID = expectedPartner.UUID
}
bucketInfo, err := satellite.DB.Buckets().GetBucket(ctx, []byte("bucket"), satProject.ID)
require.NoError(t, err, errTag)
assert.Equal(t, expectedPartnerID, bucketInfo.PartnerID, errTag)
attributionInfo, err := planet.Satellites[0].DB.Attribution().Get(ctx, satProject.ID, []byte("bucket"))
if tt.expectedAttribution == "" {
assert.True(t, attribution.ErrBucketNotAttributed.Has(err), errTag)
} else {
require.NoError(t, err, errTag)
assert.Equal(t, expectedPartnerID, attributionInfo.PartnerID, errTag)
}
}
})
}
func TestQueryAttribution(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(2, 3, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
const (
bucketName = "test"
objectKey = "test-key"
)
satellite := planet.Satellites[0]
partner, err := satellite.API.Marketing.PartnersService.ByName(ctx, "Minio")
require.NoError(t, err)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "user@test",
Email: "user@test",
PartnerID: partner.ID,
}, 1)
require.NoError(t, err)
satProject, err := satellite.AddProject(ctx, user.ID, "test")
require.NoError(t, err)
authCtx, err := satellite.AuthenticatedContext(ctx, user.ID)
require.NoError(t, err)
_, apiKeyInfo, err := satellite.API.Console.Service.CreateAPIKey(authCtx, satProject.ID, "root")
require.NoError(t, err)
access, err := uplink.RequestAccessWithPassphrase(ctx, satellite.NodeURL().String(), apiKeyInfo.Serialize(), "mypassphrase")
require.NoError(t, err)
project, err := uplink.OpenProject(ctx, access)
require.NoError(t, err)
_, err = project.CreateBucket(ctx, bucketName)
require.NoError(t, err)
{ // upload and download should be accounted for Minio
upload, err := project.UploadObject(ctx, bucketName, objectKey, nil)
require.NoError(t, err)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
err = upload.Commit()
require.NoError(t, err)
download, err := project.DownloadObject(ctx, bucketName, objectKey, nil)
require.NoError(t, err)
_, err = ioutil.ReadAll(download)
require.NoError(t, err)
err = download.Close()
require.NoError(t, err)
}
satellite, uplink := planet.Satellites[0], planet.Uplinks[0]
{ // Flush all the pending information through the system.
// Calculate the usage used for upload
for _, sn := range planet.StorageNodes {
sn.Storage2.Orders.Sender.TriggerWait()
}
access, err := config.RequestAccessWithPassphrase(ctx, satellite.URL(), uplink.Projects[0].APIKey, "mypassphrase")
require.NoError(t, err)
rollout := planet.Satellites[0].Core.Accounting.ReportedRollupChore
require.NoError(t, rollout.RunOnce(ctx, time.Now()))
project, err := config.OpenProject(ctx, access)
require.NoError(t, err)
defer ctx.Check(project.Close)
// Trigger tally so it gets all set up and can return a storage usage
planet.Satellites[0].Accounting.Tally.Loop.TriggerWait()
}
_, err = project.EnsureBucket(ctx, "bucket")
require.NoError(t, err)
{
before := time.Now().Add(-time.Hour)
after := before.Add(2 * time.Hour)
upload, err := project.UploadObject(ctx, "bucket", "alpha", nil)
require.NoError(t, err)
usage, err := planet.Satellites[0].DB.ProjectAccounting().GetProjectTotal(ctx, satProject.ID, before, after)
require.NoError(t, err)
require.NotZero(t, usage.Egress)
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
require.NoError(t, err)
require.NoError(t, upload.Commit())
partner, err := planet.Satellites[0].API.Marketing.PartnersService.ByName(ctx, "Minio")
require.NoError(t, err)
partnerID, err := uuid.FromString("8cd605fa-ad00-45b6-823e-550eddc611d6")
require.NoError(t, err)
bucketInfo, err := satellite.DB.Buckets().GetBucket(ctx, []byte("bucket"), uplink.Projects[0].ID)
require.NoError(t, err)
assert.Equal(t, partnerID, bucketInfo.PartnerID)
attribution, err := satellite.DB.Attribution().Get(ctx, uplink.Projects[0].ID, []byte("bucket"))
require.NoError(t, err)
assert.Equal(t, partnerID, attribution.PartnerID)
rows, err := planet.Satellites[0].DB.Attribution().QueryAttribution(ctx, partner.UUID, before, after)
require.NoError(t, err)
require.NotZero(t, rows[0].RemoteBytesPerHour)
require.Equal(t, rows[0].EgressData, usage.Egress)
}
})
}

View File

@ -314,7 +314,7 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
_, err = endpoint.metainfo.GetBucket(ctx, req.GetName(), keyInfo.ProjectID)
if err == nil {
// When the bucket exists, try to set the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, req.GetName()); err != nil {
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.GetName()); err != nil {
return nil, err
}
return nil, rpcstatus.Error(rpcstatus.AlreadyExists, "bucket already exists")
@ -351,7 +351,7 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
}
// Once we have created the bucket, we can try setting the attribution.
if err := endpoint.ensureAttribution(ctx, req.Header, req.GetName()); err != nil {
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.GetName()); err != nil {
return nil, err
}
@ -611,7 +611,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if err := endpoint.ensureAttribution(ctx, req.Header, req.Bucket); err != nil {
if err := endpoint.ensureAttribution(ctx, req.Header, keyInfo, req.Bucket); err != nil {
return nil, err
}

View File

@ -202,23 +202,6 @@ func (endpoint *Endpoint) validateRevoke(ctx context.Context, header *pb.Request
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized attempt to revoke macaroon")
}
// getKeyInfo returns key info based on the header.
func (endpoint *Endpoint) getKeyInfo(ctx context.Context, header *pb.RequestHeader) (_ *console.APIKeyInfo, err error) {
defer mon.Task()(&ctx)(&err)
key, err := getAPIKey(ctx, header)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "Invalid API credentials")
}
keyInfo, err := endpoint.apiKeys.GetByHead(ctx, key.Head())
if err != nil {
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "Unauthorized API credentials")
}
return keyInfo, nil
}
func (endpoint *Endpoint) checkRate(ctx context.Context, projectID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
if !endpoint.config.RateLimiter.Enabled {

View File

@ -44,7 +44,10 @@ func TestService_InvoiceElementsProcessing(t *testing.T) {
numberOfProjects := 19
// generate test data, each user has one project, one coupon and some credits
for i := 0; i < numberOfProjects; i++ {
user, err := satellite.AddUser(ctx, "testuser"+strconv.Itoa(i), "user@test"+strconv.Itoa(i), 1)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser" + strconv.Itoa(i),
Email: "user@test" + strconv.Itoa(i),
}, 1)
require.NoError(t, err)
project, err := satellite.AddProject(ctx, user.ID, "testproject-"+strconv.Itoa(i))
@ -117,7 +120,10 @@ func TestService_InvoiceUserWithManyProjects(t *testing.T) {
numberOfProjects := 5
storageHours := 24
user, err := satellite.AddUser(ctx, "testuser", "user@test", numberOfProjects)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser",
Email: "user@test",
}, numberOfProjects)
require.NoError(t, err)
projects := make([]*console.Project, numberOfProjects)
@ -213,7 +219,10 @@ func TestService_InvoiceUserWithManyCoupons(t *testing.T) {
storageHours := 24
user, err := satellite.AddUser(ctx, "testuser", "user@test", 5)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser",
Email: "user@test",
}, 5)
require.NoError(t, err)
project, err := satellite.AddProject(ctx, user.ID, "testproject")
@ -320,7 +329,10 @@ func TestService_ApplyCouponsInTheOrder(t *testing.T) {
})
start := time.Date(period.Year(), period.Month(), 1, 0, 0, 0, 0, time.UTC)
user, err := satellite.AddUser(ctx, "testuser", "user@test", 5)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser",
Email: "user@test",
}, 5)
require.NoError(t, err)
project, err := satellite.AddProject(ctx, user.ID, "testproject")
@ -446,7 +458,10 @@ func TestService_CouponStatus(t *testing.T) {
} {
errTag := fmt.Sprintf("%d. %+v", i, tt)
user, err := satellite.AddUser(ctx, "testuser"+strconv.Itoa(i), "test@test"+strconv.Itoa(i), 1)
user, err := satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser" + strconv.Itoa(i),
Email: "test@test" + strconv.Itoa(i),
}, 1)
require.NoError(t, err, errTag)
project, err := satellite.AddProject(ctx, user.ID, "testproject-"+strconv.Itoa(i))
@ -516,7 +531,11 @@ func TestService_ProjectsWithMembers(t *testing.T) {
projects := make([]*console.Project, numberOfUsers)
for i := 0; i < numberOfUsers; i++ {
var err error
users[i], err = satellite.AddUser(ctx, "testuser"+strconv.Itoa(i), "user@test"+strconv.Itoa(i), 1)
users[i], err = satellite.AddUser(ctx, console.CreateUser{
FullName: "testuser" + strconv.Itoa(i),
Email: "user@test" + strconv.Itoa(i),
}, 1)
require.NoError(t, err)
projects[i], err = satellite.AddProject(ctx, users[i].ID, "testproject-"+strconv.Itoa(i))