support value attribution endpoint (#2231)

Support Value attribution Endpoint
This commit is contained in:
aligeti 2019-06-21 15:14:34 -04:00 committed by GitHub
parent 9304817927
commit d8e62bc06e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 25 deletions

View File

@ -46,7 +46,7 @@ Our partners will have connectors that their customers will use to store data on
### Connector
Each partner will have a registered id, (which we will refer to as the partner id) that will identify a partners connector on the Storj network. When a user uploads data to a specified bucket through the connector, the connector will include the partner id in the context of the request. Before an upload occurs, the uplink will communicate the partner id and bucket name with the tardigrade satellite, checking for a previous attribution. If no attribution is found on the specified bucket and the bucket is currently void of data, the satellite will attribute the partners id to that bucket within the metadata struct. Concurrently to updating the metadata struct the satelitte will add the necessary data to the Attribution table.
Each partner will have a registered id, (which we will refer to as the partner id) that will identify a partners connector on the Storj network. When a user uploads data to a specified bucket through the connector, the connector will include the partner id in the content of the GRPC request. Before an upload occurs, the uplink will communicate the partner id and bucket name with the tardigrade satellite, checking for a previous attribution. If no attribution is found on the specified bucket and the bucket is currently void of data, the satellite will attribute the partners id to that bucket within the metadata struct. Concurrently to updating the metadata struct the satelitte will add the necessary data to the Attribution table.
### Database
@ -57,7 +57,6 @@ The attribution table will consist of data that allows for ease of calculating t
| --------------- | ------------- |
| project_id (pk) | uuid |
| bucket_name(pk) | bytes |
| user_id | uuid |
| partner_id | uuid |
| at_rest_data | integer |
| egress_data | integer |

View File

@ -9,8 +9,12 @@ import (
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/zeebo/errs"
)
// ErrBucketNotAttributed is returned if a requested bucket not attributed(entry not found)
var ErrBucketNotAttributed = errs.Class("bucket not attributed")
// Info describing value attribution from partner to bucket
type Info struct {
ProjectID uuid.UUID

View File

@ -23,6 +23,7 @@ import (
"storj.io/storj/pkg/overlay"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/attribution"
"storj.io/storj/satellite/console"
"storj.io/storj/satellite/orders"
"storj.io/storj/storage"
@ -55,6 +56,7 @@ type Endpoint struct {
metainfo *Service
orders *orders.Service
cache *overlay.Cache
partnerinfo attribution.DB
projectUsage *accounting.ProjectUsage
containment Containment
apiKeys APIKeys
@ -63,14 +65,15 @@ type Endpoint struct {
}
// NewEndpoint creates new metainfo endpoint instance
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, containment Containment,
apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint {
func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, partnerinfo attribution.DB,
containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint {
// TODO do something with too many params
return &Endpoint{
log: log,
metainfo: metainfo,
orders: orders,
cache: cache,
partnerinfo: partnerinfo,
containment: containment,
apiKeys: apiKeys,
projectUsage: projectUsage,
@ -513,24 +516,12 @@ func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bu
func (endpoint *Endpoint) SetAttribution(ctx context.Context, req *pb.SetAttributionRequest) (_ *pb.SetAttributionResponse, err error) {
defer mon.Task()(&ctx)(&err)
_, err = endpoint.checkBucketPointers(ctx, req)
// try to add an attribution that doesn't exist
partnerID, err := bytesToUUID(req.GetPartnerId())
if err != nil {
// TODO: return correct status code for GRPC
endpoint.log.Sugar().Debug("related bucket id already attributed \n")
return &pb.SetAttributionResponse{}, err
return nil, Error.Wrap(err)
}
// TODO: add valueattribution DB access functions added in new PR.
return &pb.SetAttributionResponse{}, nil
}
// checks if bucket has any pointers(entries)
func (endpoint *Endpoint) checkBucketPointers(ctx context.Context, req *pb.SetAttributionRequest) (resp bool, err error) {
//TODO: Logic of checking if bucket exists will be added in new PR.
// write into value attribution DB only if bucket exists but no segments or no bucket and no segments exits
defer mon.Task()(&ctx)(&err)
keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{
Op: macaroon.ActionList,
Bucket: req.BucketName,
@ -538,22 +529,53 @@ func (endpoint *Endpoint) checkBucketPointers(ctx context.Context, req *pb.SetAt
Time: time.Now(),
})
if err != nil {
return false, status.Errorf(codes.Unauthenticated, err.Error())
return nil, status.Errorf(codes.Unauthenticated, err.Error())
}
// check if attribution is set for given bucket
_, err = endpoint.partnerinfo.Get(ctx, keyInfo.ProjectID, req.GetBucketName())
if err == nil {
return nil, Error.New("Bucket(%s) , PartnerID(%s) cannot be attributed", string(req.BucketName), string(req.PartnerId))
}
if !attribution.ErrBucketNotAttributed.Has(err) {
// try only to set the attribution, when it's missing
return nil, Error.Wrap(err)
}
prefix, err := CreatePath(ctx, keyInfo.ProjectID, -1, req.BucketName, []byte(""))
if err != nil {
return false, err
return nil, Error.Wrap(err)
}
items, _, err := endpoint.metainfo.List(ctx, prefix, string(""), string(""), true, 1, 0)
items, _, err := endpoint.metainfo.List(ctx, prefix, "", "", true, 1, 0)
if err != nil {
return false, err
return nil, Error.Wrap(err)
}
if len(items) > 0 {
return false, errors.New("already attributed")
return nil, Error.New("Bucket(%q) , PartnerID(%s) cannot be attributed", req.BucketName, req.PartnerId)
}
return true, nil
_, err = endpoint.partnerinfo.Insert(ctx, &attribution.Info{
ProjectID: keyInfo.ProjectID,
BucketName: req.GetBucketName(),
PartnerID: partnerID,
})
if err != nil {
return nil, Error.Wrap(err)
}
return &pb.SetAttributionResponse{}, nil
}
// bytesToUUID is used to convert []byte to UUID
func bytesToUUID(data []byte) (uuid.UUID, error) {
var id uuid.UUID
copy(id[:], data)
if len(id) != len(data) {
return uuid.UUID{}, errs.New("Invalid uuid")
}
return id, nil
}

View File

@ -413,6 +413,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve
peer.Metainfo.Service,
peer.Orders.Service,
peer.Overlay.Service,
peer.DB.Attribution(),
peer.DB.Containment(),
peer.DB.Console().APIKeys(),
peer.Accounting.ProjectUsage,

View File

@ -5,6 +5,7 @@ package satellitedb
import (
"context"
"database/sql"
"github.com/skyrings/skyring-common/tools/uuid"
@ -24,6 +25,9 @@ func (keys *attributionDB) Get(ctx context.Context, projectID uuid.UUID, bucketN
dbx.ValueAttribution_ProjectId(projectID[:]),
dbx.ValueAttribution_BucketName(bucketName),
)
if err == sql.ErrNoRows {
return nil, attribution.ErrBucketNotAttributed.New(string(bucketName))
}
if err != nil {
return nil, Error.Wrap(err)
}