satellite/metainfo: remove MetabaseDB interface

Currently the interface is not useful. When we need to vary the
implementation for testing purposes we can introduce a local interface
for the service/chore that needs it, rather than using the large api.

Unfortunately, this requires adding a cleanup callback for tests, there
might be a better solution to this problem.

Change-Id: I079fe4dbe297b0ae08c10081a1cea4dfbc277682
This commit is contained in:
Egon Elbre 2021-05-13 11:14:18 +03:00
parent d32ae0459b
commit 910eec8eee
24 changed files with 103 additions and 209 deletions

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
)
// Config defines configuration for migration.
@ -84,7 +83,7 @@ func Migrate(ctx context.Context, log *zap.Logger, metabaseDBStr string, config
}
defer func() { err = errs.Combine(err, rawMetabaseDB.Close(ctx)) }()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), metabaseDBStr)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), metabaseDBStr)
if err != nil {
return errs.New("unable to connect %q: %w", metabaseDBStr, err)
}

View File

@ -20,7 +20,6 @@ import (
"storj.io/private/dbutil/tempdb"
migrator "storj.io/storj/cmd/metabase-createdat-migration"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
@ -39,11 +38,11 @@ var defaultTestEncryption = storj.EncryptionParameters{
}
func TestMigrator_NoSegments(t *testing.T) {
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
createObject(ctx, t, metabaseDB, 0)
}
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 0)
@ -53,7 +52,7 @@ func TestMigrator_NoSegments(t *testing.T) {
func TestMigrator_SingleSegment(t *testing.T) {
var expectedCreatedAt time.Time
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
commitedObject := createObject(ctx, t, metabaseDB, 1)
expectedCreatedAt = commitedObject.CreatedAt
@ -71,7 +70,7 @@ func TestMigrator_SingleSegment(t *testing.T) {
require.Nil(t, segments[0].CreatedAt)
}
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
@ -85,7 +84,7 @@ func TestMigrator_ManySegments(t *testing.T) {
numberOfObjects := 100
expectedCreatedAt := map[uuid.UUID]time.Time{}
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
for i := 0; i < numberOfObjects; i++ {
commitedObject := createObject(ctx, t, metabaseDB, 1)
expectedCreatedAt[commitedObject.StreamID] = commitedObject.CreatedAt
@ -109,7 +108,7 @@ func TestMigrator_ManySegments(t *testing.T) {
}
}
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, numberOfObjects)
@ -126,7 +125,7 @@ func TestMigrator_ManySegments(t *testing.T) {
func TestMigrator_SegmentsWithAndWithoutCreatedAt(t *testing.T) {
var expectedCreatedAt time.Time
var segmentsBefore []metabase.Segment
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB) {
prepare := func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB) {
commitedObject := createObject(ctx, t, metabaseDB, 10)
expectedCreatedAt = commitedObject.CreatedAt
@ -153,7 +152,7 @@ func TestMigrator_SegmentsWithAndWithoutCreatedAt(t *testing.T) {
}
}
check := func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB) {
check := func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB) {
segments, err := metabaseDB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 10)
@ -170,8 +169,8 @@ func TestMigrator_SegmentsWithAndWithoutCreatedAt(t *testing.T) {
test(t, prepare, check)
}
func test(t *testing.T, prepare func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB metainfo.MetabaseDB),
check func(t *testing.T, ctx context.Context, metabaseDB metainfo.MetabaseDB)) {
func test(t *testing.T, prepare func(t *testing.T, ctx context.Context, rawDB *dbutil.TempDatabase, metabaseDB *metabase.DB),
check func(t *testing.T, ctx context.Context, metabaseDB *metabase.DB)) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
@ -218,7 +217,7 @@ func randObjectStream() metabase.ObjectStream {
}
}
func createObject(ctx context.Context, t *testing.T, metabaseDB metainfo.MetabaseDB, numberOfSegments int) metabase.Object {
func createObject(ctx context.Context, t *testing.T, metabaseDB *metabase.DB, numberOfSegments int) metabase.Object {
object, err := metabaseDB.BeginObjectExactVersion(ctx, metabase.BeginObjectExactVersion{
ObjectStream: randObjectStream(),
})

View File

@ -12,7 +12,7 @@ import (
"storj.io/private/process"
"storj.io/storj/cmd/metabase-verify/verify"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
)
// Error is the default error class for the package.
@ -59,7 +59,7 @@ func VerifyCommand(log *zap.Logger) *cobra.Command {
ctx, cancel := process.Ctx(cmd)
defer cancel()
mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), metabaseDB)
mdb, err := metabase.Open(ctx, log.Named("mdb"), metabaseDB)
if err != nil {
return Error.Wrap(err)
}

View File

@ -18,8 +18,8 @@ import (
"storj.io/common/errs2"
"storj.io/common/memory"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo"
)
var mon = monkit.Package()
@ -82,7 +82,7 @@ func (bench *Bench) Run(ctx context.Context, log *zap.Logger) (err error) {
// setup databases
mdb, err := metainfo.OpenMetabase(ctx, log.Named("mdb"), bench.MetabaseDB)
mdb, err := metabase.Open(ctx, log.Named("mdb"), bench.MetabaseDB)
if err != nil {
return Error.Wrap(err)
}

View File

@ -15,7 +15,7 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -44,7 +44,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection on satellite api: %+v", err)
}

View File

@ -12,7 +12,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb"
)
@ -36,7 +36,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}

