satellite/buckets: add new buckets service
The main motivation is to wrap the bucket DB and metainfo DB, so we could check if a bucket is empty before applying geofencing config. Change-Id: I8bac21555e01d51a663fb557bc1acfc8106bc2e1
This commit is contained in:
parent
09568b3e2b
commit
814e3126fa
@ -11,6 +11,7 @@ import (
|
||||
"storj.io/private/process"
|
||||
"storj.io/private/version"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/satellitedb"
|
||||
)
|
||||
|
||||
@ -37,7 +38,18 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
|
||||
err = errs.Combine(err, db.Close())
|
||||
}()
|
||||
|
||||
peer, err := satellite.NewAdmin(log, identity, db, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
|
||||
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL, metabase.Config{
|
||||
MinPartSize: runCfg.Config.Metainfo.MinPartSize,
|
||||
MaxNumberOfParts: runCfg.Config.Metainfo.MaxNumberOfParts,
|
||||
})
|
||||
if err != nil {
|
||||
return errs.New("Error creating metabase connection on satellite api: %+v", err)
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, metabaseDB.Close())
|
||||
}()
|
||||
|
||||
peer, err := satellite.NewAdmin(log, identity, db, metabaseDB, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -51,6 +63,12 @@ func cmdAdminRun(cmd *cobra.Command, args []string) (err error) {
|
||||
log.Warn("Failed to initialize telemetry batcher on satellite admin", zap.Error(err))
|
||||
}
|
||||
|
||||
err = metabaseDB.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Error("Failed metabase database version check.", zap.Error(err))
|
||||
return errs.New("failed metabase version check: %+v", err)
|
||||
}
|
||||
|
||||
err = db.CheckVersion(ctx)
|
||||
if err != nil {
|
||||
log.Error("Failed satellite database version check.", zap.Error(err))
|
||||
|
@ -531,7 +531,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
||||
return nil, err
|
||||
}
|
||||
|
||||
adminPeer, err := planet.newAdmin(ctx, index, identity, db, config, versionInfo)
|
||||
adminPeer, err := planet.newAdmin(ctx, index, identity, db, metabaseDB, config, versionInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -649,13 +649,13 @@ func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.
|
||||
return satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
|
||||
}
|
||||
|
||||
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Admin, err error) {
|
||||
func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Admin, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
prefix := "satellite-admin" + strconv.Itoa(index)
|
||||
log := planet.log.Named(prefix)
|
||||
|
||||
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
|
||||
return satellite.NewAdmin(log, identity, db, metabaseDB, versionInfo, &config, nil)
|
||||
}
|
||||
|
||||
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.Repairer, err error) {
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"storj.io/storj/private/lifecycle"
|
||||
"storj.io/storj/private/version/checker"
|
||||
"storj.io/storj/satellite/admin"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/payments"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
)
|
||||
@ -32,6 +34,7 @@ type Admin struct {
|
||||
Log *zap.Logger
|
||||
Identity *identity.FullIdentity
|
||||
DB DB
|
||||
MetabaseDB *metabase.DB
|
||||
|
||||
Servers *lifecycle.Group
|
||||
Services *lifecycle.Group
|
||||
@ -56,20 +59,29 @@ type Admin struct {
|
||||
Listener net.Listener
|
||||
Server *admin.Server
|
||||
}
|
||||
|
||||
Buckets struct {
|
||||
Service *buckets.Service
|
||||
}
|
||||
}
|
||||
|
||||
// NewAdmin creates a new satellite admin peer.
|
||||
func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *metabase.DB,
|
||||
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Admin, error) {
|
||||
peer := &Admin{
|
||||
Log: log,
|
||||
Identity: full,
|
||||
DB: db,
|
||||
MetabaseDB: metabaseDB,
|
||||
|
||||
Servers: lifecycle.NewGroup(log.Named("servers")),
|
||||
Services: lifecycle.NewGroup(log.Named("services")),
|
||||
}
|
||||
|
||||
{
|
||||
peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
|
||||
}
|
||||
|
||||
{ // setup debug
|
||||
var err error
|
||||
if config.Debug.Address != "" {
|
||||
@ -152,7 +164,7 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
adminConfig := config.Admin
|
||||
adminConfig.AuthorizationToken = config.Console.AuthToken
|
||||
|
||||
peer.Admin.Server = admin.NewServer(log.Named("admin"), peer.Admin.Listener, peer.DB, peer.Payments.Accounts, adminConfig)
|
||||
peer.Admin.Server = admin.NewServer(log.Named("admin"), peer.Admin.Listener, peer.DB, peer.Buckets.Service, peer.Payments.Accounts, adminConfig)
|
||||
peer.Servers.Add(lifecycle.Item{
|
||||
Name: "admin",
|
||||
Run: peer.Admin.Server.Run,
|
||||
|
@ -445,7 +445,7 @@ func (server *Server) deleteProject(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
options := storj.BucketListOptions{Limit: 1, Direction: storj.Forward}
|
||||
buckets, err := server.db.Buckets().ListBuckets(ctx, projectUUID, options, macaroon.AllowedBuckets{All: true})
|
||||
buckets, err := server.buckets.ListBuckets(ctx, projectUUID, options, macaroon.AllowedBuckets{All: true})
|
||||
if err != nil {
|
||||
sendJSONError(w, "unable to list buckets",
|
||||
err.Error(), http.StatusInternalServerError)
|
||||
|
@ -292,7 +292,7 @@ func TestProjectDelete(t *testing.T) {
|
||||
projectID := planet.Uplinks[0].Projects[0].ID
|
||||
|
||||
// Ensure there are no buckets left
|
||||
buckets, err := planet.Satellites[0].DB.Buckets().ListBuckets(ctx, projectID, storj.BucketListOptions{Limit: 1, Direction: storj.Forward}, macaroon.AllowedBuckets{All: true})
|
||||
buckets, err := planet.Satellites[0].API.Buckets.Service.ListBuckets(ctx, projectID, storj.BucketListOptions{Limit: 1, Direction: storj.Forward}, macaroon.AllowedBuckets{All: true})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, buckets.Items, 0)
|
||||
|
||||
|
@ -20,8 +20,8 @@ import (
|
||||
|
||||
"storj.io/common/errs2"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/payments"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
)
|
||||
@ -47,8 +47,6 @@ type DB interface {
|
||||
Console() console.DB
|
||||
// StripeCoinPayments returns database for satellite stripe coin payments
|
||||
StripeCoinPayments() stripecoinpayments.DB
|
||||
// Buckets returns database for satellite buckets
|
||||
Buckets() metainfo.BucketsDB
|
||||
}
|
||||
|
||||
// Server provides endpoints for administrative tasks.
|
||||
@ -60,6 +58,7 @@ type Server struct {
|
||||
|
||||
db DB
|
||||
payments payments.Accounts
|
||||
buckets *buckets.Service
|
||||
|
||||
nowFn func() time.Time
|
||||
|
||||
@ -67,7 +66,7 @@ type Server struct {
|
||||
}
|
||||
|
||||
// NewServer returns a new administration Server.
|
||||
func NewServer(log *zap.Logger, listener net.Listener, db DB, accounts payments.Accounts, config Config) *Server {
|
||||
func NewServer(log *zap.Logger, listener net.Listener, db DB, buckets *buckets.Service, accounts payments.Accounts, config Config) *Server {
|
||||
server := &Server{
|
||||
log: log,
|
||||
|
||||
@ -75,6 +74,7 @@ func NewServer(log *zap.Logger, listener net.Listener, db DB, accounts payments.
|
||||
|
||||
db: db,
|
||||
payments: accounts,
|
||||
buckets: buckets,
|
||||
|
||||
nowFn: time.Now,
|
||||
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"storj.io/storj/private/version/checker"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/analytics"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/console/consoleauth"
|
||||
"storj.io/storj/satellite/console/consoleweb"
|
||||
@ -161,6 +162,10 @@ type API struct {
|
||||
Analytics struct {
|
||||
Service *analytics.Service
|
||||
}
|
||||
|
||||
Buckets struct {
|
||||
Service *buckets.Service
|
||||
}
|
||||
}
|
||||
|
||||
// NewAPI creates a new satellite API process.
|
||||
@ -178,6 +183,10 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
Services: lifecycle.NewGroup(log.Named("services")),
|
||||
}
|
||||
|
||||
{ // setup buckets service
|
||||
peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
|
||||
}
|
||||
|
||||
{ // setup debug
|
||||
var err error
|
||||
if config.Debug.Address != "" {
|
||||
@ -337,7 +346,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
signing.SignerFromFullIdentity(peer.Identity),
|
||||
peer.Overlay.Service,
|
||||
peer.Orders.DB,
|
||||
peer.DB.Buckets(),
|
||||
peer.Buckets.Service,
|
||||
config.Orders,
|
||||
)
|
||||
if err != nil {
|
||||
@ -395,7 +404,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
|
||||
peer.Metainfo.Endpoint, err = metainfo.NewEndpoint(
|
||||
peer.Log.Named("metainfo:endpoint"),
|
||||
peer.DB.Buckets(),
|
||||
peer.Buckets.Service,
|
||||
peer.Metainfo.Metabase,
|
||||
peer.Metainfo.PieceDeletion,
|
||||
peer.Orders.Service,
|
||||
@ -567,7 +576,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
peer.DB.Console(),
|
||||
peer.DB.ProjectAccounting(),
|
||||
peer.Accounting.ProjectUsage,
|
||||
peer.DB.Buckets(),
|
||||
peer.Buckets.Service,
|
||||
peer.Marketing.PartnersService,
|
||||
peer.Payments.Accounts,
|
||||
peer.Analytics.Service,
|
||||
|
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metainfo
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -19,13 +19,13 @@ type Bucket struct {
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// BucketsDB is the interface for the database to interact with buckets.
|
||||
// DB is the interface for the database to interact with buckets.
|
||||
//
|
||||
// architecture: Database
|
||||
type BucketsDB interface {
|
||||
// Create creates a new bucket
|
||||
type DB interface {
|
||||
// CreateBucket creates a new bucket
|
||||
CreateBucket(ctx context.Context, bucket storj.Bucket) (_ storj.Bucket, err error)
|
||||
// Get returns an existing bucket
|
||||
// GetBucket returns an existing bucket
|
||||
GetBucket(ctx context.Context, bucketName []byte, projectID uuid.UUID) (bucket storj.Bucket, err error)
|
||||
// GetMinimalBucket returns existing bucket with minimal number of fields.
|
||||
GetMinimalBucket(ctx context.Context, bucketName []byte, projectID uuid.UUID) (bucket Bucket, err error)
|
||||
@ -35,9 +35,9 @@ type BucketsDB interface {
|
||||
GetBucketID(ctx context.Context, bucket metabase.BucketLocation) (id uuid.UUID, err error)
|
||||
// UpdateBucket updates an existing bucket
|
||||
UpdateBucket(ctx context.Context, bucket storj.Bucket) (_ storj.Bucket, err error)
|
||||
// Delete deletes a bucket
|
||||
// DeleteBucket deletes a bucket
|
||||
DeleteBucket(ctx context.Context, bucketName []byte, projectID uuid.UUID) (err error)
|
||||
// List returns all buckets for a project
|
||||
// ListBuckets returns all buckets for a project
|
||||
ListBuckets(ctx context.Context, projectID uuid.UUID, listOpts storj.BucketListOptions, allowedBuckets macaroon.AllowedBuckets) (bucketList storj.BucketList, err error)
|
||||
// CountBuckets returns the number of buckets a project currently has
|
||||
CountBuckets(ctx context.Context, projectID uuid.UUID) (int, error)
|
@ -1,7 +1,7 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package metainfo_test
|
||||
package buckets_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -13,9 +13,8 @@ import (
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func newTestBucket(name string, projectID uuid.UUID) storj.Bucket {
|
||||
@ -41,12 +40,15 @@ func newTestBucket(name string, projectID uuid.UUID) storj.Bucket {
|
||||
}
|
||||
|
||||
func TestBasicBucketOperations(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
db := sat.DB
|
||||
consoleDB := db.Console()
|
||||
|
||||
project, err := consoleDB.Projects().Insert(ctx, &console.Project{Name: "testproject1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketsDB := db.Buckets()
|
||||
bucketsDB := sat.API.Buckets.Service
|
||||
expectedBucket := newTestBucket("testbucket", project.ID)
|
||||
|
||||
count, err := bucketsDB.CountBuckets(ctx, project.ID)
|
||||
@ -109,12 +111,15 @@ func TestListBucketsAllAllowed(t *testing.T) {
|
||||
{"non matching cursor, more", "ccc", 3, 3, true},
|
||||
{"first bucket cursor, more", "0test", 5, 5, true},
|
||||
}
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
db := sat.DB
|
||||
consoleDB := db.Console()
|
||||
|
||||
project, err := consoleDB.Projects().Insert(ctx, &console.Project{Name: "testproject1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketsDB := db.Buckets()
|
||||
bucketsDB := sat.API.Buckets.Service
|
||||
|
||||
allowedBuckets := macaroon.AllowedBuckets{
|
||||
Buckets: map[string]struct{}{},
|
||||
@ -169,12 +174,15 @@ func TestListBucketsNotAllowed(t *testing.T) {
|
||||
{"last bucket cursor, allow all", "zzz", 2, 1, false, true, map[string]struct{}{"zzz": {}}, []string{"zzz"}},
|
||||
{"empty string cursor, allow all, more", "", 5, 5, true, true, map[string]struct{}{"": {}}, []string{"123", "0test", "999", "aaa", "bbb"}},
|
||||
}
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
db := sat.DB
|
||||
consoleDB := db.Console()
|
||||
|
||||
project, err := consoleDB.Projects().Insert(ctx, &console.Project{Name: "testproject1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketsDB := db.Buckets()
|
||||
bucketsDB := sat.API.Buckets.Service
|
||||
|
||||
{ // setup some test buckets
|
||||
var testBucketNames = []string{"aaa", "bbb", "mmm", "qqq", "zzz",
|
58
satellite/buckets/service.go
Normal file
58
satellite/buckets/service.go
Normal file
@ -0,0 +1,58 @@
|
||||
// Copyright (C) 2021 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/common/storj"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBucketNotEmpty is returned when a caller attempts to change placement constraints.
|
||||
ErrBucketNotEmpty = errs.Class("bucket must be empty")
|
||||
)
|
||||
|
||||
// NewService converts the provided db and metabase calls into a single DB interface.
|
||||
func NewService(bucketsDB DB, metabase *metabase.DB) *Service {
|
||||
return &Service{
|
||||
DB: bucketsDB,
|
||||
metabase: metabase,
|
||||
}
|
||||
}
|
||||
|
||||
// Service encapsulates operations around buckets.
|
||||
type Service struct {
|
||||
DB
|
||||
metabase *metabase.DB
|
||||
}
|
||||
|
||||
// UpdateBucket overrides the default UpdateBucket behaviour by adding a check against MetabaseDB to ensure the bucket
|
||||
// is empty before attempting to change the placement constraint of a bucket. If the placement constraint is not being
|
||||
// changed, then this additional check is skipped.
|
||||
func (buckets *Service) UpdateBucket(ctx context.Context, bucket storj.Bucket) (storj.Bucket, error) {
|
||||
current, err := buckets.GetBucket(ctx, []byte(bucket.Name), bucket.ProjectID)
|
||||
if err != nil {
|
||||
return storj.Bucket{}, err
|
||||
}
|
||||
|
||||
if current.Placement != bucket.Placement {
|
||||
ok, err := buckets.metabase.BucketEmpty(ctx, metabase.BucketEmpty{
|
||||
ProjectID: bucket.ProjectID,
|
||||
BucketName: bucket.Name,
|
||||
})
|
||||
|
||||
switch {
|
||||
case err != nil:
|
||||
return storj.Bucket{}, err
|
||||
case !ok:
|
||||
return storj.Bucket{}, ErrBucketNotEmpty.New("cannot modify placement constraint for non-empty bucket")
|
||||
}
|
||||
}
|
||||
|
||||
return buckets.DB.UpdateBucket(ctx, bucket)
|
||||
}
|
@ -12,13 +12,15 @@ import (
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestUsers(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
buckets := sat.API.Buckets.Service
|
||||
db := sat.DB
|
||||
consoleDB := db.Console()
|
||||
|
||||
// create user
|
||||
@ -75,7 +77,7 @@ func TestUsers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// create a bucket with no partnerID
|
||||
_, err = db.Buckets().CreateBucket(ctx, storj.Bucket{
|
||||
_, err = buckets.CreateBucket(ctx, storj.Bucket{
|
||||
ID: testrand.UUID(),
|
||||
Name: "testbucket",
|
||||
ProjectID: proj.ID,
|
||||
@ -86,7 +88,7 @@ func TestUsers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// update a bucket with partnerID
|
||||
bucket, err := db.Buckets().UpdateBucket(ctx, storj.Bucket{
|
||||
bucket, err := buckets.UpdateBucket(ctx, storj.Bucket{
|
||||
ID: testrand.UUID(),
|
||||
Name: "testbucket",
|
||||
ProjectID: proj.ID,
|
||||
@ -98,7 +100,7 @@ func TestUsers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, proj.ID, bucket.PartnerID)
|
||||
|
||||
bucket, err = db.Buckets().GetBucket(ctx, []byte("testbucket"), proj.ID)
|
||||
bucket, err = buckets.GetBucket(ctx, []byte("testbucket"), proj.ID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, proj.ID, bucket.PartnerID)
|
||||
})
|
||||
|
@ -57,10 +57,10 @@ func Test_AllBucketNames(t *testing.T) {
|
||||
ProjectID: project.ID,
|
||||
}
|
||||
|
||||
_, err = sat.DB.Buckets().CreateBucket(ctx, bucket1)
|
||||
_, err = sat.API.Buckets.Service.CreateBucket(ctx, bucket1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sat.DB.Buckets().CreateBucket(ctx, bucket2)
|
||||
_, err = sat.API.Buckets.Service.CreateBucket(ctx, bucket2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// we are using full name as a password
|
||||
|
@ -18,8 +18,8 @@ import (
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/private/post"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/private/testredis"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/satellite/analytics"
|
||||
@ -30,7 +30,6 @@ import (
|
||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
"storj.io/storj/satellite/rewards"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
// discardSender discard sending of an actual email.
|
||||
@ -47,7 +46,9 @@ func (*discardSender) FromAddress() post.Address {
|
||||
}
|
||||
|
||||
func TestGraphqlMutation(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
db := sat.DB
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
partnersService := rewards.NewPartnersService(
|
||||
@ -98,7 +99,7 @@ func TestGraphqlMutation(t *testing.T) {
|
||||
db.Console(),
|
||||
db.ProjectAccounting(),
|
||||
projectUsage,
|
||||
db.Buckets(),
|
||||
sat.API.Buckets.Service,
|
||||
partnersService,
|
||||
paymentsService.Accounts(),
|
||||
analyticsService,
|
||||
|
@ -15,8 +15,8 @@ import (
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/private/testredis"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/accounting/live"
|
||||
"storj.io/storj/satellite/analytics"
|
||||
@ -27,11 +27,12 @@ import (
|
||||
"storj.io/storj/satellite/payments/paymentsconfig"
|
||||
"storj.io/storj/satellite/payments/stripecoinpayments"
|
||||
"storj.io/storj/satellite/rewards"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
)
|
||||
|
||||
func TestGraphqlQuery(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
testplanet.Run(t, testplanet.Config{SatelliteCount: 1}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
db := sat.DB
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
partnersService := rewards.NewPartnersService(
|
||||
@ -82,7 +83,7 @@ func TestGraphqlQuery(t *testing.T) {
|
||||
db.Console(),
|
||||
db.ProjectAccounting(),
|
||||
projectUsage,
|
||||
db.Buckets(),
|
||||
sat.API.Buckets.Service,
|
||||
partnersService,
|
||||
paymentsService.Accounts(),
|
||||
analyticsService,
|
||||
|
@ -246,10 +246,10 @@ func TestService(t *testing.T) {
|
||||
ProjectID: up2Pro1.ID,
|
||||
}
|
||||
|
||||
_, err := sat.DB.Buckets().CreateBucket(authCtx2, bucket1)
|
||||
_, err := sat.API.Buckets.Service.CreateBucket(authCtx2, bucket1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sat.DB.Buckets().CreateBucket(authCtx2, bucket2)
|
||||
_, err = sat.API.Buckets.Service.CreateBucket(authCtx2, bucket2)
|
||||
require.NoError(t, err)
|
||||
|
||||
bucketNames, err := service.GetAllBucketNames(authCtx2, up2Pro1.ID)
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"storj.io/storj/satellite/accounting/rolluparchive"
|
||||
"storj.io/storj/satellite/accounting/tally"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metabase/segmentloop"
|
||||
@ -135,6 +136,10 @@ type Core struct {
|
||||
Metrics struct {
|
||||
Chore *metrics.Chore
|
||||
}
|
||||
|
||||
Buckets struct {
|
||||
Service *buckets.Service
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new satellite.
|
||||
@ -151,6 +156,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
Services: lifecycle.NewGroup(log.Named("services")),
|
||||
}
|
||||
|
||||
{ // setup buckets service
|
||||
peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
|
||||
}
|
||||
|
||||
{ // setup debug
|
||||
var err error
|
||||
if config.Debug.Address != "" {
|
||||
@ -240,7 +249,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
|
||||
signing.SignerFromFullIdentity(peer.Identity),
|
||||
peer.Overlay.Service,
|
||||
peer.Orders.DB,
|
||||
peer.DB.Buckets(),
|
||||
peer.Buckets.Service,
|
||||
config.Orders,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -114,7 +114,7 @@ func TestBucketAttribution(t *testing.T) {
|
||||
_, err = project.CreateBucket(ctx, "bucket")
|
||||
require.NoError(t, err, errTag)
|
||||
|
||||
bucketInfo, err := satellite.DB.Buckets().GetBucket(ctx, []byte("bucket"), satProject.ID)
|
||||
bucketInfo, err := satellite.API.Buckets.Service.GetBucket(ctx, []byte("bucket"), satProject.ID)
|
||||
require.NoError(t, err, errTag)
|
||||
assert.Equal(t, tt.expectedAttribution, bucketInfo.UserAgent, errTag)
|
||||
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/attribution"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/internalpb"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
@ -67,7 +68,7 @@ type Endpoint struct {
|
||||
pb.DRPCMetainfoUnimplementedServer
|
||||
|
||||
log *zap.Logger
|
||||
buckets BucketsDB
|
||||
buckets *buckets.Service
|
||||
metabase *metabase.DB
|
||||
deletePieces *piecedeletion.Service
|
||||
orders *orders.Service
|
||||
@ -88,7 +89,7 @@ type Endpoint struct {
|
||||
}
|
||||
|
||||
// NewEndpoint creates new metainfo endpoint instance.
|
||||
func NewEndpoint(log *zap.Logger, buckets BucketsDB, metabaseDB *metabase.DB,
|
||||
func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase.DB,
|
||||
deletePieces *piecedeletion.Service, orders *orders.Service, cache *overlay.Service,
|
||||
attributions attribution.DB, partners *rewards.PartnersService, peerIdentities overlay.PeerIdentities,
|
||||
apiKeys APIKeys, projectUsage *accounting.Service, projects console.Projects,
|
||||
@ -278,7 +279,7 @@ func (endpoint *Endpoint) CreateBucket(ctx context.Context, req *pb.BucketCreate
|
||||
}
|
||||
|
||||
// override RS to fit satellite settings
|
||||
convBucket, err := convertBucketToProto(Bucket{
|
||||
convBucket, err := convertBucketToProto(buckets.Bucket{
|
||||
Name: []byte(bucket.Name),
|
||||
CreatedAt: bucket.Created,
|
||||
}, endpoint.defaultRS, endpoint.config.MaxSegmentSize)
|
||||
@ -342,7 +343,7 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
|
||||
}
|
||||
|
||||
var (
|
||||
bucket Bucket
|
||||
bucket buckets.Bucket
|
||||
convBucket *pb.Bucket
|
||||
)
|
||||
if canRead || canList {
|
||||
@ -549,7 +550,7 @@ func convertProtoToBucket(req *pb.BucketCreateRequest, projectID uuid.UUID) (buc
|
||||
}, nil
|
||||
}
|
||||
|
||||
func convertBucketToProto(bucket Bucket, rs *pb.RedundancyScheme, maxSegmentSize memory.Size) (pbBucket *pb.Bucket, err error) {
|
||||
func convertBucketToProto(bucket buckets.Bucket, rs *pb.RedundancyScheme, maxSegmentSize memory.Size) (pbBucket *pb.Bucket, err error) {
|
||||
if len(bucket.Name) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -622,7 +622,7 @@ func TestBeginCommit(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
bucketsDB := planet.Satellites[0].DB.Buckets()
|
||||
bucketsDB := planet.Satellites[0].API.Buckets.Service
|
||||
|
||||
bucket := storj.Bucket{
|
||||
Name: "initial-bucket",
|
||||
@ -745,7 +745,7 @@ func TestInlineSegment(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
|
||||
bucketsDB := planet.Satellites[0].DB.Buckets()
|
||||
bucketsDB := planet.Satellites[0].API.Buckets.Service
|
||||
|
||||
// TODO maybe split into separate cases
|
||||
// Test:
|
||||
@ -1560,7 +1560,7 @@ func TestCommitObjectMetadataSize(t *testing.T) {
|
||||
},
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
|
||||
bucketsDB := planet.Satellites[0].DB.Buckets()
|
||||
bucketsDB := planet.Satellites[0].API.Buckets.Service
|
||||
|
||||
bucket := storj.Bucket{
|
||||
Name: "initial-bucket",
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"storj.io/storj/satellite/analytics"
|
||||
"storj.io/storj/satellite/attribution"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/compensation"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/console/consoleweb"
|
||||
@ -89,7 +90,7 @@ type DB interface {
|
||||
// Containment returns database for containment
|
||||
Containment() audit.Containment
|
||||
// Buckets returns the database to interact with buckets
|
||||
Buckets() metainfo.BucketsDB
|
||||
Buckets() buckets.DB
|
||||
// GracefulExit returns database for graceful exit
|
||||
GracefulExit() gracefulexit.DB
|
||||
// StripeCoinPayments returns stripecoinpayments database.
|
||||
|
@ -24,8 +24,8 @@ import (
|
||||
"storj.io/storj/private/lifecycle"
|
||||
version_checker "storj.io/storj/private/version/checker"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/repair/queue"
|
||||
@ -70,6 +70,10 @@ type Repairer struct {
|
||||
EcRepairer *repairer.ECRepairer
|
||||
SegmentRepairer *repairer.SegmentRepairer
|
||||
Repairer *repairer.Service
|
||||
|
||||
Buckets struct {
|
||||
Service *buckets.Service
|
||||
}
|
||||
}
|
||||
|
||||
// NewRepairer creates a new repairer peer.
|
||||
@ -77,7 +81,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
metabaseDB *metabase.DB,
|
||||
revocationDB extensions.RevocationDB,
|
||||
repairQueue queue.RepairQueue,
|
||||
bucketsDB metainfo.BucketsDB,
|
||||
bucketsDB buckets.DB,
|
||||
overlayCache overlay.DB,
|
||||
reputationdb reputation.DB,
|
||||
containmentDB audit.Containment,
|
||||
@ -163,6 +167,10 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
})
|
||||
}
|
||||
|
||||
{ // setup buckets service
|
||||
peer.Buckets.Service = buckets.NewService(bucketsDB, metabaseDB)
|
||||
}
|
||||
|
||||
{ // setup orders
|
||||
peer.Orders.DB = rollupsWriteCache
|
||||
peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
|
||||
@ -180,7 +188,7 @@ func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
|
||||
signing.SignerFromFullIdentity(peer.Identity),
|
||||
peer.Overlay,
|
||||
peer.Orders.DB,
|
||||
bucketsDB,
|
||||
peer.Buckets.Service,
|
||||
config.Orders,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -11,8 +11,8 @@ import (
|
||||
"storj.io/common/macaroon"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/metabase"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/satellitedb/dbx"
|
||||
)
|
||||
|
||||
@ -76,7 +76,7 @@ func (db *bucketsDB) GetBucket(ctx context.Context, bucketName []byte, projectID
|
||||
}
|
||||
|
||||
// GetMinimalBucket returns existing bucket with minimal number of fields.
|
||||
func (db *bucketsDB) GetMinimalBucket(ctx context.Context, bucketName []byte, projectID uuid.UUID) (_ metainfo.Bucket, err error) {
|
||||
func (db *bucketsDB) GetMinimalBucket(ctx context.Context, bucketName []byte, projectID uuid.UUID) (_ buckets.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
row, err := db.db.Get_BucketMetainfo_CreatedAt_By_ProjectId_And_Name(ctx,
|
||||
dbx.BucketMetainfo_ProjectId(projectID[:]),
|
||||
@ -84,11 +84,11 @@ func (db *bucketsDB) GetMinimalBucket(ctx context.Context, bucketName []byte, pr
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return metainfo.Bucket{}, storj.ErrBucketNotFound.New("%s", bucketName)
|
||||
return buckets.Bucket{}, storj.ErrBucketNotFound.New("%s", bucketName)
|
||||
}
|
||||
return metainfo.Bucket{}, storj.ErrBucket.Wrap(err)
|
||||
return buckets.Bucket{}, storj.ErrBucket.Wrap(err)
|
||||
}
|
||||
return metainfo.Bucket{
|
||||
return buckets.Bucket{
|
||||
Name: bucketName,
|
||||
CreatedAt: row.CreatedAt,
|
||||
}, nil
|
||||
@ -130,10 +130,6 @@ func (db *bucketsDB) GetBucketID(ctx context.Context, bucket metabase.BucketLoca
|
||||
func (db *bucketsDB) UpdateBucket(ctx context.Context, bucket storj.Bucket) (_ storj.Bucket, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if bucket.PartnerID.IsZero() && bucket.UserAgent == nil {
|
||||
return storj.Bucket{}, Error.New("no partner ID or user agent found")
|
||||
}
|
||||
|
||||
var updateFields dbx.BucketMetainfo_Update_Fields
|
||||
if !bucket.PartnerID.IsZero() {
|
||||
updateFields.PartnerId = dbx.BucketMetainfo_PartnerId(bucket.PartnerID[:])
|
||||
@ -143,6 +139,8 @@ func (db *bucketsDB) UpdateBucket(ctx context.Context, bucket storj.Bucket) (_ s
|
||||
updateFields.UserAgent = dbx.BucketMetainfo_UserAgent(bucket.UserAgent)
|
||||
}
|
||||
|
||||
updateFields.Placement = dbx.BucketMetainfo_Placement(int(bucket.Placement))
|
||||
|
||||
dbxBucket, err := db.db.Update_BucketMetainfo_By_ProjectId_And_Name(ctx, dbx.BucketMetainfo_ProjectId(bucket.ProjectID[:]), dbx.BucketMetainfo_Name([]byte(bucket.Name)), updateFields)
|
||||
if err != nil {
|
||||
return storj.Bucket{}, storj.ErrBucket.Wrap(err)
|
||||
@ -283,6 +281,10 @@ func convertDBXtoBucket(dbxBucket *dbx.BucketMetainfo) (bucket storj.Bucket, err
|
||||
},
|
||||
}
|
||||
|
||||
if dbxBucket.Placement != nil {
|
||||
bucket.Placement = storj.PlacementConstraint(*dbxBucket.Placement)
|
||||
}
|
||||
|
||||
if dbxBucket.PartnerId != nil {
|
||||
partnerID, err := uuid.FromBytes(dbxBucket.PartnerId)
|
||||
if err != nil {
|
||||
|
@ -19,10 +19,10 @@ import (
|
||||
"storj.io/storj/satellite/accounting"
|
||||
"storj.io/storj/satellite/attribution"
|
||||
"storj.io/storj/satellite/audit"
|
||||
"storj.io/storj/satellite/buckets"
|
||||
"storj.io/storj/satellite/compensation"
|
||||
"storj.io/storj/satellite/console"
|
||||
"storj.io/storj/satellite/gracefulexit"
|
||||
"storj.io/storj/satellite/metainfo"
|
||||
"storj.io/storj/satellite/nodeapiversion"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
@ -273,7 +273,7 @@ func (dbc *satelliteDBCollection) NodeAPIVersion() nodeapiversion.DB {
|
||||
}
|
||||
|
||||
// Buckets returns database for interacting with buckets.
|
||||
func (dbc *satelliteDBCollection) Buckets() metainfo.BucketsDB {
|
||||
func (dbc *satelliteDBCollection) Buckets() buckets.DB {
|
||||
return &bucketsDB{db: dbc.getByName("buckets")}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user