storagenode/storagenodedb: faster test db init
Running all of the migrations necessary to initialize a storage node database takes a significant amount of time during runs. The package current supports initializing a database from manually coalesced migration data (i.e. snapshot) which improves the situation somewhat. This change takes things a bit further by changing the snapshot code to instead hydrate the database directory from a pre-generated snapshot zip file. name old time/op new time/op delta Run_StorageNodeCount_4/Postgres-16 2.50s ± 0% 0.16s ± 0% ~ (p=1.000 n=1+1) Change-Id: I213bbba5f9199497fbe8ce889b627e853f8b29a0
This commit is contained in:
parent
08c9d745f1
commit
4fdea51d5c
@ -39,7 +39,7 @@ import (
|
|||||||
"storj.io/storj/storagenode/piecestore"
|
"storj.io/storj/storagenode/piecestore"
|
||||||
"storj.io/storj/storagenode/preflight"
|
"storj.io/storj/storagenode/preflight"
|
||||||
"storj.io/storj/storagenode/retain"
|
"storj.io/storj/storagenode/retain"
|
||||||
"storj.io/storj/storagenode/storagenodedb"
|
"storj.io/storj/storagenode/storagenodedb/storagenodedbtest"
|
||||||
"storj.io/storj/storagenode/trust"
|
"storj.io/storj/storagenode/trust"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -227,7 +227,7 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
|
|||||||
verisonInfo := planet.NewVersionInfo()
|
verisonInfo := planet.NewVersionInfo()
|
||||||
|
|
||||||
var db storagenode.DB
|
var db storagenode.DB
|
||||||
db, err = storagenodedb.OpenNew(ctx, log.Named("db"), config.DatabaseConfig())
|
db, err = storagenodedbtest.OpenNew(ctx, log.Named("db"), config.DatabaseConfig())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -257,7 +257,7 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
|
|||||||
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
|
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
|
||||||
peer.Storage2.PieceDeleter.SetupTest()
|
peer.Storage2.PieceDeleter.SetupTest()
|
||||||
|
|
||||||
err = db.TestMigrateToLatest(ctx)
|
err = db.MigrateToLatest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -78,9 +78,6 @@ type DB interface {
|
|||||||
// MigrateToLatest initializes the database
|
// MigrateToLatest initializes the database
|
||||||
MigrateToLatest(ctx context.Context) error
|
MigrateToLatest(ctx context.Context) error
|
||||||
|
|
||||||
// TestMigrateToLatest is a fast migration with skipping test (not safe for production + old db state)
|
|
||||||
TestMigrateToLatest(ctx context.Context) error
|
|
||||||
|
|
||||||
// Close closes the database
|
// Close closes the database
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
|
@ -347,13 +347,6 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
|
|||||||
return migration.Run(ctx, db.log.Named("migration"))
|
return migration.Run(ctx, db.log.Named("migration"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestMigrateToLatest creates any necessary tables from snapshot.
|
|
||||||
// it's faster for the testing but should be the same as MigrateToLatest.
|
|
||||||
func (db *DB) TestMigrateToLatest(ctx context.Context) error {
|
|
||||||
migration := db.Snapshot(ctx)
|
|
||||||
return migration.Run(ctx, db.log.Named("migration"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Preflight conducts a pre-flight check to ensure correct schemas and minimal read+write functionality of the database tables.
|
// Preflight conducts a pre-flight check to ensure correct schemas and minimal read+write functionality of the database tables.
|
||||||
func (db *DB) Preflight(ctx context.Context) (err error) {
|
func (db *DB) Preflight(ctx context.Context) (err error) {
|
||||||
for dbName, dbContainer := range db.SQLDBs {
|
for dbName, dbContainer := range db.SQLDBs {
|
||||||
@ -566,6 +559,11 @@ func (db *DB) RawDatabases() map[string]DBContainer {
|
|||||||
return db.SQLDBs
|
return db.SQLDBs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DBDirectory returns the database directory for testing purposes.
|
||||||
|
func (db *DB) DBDirectory() string {
|
||||||
|
return db.dbDirectory
|
||||||
|
}
|
||||||
|
|
||||||
// migrateToDB is a helper method that performs the migration from the
|
// migrateToDB is a helper method that performs the migration from the
|
||||||
// deprecatedInfoDB to the specified new db. It first closes and deletes any
|
// deprecatedInfoDB to the specified new db. It first closes and deletes any
|
||||||
// existing database to guarantee idempotence. After migration it also closes
|
// existing database to guarantee idempotence. After migration it also closes
|
||||||
|
@ -1,394 +0,0 @@
|
|||||||
// Copyright (C) 2022 Storj Labs, Inc.
|
|
||||||
// See LICENSE for copying information.
|
|
||||||
|
|
||||||
package storagenodedb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"storj.io/storj/private/migrate"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Snapshot supposed to generate the same database as the Migration but without the original steps.
|
|
||||||
func (db *DB) Snapshot(ctx context.Context) *migrate.Migration {
|
|
||||||
return &migrate.Migration{
|
|
||||||
Table: VersionTable,
|
|
||||||
Steps: []*migrate.Step{
|
|
||||||
{
|
|
||||||
DB: &db.deprecatedInfoDB.DB,
|
|
||||||
Description: "Initial setup",
|
|
||||||
Version: 0,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, DeprecatedInfoDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.bandwidthDB.DB,
|
|
||||||
Description: "bandwidth db snapshot",
|
|
||||||
Version: 2,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, BandwidthDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
// table for storing bandwidth usage
|
|
||||||
`CREATE TABLE bandwidth_usage (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
action INTEGER NOT NULL,
|
|
||||||
amount BIGINT NOT NULL,
|
|
||||||
created_at TIMESTAMP NOT NULL
|
|
||||||
)`,
|
|
||||||
`CREATE INDEX idx_bandwidth_usage_satellite ON bandwidth_usage(satellite_id)`,
|
|
||||||
`CREATE INDEX idx_bandwidth_usage_created ON bandwidth_usage(created_at)`,
|
|
||||||
`CREATE TABLE bandwidth_usage_rollups (
|
|
||||||
interval_start TIMESTAMP NOT NULL,
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
action INTEGER NOT NULL,
|
|
||||||
amount BIGINT NOT NULL,
|
|
||||||
PRIMARY KEY ( interval_start, satellite_id, action )
|
|
||||||
)`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.ordersDB.DB,
|
|
||||||
Description: "orders db snapshot",
|
|
||||||
Version: 3,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, OrdersDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
// table for storing all unsent orders
|
|
||||||
`CREATE TABLE unsent_order (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
serial_number BLOB NOT NULL,
|
|
||||||
|
|
||||||
order_limit_serialized BLOB NOT NULL, -- serialized pb.OrderLimit
|
|
||||||
order_serialized BLOB NOT NULL, -- serialized pb.Order
|
|
||||||
order_limit_expiration TIMESTAMP NOT NULL, -- when is the deadline for sending it
|
|
||||||
|
|
||||||
uplink_cert_id INTEGER NOT NULL,
|
|
||||||
|
|
||||||
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
|
|
||||||
)`,
|
|
||||||
`CREATE UNIQUE INDEX idx_orders ON unsent_order(satellite_id, serial_number)`,
|
|
||||||
`CREATE TABLE order_archive_ (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
serial_number BLOB NOT NULL,
|
|
||||||
|
|
||||||
order_limit_serialized BLOB NOT NULL,
|
|
||||||
order_serialized BLOB NOT NULL,
|
|
||||||
|
|
||||||
uplink_cert_id INTEGER NOT NULL,
|
|
||||||
|
|
||||||
status INTEGER NOT NULL,
|
|
||||||
archived_at TIMESTAMP NOT NULL,
|
|
||||||
|
|
||||||
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
|
|
||||||
)`,
|
|
||||||
`CREATE INDEX idx_order_archived_at ON order_archive_(archived_at)`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.pieceExpirationDB.DB,
|
|
||||||
Description: "pieceExpiration db snapshot",
|
|
||||||
Version: 4,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, PieceExpirationDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE piece_expirations (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
piece_id BLOB NOT NULL,
|
|
||||||
piece_expiration TIMESTAMP NOT NULL, -- date when it can be deleted
|
|
||||||
deletion_failed_at TIMESTAMP, trash INTEGER NOT NULL DEFAULT 0,
|
|
||||||
PRIMARY KEY (satellite_id, piece_id)
|
|
||||||
)`,
|
|
||||||
`CREATE INDEX idx_piece_expirations_piece_expiration ON piece_expirations(piece_expiration)`,
|
|
||||||
`CREATE INDEX idx_piece_expirations_deletion_failed_at ON piece_expirations(deletion_failed_at)`,
|
|
||||||
`CREATE INDEX idx_piece_expirations_trashed
|
|
||||||
ON piece_expirations(satellite_id, trash)
|
|
||||||
WHERE trash = 1`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.v0PieceInfoDB.DB,
|
|
||||||
Description: "v0PieceInfo db snapshot",
|
|
||||||
Version: 5,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, PieceInfoDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE pieceinfo_ (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
piece_id BLOB NOT NULL,
|
|
||||||
piece_size BIGINT NOT NULL,
|
|
||||||
piece_expiration TIMESTAMP,
|
|
||||||
|
|
||||||
order_limit BLOB NOT NULL,
|
|
||||||
uplink_piece_hash BLOB NOT NULL,
|
|
||||||
uplink_cert_id INTEGER NOT NULL,
|
|
||||||
|
|
||||||
deletion_failed_at TIMESTAMP,
|
|
||||||
piece_creation TIMESTAMP NOT NULL,
|
|
||||||
|
|
||||||
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
|
|
||||||
)`,
|
|
||||||
`CREATE UNIQUE INDEX pk_pieceinfo_ ON pieceinfo_(satellite_id, piece_id)`,
|
|
||||||
`CREATE INDEX idx_pieceinfo__expiration ON pieceinfo_(piece_expiration) WHERE piece_expiration IS NOT NULL`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.pieceSpaceUsedDB.DB,
|
|
||||||
Description: "pieceSpaceUsed db snapshot",
|
|
||||||
Version: 6,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, PieceSpaceUsedDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
// new table to hold the most recent totals from the piece space used cache
|
|
||||||
`CREATE TABLE piece_space_used_new (
|
|
||||||
total INTEGER NOT NULL DEFAULT 0,
|
|
||||||
content_size INTEGER NOT NULL,
|
|
||||||
satellite_id BLOB
|
|
||||||
)`,
|
|
||||||
`ALTER TABLE piece_space_used_new RENAME TO piece_space_used;`,
|
|
||||||
`CREATE UNIQUE INDEX idx_piece_space_used_satellite_id ON piece_space_used(satellite_id)`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.reputationDB.DB,
|
|
||||||
Description: "reputation db snapshot",
|
|
||||||
Version: 7,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, ReputationDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
` CREATE TABLE reputation_new (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
audit_success_count INTEGER NOT NULL,
|
|
||||||
audit_total_count INTEGER NOT NULL,
|
|
||||||
audit_reputation_alpha REAL NOT NULL,
|
|
||||||
audit_reputation_beta REAL NOT NULL,
|
|
||||||
audit_reputation_score REAL NOT NULL,
|
|
||||||
audit_unknown_reputation_alpha REAL NOT NULL,
|
|
||||||
audit_unknown_reputation_beta REAL NOT NULL,
|
|
||||||
audit_unknown_reputation_score REAL NOT NULL,
|
|
||||||
online_score REAL NOT NULL,
|
|
||||||
audit_history BLOB,
|
|
||||||
disqualified_at TIMESTAMP,
|
|
||||||
updated_at TIMESTAMP NOT NULL,
|
|
||||||
suspended_at TIMESTAMP,
|
|
||||||
offline_suspended_at TIMESTAMP,
|
|
||||||
offline_under_review_at TIMESTAMP,
|
|
||||||
joined_at TIMESTAMP NOT NULL, vetted_at TIMESTAMP,
|
|
||||||
PRIMARY KEY (satellite_id)
|
|
||||||
);`,
|
|
||||||
`ALTER TABLE reputation_new RENAME TO reputation;`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.storageUsageDB.DB,
|
|
||||||
Description: "storageUsage db snapshot",
|
|
||||||
Version: 8,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, StorageUsageDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE storage_usage_new (
|
|
||||||
timestamp TIMESTAMP NOT NULL,
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
at_rest_total REAL NOT NULL,
|
|
||||||
interval_end_time TIMESTAMP NOT NULL,
|
|
||||||
PRIMARY KEY (timestamp, satellite_id)
|
|
||||||
);`,
|
|
||||||
`ALTER TABLE storage_usage_new RENAME TO storage_usage`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.usedSerialsDB.DB,
|
|
||||||
Description: "usedSerials db snapshot",
|
|
||||||
Version: 9,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, UsedSerialsDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.satellitesDB.DB,
|
|
||||||
Description: "satellites db snapshot",
|
|
||||||
Version: 10,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, SatellitesDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE satellites_new (
|
|
||||||
node_id BLOB NOT NULL,
|
|
||||||
added_at TIMESTAMP NOT NULL,
|
|
||||||
status INTEGER NOT NULL, address TEXT,
|
|
||||||
PRIMARY KEY (node_id)
|
|
||||||
);`,
|
|
||||||
`ALTER TABLE satellites_new RENAME TO satellites;`,
|
|
||||||
`CREATE TABLE satellite_exit_progress_new (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
initiated_at TIMESTAMP,
|
|
||||||
finished_at TIMESTAMP,
|
|
||||||
starting_disk_usage INTEGER NOT NULL,
|
|
||||||
bytes_deleted INTEGER NOT NULL,
|
|
||||||
completion_receipt BLOB,
|
|
||||||
FOREIGN KEY (satellite_id) REFERENCES satellites (node_id)
|
|
||||||
);`,
|
|
||||||
`ALTER TABLE satellite_exit_progress_new RENAME TO satellite_exit_progress`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
DB: &db.notificationsDB.DB,
|
|
||||||
Description: "notifications db snapshot",
|
|
||||||
Version: 11,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, NotificationsDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE notifications (
|
|
||||||
id BLOB NOT NULL,
|
|
||||||
sender_id BLOB NOT NULL,
|
|
||||||
type INTEGER NOT NULL,
|
|
||||||
title TEXT NOT NULL,
|
|
||||||
message TEXT NOT NULL,
|
|
||||||
read_at TIMESTAMP,
|
|
||||||
created_at TIMESTAMP NOT NULL,
|
|
||||||
PRIMARY KEY (id)
|
|
||||||
);`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
DB: &db.payoutDB.DB,
|
|
||||||
Description: "paystubs db snapshot",
|
|
||||||
Version: 12,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, HeldAmountDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE paystubs_new (
|
|
||||||
period text NOT NULL,
|
|
||||||
satellite_id bytea NOT NULL,
|
|
||||||
created_at timestamp NOT NULL,
|
|
||||||
codes text NOT NULL,
|
|
||||||
usage_at_rest double precision NOT NULL,
|
|
||||||
usage_get bigint NOT NULL,
|
|
||||||
usage_put bigint NOT NULL,
|
|
||||||
usage_get_repair bigint NOT NULL,
|
|
||||||
usage_put_repair bigint NOT NULL,
|
|
||||||
usage_get_audit bigint NOT NULL,
|
|
||||||
comp_at_rest bigint NOT NULL,
|
|
||||||
comp_get bigint NOT NULL,
|
|
||||||
comp_put bigint NOT NULL,
|
|
||||||
comp_get_repair bigint NOT NULL,
|
|
||||||
comp_put_repair bigint NOT NULL,
|
|
||||||
comp_get_audit bigint NOT NULL,
|
|
||||||
surge_percent bigint NOT NULL,
|
|
||||||
held bigint NOT NULL,
|
|
||||||
owed bigint NOT NULL,
|
|
||||||
disposed bigint NOT NULL,
|
|
||||||
paid bigint NOT NULL,
|
|
||||||
distributed bigint NOT NULL,
|
|
||||||
PRIMARY KEY ( period, satellite_id )
|
|
||||||
);`,
|
|
||||||
`CREATE TABLE payments (
|
|
||||||
id bigserial NOT NULL,
|
|
||||||
created_at timestamp NOT NULL,
|
|
||||||
satellite_id bytea NOT NULL,
|
|
||||||
period text,
|
|
||||||
amount bigint NOT NULL,
|
|
||||||
receipt text,
|
|
||||||
notes text,
|
|
||||||
PRIMARY KEY ( id )
|
|
||||||
);`,
|
|
||||||
`ALTER TABLE paystubs_new RENAME TO paystubs`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
DB: &db.pricingDB.DB,
|
|
||||||
Description: "pricing db snapshot",
|
|
||||||
Version: 13,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, PricingDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE pricing (
|
|
||||||
satellite_id BLOB NOT NULL,
|
|
||||||
egress_bandwidth_price bigint NOT NULL,
|
|
||||||
repair_bandwidth_price bigint NOT NULL,
|
|
||||||
audit_bandwidth_price bigint NOT NULL,
|
|
||||||
disk_space_price bigint NOT NULL,
|
|
||||||
PRIMARY KEY ( satellite_id )
|
|
||||||
);`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
DB: &db.apiKeysDB.DB,
|
|
||||||
Description: "scret db snapshot",
|
|
||||||
Version: 14,
|
|
||||||
CreateDB: func(ctx context.Context, log *zap.Logger) error {
|
|
||||||
if err := db.openDatabase(ctx, APIKeysDBName); err != nil {
|
|
||||||
return ErrDatabase.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
Action: migrate.SQL{
|
|
||||||
`CREATE TABLE secret (
|
|
||||||
token bytea NOT NULL,
|
|
||||||
created_at timestamp with time zone NOT NULL,
|
|
||||||
PRIMARY KEY ( token )
|
|
||||||
);`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
100
storagenode/storagenodedb/storagenodedbtest/gensnapshot/main.go
Normal file
100
storagenode/storagenodedb/storagenodedbtest/gensnapshot/main.go
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/zip"
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/storj/storagenode/storagenodedb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
outFile := "snapshot.zip"
|
||||||
|
if len(os.Args) > 1 {
|
||||||
|
outFile = os.Args[1]
|
||||||
|
}
|
||||||
|
if err := run(context.Background(), outFile); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Failed to generate snapshot database: %+v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func run(ctx context.Context, outFile string) error {
|
||||||
|
log, err := zap.NewDevelopment()
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tempDir, err := os.MkdirTemp("", "")
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := os.RemoveAll(tempDir); err != nil {
|
||||||
|
log.Warn("Could not remove temp directory", zap.Error(err), zap.String("directory", tempDir))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cfg := storagenodedb.Config{
|
||||||
|
Storage: tempDir,
|
||||||
|
Info: filepath.Join(tempDir, "piecestore.db"),
|
||||||
|
Info2: filepath.Join(tempDir, "info.db"),
|
||||||
|
Pieces: tempDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := storagenodedb.OpenNew(ctx, log, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.MigrateToLatest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
matches, err := filepath.Glob(filepath.Join(tempDir, "*.db"))
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
zipWriter := zip.NewWriter(buf)
|
||||||
|
for _, match := range matches {
|
||||||
|
data, err := os.ReadFile(match)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
w, err := zipWriter.Create(filepath.Base(match))
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
_, err = io.Copy(w, bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := zipWriter.Close(); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.WriteFile(outFile, buf.Bytes(), 0644); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -34,6 +34,7 @@ func Run(t *testing.T, test func(ctx *testcontext.Context, t *testing.T, db stor
|
|||||||
log := zaptest.NewLogger(t)
|
log := zaptest.NewLogger(t)
|
||||||
|
|
||||||
storageDir := ctx.Dir("storage")
|
storageDir := ctx.Dir("storage")
|
||||||
|
|
||||||
cfg := storagenodedb.Config{
|
cfg := storagenodedb.Config{
|
||||||
Storage: storageDir,
|
Storage: storageDir,
|
||||||
Info: filepath.Join(storageDir, "piecestore.db"),
|
Info: filepath.Join(storageDir, "piecestore.db"),
|
||||||
@ -42,7 +43,7 @@ func Run(t *testing.T, test func(ctx *testcontext.Context, t *testing.T, db stor
|
|||||||
Pieces: storageDir,
|
Pieces: storageDir,
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := storagenodedb.OpenNew(ctx, log, cfg)
|
db, err := OpenNew(ctx, log, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
61
storagenode/storagenodedb/storagenodedbtest/snapshot.go
Normal file
61
storagenode/storagenodedb/storagenodedbtest/snapshot.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
|
package storagenodedbtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/zip"
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/zeebo/errs"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"storj.io/storj/storagenode/storagenodedb"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:generate go run ./gensnapshot testdata/snapshot.zip
|
||||||
|
//go:embed testdata/snapshot.zip
|
||||||
|
var snapshotZip []byte
|
||||||
|
|
||||||
|
// OpenNew opens a new storage node database pre-populated with a newly
|
||||||
|
// initialized and migrated database snapshot.
|
||||||
|
func OpenNew(ctx context.Context, log *zap.Logger, config storagenodedb.Config) (*storagenodedb.DB, error) {
|
||||||
|
db, err := storagenodedb.OpenNew(ctx, log, config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := deploySnapshot(db.DBDirectory()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deploySnapshot(storageDir string) error {
|
||||||
|
zipReader, err := zip.NewReader(bytes.NewReader(snapshotZip), int64(len(snapshotZip)))
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range zipReader.File {
|
||||||
|
rc, err := f.Open()
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
data, err := io.ReadAll(rc)
|
||||||
|
if err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(filepath.Join(storageDir, f.Name), data, 0644); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
if err := rc.Close(); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
// Copyright (C) 2022 Storj Labs, Inc.
|
// Copyright (C) 2022 Storj Labs, Inc.
|
||||||
// See LICENSE for copying information.
|
// See LICENSE for copying information.
|
||||||
|
|
||||||
package storagenodedb
|
package storagenodedbtest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -17,6 +17,7 @@ import (
|
|||||||
"storj.io/common/testcontext"
|
"storj.io/common/testcontext"
|
||||||
"storj.io/private/tagsql"
|
"storj.io/private/tagsql"
|
||||||
"storj.io/storj/storage/filestore"
|
"storj.io/storj/storage/filestore"
|
||||||
|
"storj.io/storj/storagenode/storagenodedb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestSnapshot tests if the snapshot migration (used for faster testplanet) is the same as the prod migration.
|
// TestSnapshot tests if the snapshot migration (used for faster testplanet) is the same as the prod migration.
|
||||||
@ -25,24 +26,25 @@ func TestSnapshot(t *testing.T) {
|
|||||||
ctx := testcontext.New(t)
|
ctx := testcontext.New(t)
|
||||||
defer ctx.Cleanup()
|
defer ctx.Cleanup()
|
||||||
|
|
||||||
fromMigrationSteps := getSchemeSnapshot(t, ctx, "migration", func(ctx context.Context, db *DB) error {
|
fromMigrationSteps := getSchemeSnapshot(t, ctx, "migration", func(ctx context.Context, db *storagenodedb.DB) error {
|
||||||
return db.MigrateToLatest(ctx)
|
return db.MigrateToLatest(ctx)
|
||||||
})
|
})
|
||||||
fromSnapshot := getSchemeSnapshot(t, ctx, "steps", func(ctx context.Context, db *DB) error {
|
fromSnapshot := getSchemeSnapshot(t, ctx, "steps", func(ctx context.Context, db *storagenodedb.DB) error {
|
||||||
return db.TestMigrateToLatest(ctx)
|
if err := deploySnapshot(db.DBDirectory()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return db.MigrateToLatest(ctx)
|
||||||
})
|
})
|
||||||
|
|
||||||
require.Equal(t, fromSnapshot, fromMigrationSteps, "storagenode/storagenodedb/snapshot.go produces different scheme compared to storagenode/storagenodedb/database.go. "+
|
require.Equal(t, fromSnapshot, fromMigrationSteps, "The database snapshot produces a different scheme than the current storagenodedb migrations. "+
|
||||||
"If you changed the database.go recently, please also change snapshot.go (but instead of adding new migration step, try to modify the existing steps. snapshot.go should have "+
|
"If you have introduced a new migration, please run go generate ./storagenode/storagenodedb/storagenodedbtest/testdata to update the snapshot.")
|
||||||
"just the minimal number of migrations to keep the unit test executions fast.)")
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSchemeSnapshot(t *testing.T, ctx *testcontext.Context, name string, init func(ctx context.Context, db *DB) error) schemeSnapshot {
|
func getSchemeSnapshot(t *testing.T, ctx *testcontext.Context, name string, init func(ctx context.Context, db *storagenodedb.DB) error) schemeSnapshot {
|
||||||
log := zaptest.NewLogger(t)
|
log := zaptest.NewLogger(t)
|
||||||
|
|
||||||
storageDir := ctx.Dir(name)
|
storageDir := ctx.Dir(name)
|
||||||
cfg := Config{
|
cfg := storagenodedb.Config{
|
||||||
Pieces: storageDir,
|
Pieces: storageDir,
|
||||||
Storage: storageDir,
|
Storage: storageDir,
|
||||||
Info: filepath.Join(storageDir, "piecestore.db"),
|
Info: filepath.Join(storageDir, "piecestore.db"),
|
||||||
@ -50,7 +52,7 @@ func getSchemeSnapshot(t *testing.T, ctx *testcontext.Context, name string, init
|
|||||||
Filestore: filestore.DefaultConfig,
|
Filestore: filestore.DefaultConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := OpenNew(ctx, log, cfg)
|
db, err := storagenodedb.OpenNew(ctx, log, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
@ -69,7 +71,7 @@ type schemeSnapshot map[string]dbScheme
|
|||||||
// dbScheme represents uniq id (table/name/...) -> sql.
|
// dbScheme represents uniq id (table/name/...) -> sql.
|
||||||
type dbScheme map[string]string
|
type dbScheme map[string]string
|
||||||
|
|
||||||
func getSerializedScheme(t *testing.T, ctx *testcontext.Context, db *DB) schemeSnapshot {
|
func getSerializedScheme(t *testing.T, ctx *testcontext.Context, db *storagenodedb.DB) schemeSnapshot {
|
||||||
dbs := schemeSnapshot{}
|
dbs := schemeSnapshot{}
|
||||||
for dbName, db := range db.SQLDBs {
|
for dbName, db := range db.SQLDBs {
|
||||||
s := dbScheme{}
|
s := dbScheme{}
|
BIN
storagenode/storagenodedb/storagenodedbtest/testdata/snapshot.zip
vendored
Normal file
BIN
storagenode/storagenodedb/storagenodedbtest/testdata/snapshot.zip
vendored
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user