From d8e62bc06e7fe21a23d4e7aafbfea0f3b5b8bcf1 Mon Sep 17 00:00:00 2001 From: aligeti <34487396+aligeti@users.noreply.github.com> Date: Fri, 21 Jun 2019 15:14:34 -0400 Subject: [PATCH] support value attribution endpoint (#2231) Support Value attribution Endpoint --- docs/design/value-attribution.md | 3 +- satellite/attribution/db.go | 4 ++ satellite/metainfo/metainfo.go | 68 ++++++++++++++++++---------- satellite/peer.go | 1 + satellite/satellitedb/attribution.go | 4 ++ 5 files changed, 55 insertions(+), 25 deletions(-) diff --git a/docs/design/value-attribution.md b/docs/design/value-attribution.md index 2bf0cb164..76d9915a9 100644 --- a/docs/design/value-attribution.md +++ b/docs/design/value-attribution.md @@ -46,7 +46,7 @@ Our partners will have connectors that their customers will use to store data on ### Connector -Each partner will have a registered id, (which we will refer to as the partner id) that will identify a partners connector on the Storj network. When a user uploads data to a specified bucket through the connector, the connector will include the partner id in the context of the request. Before an upload occurs, the uplink will communicate the partner id and bucket name with the tardigrade satellite, checking for a previous attribution. If no attribution is found on the specified bucket and the bucket is currently void of data, the satellite will attribute the partners id to that bucket within the metadata struct. Concurrently to updating the metadata struct the satelitte will add the necessary data to the Attribution table. +Each partner will have a registered id, (which we will refer to as the partner id) that will identify a partners connector on the Storj network. When a user uploads data to a specified bucket through the connector, the connector will include the partner id in the content of the GRPC request. Before an upload occurs, the uplink will communicate the partner id and bucket name with the tardigrade satellite, checking for a previous attribution. If no attribution is found on the specified bucket and the bucket is currently void of data, the satellite will attribute the partners id to that bucket within the metadata struct. Concurrently to updating the metadata struct the satelitte will add the necessary data to the Attribution table. ### Database @@ -57,7 +57,6 @@ The attribution table will consist of data that allows for ease of calculating t | --------------- | ------------- | | project_id (pk) | uuid | | bucket_name(pk) | bytes | -| user_id | uuid | | partner_id | uuid | | at_rest_data | integer | | egress_data | integer | diff --git a/satellite/attribution/db.go b/satellite/attribution/db.go index e64b70a3e..9792b817d 100644 --- a/satellite/attribution/db.go +++ b/satellite/attribution/db.go @@ -9,8 +9,12 @@ import ( "time" "github.com/skyrings/skyring-common/tools/uuid" + "github.com/zeebo/errs" ) +// ErrBucketNotAttributed is returned if a requested bucket not attributed(entry not found) +var ErrBucketNotAttributed = errs.Class("bucket not attributed") + // Info describing value attribution from partner to bucket type Info struct { ProjectID uuid.UUID diff --git a/satellite/metainfo/metainfo.go b/satellite/metainfo/metainfo.go index 4ffdfa904..87200c802 100644 --- a/satellite/metainfo/metainfo.go +++ b/satellite/metainfo/metainfo.go @@ -23,6 +23,7 @@ import ( "storj.io/storj/pkg/overlay" "storj.io/storj/pkg/pb" "storj.io/storj/pkg/storj" + "storj.io/storj/satellite/attribution" "storj.io/storj/satellite/console" "storj.io/storj/satellite/orders" "storj.io/storj/storage" @@ -55,6 +56,7 @@ type Endpoint struct { metainfo *Service orders *orders.Service cache *overlay.Cache + partnerinfo attribution.DB projectUsage *accounting.ProjectUsage containment Containment apiKeys APIKeys @@ -63,14 +65,15 @@ type Endpoint struct { } // NewEndpoint creates new metainfo endpoint instance -func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, containment Containment, - apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint { +func NewEndpoint(log *zap.Logger, metainfo *Service, orders *orders.Service, cache *overlay.Cache, partnerinfo attribution.DB, + containment Containment, apiKeys APIKeys, projectUsage *accounting.ProjectUsage, rsConfig RSConfig) *Endpoint { // TODO do something with too many params return &Endpoint{ log: log, metainfo: metainfo, orders: orders, cache: cache, + partnerinfo: partnerinfo, containment: containment, apiKeys: apiKeys, projectUsage: projectUsage, @@ -513,24 +516,12 @@ func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bu func (endpoint *Endpoint) SetAttribution(ctx context.Context, req *pb.SetAttributionRequest) (_ *pb.SetAttributionResponse, err error) { defer mon.Task()(&ctx)(&err) - _, err = endpoint.checkBucketPointers(ctx, req) + // try to add an attribution that doesn't exist + partnerID, err := bytesToUUID(req.GetPartnerId()) if err != nil { - // TODO: return correct status code for GRPC - endpoint.log.Sugar().Debug("related bucket id already attributed \n") - return &pb.SetAttributionResponse{}, err + return nil, Error.Wrap(err) } - // TODO: add valueattribution DB access functions added in new PR. - - return &pb.SetAttributionResponse{}, nil -} - -// checks if bucket has any pointers(entries) -func (endpoint *Endpoint) checkBucketPointers(ctx context.Context, req *pb.SetAttributionRequest) (resp bool, err error) { - //TODO: Logic of checking if bucket exists will be added in new PR. - // write into value attribution DB only if bucket exists but no segments or no bucket and no segments exits - defer mon.Task()(&ctx)(&err) - keyInfo, err := endpoint.validateAuth(ctx, macaroon.Action{ Op: macaroon.ActionList, Bucket: req.BucketName, @@ -538,22 +529,53 @@ func (endpoint *Endpoint) checkBucketPointers(ctx context.Context, req *pb.SetAt Time: time.Now(), }) if err != nil { - return false, status.Errorf(codes.Unauthenticated, err.Error()) + return nil, status.Errorf(codes.Unauthenticated, err.Error()) + } + + // check if attribution is set for given bucket + _, err = endpoint.partnerinfo.Get(ctx, keyInfo.ProjectID, req.GetBucketName()) + if err == nil { + return nil, Error.New("Bucket(%s) , PartnerID(%s) cannot be attributed", string(req.BucketName), string(req.PartnerId)) + } + + if !attribution.ErrBucketNotAttributed.Has(err) { + // try only to set the attribution, when it's missing + return nil, Error.Wrap(err) } prefix, err := CreatePath(ctx, keyInfo.ProjectID, -1, req.BucketName, []byte("")) if err != nil { - return false, err + return nil, Error.Wrap(err) } - items, _, err := endpoint.metainfo.List(ctx, prefix, string(""), string(""), true, 1, 0) + items, _, err := endpoint.metainfo.List(ctx, prefix, "", "", true, 1, 0) if err != nil { - return false, err + return nil, Error.Wrap(err) } if len(items) > 0 { - return false, errors.New("already attributed") + return nil, Error.New("Bucket(%q) , PartnerID(%s) cannot be attributed", req.BucketName, req.PartnerId) } - return true, nil + _, err = endpoint.partnerinfo.Insert(ctx, &attribution.Info{ + ProjectID: keyInfo.ProjectID, + BucketName: req.GetBucketName(), + PartnerID: partnerID, + }) + if err != nil { + return nil, Error.Wrap(err) + } + return &pb.SetAttributionResponse{}, nil +} + +// bytesToUUID is used to convert []byte to UUID +func bytesToUUID(data []byte) (uuid.UUID, error) { + var id uuid.UUID + + copy(id[:], data) + if len(id) != len(data) { + return uuid.UUID{}, errs.New("Invalid uuid") + } + + return id, nil } diff --git a/satellite/peer.go b/satellite/peer.go index 22ac83791..0dbdbda42 100644 --- a/satellite/peer.go +++ b/satellite/peer.go @@ -413,6 +413,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, config *Config, ve peer.Metainfo.Service, peer.Orders.Service, peer.Overlay.Service, + peer.DB.Attribution(), peer.DB.Containment(), peer.DB.Console().APIKeys(), peer.Accounting.ProjectUsage, diff --git a/satellite/satellitedb/attribution.go b/satellite/satellitedb/attribution.go index bc41735d2..754b7d5bd 100644 --- a/satellite/satellitedb/attribution.go +++ b/satellite/satellitedb/attribution.go @@ -5,6 +5,7 @@ package satellitedb import ( "context" + "database/sql" "github.com/skyrings/skyring-common/tools/uuid" @@ -24,6 +25,9 @@ func (keys *attributionDB) Get(ctx context.Context, projectID uuid.UUID, bucketN dbx.ValueAttribution_ProjectId(projectID[:]), dbx.ValueAttribution_BucketName(bucketName), ) + if err == sql.ErrNoRows { + return nil, attribution.ErrBucketNotAttributed.New(string(bucketName)) + } if err != nil { return nil, Error.Wrap(err) }