From c6f94ce9e4eb117e0127dec1bb1484b6c4c8cabf Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 22 Jan 2020 23:17:41 +0200 Subject: [PATCH] satellite/metainfo: remove support for boltdb based pointerDB By previous changes we can now remove testplanet.New and also remove metainfo boltdb support. Change-Id: I5bdfbbbb45967492728e705b34b2fedb4f28c381 --- cmd/segment-reaper/observer_test.go | 13 +++------- private/testplanet/planet.go | 38 ----------------------------- private/testplanet/satellite.go | 21 ++-------------- satellite/metainfo/config.go | 5 +--- storage/teststore/store.go | 7 +++++- 5 files changed, 13 insertions(+), 71 deletions(-) diff --git a/cmd/segment-reaper/observer_test.go b/cmd/segment-reaper/observer_test.go index 850b8ca1d..3226c155d 100644 --- a/cmd/segment-reaper/observer_test.go +++ b/cmd/segment-reaper/observer_test.go @@ -21,7 +21,6 @@ import ( "github.com/skyrings/skyring-common/tools/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "storj.io/common/pb" "storj.io/common/storj" @@ -29,6 +28,7 @@ import ( "storj.io/common/testrand" "storj.io/storj/satellite/metainfo" "storj.io/storj/storage" + "storj.io/storj/storage/teststore" ) func TestMain(m *testing.M) { @@ -524,10 +524,7 @@ func TestObserver_processSegment_switch_project(t *testing.T) { ctx := testcontext.New(t) defer ctx.Cleanup() - // need bolddb to have DB with concurrent access support - db, err := metainfo.NewStore(zaptest.NewLogger(t), "bolt://"+ctx.File("pointers.db")) - require.NoError(t, err) - defer ctx.Check(db.Close) + db := teststore.New() buffer := new(bytes.Buffer) writer := csv.NewWriter(buffer) @@ -601,9 +598,7 @@ func TestObserver_processSegment_single_project(t *testing.T) { tt := tt t.Run("#"+strconv.Itoa(i), func(t *testing.T) { // need boltdb to have DB with concurrent access support - db, err := metainfo.NewStore(zaptest.NewLogger(t), "bolt://"+ctx.File("pointers.db")) - require.NoError(t, err) - defer ctx.Check(db.Close) + db := teststore.New() for i, ttObject := range tt.objects { for _, segment := range ttObject.segments { @@ -632,7 +627,7 @@ func TestObserver_processSegment_single_project(t *testing.T) { objects: make(bucketsObjects), writer: csv.NewWriter(new(bytes.Buffer)), } - err = observer.detectZombieSegments(ctx) + err := observer.detectZombieSegments(ctx) require.NoError(t, err) for i, ttObject := range tt.objects { diff --git a/private/testplanet/planet.go b/private/testplanet/planet.go index 1eef2a864..67c99631c 100644 --- a/private/testplanet/planet.go +++ b/private/testplanet/planet.go @@ -17,7 +17,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" - "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" "storj.io/common/identity" @@ -113,43 +112,6 @@ func (peer *closablePeer) Close() error { return peer.err } -// New creates a new full system with the given number of nodes. -func New(t zaptest.TestingT, satelliteCount, storageNodeCount, uplinkCount int) (*Planet, error) { - var log *zap.Logger - if t == nil { - log = zap.NewNop() - } else { - log = zaptest.NewLogger(t) - } - - return NewCustom(log, Config{ - SatelliteCount: satelliteCount, - StorageNodeCount: storageNodeCount, - UplinkCount: uplinkCount, - - Name: t.Name(), - }) -} - -// NewWithIdentityVersion creates a new full system with the given version for node identities and the given number of nodes. -func NewWithIdentityVersion(t zaptest.TestingT, identityVersion *storj.IDVersion, satelliteCount, storageNodeCount, uplinkCount int) (*Planet, error) { - var log *zap.Logger - if t == nil { - log = zap.NewNop() - } else { - log = zaptest.NewLogger(t) - } - - return NewCustom(log, Config{ - SatelliteCount: satelliteCount, - StorageNodeCount: storageNodeCount, - UplinkCount: uplinkCount, - IdentityVersion: identityVersion, - - Name: t.Name(), - }) -} - // NewCustom creates a new full system with the specified configuration. func NewCustom(log *zap.Logger, config Config) (*Planet, error) { // Clear error in the beginning to avoid issues down the line. diff --git a/private/testplanet/satellite.go b/private/testplanet/satellite.go index f4142bbfc..b2a523a8b 100644 --- a/private/testplanet/satellite.go +++ b/private/testplanet/satellite.go @@ -5,7 +5,6 @@ package testplanet import ( "context" - "fmt" "net" "os" "path/filepath" @@ -25,9 +24,6 @@ import ( "storj.io/common/storj" "storj.io/storj/pkg/revocation" "storj.io/storj/pkg/server" - "storj.io/storj/private/dbutil" - "storj.io/storj/private/dbutil/pgutil/pgtest" - "storj.io/storj/private/dbutil/tempdb" "storj.io/storj/private/version" versionchecker "storj.io/storj/private/version/checker" "storj.io/storj/satellite" @@ -55,7 +51,6 @@ import ( "storj.io/storj/satellite/repair/checker" "storj.io/storj/satellite/repair/irreparable" "storj.io/storj/satellite/repair/repairer" - "storj.io/storj/satellite/satellitedb/satellitedbtest" "storj.io/storj/satellite/vouchers" "storj.io/storj/storage/redis/redisserver" ) @@ -241,19 +236,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { if planet.config.Reconfigure.NewSatelliteDB != nil { db, err = planet.config.Reconfigure.NewSatelliteDB(log.Named("db"), i) } else { - // TODO: This is analogous to the way we worked prior to the advent of OpenUnique, - // but it seems wrong. Tests that use planet.Start() instead of testplanet.Run() - // will not get run against both types of DB. - connStr := *pgtest.ConnStr - if *pgtest.CrdbConnStr != "" { - connStr = *pgtest.CrdbConnStr - } - var tempDB *dbutil.TempDatabase - tempDB, err = tempdb.OpenUnique(context.TODO(), connStr, fmt.Sprintf("%s.%d", planet.id, i)) - if err != nil { - return nil, err - } - db, err = satellitedbtest.CreateMasterDBOnTopOf(log.Named("db"), tempDB) + return nil, errs.New("NewSatelliteDB not defined") } if err != nil { return nil, err @@ -263,7 +246,7 @@ func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) { if planet.config.Reconfigure.NewSatellitePointerDB != nil { pointerDB, err = planet.config.Reconfigure.NewSatellitePointerDB(log.Named("pointerdb"), i) } else { - pointerDB, err = metainfo.NewStore(log.Named("pointerdb"), "bolt://"+filepath.Join(storageDir, "pointers.db")) + return nil, errs.New("NewSatellitePointerDB not defined") } if err != nil { return nil, err diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index 0401ce912..db1e42e23 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -11,7 +11,6 @@ import ( "storj.io/common/memory" "storj.io/storj/private/dbutil" "storj.io/storj/storage" - "storj.io/storj/storage/boltdb" "storj.io/storj/storage/cockroachkv" "storj.io/storj/storage/postgreskv" ) @@ -47,7 +46,7 @@ type RateLimiterConfig struct { // Config is a configuration struct that is everything you need to start a metainfo type Config struct { - DatabaseURL string `help:"the database connection string to use" releaseDefault:"postgres://" devDefault:"bolt://$CONFDIR/pointerdb.db"` + DatabaseURL string `help:"the database connection string to use" default:"postgres://"` MinRemoteSegmentSize memory.Size `default:"1240" help:"minimum remote segment size"` MaxInlineSegmentSize memory.Size `default:"8000" help:"maximum inline segment size"` MaxCommitInterval time.Duration `default:"48h" help:"maximum time allowed to pass between creating and committing a segment"` @@ -72,8 +71,6 @@ func NewStore(logger *zap.Logger, dbURLString string) (db PointerDB, err error) } switch implementation { - case dbutil.Bolt: - db, err = boltdb.New(source, BoltPointerBucket) case dbutil.Postgres: db, err = postgreskv.New(source) case dbutil.Cockroach: diff --git a/storage/teststore/store.go b/storage/teststore/store.go index 6e50b0502..044589a11 100644 --- a/storage/teststore/store.go +++ b/storage/teststore/store.go @@ -202,12 +202,14 @@ func (store *Client) Close() error { // Iterate iterates over items based on opts func (store *Client) Iterate(ctx context.Context, opts storage.IterateOptions, fn func(context.Context, storage.Iterator) error) (err error) { defer mon.Task()(&ctx)(&err) - defer store.locked()() + store.mu.Lock() store.CallCount.Iterate++ if store.forcedError() { + store.mu.Unlock() return errInternal } + store.mu.Unlock() var cursor advancer = &forward{newCursor(store)} @@ -304,8 +306,10 @@ func (cursor *cursor) close() { // positionForward positions at key or the next item func (cursor *cursor) positionForward(key storage.Key) { store := cursor.store + store.mu.Lock() cursor.version = store.version cursor.nextIndex, _ = store.indexOf(key) + store.mu.Unlock() cursor.lastKey = storage.CloneKey(key) } @@ -314,6 +318,7 @@ func (cursor *cursor) next() (*storage.ListItem, bool) { if cursor.done { return nil, false } + defer store.locked()() if cursor.version != store.version { cursor.version = store.version