From dc57640d9ce66ffc160c298209e44778febe3467 Mon Sep 17 00:00:00 2001 From: Moby von Briesen Date: Wed, 27 May 2020 17:07:24 -0400 Subject: [PATCH] storagenode/piecestore: switch usedserials db for in-memory usedserials store Part 2 of moving usedserials in memory * Drop usedserials table in storagenodedb * Use in-memory usedserials store in place of db for order limit verification * Update order limit grace period to be only one hour - this means uplinks must send their order limits to storagenodes within an hour of receiving them Change-Id: I37a0e1d2ca6cb80854a3ef495af2d1d1f92e9f03 --- private/testplanet/storagenode.go | 1 + storagenode/collector/service.go | 10 +- storagenode/collector/service_test.go | 28 ++-- storagenode/peer.go | 15 ++- storagenode/piecestore/endpoint.go | 8 +- storagenode/piecestore/serials.go | 29 ----- storagenode/piecestore/serials_test.go | 121 ------------------ storagenode/piecestore/verification.go | 2 +- storagenode/preflight/db_test.go | 12 +- storagenode/storagenodedb/database.go | 21 ++- storagenode/storagenodedb/schema.go | 29 +---- .../storagenodedbtest/run_test.go | 4 +- .../storagenodedb/testdata/multidbsnapshot.go | 1 + storagenode/storagenodedb/testdata/v42.go | 27 ++++ storagenode/storagenodedb/usedserials.go | 61 +-------- 15 files changed, 82 insertions(+), 287 deletions(-) delete mode 100644 storagenode/piecestore/serials.go delete mode 100644 storagenode/piecestore/serials_test.go create mode 100644 storagenode/storagenodedb/testdata/v42.go diff --git a/private/testplanet/storagenode.go b/private/testplanet/storagenode.go index 8567bbab1..10e14b099 100644 --- a/private/testplanet/storagenode.go +++ b/private/testplanet/storagenode.go @@ -153,6 +153,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod CachePath: filepath.Join(storageDir, "trust-cache.json"), RefreshInterval: defaultInterval, }, + MaxUsedSerialsSize: memory.MiB, }, Pieces: pieces.DefaultConfig, Filestore: filestore.DefaultConfig, diff --git a/storagenode/collector/service.go b/storagenode/collector/service.go index 0ea4ea08b..1b66bafec 100644 --- a/storagenode/collector/service.go +++ b/storagenode/collector/service.go @@ -13,7 +13,7 @@ import ( "storj.io/common/sync2" "storj.io/storj/storagenode/pieces" - "storj.io/storj/storagenode/piecestore" + "storj.io/storj/storagenode/piecestore/usedserials" ) var mon = monkit.Package() @@ -29,13 +29,13 @@ type Config struct { type Service struct { log *zap.Logger pieces *pieces.Store - usedSerials piecestore.UsedSerials + usedSerials *usedserials.Table Loop *sync2.Cycle } // NewService creates a new collector service. -func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials piecestore.UsedSerials, config Config) *Service { +func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials *usedserials.Table, config Config) *Service { return &Service{ log: log, pieces: pieces, @@ -70,9 +70,7 @@ func (service *Service) Close() (err error) { func (service *Service) Collect(ctx context.Context, now time.Time) (err error) { defer mon.Task()(&ctx)(&err) - if deleteErr := service.usedSerials.DeleteExpired(ctx, now); err != nil { - service.log.Error("unable to delete expired used serials", zap.Error(deleteErr)) - } + service.usedSerials.DeleteExpired(now) const maxBatches = 100 const batchSize = 1000 diff --git a/storagenode/collector/service_test.go b/storagenode/collector/service_test.go index b07f35b84..3b5791757 100644 --- a/storagenode/collector/service_test.go +++ b/storagenode/collector/service_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "storj.io/common/memory" - "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -45,7 +44,7 @@ func TestCollector(t *testing.T) { // imagine we are 30 minutes in the future for _, storageNode := range planet.StorageNodes { pieceStore := storageNode.DB.Pieces() - usedSerials := storageNode.DB.UsedSerials() + usedSerials := storageNode.UsedSerials // verify that we actually have some data on storage nodes used, err := pieceStore.SpaceUsedForBlobs(ctx) @@ -59,43 +58,37 @@ func TestCollector(t *testing.T) { err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute)) require.NoError(t, err) - // ensure we haven't deleted used serials - err = usedSerials.IterateAll(ctx, func(_ storj.NodeID, _ storj.SerialNumber, _ time.Time) { - serialsPresent++ - }) - require.NoError(t, err) + serialsPresent += usedSerials.Count() collections++ } require.NotZero(t, collections) + // ensure we haven't deleted used serials require.Equal(t, 2, serialsPresent) serialsPresent = 0 // imagine we are 2 hours in the future for _, storageNode := range planet.StorageNodes { - usedSerials := storageNode.DB.UsedSerials() + usedSerials := storageNode.UsedSerials // collect all the data err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour)) require.NoError(t, err) - // ensure we have deleted used serials - err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) { - serialsPresent++ - }) - require.NoError(t, err) + serialsPresent += usedSerials.Count() collections++ } + // ensure we have deleted used serials require.Equal(t, 0, serialsPresent) // imagine we are 10 days in the future for _, storageNode := range planet.StorageNodes { pieceStore := storageNode.DB.Pieces() - usedSerials := storageNode.DB.UsedSerials() + usedSerials := storageNode.UsedSerials // collect all the data err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour)) @@ -106,15 +99,12 @@ func TestCollector(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(0), used) - // ensure we have deleted used serials - err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) { - serialsPresent++ - }) - require.NoError(t, err) + serialsPresent += usedSerials.Count() collections++ } + // ensure we have deleted used serials require.Equal(t, 0, serialsPresent) }) } diff --git a/storagenode/peer.go b/storagenode/peer.go index d5664a110..2989a82cc 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -46,6 +46,7 @@ import ( "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/piecestore" + "storj.io/storj/storagenode/piecestore/usedserials" "storj.io/storj/storagenode/preflight" "storj.io/storj/storagenode/pricing" "storj.io/storj/storagenode/reputation" @@ -77,7 +78,6 @@ type DB interface { PieceExpirationDB() pieces.PieceExpirationDB PieceSpaceUsedDB() pieces.PieceSpaceUsedDB Bandwidth() bandwidth.DB - UsedSerials() piecestore.UsedSerials Reputation() reputation.DB StorageUsage() storageusage.DB Satellites() satellites.DB @@ -181,9 +181,10 @@ func isAddressValid(addrstring string) error { // architecture: Peer type Peer struct { // core dependencies - Log *zap.Logger - Identity *identity.FullIdentity - DB DB + Log *zap.Logger + Identity *identity.FullIdentity + DB DB + UsedSerials *usedserials.Table Servers *lifecycle.Group Services *lifecycle.Group @@ -471,6 +472,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten Close: peer.Storage2.RetainService.Close, }) + peer.UsedSerials = usedserials.NewTable(config.Storage2.MaxUsedSerialsSize) + peer.Storage2.Endpoint, err = piecestore.NewEndpoint( peer.Log.Named("piecestore"), signing.SignerFromFullIdentity(peer.Identity), @@ -482,7 +485,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten peer.Storage2.PieceDeleter, peer.DB.Orders(), peer.DB.Bandwidth(), - peer.DB.UsedSerials(), + peer.UsedSerials, config.Storage2, ) if err != nil { @@ -655,7 +658,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) } - peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector) + peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.UsedSerials, config.Collector) peer.Services.Add(lifecycle.Item{ Name: "collector", Run: peer.Collector.Run, diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 2957a846d..ca4b75b17 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -31,6 +31,7 @@ import ( "storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" + "storj.io/storj/storagenode/piecestore/usedserials" "storj.io/storj/storagenode/retain" "storj.io/storj/storagenode/trust" ) @@ -55,11 +56,12 @@ type Config struct { MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"` DeleteWorkers int `help:"how many piece delete workers" default:"1"` DeleteQueueSize int `help:"size of the piece delete queue" default:"10000"` - OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"` + OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"` CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"` StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"` RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"` ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"` + MaxUsedSerialsSize memory.Size `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"` Trust trust.Config @@ -87,14 +89,14 @@ type Endpoint struct { store *pieces.Store orders orders.DB usage bandwidth.DB - usedSerials UsedSerials + usedSerials *usedserials.Table pieceDeleter *pieces.Deleter liveRequests int32 } // NewEndpoint creates a new piecestore endpoint. -func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { +func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error) { return &Endpoint{ log: log, config: config, diff --git a/storagenode/piecestore/serials.go b/storagenode/piecestore/serials.go deleted file mode 100644 index 0430dbc17..000000000 --- a/storagenode/piecestore/serials.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecestore - -import ( - "context" - "time" - - "storj.io/common/storj" -) - -// SerialNumberFn is callback from IterateAll -type SerialNumberFn func(satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) - -// UsedSerials is a persistent store for serial numbers. -// TODO: maybe this should be in orders.UsedSerials -// -// architecture: Database -type UsedSerials interface { - // Add adds a serial to the database. - Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) error - // DeleteExpired deletes expired serial numbers - DeleteExpired(ctx context.Context, now time.Time) error - - // IterateAll iterates all serials. - // Note, this will lock the database and should only be used during startup. - IterateAll(ctx context.Context, fn SerialNumberFn) error -} diff --git a/storagenode/piecestore/serials_test.go b/storagenode/piecestore/serials_test.go deleted file mode 100644 index 637b96785..000000000 --- a/storagenode/piecestore/serials_test.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -package piecestore_test - -import ( - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "storj.io/common/identity/testidentity" - "storj.io/common/storj" - "storj.io/common/testcontext" - "storj.io/common/testrand" - "storj.io/storj/storagenode" - "storj.io/storj/storagenode/storagenodedb/storagenodedbtest" -) - -func TestUsedSerials(t *testing.T) { - storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { - usedSerials := db.UsedSerials() - - node0 := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion()) - node1 := testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion()) - - serial1 := testrand.SerialNumber() - serial2 := testrand.SerialNumber() - serial3 := testrand.SerialNumber() - - now := time.Now() - - // queries on empty table - err := usedSerials.DeleteExpired(ctx, now.Add(6*time.Minute)) - assert.NoError(t, err) - - err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) {}) - assert.NoError(t, err) - - // let's start adding data - type Serial struct { - SatelliteID storj.NodeID - SerialNumber storj.SerialNumber - Expiration time.Time - } - - // use different timezones - location := time.FixedZone("XYZ", int((8 * time.Hour).Seconds())) - - serialNumbers := []Serial{ - {node0.ID, serial1, now.Add(time.Minute)}, - {node0.ID, serial2, now.Add(4 * time.Minute)}, - {node0.ID, serial3, now.In(location).Add(8 * time.Minute)}, - {node1.ID, serial1, now.In(location).Add(time.Minute)}, - {node1.ID, serial2, now.Add(4 * time.Minute)}, - {node1.ID, serial3, now.Add(8 * time.Minute)}, - } - - // basic adding - for _, serial := range serialNumbers { - err = usedSerials.Add(ctx, serial.SatelliteID, serial.SerialNumber, serial.Expiration) - assert.NoError(t, err) - } - - // duplicate adds should fail - for _, serial := range serialNumbers { - expirationDelta := time.Duration(testrand.Intn(10)-5) * time.Hour - err = usedSerials.Add(ctx, serial.SatelliteID, serial.SerialNumber, serial.Expiration.Add(expirationDelta)) - assert.Error(t, err) - } - - // ensure we can list all of them - listedNumbers := []Serial{} - err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) { - listedNumbers = append(listedNumbers, Serial{satellite, serialNumber, expiration}) - }) - - require.NoError(t, err) - assert.Empty(t, cmp.Diff(serialNumbers, listedNumbers)) - - // ensure we can delete expired - err = usedSerials.DeleteExpired(ctx, now.Add(6*time.Minute)) - require.NoError(t, err) - - // ensure we can list after delete - listedAfterDelete := []Serial{} - err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) { - listedAfterDelete = append(listedAfterDelete, Serial{satellite, serialNumber, expiration}) - }) - - // check that we have actually deleted things - require.NoError(t, err) - assert.Empty(t, cmp.Diff([]Serial{ - {node0.ID, serial3, now.Add(8 * time.Minute)}, - {node1.ID, serial3, now.Add(8 * time.Minute)}, - }, listedAfterDelete)) - }) -} - -func TestUsedSerials_Trivial(t *testing.T) { - storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) { - satelliteID, serial := testrand.NodeID(), testrand.SerialNumber() - - { // Ensure Add works at all - err := db.UsedSerials().Add(ctx, satelliteID, serial, time.Now()) - require.NoError(t, err) - } - - { // Ensure IterateAll works at all - err := db.UsedSerials().IterateAll(ctx, func(storj.NodeID, storj.SerialNumber, time.Time) {}) - require.NoError(t, err) - } - - { // Ensure DeleteExpired works at all - err := db.UsedSerials().DeleteExpired(ctx, time.Now()) - require.NoError(t, err) - } - }) -} diff --git a/storagenode/piecestore/verification.go b/storagenode/piecestore/verification.go index 1aeffc41d..237575b55 100644 --- a/storagenode/piecestore/verification.go +++ b/storagenode/piecestore/verification.go @@ -73,7 +73,7 @@ func (endpoint *Endpoint) verifyOrderLimit(ctx context.Context, limit *pb.OrderL serialExpiration = graceExpiration } - if err := endpoint.usedSerials.Add(ctx, limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil { + if err := endpoint.usedSerials.Add(limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil { return rpcstatus.Wrap(rpcstatus.Unauthenticated, err) } diff --git a/storagenode/preflight/db_test.go b/storagenode/preflight/db_test.go index 80742593f..a3dc9806d 100644 --- a/storagenode/preflight/db_test.go +++ b/storagenode/preflight/db_test.go @@ -28,8 +28,8 @@ func TestPreflightSchema(t *testing.T) { // add index to used serials db rawDBs := db.(*storagenodedb.DB).RawDatabases() - usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] - _, err = usedSerialsDB.GetDB().Exec(ctx, "CREATE INDEX a_new_index ON used_serial_(serial_number)") + satellitesDB := rawDBs[storagenodedb.SatellitesDBName] + _, err = satellitesDB.GetDB().Exec(ctx, "CREATE INDEX a_new_index ON satellites(status)") require.NoError(t, err) // expect error from preflight check for addition @@ -45,8 +45,8 @@ func TestPreflightSchema(t *testing.T) { // remove index from used serials db rawDBs := db.(*storagenodedb.DB).RawDatabases() - usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] - _, err = usedSerialsDB.GetDB().Exec(ctx, "DROP INDEX idx_used_serial_;") + bandwidthDB := rawDBs[storagenodedb.BandwidthDBName] + _, err = bandwidthDB.GetDB().Exec(ctx, "DROP INDEX idx_bandwidth_usage_created;") require.NoError(t, err) // expect error from preflight check for removal @@ -62,8 +62,8 @@ func TestPreflightSchema(t *testing.T) { // add test_table to used serials db rawDBs := db.(*storagenodedb.DB).RawDatabases() - usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] - _, err = usedSerialsDB.GetDB().Exec(ctx, "CREATE TABLE test_table(id int NOT NULL, name varchar(30), PRIMARY KEY (id));") + bandwidthDB := rawDBs[storagenodedb.BandwidthDBName] + _, err = bandwidthDB.GetDB().Exec(ctx, "CREATE TABLE test_table(id int NOT NULL, name varchar(30), PRIMARY KEY (id));") require.NoError(t, err) // expect no error from preflight check with added test_table diff --git a/storagenode/storagenodedb/database.go b/storagenode/storagenodedb/database.go index 981fbe494..bc9fbcaad 100644 --- a/storagenode/storagenodedb/database.go +++ b/storagenode/storagenodedb/database.go @@ -28,7 +28,6 @@ import ( "storj.io/storj/storagenode/notifications" "storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/pieces" - "storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/pricing" "storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/satellites" @@ -436,11 +435,6 @@ func (db *DB) StorageUsage() storageusage.DB { return db.storageUsageDB } -// UsedSerials returns the instance of the UsedSerials database. -func (db *DB) UsedSerials() piecestore.UsedSerials { - return db.usedSerialsDB -} - // Satellites returns the instance of the Satellites database. func (db *DB) Satellites() satellites.DB { return db.satellitesDB @@ -1446,6 +1440,21 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration { return errs.Wrap(err) } + return nil + }), + }, + { + DB: db.usedSerialsDB, + Description: "Drop used serials table", + Version: 42, + Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) { + _, err = rtx.Exec(ctx, ` + DROP TABLE used_serial_; + `) + if err != nil { + return errs.Wrap(err) + } + return nil }), }, diff --git a/storagenode/storagenodedb/schema.go b/storagenode/storagenodedb/schema.go index bd93b4e3c..c84607341 100644 --- a/storagenode/storagenodedb/schema.go +++ b/storagenode/storagenodedb/schema.go @@ -660,34 +660,7 @@ func Schema() map[string]*dbschema.Schema { }, }, }, - "used_serial": &dbschema.Schema{ - Tables: []*dbschema.Table{ - &dbschema.Table{ - Name: "used_serial_", - Columns: []*dbschema.Column{ - &dbschema.Column{ - Name: "expiration", - Type: "TIMESTAMP", - IsNullable: false, - }, - &dbschema.Column{ - Name: "satellite_id", - Type: "BLOB", - IsNullable: false, - }, - &dbschema.Column{ - Name: "serial_number", - Type: "BLOB", - IsNullable: false, - }, - }, - }, - }, - Indexes: []*dbschema.Index{ - &dbschema.Index{Name: "idx_used_serial_", Table: "used_serial_", Columns: []string{"expiration"}, Unique: false, Partial: ""}, - &dbschema.Index{Name: "pk_used_serial_", Table: "used_serial_", Columns: []string{"satellite_id", "serial_number"}, Unique: false, Partial: ""}, - }, - }, + "used_serial": &dbschema.Schema{}, } } diff --git a/storagenode/storagenodedb/storagenodedbtest/run_test.go b/storagenode/storagenodedb/storagenodedbtest/run_test.go index 0d9703299..6dc5e1e16 100644 --- a/storagenode/storagenodedb/storagenodedbtest/run_test.go +++ b/storagenode/storagenodedb/storagenodedbtest/run_test.go @@ -35,8 +35,8 @@ func TestDatabase(t *testing.T) { canceledCtx, cancel := context.WithCancel(ctx) cancel() - serials := db.UsedSerials() - err := serials.Add(canceledCtx, testrand.NodeID(), testrand.SerialNumber(), time.Now().Add(time.Hour)) + bw := db.Bandwidth() + err := bw.Add(canceledCtx, testrand.NodeID(), pb.PieceAction_GET, 0, time.Now()) require.True(t, errs2.IsCanceled(err), err) }) } diff --git a/storagenode/storagenodedb/testdata/multidbsnapshot.go b/storagenode/storagenodedb/testdata/multidbsnapshot.go index 5db2c36ea..740f3360a 100644 --- a/storagenode/storagenodedb/testdata/multidbsnapshot.go +++ b/storagenode/storagenodedb/testdata/multidbsnapshot.go @@ -56,6 +56,7 @@ var States = MultiDBStates{ &v39, &v40, &v41, + &v42, }, } diff --git a/storagenode/storagenodedb/testdata/v42.go b/storagenode/storagenodedb/testdata/v42.go new file mode 100644 index 000000000..0f843f401 --- /dev/null +++ b/storagenode/storagenodedb/testdata/v42.go @@ -0,0 +1,27 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package testdata + +import ( + "storj.io/storj/storagenode/storagenodedb" +) + +var v42 = MultiDBState{ + Version: 42, + DBStates: DBStates{ + storagenodedb.UsedSerialsDBName: &DBState{}, + storagenodedb.StorageUsageDBName: v41.DBStates[storagenodedb.StorageUsageDBName], + storagenodedb.ReputationDBName: v41.DBStates[storagenodedb.ReputationDBName], + storagenodedb.PieceSpaceUsedDBName: v41.DBStates[storagenodedb.PieceSpaceUsedDBName], + storagenodedb.PieceInfoDBName: v41.DBStates[storagenodedb.PieceInfoDBName], + storagenodedb.PieceExpirationDBName: v41.DBStates[storagenodedb.PieceExpirationDBName], + storagenodedb.OrdersDBName: v41.DBStates[storagenodedb.OrdersDBName], + storagenodedb.BandwidthDBName: v41.DBStates[storagenodedb.BandwidthDBName], + storagenodedb.SatellitesDBName: v41.DBStates[storagenodedb.SatellitesDBName], + storagenodedb.DeprecatedInfoDBName: v41.DBStates[storagenodedb.DeprecatedInfoDBName], + storagenodedb.NotificationsDBName: v41.DBStates[storagenodedb.NotificationsDBName], + storagenodedb.HeldAmountDBName: v41.DBStates[storagenodedb.HeldAmountDBName], + storagenodedb.PricingDBName: v41.DBStates[storagenodedb.PricingDBName], + }, +} diff --git a/storagenode/storagenodedb/usedserials.go b/storagenode/storagenodedb/usedserials.go index cfdd324eb..278bd8284 100644 --- a/storagenode/storagenodedb/usedserials.go +++ b/storagenode/storagenodedb/usedserials.go @@ -3,69 +3,10 @@ package storagenodedb -import ( - "context" - "time" - - "github.com/zeebo/errs" - - "storj.io/common/storj" - "storj.io/storj/storagenode/piecestore" -) - -// ErrUsedSerials represents errors from the used serials database. -var ErrUsedSerials = errs.Class("usedserialsdb error") - // UsedSerialsDBName represents the database name. const UsedSerialsDBName = "used_serial" +// usedSerialsDB is necessary for previous migration steps, even though the usedserials db is no longer used. type usedSerialsDB struct { dbContainerImpl } - -// Add adds a serial to the database. -func (db *usedSerialsDB) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) (err error) { - defer mon.Task()(&ctx)(&err) - - _, err = db.ExecContext(ctx, ` - INSERT INTO - used_serial_(satellite_id, serial_number, expiration) - VALUES(?, ?, ?)`, satelliteID, serialNumber, expiration.UTC()) - - return ErrUsedSerials.Wrap(err) -} - -// DeleteExpired deletes expired serial numbers -func (db *usedSerialsDB) DeleteExpired(ctx context.Context, now time.Time) (err error) { - defer mon.Task()(&ctx)(&err) - - _, err = db.ExecContext(ctx, `DELETE FROM used_serial_ WHERE expiration < ?`, now.UTC()) - return ErrUsedSerials.Wrap(err) -} - -// IterateAll iterates all serials. -// Note, this will lock the database and should only be used during startup. -func (db *usedSerialsDB) IterateAll(ctx context.Context, fn piecestore.SerialNumberFn) (err error) { - defer mon.Task()(&ctx)(&err) - - rows, err := db.QueryContext(ctx, `SELECT satellite_id, serial_number, expiration FROM used_serial_`) - if err != nil { - return ErrUsedSerials.Wrap(err) - } - defer func() { err = errs.Combine(err, ErrUsedSerials.Wrap(rows.Close())) }() - - for rows.Next() { - var satelliteID storj.NodeID - var serialNumber storj.SerialNumber - var expiration time.Time - - err := rows.Scan(&satelliteID, &serialNumber, &expiration) - if err != nil { - return ErrUsedSerials.Wrap(err) - } - - fn(satelliteID, serialNumber, expiration) - } - - return ErrUsedSerials.Wrap(rows.Err()) -}