storagenode/piecestore: switch usedserials db for in-memory usedserials store

Part 2 of moving usedserials in memory
* Drop usedserials table in storagenodedb
* Use in-memory usedserials store in place of db for order limit
verification
* Update order limit grace period to be only one hour - this means
uplinks must send their order limits to storagenodes within an hour of
receiving them

Change-Id: I37a0e1d2ca6cb80854a3ef495af2d1d1f92e9f03
This commit is contained in:
Moby von Briesen 2020-05-27 17:07:24 -04:00
parent 909d6d9668
commit dc57640d9c
15 changed files with 82 additions and 287 deletions

View File

@ -153,6 +153,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
CachePath: filepath.Join(storageDir, "trust-cache.json"), CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval, RefreshInterval: defaultInterval,
}, },
MaxUsedSerialsSize: memory.MiB,
}, },
Pieces: pieces.DefaultConfig, Pieces: pieces.DefaultConfig,
Filestore: filestore.DefaultConfig, Filestore: filestore.DefaultConfig,

View File

@ -13,7 +13,7 @@ import (
"storj.io/common/sync2" "storj.io/common/sync2"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/piecestore/usedserials"
) )
var mon = monkit.Package() var mon = monkit.Package()
@ -29,13 +29,13 @@ type Config struct {
type Service struct { type Service struct {
log *zap.Logger log *zap.Logger
pieces *pieces.Store pieces *pieces.Store
usedSerials piecestore.UsedSerials usedSerials *usedserials.Table
Loop *sync2.Cycle Loop *sync2.Cycle
} }
// NewService creates a new collector service. // NewService creates a new collector service.
func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials piecestore.UsedSerials, config Config) *Service { func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials *usedserials.Table, config Config) *Service {
return &Service{ return &Service{
log: log, log: log,
pieces: pieces, pieces: pieces,
@ -70,9 +70,7 @@ func (service *Service) Close() (err error) {
func (service *Service) Collect(ctx context.Context, now time.Time) (err error) { func (service *Service) Collect(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err) defer mon.Task()(&ctx)(&err)
if deleteErr := service.usedSerials.DeleteExpired(ctx, now); err != nil { service.usedSerials.DeleteExpired(now)
service.log.Error("unable to delete expired used serials", zap.Error(deleteErr))
}
const maxBatches = 100 const maxBatches = 100
const batchSize = 1000 const batchSize = 1000

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"storj.io/common/memory" "storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext" "storj.io/common/testcontext"
"storj.io/common/testrand" "storj.io/common/testrand"
"storj.io/storj/private/testplanet" "storj.io/storj/private/testplanet"
@ -45,7 +44,7 @@ func TestCollector(t *testing.T) {
// imagine we are 30 minutes in the future // imagine we are 30 minutes in the future
for _, storageNode := range planet.StorageNodes { for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces() pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.DB.UsedSerials() usedSerials := storageNode.UsedSerials
// verify that we actually have some data on storage nodes // verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsedForBlobs(ctx) used, err := pieceStore.SpaceUsedForBlobs(ctx)
@ -59,43 +58,37 @@ func TestCollector(t *testing.T) {
err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute)) err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute))
require.NoError(t, err) require.NoError(t, err)
// ensure we haven't deleted used serials serialsPresent += usedSerials.Count()
err = usedSerials.IterateAll(ctx, func(_ storj.NodeID, _ storj.SerialNumber, _ time.Time) {
serialsPresent++
})
require.NoError(t, err)
collections++ collections++
} }
require.NotZero(t, collections) require.NotZero(t, collections)
// ensure we haven't deleted used serials
require.Equal(t, 2, serialsPresent) require.Equal(t, 2, serialsPresent)
serialsPresent = 0 serialsPresent = 0
// imagine we are 2 hours in the future // imagine we are 2 hours in the future
for _, storageNode := range planet.StorageNodes { for _, storageNode := range planet.StorageNodes {
usedSerials := storageNode.DB.UsedSerials() usedSerials := storageNode.UsedSerials
// collect all the data // collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour)) err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour))
require.NoError(t, err) require.NoError(t, err)
// ensure we have deleted used serials serialsPresent += usedSerials.Count()
err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) {
serialsPresent++
})
require.NoError(t, err)
collections++ collections++
} }
// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent) require.Equal(t, 0, serialsPresent)
// imagine we are 10 days in the future // imagine we are 10 days in the future
for _, storageNode := range planet.StorageNodes { for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces() pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.DB.UsedSerials() usedSerials := storageNode.UsedSerials
// collect all the data // collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour)) err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour))
@ -106,15 +99,12 @@ func TestCollector(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(0), used) require.Equal(t, int64(0), used)
// ensure we have deleted used serials serialsPresent += usedSerials.Count()
err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) {
serialsPresent++
})
require.NoError(t, err)
collections++ collections++
} }
// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent) require.Equal(t, 0, serialsPresent)
}) })
} }