View File

@ -37,7 +37,7 @@ import (
"storj.io/storj/satellite/accounting"
"storj.io/storj/satellite/accounting/live"
"storj.io/storj/satellite/compensation"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/payments/stripecoinpayments"
@ -375,7 +375,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}
@ -459,7 +459,7 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}

View File

@ -14,7 +14,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/satellitedb"
)
@ -39,7 +39,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
}

View File

@ -12,7 +12,7 @@ import (
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/storagenode"
"storj.io/storj/versioncontrol"
)
@ -20,7 +20,7 @@ import (
// Reconfigure allows to change node configurations.
type Reconfigure struct {
SatelliteDB func(log *zap.Logger, index int, db satellite.DB) (satellite.DB, error)
SatelliteMetabaseDB func(log *zap.Logger, index int, db metainfo.MetabaseDB) (metainfo.MetabaseDB, error)
SatelliteMetabaseDB func(log *zap.Logger, index int, db *metabase.DB) (*metabase.DB, error)
Satellite func(log *zap.Logger, index int, config *satellite.Config)
StorageNodeDB func(index int, db storagenode.DB, log *zap.Logger) (storagenode.DB, error)

View File

@ -48,6 +48,7 @@ import (
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
@ -98,7 +99,7 @@ type Satellite struct {
}
Metainfo struct {
Metabase metainfo.MetabaseDB
Metabase *metabase.DB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Loop *metaloop.Service
@ -361,7 +362,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
}
if planet.config.Reconfigure.SatelliteMetabaseDB != nil {
var newMetabaseDB metainfo.MetabaseDB
var newMetabaseDB *metabase.DB
newMetabaseDB, err = planet.config.Reconfigure.SatelliteMetabaseDB(log.Named("metabase"), index, metabaseDB)
if err != nil {
return nil, errs.Combine(err, metabaseDB.Close())
@ -740,7 +741,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
return system
}
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(index)
log := planet.log.Named(prefix)
var err error
@ -770,7 +771,7 @@ func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identit
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
}
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(index)
log := planet.log.Named(prefix)
@ -794,7 +795,7 @@ func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
prefix := "satellite-gc" + strconv.Itoa(index)
log := planet.log.Named(prefix)

View File

@ -41,6 +41,7 @@ import (
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/mailservice"
"storj.io/storj/satellite/mailservice/simulate"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/piecedeletion"
"storj.io/storj/satellite/nodestats"
@ -97,7 +98,7 @@ type API struct {
}
Metainfo struct {
Metabase metainfo.MetabaseDB
Metabase *metabase.DB
Service *metainfo.Service
PieceDeletion *piecedeletion.Service
Endpoint2 *metainfo.Endpoint
@ -165,7 +166,7 @@ type API struct {
// NewAPI creates a new satellite API process.
func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
metabaseDB metainfo.MetabaseDB, revocationDB extensions.RevocationDB,
metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
config *Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*API, error) {
peer := &API{

View File

@ -24,7 +24,6 @@ import (
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/piecestore"
@ -54,7 +53,7 @@ type Share struct {
// architecture: Worker
type Verifier struct {
log *zap.Logger
metabase metainfo.MetabaseDB
metabase *metabase.DB
orders *orders.Service
auditor *identity.PeerIdentity
dialer rpc.Dialer
@ -68,7 +67,7 @@ type Verifier struct {
}
// NewVerifier creates a Verifier.
func NewVerifier(log *zap.Logger, metabase metainfo.MetabaseDB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier {
func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier {
return &Verifier{
log: log,
metabase: metabase,

View File

@ -33,6 +33,7 @@ import (
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metainfo/expireddeletion"
@ -81,7 +82,7 @@ type Core struct {
}
Metainfo struct {
Metabase metainfo.MetabaseDB
Metabase *metabase.DB
Service *metainfo.Service
Loop *metaloop.Service
}
@ -139,7 +140,7 @@ type Core struct {
// New creates a new satellite.
func New(log *zap.Logger, full *identity.FullIdentity, db DB,
metabaseDB metainfo.MetabaseDB, revocationDB extensions.RevocationDB,
metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) {
peer := &Core{

View File

@ -23,8 +23,8 @@ import (
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
)
@ -66,7 +66,7 @@ type GarbageCollection struct {
// NewGarbageCollection creates a new satellite garbage collection process.
func NewGarbageCollection(log *zap.Logger, full *identity.FullIdentity, db DB,
metabaseDB metainfo.MetabaseDB, revocationDB extensions.RevocationDB,
metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*GarbageCollection, error) {
peer := &GarbageCollection{
Log: log,

View File

@ -48,7 +48,7 @@ type Endpoint struct {
db DB
overlaydb overlay.DB
overlay *overlay.Service
metabase metainfo.MetabaseDB
metabase *metabase.DB
orders *orders.Service
connections *connectionsTracker
peerIdentities overlay.PeerIdentities
@ -91,7 +91,7 @@ func (pm *connectionsTracker) delete(nodeID storj.NodeID) {
}
// NewEndpoint creates a new graceful exit endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, metabase metainfo.MetabaseDB, orders *orders.Service,
func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, metabase *metabase.DB, orders *orders.Service,
peerIdentities overlay.PeerIdentities, config Config) *Endpoint {
return &Endpoint{
log: log,

View File

@ -16,7 +16,6 @@ import (
"storj.io/common/uuid"
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
)
@ -31,17 +30,17 @@ var (
// architecture: Endpoint
type Endpoint struct {
internalpb.DRPCHealthInspectorUnimplementedServer
log *zap.Logger
overlay *overlay.Service
metabaseDB metainfo.MetabaseDB
log *zap.Logger
overlay *overlay.Service
metabase *metabase.DB
}
// NewEndpoint will initialize an Endpoint struct.
func NewEndpoint(log *zap.Logger, cache *overlay.Service, metabaseDB metainfo.MetabaseDB) *Endpoint {
func NewEndpoint(log *zap.Logger, cache *overlay.Service, metabase *metabase.DB) *Endpoint {
return &Endpoint{
log: log,
overlay: cache,
metabaseDB: metabaseDB,
log: log,
overlay: cache,
metabase: metabase,
}
}
@ -74,14 +73,14 @@ func (endpoint *Endpoint) ObjectHealth(ctx context.Context, in *internalpb.Objec
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
}
// TODO add version field to ObjectHealthRequest?
object, err := endpoint.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: objectLocation,
})
if err != nil {
return nil, Error.Wrap(err)
}
listResult, err := endpoint.metabaseDB.ListSegments(ctx, metabase.ListSegments{
listResult, err := endpoint.metabase.ListSegments(ctx, metabase.ListSegments{
StreamID: object.StreamID,
Cursor: startPosition,
Limit: limit,
@ -122,14 +121,14 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *internalpb.Segm
ObjectKey: metabase.ObjectKey(in.GetEncryptedPath()),
}
object, err := endpoint.metabaseDB.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
object, err := endpoint.metabase.GetObjectLatestVersion(ctx, metabase.GetObjectLatestVersion{
ObjectLocation: objectLocation,
})
if err != nil {
return nil, Error.Wrap(err)
}
segment, err := endpoint.metabaseDB.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
segment, err := endpoint.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: object.StreamID,
Position: metabase.SegmentPositionFromEncoded(uint64(in.GetSegmentIndex())),
})

View File

@ -34,23 +34,42 @@ type DB struct {
impl dbutil.Implementation
aliasCache *NodeAliasCache
testCleanup func() error
}
// Open opens a connection to metabase.
func Open(ctx context.Context, log *zap.Logger, driverName, connstr string) (*DB, error) {
func Open(ctx context.Context, log *zap.Logger, connstr string) (*DB, error) {
var driverName string
_, _, impl, err := dbutil.SplitConnStr(connstr)
if err != nil {
return nil, Error.Wrap(err)
}
switch impl {
case dbutil.Postgres:
driverName = "pgx"
case dbutil.Cockroach:
driverName = "cockroach"
default:
return nil, Error.New("unsupported implementation: %s", connstr)
}
rawdb, err := tagsql.Open(ctx, driverName, connstr)
if err != nil {
return nil, Error.Wrap(err)
}
dbutil.Configure(ctx, rawdb, "metabase", mon)
db := &DB{log: log, connstr: connstr, db: postgresRebind{rawdb}}
db := &DB{
log: log,
db: postgresRebind{rawdb},
connstr: connstr,
impl: impl,
testCleanup: func() error { return nil },
}
db.aliasCache = NewNodeAliasCache(db)
_, _, db.impl, err = dbutil.SplitConnStr(connstr)
if err != nil {
return nil, Error.Wrap(err)
}
log.Debug("Connected", zap.String("db source", connstr))
return db, nil
}
@ -68,9 +87,14 @@ func (db *DB) Ping(ctx context.Context) error {
return Error.Wrap(db.db.PingContext(ctx))
}
// TestingSetCleanup is used to set the callback for cleaning up test database.
func (db *DB) TestingSetCleanup(cleanup func() error) {
db.testCleanup = cleanup
}
// Close closes the connection to database.
func (db *DB) Close() error {
return Error.Wrap(db.db.Close())
return errs.Combine(Error.Wrap(db.db.Close()), db.testCleanup())
}
// DestroyTables deletes all tables.

View File

@ -4,20 +4,12 @@
package metainfo
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/private/dbutil"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo/piecedeletion"
)
@ -126,112 +118,3 @@ type Config struct {
ProjectLimits ProjectLimitConfig `help:"project limit configuration"`
PieceDeletion piecedeletion.Config `help:"piece deletion configuration"`
}
// MetabaseDB stores objects and segments.
type MetabaseDB interface {
io.Closer
// Now returns time on the database.
Now(ctx context.Context) (time.Time, error)
// MigrateToLatest migrates to latest schema version.
MigrateToLatest(ctx context.Context) error
// CheckVersion checks the database is the correct version
CheckVersion(ctx context.Context) error
// DeleteObjectAnyStatusAllVersions deletes all object versions.
DeleteObjectAnyStatusAllVersions(ctx context.Context, opts metabase.DeleteObjectAnyStatusAllVersions) (result metabase.DeleteObjectResult, err error)
// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
DeleteObjectsAllVersions(ctx context.Context, opts metabase.DeleteObjectsAllVersions) (result metabase.DeleteObjectResult, err error)
// DeletePendingObject deletes a pending object.
DeletePendingObject(ctx context.Context, opts metabase.DeletePendingObject) (result metabase.DeleteObjectResult, err error)
// DeleteBucketObjects deletes all objects in the specified bucket.
DeleteBucketObjects(ctx context.Context, opts metabase.DeleteBucketObjects) (deletedObjectCount int64, err error)
// DeleteExpiredObjects deletes all objects that expired before expiredBefore.
DeleteExpiredObjects(ctx context.Context, opts metabase.DeleteExpiredObjects) error
// DeleteObjectLatestVersion deletes latest object version.
DeleteObjectLatestVersion(ctx context.Context, opts metabase.DeleteObjectLatestVersion) (result metabase.DeleteObjectResult, err error)
// BeginObjectExactVersion adds a pending object to the database, with specific version.
BeginObjectExactVersion(ctx context.Context, opts metabase.BeginObjectExactVersion) (committed metabase.Object, err error)
// CommitObject adds a pending object to the database.
CommitObject(ctx context.Context, opts metabase.CommitObject) (object metabase.Object, err error)
// BeginSegment verifies whether a new segment upload can be started.
BeginSegment(ctx context.Context, opts metabase.BeginSegment) (err error)
// CommitSegment commits segment to the database.
CommitSegment(ctx context.Context, opts metabase.CommitSegment) (err error)
// CommitInlineSegment commits inline segment to the database.
CommitInlineSegment(ctx context.Context, opts metabase.CommitInlineSegment) (err error)
// GetObjectLatestVersion returns object information for latest version.
GetObjectLatestVersion(ctx context.Context, opts metabase.GetObjectLatestVersion) (_ metabase.Object, err error)
// GetSegmentByLocation returns a information about segment on the specified location.
GetSegmentByLocation(ctx context.Context, opts metabase.GetSegmentByLocation) (segment metabase.Segment, err error)
// GetSegmentByPosition returns a information about segment which covers specified offset.
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
// GetLatestObjectLastSegment returns an object last segment information.
GetLatestObjectLastSegment(ctx context.Context, opts metabase.GetLatestObjectLastSegment) (segment metabase.Segment, err error)
// GetStreamPieceCountByNodeID returns piece count by node id.
GetStreamPieceCountByNodeID(ctx context.Context, opts metabase.GetStreamPieceCountByNodeID) (result map[storj.NodeID]int64, err error)
// ListSegments lists specified stream segments.
ListSegments(ctx context.Context, opts metabase.ListSegments) (result metabase.ListSegmentsResult, err error)
// ListStreamPositions lists specified stream segment positions.
ListStreamPositions(ctx context.Context, opts metabase.ListStreamPositions) (result metabase.ListStreamPositionsResult, err error)
// IterateObjectsAllVersions iterates through all versions of all objects.
IterateObjectsAllVersions(ctx context.Context, opts metabase.IterateObjects, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IterateObjectsAllVersionsWithStatus iterates through all versions of all objects with specified status.
IterateObjectsAllVersionsWithStatus(ctx context.Context, opts metabase.IterateObjectsWithStatus, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IteratePendingObjectsByKey iterates through all StreamID for a given ObjectKey.
IteratePendingObjectsByKey(ctx context.Context, opts metabase.IteratePendingObjectsByKey, fn func(context.Context, metabase.ObjectsIterator) error) (err error)
// IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose.
IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error)
// IterateLoopStreams iterates through all streams passed in as arguments.
IterateLoopStreams(ctx context.Context, opts metabase.IterateLoopStreams, handleStream func(ctx context.Context, streamID uuid.UUID, next metabase.SegmentIterator) error) (err error)
// BucketEmpty returns true if bucket does not contain objects (pending or committed).
// This method doesn't check bucket existence.
BucketEmpty(ctx context.Context, opts metabase.BucketEmpty) (empty bool, err error)
// UpdateSegmentPieces updates pieces for specified segment. If provided old pieces won't match current database state update will fail.
UpdateSegmentPieces(ctx context.Context, opts metabase.UpdateSegmentPieces) (err error)
// EnsureNodeAliases ensures that the supplied node ID-s have a alias.
// It's safe to concurrently try and create node ID-s for the same NodeID.
EnsureNodeAliases(ctx context.Context, opts metabase.EnsureNodeAliases) (err error)
// ListNodeAliases lists all node alias mappings.
ListNodeAliases(ctx context.Context) (_ []metabase.NodeAliasEntry, err error)
// TestingAllCommittedObjects gets all committed objects from bucket. Use only for testing purposes.
TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error)
// TestingAllPendingObjects gets all pending objects from bucket. Use only for testing purposes.
TestingAllPendingObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []metabase.ObjectEntry, err error)
// TestingAllObjectSegments gets all segments for given object. Use only for testing purposes.
TestingAllObjectSegments(ctx context.Context, objectLocation metabase.ObjectLocation) (segments []metabase.Segment, err error)
// TestingAllObjects gets all objects. Use only for testing purposes.
TestingAllObjects(ctx context.Context) (segments []metabase.Object, err error)
// TestingAllSegments gets all segments. Use only for testing purposes.
TestingAllSegments(ctx context.Context) (segments []metabase.Segment, err error)
// InternalImplementation returns *metabase.DB.
// TODO: remove.
InternalImplementation() interface{}
}
// OpenMetabase returns database for storing objects and segments.
func OpenMetabase(ctx context.Context, log *zap.Logger, dbURLString string) (db MetabaseDB, err error) {
_, source, implementation, err := dbutil.SplitConnStr(dbURLString)
if err != nil {
return nil, err
}
switch implementation {
case dbutil.Postgres:
db, err = metabase.Open(ctx, log, "pgx", dbURLString)
case dbutil.Cockroach:
db, err = metabase.Open(ctx, log, "cockroach", dbURLString)
default:
err = Error.New("unsupported db implementation: %s", dbURLString)
}
if err != nil {
return nil, err
}
log.Debug("Connected to:", zap.String("db source", source))
return db, nil
}

View File

@ -13,7 +13,6 @@ import (
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
)
var (
@ -35,14 +34,14 @@ type Config struct {
type Chore struct {
log *zap.Logger
config Config
metabase metainfo.MetabaseDB
metabase *metabase.DB
nowFn func() time.Time
Loop *sync2.Cycle
}
// NewChore creates a new instance of the expireddeletion chore.
func NewChore(log *zap.Logger, config Config, metabase metainfo.MetabaseDB) *Chore {
func NewChore(log *zap.Logger, config Config, metabase *metabase.DB) *Chore {
return &Chore{
log: log,
config: config,

View File

@ -26,11 +26,11 @@ var (
type Service struct {
logger *zap.Logger
bucketsDB BucketsDB
metabaseDB MetabaseDB
metabaseDB *metabase.DB
}
// NewService creates new metainfo service.
func NewService(logger *zap.Logger, bucketsDB BucketsDB, metabaseDB MetabaseDB) *Service {
func NewService(logger *zap.Logger, bucketsDB BucketsDB, metabaseDB *metabase.DB) *Service {
return &Service{
logger: logger,
bucketsDB: bucketsDB,

View File

@ -19,7 +19,6 @@ import (
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metaloop"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/irreparable"
@ -39,7 +38,7 @@ type Checker struct {
logger *zap.Logger
repairQueue queue.RepairQueue
irrdb irreparable.DB
metabase metainfo.MetabaseDB
metabase *metabase.DB
metaLoop *metaloop.Service
nodestate *ReliabilityCache
statsCollector *statsCollector
@ -50,7 +49,7 @@ type Checker struct {
}
// NewChecker creates a new instance of checker.
func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irreparable.DB, metabase metainfo.MetabaseDB, metaLoop *metaloop.Service, overlay *overlay.Service, config Config) *Checker {
func NewChecker(logger *zap.Logger, repairQueue queue.RepairQueue, irrdb irreparable.DB, metabase *metabase.DB, metaLoop *metaloop.Service, overlay *overlay.Service, config Config) *Checker {
return &Checker{
logger: logger,

View File

@ -18,7 +18,6 @@ import (
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
@ -52,7 +51,7 @@ func (ie *irreparableError) Error() string {
type SegmentRepairer struct {
log *zap.Logger
statsCollector *statsCollector
metabase metainfo.MetabaseDB
metabase *metabase.DB
orders *orders.Service
overlay *overlay.Service
ec *ECRepairer
@ -75,7 +74,7 @@ type SegmentRepairer struct {
// threshould to determine the maximum limit of nodes to upload repaired pieces,
// when negative, 0 is applied.
func NewSegmentRepairer(
log *zap.Logger, metabase metainfo.MetabaseDB, orders *orders.Service,
log *zap.Logger, metabase *metabase.DB, orders *orders.Service,
overlay *overlay.Service, dialer rpc.Dialer, timeout time.Duration,
excessOptimalThreshold float64, repairOverrides checker.RepairOverrides,
downloadTimeout time.Duration, inMemoryRepair bool,

View File

@ -23,6 +23,7 @@ import (
"storj.io/private/version"
"storj.io/storj/private/lifecycle"
version_checker "storj.io/storj/private/version/checker"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
@ -66,7 +67,7 @@ type Repairer struct {
// NewRepairer creates a new repairer peer.
func NewRepairer(log *zap.Logger, full *identity.FullIdentity,
metabaseDB metainfo.MetabaseDB,
metabaseDB *metabase.DB,
revocationDB extensions.RevocationDB, repairQueue queue.RepairQueue,
bucketsDB metainfo.BucketsDB, overlayCache overlay.DB,
rollupsWriteCache *orders.RollupsWriteCache, irrDB irreparable.DB,

View File

@ -22,7 +22,7 @@ import (
"storj.io/private/dbutil/pgutil"
"storj.io/private/dbutil/tempdb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/satellitedb"
)
@ -130,19 +130,8 @@ func CreateMasterDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.
return &tempMasterDB{DB: masterDB, tempDB: tempDB}, err
}
// tempMetabaseDB is a metabase.DB-implementing type that cleans up after itself when closed.
type tempMetabaseDB struct {
metainfo.MetabaseDB
tempDB *dbutil.TempDatabase
}
// Close closes a tempMetabaseDB and cleans it up afterward.
func (db *tempMetabaseDB) Close() error {
return errs.Combine(db.MetabaseDB.Close(), db.tempDB.Close())
}
// CreateMetabaseDB creates a new satellite metabase for testing.
func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db metainfo.MetabaseDB, err error) {
func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, category string, index int, dbInfo Database) (db *metabase.DB, err error) {
if dbInfo.URL == "" {
return nil, fmt.Errorf("Database %s connection string not provided. %s", dbInfo.Name, dbInfo.Message)
}
@ -162,12 +151,13 @@ func CreateMetabaseDB(ctx context.Context, log *zap.Logger, name string, categor
// CreateMetabaseDBOnTopOf creates a new metabase on top of an already existing
// temporary database.
func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (db metainfo.MetabaseDB, err error) {
metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), tempDB.ConnStr)
func CreateMetabaseDBOnTopOf(ctx context.Context, log *zap.Logger, tempDB *dbutil.TempDatabase) (*metabase.DB, error) {
db, err := metabase.Open(ctx, log.Named("metabase"), tempDB.ConnStr)
if err != nil {
return nil, err
}
return &tempMetabaseDB{MetabaseDB: metabaseDB, tempDB: tempDB}, err
db.TestingSetCleanup(tempDB.Close)
return db, nil
}
// Run method will iterate over all supported databases. Will establish