satellite/metainfo: remove support for boltdb based pointerDB
By previous changes we can now remove testplanet.New and also remove metainfo boltdb support. Change-Id: I5bdfbbbb45967492728e705b34b2fedb4f28c381
This commit is contained in:
parent
5a4745eddb
commit
c6f94ce9e4
@ -21,7 +21,6 @@ import (
|
|||||||
"github.com/skyrings/skyring-common/tools/uuid"
|
"github.com/skyrings/skyring-common/tools/uuid"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
|
||||||
|
|
||||||
"storj.io/common/pb"
|
"storj.io/common/pb"
|
||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
@ -29,6 +28,7 @@ import (
|
|||||||
"storj.io/common/testrand"
|
"storj.io/common/testrand"
|
||||||
"storj.io/storj/satellite/metainfo"
|
"storj.io/storj/satellite/metainfo"
|
||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
|
"storj.io/storj/storage/teststore"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
@ -524,10 +524,7 @@ func TestObserver_processSegment_switch_project(t *testing.T) {
|
|||||||
ctx := testcontext.New(t)
|
ctx := testcontext.New(t)
|
||||||
defer ctx.Cleanup()
|
defer ctx.Cleanup()
|
||||||
|
|
||||||
// need bolddb to have DB with concurrent access support
|
db := teststore.New()
|
||||||
db, err := metainfo.NewStore(zaptest.NewLogger(t), "bolt://"+ctx.File("pointers.db"))
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer ctx.Check(db.Close)
|
|
||||||
|
|
||||||
buffer := new(bytes.Buffer)
|
buffer := new(bytes.Buffer)
|
||||||
writer := csv.NewWriter(buffer)
|
writer := csv.NewWriter(buffer)
|
||||||
@ -601,9 +598,7 @@ func TestObserver_processSegment_single_project(t *testing.T) {
|
|||||||
tt := tt
|
tt := tt
|
||||||
t.Run("#"+strconv.Itoa(i), func(t *testing.T) {
|
t.Run("#"+strconv.Itoa(i), func(t *testing.T) {
|
||||||
// need boltdb to have DB with concurrent access support
|
// need boltdb to have DB with concurrent access support
|
||||||
db, err := metainfo.NewStore(zaptest.NewLogger(t), "bolt://"+ctx.File("pointers.db"))
|
db := teststore.New()
|
||||||
require.NoError(t, err)
|
|
||||||
defer ctx.Check(db.Close)
|
|
||||||
|
|
||||||
for i, ttObject := range tt.objects {
|
for i, ttObject := range tt.objects {
|
||||||
for _, segment := range ttObject.segments {
|
for _, segment := range ttObject.segments {
|
||||||
@ -632,7 +627,7 @@ func TestObserver_processSegment_single_project(t *testing.T) {
|
|||||||
objects: make(bucketsObjects),
|
objects: make(bucketsObjects),
|
||||||
writer: csv.NewWriter(new(bytes.Buffer)),
|
writer: csv.NewWriter(new(bytes.Buffer)),
|
||||||
}
|
}
|
||||||
err = observer.detectZombieSegments(ctx)
|
err := observer.detectZombieSegments(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
for i, ttObject := range tt.objects {
|
for i, ttObject := range tt.objects {
|
||||||
|
@ -17,7 +17,6 @@ import (
|
|||||||
|
|
||||||
"github.com/zeebo/errs"
|
"github.com/zeebo/errs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zaptest"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"storj.io/common/identity"
|
"storj.io/common/identity"
|
||||||
@ -113,43 +112,6 @@ func (peer *closablePeer) Close() error {
|
|||||||
return peer.err
|
return peer.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new full system with the given number of nodes.
|
|
||||||
func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int) (*Planet, error) {
|
|
||||||
var log *zap.Logger
|
|
||||||
if t == nil {
|
|
||||||
log = zap.NewNop()
|
|
||||||
} else {
|
|
||||||
log = zaptest.NewLogger(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NewCustom(log, Config{
|
|
||||||
SatelliteCount: satelliteCount,
|
|
||||||
StorageNodeCount: storageNodeCount,
|
|
||||||
UplinkCount: uplinkCount,
|
|
||||||
|
|
||||||
Name: t.Name(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWithIdentityVersion creates a new full system with the given version for node identities and the given number of nodes.
|
|
||||||
func NewWithIdentityVersion(t zaptest.TestingT, identityVersion *storj.IDVersion, satelliteCount, storageNodeCount, uplinkCount int) (*Planet, error) {
|
|
||||||
var log *zap.Logger
|
|
||||||
if t == nil {
|
|
||||||
log = zap.NewNop()
|
|
||||||
} else {
|
|
||||||
log = zaptest.NewLogger(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NewCustom(log, Config{
|
|
||||||
SatelliteCount: satelliteCount,
|
|
||||||
StorageNodeCount: storageNodeCount,
|
|
||||||
UplinkCount: uplinkCount,
|
|
||||||
IdentityVersion: identityVersion,
|
|
||||||
|
|
||||||
Name: t.Name(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCustom creates a new full system with the specified configuration.
|
// NewCustom creates a new full system with the specified configuration.
|
||||||
func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
|
func NewCustom(log *zap.Logger, config Config) (*Planet, error) {
|
||||||
// Clear error in the beginning to avoid issues down the line.
|
// Clear error in the beginning to avoid issues down the line.
|
||||||
|
@ -5,7 +5,6 @@ package testplanet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -25,9 +24,6 @@ import (
|
|||||||
"storj.io/common/storj"
|
"storj.io/common/storj"
|
||||||
"storj.io/storj/pkg/revocation"
|
"storj.io/storj/pkg/revocation"
|
||||||
"storj.io/storj/pkg/server"
|
"storj.io/storj/pkg/server"
|
||||||
"storj.io/storj/private/dbutil"
|
|
||||||
"storj.io/storj/private/dbutil/pgutil/pgtest"
|
|
||||||
"storj.io/storj/private/dbutil/tempdb"
|
|
||||||
"storj.io/storj/private/version"
|
"storj.io/storj/private/version"
|
||||||
versionchecker "storj.io/storj/private/version/checker"
|
versionchecker "storj.io/storj/private/version/checker"
|
||||||
"storj.io/storj/satellite"
|
"storj.io/storj/satellite"
|
||||||
@ -55,7 +51,6 @@ import (
|
|||||||
"storj.io/storj/satellite/repair/checker"
|
"storj.io/storj/satellite/repair/checker"
|
||||||
"storj.io/storj/satellite/repair/irreparable"
|
"storj.io/storj/satellite/repair/irreparable"
|
||||||
"storj.io/storj/satellite/repair/repairer"
|
"storj.io/storj/satellite/repair/repairer"
|
||||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
|
||||||
"storj.io/storj/satellite/vouchers"
|
"storj.io/storj/satellite/vouchers"
|
||||||
"storj.io/storj/storage/redis/redisserver"
|
"storj.io/storj/storage/redis/redisserver"
|
||||||
)
|
)
|
||||||
@ -241,19 +236,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
|||||||
if planet.config.Reconfigure.NewSatelliteDB != nil {
|
if planet.config.Reconfigure.NewSatelliteDB != nil {
|
||||||
db, err = planet.config.Reconfigure.NewSatelliteDB(log.Named("db"), i)
|
db, err = planet.config.Reconfigure.NewSatelliteDB(log.Named("db"), i)
|
||||||
} else {
|
} else {
|
||||||
// TODO: This is analogous to the way we worked prior to the advent of OpenUnique,
|
return nil, errs.New("NewSatelliteDB not defined")
|
||||||
// but it seems wrong. Tests that use planet.Start() instead of testplanet.Run()
|
|
||||||
// will not get run against both types of DB.
|
|
||||||
connStr := *pgtest.ConnStr
|
|
||||||
if *pgtest.CrdbConnStr != "" {
|
|
||||||
connStr = *pgtest.CrdbConnStr
|
|
||||||
}
|
|
||||||
var tempDB *dbutil.TempDatabase
|
|
||||||
tempDB, err = tempdb.OpenUnique(context.TODO(), connStr, fmt.Sprintf("%s.%d", planet.id, i))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
db, err = satellitedbtest.CreateMasterDBOnTopOf(log.Named("db"), tempDB)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -263,7 +246,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
|
|||||||
if planet.config.Reconfigure.NewSatellitePointerDB != nil {
|
if planet.config.Reconfigure.NewSatellitePointerDB != nil {
|
||||||
pointerDB, err = planet.config.Reconfigure.NewSatellitePointerDB(log.Named("pointerdb"), i)
|
pointerDB, err = planet.config.Reconfigure.NewSatellitePointerDB(log.Named("pointerdb"), i)
|
||||||
} else {
|
} else {
|
||||||
pointerDB, err = metainfo.NewStore(log.Named("pointerdb"), "bolt://"+filepath.Join(storageDir, "pointers.db"))
|
return nil, errs.New("NewSatellitePointerDB not defined")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"storj.io/common/memory"
|
"storj.io/common/memory"
|
||||||
"storj.io/storj/private/dbutil"
|
"storj.io/storj/private/dbutil"
|
||||||
"storj.io/storj/storage"
|
"storj.io/storj/storage"
|
||||||
"storj.io/storj/storage/boltdb"
|
|
||||||
"storj.io/storj/storage/cockroachkv"
|
"storj.io/storj/storage/cockroachkv"
|
||||||
"storj.io/storj/storage/postgreskv"
|
"storj.io/storj/storage/postgreskv"
|
||||||
)
|
)
|
||||||
@ -47,7 +46,7 @@ type RateLimiterConfig struct {
|
|||||||
|
|
||||||
// Config is a configuration struct that is everything you need to start a metainfo
|
// Config is a configuration struct that is everything you need to start a metainfo
|
||||||
type Config struct {
|
type Config struct {
|
||||||
DatabaseURL string `help:"the database connection string to use" releaseDefault:"postgres://" devDefault:"bolt://$CONFDIR/pointerdb.db"`
|
DatabaseURL string `help:"the database connection string to use" default:"postgres://"`
|
||||||
MinRemoteSegmentSize memory.Size `default:"1240" help:"minimum remote segment size"`
|
MinRemoteSegmentSize memory.Size `default:"1240" help:"minimum remote segment size"`
|
||||||
MaxInlineSegmentSize memory.Size `default:"8000" help:"maximum inline segment size"`
|
MaxInlineSegmentSize memory.Size `default:"8000" help:"maximum inline segment size"`
|
||||||
MaxCommitInterval time.Duration `default:"48h" help:"maximum time allowed to pass between creating and committing a segment"`
|
MaxCommitInterval time.Duration `default:"48h" help:"maximum time allowed to pass between creating and committing a segment"`
|
||||||
@ -72,8 +71,6 @@ func NewStore(logger *zap.Logger, dbURLString string) (db PointerDB, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch implementation {
|
switch implementation {
|
||||||
case dbutil.Bolt:
|
|
||||||
db, err = boltdb.New(source, BoltPointerBucket)
|
|
||||||
case dbutil.Postgres:
|
case dbutil.Postgres:
|
||||||
db, err = postgreskv.New(source)
|
db, err = postgreskv.New(source)
|
||||||
case dbutil.Cockroach:
|
case dbutil.Cockroach:
|
||||||
|
@ -202,12 +202,14 @@ func (store *Client) Close() error {
|
|||||||
// Iterate iterates over items based on opts
|
// Iterate iterates over items based on opts
|
||||||
func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) {
|
||||||
defer mon.Task()(&ctx)(&err)
|
defer mon.Task()(&ctx)(&err)
|
||||||
defer store.locked()()
|
|
||||||
|
|
||||||
|
store.mu.Lock()
|
||||||
store.CallCount.Iterate++
|
store.CallCount.Iterate++
|
||||||
if store.forcedError() {
|
if store.forcedError() {
|
||||||
|
store.mu.Unlock()
|
||||||
return errInternal
|
return errInternal
|
||||||
}
|
}
|
||||||
|
store.mu.Unlock()
|
||||||
|
|
||||||
var cursor advancer = &forward{newCursor(store)}
|
var cursor advancer = &forward{newCursor(store)}
|
||||||
|
|
||||||
@ -304,8 +306,10 @@ func (cursor *cursor) close() {
|
|||||||
// positionForward positions at key or the next item
|
// positionForward positions at key or the next item
|
||||||
func (cursor *cursor) positionForward(key storage.Key) {
|
func (cursor *cursor) positionForward(key storage.Key) {
|
||||||
store := cursor.store
|
store := cursor.store
|
||||||
|
store.mu.Lock()
|
||||||
cursor.version = store.version
|
cursor.version = store.version
|
||||||
cursor.nextIndex, _ = store.indexOf(key)
|
cursor.nextIndex, _ = store.indexOf(key)
|
||||||
|
store.mu.Unlock()
|
||||||
cursor.lastKey = storage.CloneKey(key)
|
cursor.lastKey = storage.CloneKey(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,6 +318,7 @@ func (cursor *cursor) next() (*storage.ListItem, bool) {
|
|||||||
if cursor.done {
|
if cursor.done {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
defer store.locked()()
|
||||||
|
|
||||||
if cursor.version != store.version {
|
if cursor.version != store.version {
|
||||||
cursor.version = store.version
|
cursor.version = store.version
|
||||||
|
Loading…
Reference in New Issue
Block a user