View File

@ -46,6 +46,7 @@ import (
"storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore" "storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/piecestore/usedserials"
"storj.io/storj/storagenode/preflight" "storj.io/storj/storagenode/preflight"
"storj.io/storj/storagenode/pricing" "storj.io/storj/storagenode/pricing"
"storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/reputation"
@ -77,7 +78,6 @@ type DB interface {
PieceExpirationDB() pieces.PieceExpirationDB PieceExpirationDB() pieces.PieceExpirationDB
PieceSpaceUsedDB() pieces.PieceSpaceUsedDB PieceSpaceUsedDB() pieces.PieceSpaceUsedDB
Bandwidth() bandwidth.DB Bandwidth() bandwidth.DB
UsedSerials() piecestore.UsedSerials
Reputation() reputation.DB Reputation() reputation.DB
StorageUsage() storageusage.DB StorageUsage() storageusage.DB
Satellites() satellites.DB Satellites() satellites.DB
@ -181,9 +181,10 @@ func isAddressValid(addrstring string) error {
// architecture: Peer // architecture: Peer
type Peer struct { type Peer struct {
// core dependencies // core dependencies
Log *zap.Logger Log *zap.Logger
Identity *identity.FullIdentity Identity *identity.FullIdentity
DB DB DB DB
UsedSerials *usedserials.Table
Servers *lifecycle.Group Servers *lifecycle.Group
Services *lifecycle.Group Services *lifecycle.Group
@ -471,6 +472,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
Close: peer.Storage2.RetainService.Close, Close: peer.Storage2.RetainService.Close,
}) })
peer.UsedSerials = usedserials.NewTable(config.Storage2.MaxUsedSerialsSize)
peer.Storage2.Endpoint, err = piecestore.NewEndpoint( peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
peer.Log.Named("piecestore"), peer.Log.Named("piecestore"),
signing.SignerFromFullIdentity(peer.Identity), signing.SignerFromFullIdentity(peer.Identity),
@ -482,7 +485,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.PieceDeleter, peer.Storage2.PieceDeleter,
peer.DB.Orders(), peer.DB.Orders(),
peer.DB.Bandwidth(), peer.DB.Bandwidth(),
peer.DB.UsedSerials(), peer.UsedSerials,
config.Storage2, config.Storage2,
) )
if err != nil { if err != nil {
@ -655,7 +658,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
} }
peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector) peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.UsedSerials, config.Collector)
peer.Services.Add(lifecycle.Item{ peer.Services.Add(lifecycle.Item{
Name: "collector", Name: "collector",
Run: peer.Collector.Run, Run: peer.Collector.Run,

View File

@ -31,6 +31,7 @@ import (
"storj.io/storj/storagenode/monitor" "storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore/usedserials"
"storj.io/storj/storagenode/retain" "storj.io/storj/storagenode/retain"
"storj.io/storj/storagenode/trust" "storj.io/storj/storagenode/trust"
) )
@ -55,11 +56,12 @@ type Config struct {
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"` MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
DeleteWorkers int `help:"how many piece delete workers" default:"1"` DeleteWorkers int `help:"how many piece delete workers" default:"1"`
DeleteQueueSize int `help:"size of the piece delete queue" default:"10000"` DeleteQueueSize int `help:"size of the piece delete queue" default:"10000"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"` OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"` CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"` StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"`
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"` RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"`
ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"` ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"`
MaxUsedSerialsSize memory.Size `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"`
Trust trust.Config Trust trust.Config
@ -87,14 +89,14 @@ type Endpoint struct {
store *pieces.Store store *pieces.Store
orders orders.DB orders orders.DB
usage bandwidth.DB usage bandwidth.DB
usedSerials UsedSerials usedSerials *usedserials.Table
pieceDeleter *pieces.Deleter pieceDeleter *pieces.Deleter
liveRequests int32 liveRequests int32
} }
// NewEndpoint creates a new piecestore endpoint. // NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) { func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error) {
return &Endpoint{ return &Endpoint{
log: log, log: log,
config: config, config: config,

View File

@ -1,29 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package piecestore
import (
"context"
"time"
"storj.io/common/storj"
)
// SerialNumberFn is callback from IterateAll
type SerialNumberFn func(satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time)
// UsedSerials is a persistent store for serial numbers.
// TODO: maybe this should be in orders.UsedSerials
//
// architecture: Database
type UsedSerials interface {
// Add adds a serial to the database.
Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) error
// DeleteExpired deletes expired serial numbers
DeleteExpired(ctx context.Context, now time.Time) error
// IterateAll iterates all serials.
// Note, this will lock the database and should only be used during startup.
IterateAll(ctx context.Context, fn SerialNumberFn) error
}

View File

@ -1,121 +0,0 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package piecestore_test
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
)
func TestUsedSerials(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
usedSerials := db.UsedSerials()
node0 := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
node1 := testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion())
serial1 := testrand.SerialNumber()
serial2 := testrand.SerialNumber()
serial3 := testrand.SerialNumber()
now := time.Now()
// queries on empty table
err := usedSerials.DeleteExpired(ctx, now.Add(6*time.Minute))
assert.NoError(t, err)
err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) {})
assert.NoError(t, err)
// let's start adding data
type Serial struct {
SatelliteID storj.NodeID
SerialNumber storj.SerialNumber
Expiration time.Time
}
// use different timezones
location := time.FixedZone("XYZ", int((8 * time.Hour).Seconds()))
serialNumbers := []Serial{
{node0.ID, serial1, now.Add(time.Minute)},
{node0.ID, serial2, now.Add(4 * time.Minute)},
{node0.ID, serial3, now.In(location).Add(8 * time.Minute)},
{node1.ID, serial1, now.In(location).Add(time.Minute)},
{node1.ID, serial2, now.Add(4 * time.Minute)},
{node1.ID, serial3, now.Add(8 * time.Minute)},
}
// basic adding
for _, serial := range serialNumbers {
err = usedSerials.Add(ctx, serial.SatelliteID, serial.SerialNumber, serial.Expiration)
assert.NoError(t, err)
}
// duplicate adds should fail
for _, serial := range serialNumbers {
expirationDelta := time.Duration(testrand.Intn(10)-5) * time.Hour
err = usedSerials.Add(ctx, serial.SatelliteID, serial.SerialNumber, serial.Expiration.Add(expirationDelta))
assert.Error(t, err)
}
// ensure we can list all of them
listedNumbers := []Serial{}
err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) {
listedNumbers = append(listedNumbers, Serial{satellite, serialNumber, expiration})
})
require.NoError(t, err)
assert.Empty(t, cmp.Diff(serialNumbers, listedNumbers))
// ensure we can delete expired
err = usedSerials.DeleteExpired(ctx, now.Add(6*time.Minute))
require.NoError(t, err)
// ensure we can list after delete
listedAfterDelete := []Serial{}
err = usedSerials.IterateAll(ctx, func(satellite storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) {
listedAfterDelete = append(listedAfterDelete, Serial{satellite, serialNumber, expiration})
})
// check that we have actually deleted things
require.NoError(t, err)
assert.Empty(t, cmp.Diff([]Serial{
{node0.ID, serial3, now.Add(8 * time.Minute)},
{node1.ID, serial3, now.Add(8 * time.Minute)},
}, listedAfterDelete))
})
}
func TestUsedSerials_Trivial(t *testing.T) {
storagenodedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db storagenode.DB) {
satelliteID, serial := testrand.NodeID(), testrand.SerialNumber()
{ // Ensure Add works at all
err := db.UsedSerials().Add(ctx, satelliteID, serial, time.Now())
require.NoError(t, err)
}
{ // Ensure IterateAll works at all
err := db.UsedSerials().IterateAll(ctx, func(storj.NodeID, storj.SerialNumber, time.Time) {})
require.NoError(t, err)
}
{ // Ensure DeleteExpired works at all
err := db.UsedSerials().DeleteExpired(ctx, time.Now())
require.NoError(t, err)
}
})
}

View File

@ -73,7 +73,7 @@ func (endpoint *Endpoint) verifyOrderLimit(ctx context.Context, limit *pb.OrderL
serialExpiration = graceExpiration serialExpiration = graceExpiration
} }
if err := endpoint.usedSerials.Add(ctx, limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil { if err := endpoint.usedSerials.Add(limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil {
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err) return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
} }

View File

@ -28,8 +28,8 @@ func TestPreflightSchema(t *testing.T) {
// add index to used serials db // add index to used serials db
rawDBs := db.(*storagenodedb.DB).RawDatabases() rawDBs := db.(*storagenodedb.DB).RawDatabases()
usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] satellitesDB := rawDBs[storagenodedb.SatellitesDBName]
_, err = usedSerialsDB.GetDB().Exec(ctx, "CREATE INDEX a_new_index ON used_serial_(serial_number)") _, err = satellitesDB.GetDB().Exec(ctx, "CREATE INDEX a_new_index ON satellites(status)")
require.NoError(t, err) require.NoError(t, err)
// expect error from preflight check for addition // expect error from preflight check for addition
@ -45,8 +45,8 @@ func TestPreflightSchema(t *testing.T) {
// remove index from used serials db // remove index from used serials db
rawDBs := db.(*storagenodedb.DB).RawDatabases() rawDBs := db.(*storagenodedb.DB).RawDatabases()
usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] bandwidthDB := rawDBs[storagenodedb.BandwidthDBName]
_, err = usedSerialsDB.GetDB().Exec(ctx, "DROP INDEX idx_used_serial_;") _, err = bandwidthDB.GetDB().Exec(ctx, "DROP INDEX idx_bandwidth_usage_created;")
require.NoError(t, err) require.NoError(t, err)
// expect error from preflight check for removal // expect error from preflight check for removal
@ -62,8 +62,8 @@ func TestPreflightSchema(t *testing.T) {
// add test_table to used serials db // add test_table to used serials db
rawDBs := db.(*storagenodedb.DB).RawDatabases() rawDBs := db.(*storagenodedb.DB).RawDatabases()
usedSerialsDB := rawDBs[storagenodedb.UsedSerialsDBName] bandwidthDB := rawDBs[storagenodedb.BandwidthDBName]
_, err = usedSerialsDB.GetDB().Exec(ctx, "CREATE TABLE test_table(id int NOT NULL, name varchar(30), PRIMARY KEY (id));") _, err = bandwidthDB.GetDB().Exec(ctx, "CREATE TABLE test_table(id int NOT NULL, name varchar(30), PRIMARY KEY (id));")
require.NoError(t, err) require.NoError(t, err)
// expect no error from preflight check with added test_table // expect no error from preflight check with added test_table

View File

@ -28,7 +28,6 @@ import (
"storj.io/storj/storagenode/notifications" "storj.io/storj/storagenode/notifications"
"storj.io/storj/storagenode/orders" "storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces" "storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/pricing" "storj.io/storj/storagenode/pricing"
"storj.io/storj/storagenode/reputation" "storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites" "storj.io/storj/storagenode/satellites"
@ -436,11 +435,6 @@ func (db *DB) StorageUsage() storageusage.DB {
return db.storageUsageDB return db.storageUsageDB
} }
// UsedSerials returns the instance of the UsedSerials database.
func (db *DB) UsedSerials() piecestore.UsedSerials {
return db.usedSerialsDB
}
// Satellites returns the instance of the Satellites database. // Satellites returns the instance of the Satellites database.
func (db *DB) Satellites() satellites.DB { func (db *DB) Satellites() satellites.DB {
return db.satellitesDB return db.satellitesDB
@ -1446,6 +1440,21 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
return errs.Wrap(err) return errs.Wrap(err)
} }
return nil
}),
},
{
DB: db.usedSerialsDB,
Description: "Drop used serials table",
Version: 42,
Action: migrate.Func(func(ctx context.Context, _ *zap.Logger, rdb tagsql.DB, rtx tagsql.Tx) (err error) {
_, err = rtx.Exec(ctx, `
DROP TABLE used_serial_;
`)
if err != nil {
return errs.Wrap(err)
}
return nil return nil
}), }),
}, },

View File

@ -660,34 +660,7 @@ func Schema() map[string]*dbschema.Schema {
}, },
}, },
}, },
"used_serial": &dbschema.Schema{ "used_serial": &dbschema.Schema{},
Tables: []*dbschema.Table{
&dbschema.Table{
Name: "used_serial_",
Columns: []*dbschema.Column{
&dbschema.Column{
Name: "expiration",
Type: "TIMESTAMP",
IsNullable: false,
},
&dbschema.Column{
Name: "satellite_id",
Type: "BLOB",
IsNullable: false,
},
&dbschema.Column{
Name: "serial_number",
Type: "BLOB",
IsNullable: false,
},
},
},
},
Indexes: []*dbschema.Index{
&dbschema.Index{Name: "idx_used_serial_", Table: "used_serial_", Columns: []string{"expiration"}, Unique: false, Partial: ""},
&dbschema.Index{Name: "pk_used_serial_", Table: "used_serial_", Columns: []string{"satellite_id", "serial_number"}, Unique: false, Partial: ""},
},
},
} }
} }

View File

@ -35,8 +35,8 @@ func TestDatabase(t *testing.T) {
canceledCtx, cancel := context.WithCancel(ctx) canceledCtx, cancel := context.WithCancel(ctx)
cancel() cancel()
serials := db.UsedSerials() bw := db.Bandwidth()
err := serials.Add(canceledCtx, testrand.NodeID(), testrand.SerialNumber(), time.Now().Add(time.Hour)) err := bw.Add(canceledCtx, testrand.NodeID(), pb.PieceAction_GET, 0, time.Now())
require.True(t, errs2.IsCanceled(err), err) require.True(t, errs2.IsCanceled(err), err)
}) })
} }

View File

@ -56,6 +56,7 @@ var States = MultiDBStates{
&v39, &v39,
&v40, &v40,
&v41, &v41,
&v42,
}, },
} }

View File

@ -0,0 +1,27 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import (
"storj.io/storj/storagenode/storagenodedb"
)
var v42 = MultiDBState{
Version: 42,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: &DBState{},
storagenodedb.StorageUsageDBName: v41.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v41.DBStates[storagenodedb.ReputationDBName],
storagenodedb.PieceSpaceUsedDBName: v41.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v41.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v41.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v41.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v41.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: v41.DBStates[storagenodedb.SatellitesDBName],
storagenodedb.DeprecatedInfoDBName: v41.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v41.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: v41.DBStates[storagenodedb.HeldAmountDBName],
storagenodedb.PricingDBName: v41.DBStates[storagenodedb.PricingDBName],
},
}

View File

@ -3,69 +3,10 @@
package storagenodedb package storagenodedb
import (
"context"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/storagenode/piecestore"
)
// ErrUsedSerials represents errors from the used serials database.
var ErrUsedSerials = errs.Class("usedserialsdb error")
// UsedSerialsDBName represents the database name. // UsedSerialsDBName represents the database name.
const UsedSerialsDBName = "used_serial" const UsedSerialsDBName = "used_serial"
// usedSerialsDB is necessary for previous migration steps, even though the usedserials db is no longer used.
type usedSerialsDB struct { type usedSerialsDB struct {
dbContainerImpl dbContainerImpl
} }
// Add adds a serial to the database.
func (db *usedSerialsDB) Add(ctx context.Context, satelliteID storj.NodeID, serialNumber storj.SerialNumber, expiration time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `
INSERT INTO
used_serial_(satellite_id, serial_number, expiration)
VALUES(?, ?, ?)`, satelliteID, serialNumber, expiration.UTC())
return ErrUsedSerials.Wrap(err)
}
// DeleteExpired deletes expired serial numbers
func (db *usedSerialsDB) DeleteExpired(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = db.ExecContext(ctx, `DELETE FROM used_serial_ WHERE expiration < ?`, now.UTC())
return ErrUsedSerials.Wrap(err)
}
// IterateAll iterates all serials.
// Note, this will lock the database and should only be used during startup.
func (db *usedSerialsDB) IterateAll(ctx context.Context, fn piecestore.SerialNumberFn) (err error) {
defer mon.Task()(&ctx)(&err)
rows, err := db.QueryContext(ctx, `SELECT satellite_id, serial_number, expiration FROM used_serial_`)
if err != nil {
return ErrUsedSerials.Wrap(err)
}
defer func() { err = errs.Combine(err, ErrUsedSerials.Wrap(rows.Close())) }()
for rows.Next() {
var satelliteID storj.NodeID
var serialNumber storj.SerialNumber
var expiration time.Time
err := rows.Scan(&satelliteID, &serialNumber, &expiration)
if err != nil {
return ErrUsedSerials.Wrap(err)
}
fn(satelliteID, serialNumber, expiration)
}
return ErrUsedSerials.Wrap(rows.Err())
}