testplanet: support snapshot based migration for storagenode

Similar to the existing snapshot based tests of satellite/metabase db we make a migration here which is:
 * dedicated to unit tests
 * faster (with less steps)
 * but safe: additional unit test ensures that the snapshot based migration and normal prod migration have the same results.

Change-Id: Ie324b09f64b4553df02247a9461ece305a6cf832
This commit is contained in:
Márton Elek 2022-08-09 12:47:00 +02:00 committed by Storj Robot
parent 8ddd5557f5
commit 6b27c64833
5 changed files with 520 additions and 1 deletions

View File

@ -257,7 +257,7 @@ func (planet *Planet) newStorageNode(ctx context.Context, prefix string, index,
// Mark the peer's PieceDeleter as in testing mode, so it is easy to wait on the deleter
peer.Storage2.PieceDeleter.SetupTest()
err = db.MigrateToLatest(ctx)
err = db.TestMigrateToLatest(ctx)
if err != nil {
return nil, err
}

View File

@ -76,6 +76,10 @@ var (
type DB interface {
// MigrateToLatest initializes the database
MigrateToLatest(ctx context.Context) error
// TestMigrateToLatest is a fast migration with skipping test (not safe for production + old db state)
TestMigrateToLatest(ctx context.Context) error
// Close closes the database
Close() error

View File

@ -347,6 +347,13 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
return migration.Run(ctx, db.log.Named("migration"))
}
// TestMigrateToLatest creates any necessary tables from snapshot.
// it's faster for the testing but should be the same as MigrateToLatest.
func (db *DB) TestMigrateToLatest(ctx context.Context) error {
migration := db.Snapshot(ctx)
return migration.Run(ctx, db.log.Named("migration"))
}
// Preflight conducts a pre-flight check to ensure correct schemas and minimal read+write functionality of the database tables.
func (db *DB) Preflight(ctx context.Context) (err error) {
for dbName, dbContainer := range db.SQLDBs {

View File

@ -0,0 +1,394 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"go.uber.org/zap"
"storj.io/storj/private/migrate"
)
// Snapshot supposed to generate the same database as the Migration but without the original steps.
func (db *DB) Snapshot(ctx context.Context) *migrate.Migration {
return &migrate.Migration{
Table: VersionTable,
Steps: []*migrate.Step{
{
DB: &db.deprecatedInfoDB.DB,
Description: "Initial setup",
Version: 0,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, DeprecatedInfoDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{},
},
{
DB: &db.bandwidthDB.DB,
Description: "bandwidth db snapshot",
Version: 2,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, BandwidthDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
// table for storing bandwidth usage
`CREATE TABLE bandwidth_usage (
satellite_id BLOB NOT NULL,
action INTEGER NOT NULL,
amount BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL
)`,
`CREATE INDEX idx_bandwidth_usage_satellite ON bandwidth_usage(satellite_id)`,
`CREATE INDEX idx_bandwidth_usage_created ON bandwidth_usage(created_at)`,
`CREATE TABLE bandwidth_usage_rollups (
interval_start TIMESTAMP NOT NULL,
satellite_id BLOB NOT NULL,
action INTEGER NOT NULL,
amount BIGINT NOT NULL,
PRIMARY KEY ( interval_start, satellite_id, action )
)`,
},
},
{
DB: &db.ordersDB.DB,
Description: "orders db snapshot",
Version: 3,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, OrdersDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
// table for storing all unsent orders
`CREATE TABLE unsent_order (
satellite_id BLOB NOT NULL,
serial_number BLOB NOT NULL,
order_limit_serialized BLOB NOT NULL, -- serialized pb.OrderLimit
order_serialized BLOB NOT NULL, -- serialized pb.Order
order_limit_expiration TIMESTAMP NOT NULL, -- when is the deadline for sending it
uplink_cert_id INTEGER NOT NULL,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
)`,
`CREATE UNIQUE INDEX idx_orders ON unsent_order(satellite_id, serial_number)`,
`CREATE TABLE order_archive_ (
satellite_id BLOB NOT NULL,
serial_number BLOB NOT NULL,
order_limit_serialized BLOB NOT NULL,
order_serialized BLOB NOT NULL,
uplink_cert_id INTEGER NOT NULL,
status INTEGER NOT NULL,
archived_at TIMESTAMP NOT NULL,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
)`,
`CREATE INDEX idx_order_archived_at ON order_archive_(archived_at)`,
},
},
{
DB: &db.pieceExpirationDB.DB,
Description: "pieceExpiration db snapshot",
Version: 4,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, PieceExpirationDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE piece_expirations (
satellite_id BLOB NOT NULL,
piece_id BLOB NOT NULL,
piece_expiration TIMESTAMP NOT NULL, -- date when it can be deleted
deletion_failed_at TIMESTAMP, trash INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (satellite_id, piece_id)
)`,
`CREATE INDEX idx_piece_expirations_piece_expiration ON piece_expirations(piece_expiration)`,
`CREATE INDEX idx_piece_expirations_deletion_failed_at ON piece_expirations(deletion_failed_at)`,
`CREATE INDEX idx_piece_expirations_trashed
ON piece_expirations(satellite_id, trash)
WHERE trash = 1`,
},
},
{
DB: &db.v0PieceInfoDB.DB,
Description: "v0PieceInfo db snapshot",
Version: 5,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, PieceInfoDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE pieceinfo_ (
satellite_id BLOB NOT NULL,
piece_id BLOB NOT NULL,
piece_size BIGINT NOT NULL,
piece_expiration TIMESTAMP,
order_limit BLOB NOT NULL,
uplink_piece_hash BLOB NOT NULL,
uplink_cert_id INTEGER NOT NULL,
deletion_failed_at TIMESTAMP,
piece_creation TIMESTAMP NOT NULL,
FOREIGN KEY(uplink_cert_id) REFERENCES certificate(cert_id)
)`,
`CREATE UNIQUE INDEX pk_pieceinfo_ ON pieceinfo_(satellite_id, piece_id)`,
`CREATE INDEX idx_pieceinfo__expiration ON pieceinfo_(piece_expiration) WHERE piece_expiration IS NOT NULL`,
},
},
{
DB: &db.pieceSpaceUsedDB.DB,
Description: "pieceSpaceUsed db snapshot",
Version: 6,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, PieceSpaceUsedDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
// new table to hold the most recent totals from the piece space used cache
`CREATE TABLE piece_space_used_new (
total INTEGER NOT NULL DEFAULT 0,
content_size INTEGER NOT NULL,
satellite_id BLOB
)`,
`ALTER TABLE piece_space_used_new RENAME TO piece_space_used;`,
`CREATE UNIQUE INDEX idx_piece_space_used_satellite_id ON piece_space_used(satellite_id)`,
},
},
{
DB: &db.reputationDB.DB,
Description: "reputation db snapshot",
Version: 7,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, ReputationDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
` CREATE TABLE reputation_new (
satellite_id BLOB NOT NULL,
audit_success_count INTEGER NOT NULL,
audit_total_count INTEGER NOT NULL,
audit_reputation_alpha REAL NOT NULL,
audit_reputation_beta REAL NOT NULL,
audit_reputation_score REAL NOT NULL,
audit_unknown_reputation_alpha REAL NOT NULL,
audit_unknown_reputation_beta REAL NOT NULL,
audit_unknown_reputation_score REAL NOT NULL,
online_score REAL NOT NULL,
audit_history BLOB,
disqualified_at TIMESTAMP,
updated_at TIMESTAMP NOT NULL,
suspended_at TIMESTAMP,
offline_suspended_at TIMESTAMP,
offline_under_review_at TIMESTAMP,
joined_at TIMESTAMP NOT NULL, vetted_at TIMESTAMP,
PRIMARY KEY (satellite_id)
);`,
`ALTER TABLE reputation_new RENAME TO reputation;`,
},
},
{
DB: &db.storageUsageDB.DB,
Description: "storageUsage db snapshot",
Version: 8,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, StorageUsageDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE storage_usage_new (
timestamp TIMESTAMP NOT NULL,
satellite_id BLOB NOT NULL,
at_rest_total REAL NOT NULL,
interval_end_time TIMESTAMP NOT NULL,
PRIMARY KEY (timestamp, satellite_id)
);`,
`ALTER TABLE storage_usage_new RENAME TO storage_usage`,
},
},
{
DB: &db.usedSerialsDB.DB,
Description: "usedSerials db snapshot",
Version: 9,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, UsedSerialsDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{},
},
{
DB: &db.satellitesDB.DB,
Description: "satellites db snapshot",
Version: 10,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, SatellitesDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE satellites_new (
node_id BLOB NOT NULL,
added_at TIMESTAMP NOT NULL,
status INTEGER NOT NULL, address TEXT,
PRIMARY KEY (node_id)
);`,
`ALTER TABLE satellites_new RENAME TO satellites;`,
`CREATE TABLE satellite_exit_progress_new (
satellite_id BLOB NOT NULL,
initiated_at TIMESTAMP,
finished_at TIMESTAMP,
starting_disk_usage INTEGER NOT NULL,
bytes_deleted INTEGER NOT NULL,
completion_receipt BLOB,
FOREIGN KEY (satellite_id) REFERENCES satellites (node_id)
);`,
`ALTER TABLE satellite_exit_progress_new RENAME TO satellite_exit_progress`,
},
},
{
DB: &db.notificationsDB.DB,
Description: "notifications db snapshot",
Version: 11,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, NotificationsDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE notifications (
id BLOB NOT NULL,
sender_id BLOB NOT NULL,
type INTEGER NOT NULL,
title TEXT NOT NULL,
message TEXT NOT NULL,
read_at TIMESTAMP,
created_at TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);`,
},
},
{
DB: &db.payoutDB.DB,
Description: "paystubs db snapshot",
Version: 12,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, HeldAmountDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE paystubs_new (
period text NOT NULL,
satellite_id bytea NOT NULL,
created_at timestamp NOT NULL,
codes text NOT NULL,
usage_at_rest double precision NOT NULL,
usage_get bigint NOT NULL,
usage_put bigint NOT NULL,
usage_get_repair bigint NOT NULL,
usage_put_repair bigint NOT NULL,
usage_get_audit bigint NOT NULL,
comp_at_rest bigint NOT NULL,
comp_get bigint NOT NULL,
comp_put bigint NOT NULL,
comp_get_repair bigint NOT NULL,
comp_put_repair bigint NOT NULL,
comp_get_audit bigint NOT NULL,
surge_percent bigint NOT NULL,
held bigint NOT NULL,
owed bigint NOT NULL,
disposed bigint NOT NULL,
paid bigint NOT NULL,
distributed bigint NOT NULL,
PRIMARY KEY ( period, satellite_id )
);`,
`CREATE TABLE payments (
id bigserial NOT NULL,
created_at timestamp NOT NULL,
satellite_id bytea NOT NULL,
period text,
amount bigint NOT NULL,
receipt text,
notes text,
PRIMARY KEY ( id )
);`,
`ALTER TABLE paystubs_new RENAME TO paystubs`,
},
},
{
DB: &db.pricingDB.DB,
Description: "pricing db snapshot",
Version: 13,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, PricingDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE pricing (
satellite_id BLOB NOT NULL,
egress_bandwidth_price bigint NOT NULL,
repair_bandwidth_price bigint NOT NULL,
audit_bandwidth_price bigint NOT NULL,
disk_space_price bigint NOT NULL,
PRIMARY KEY ( satellite_id )
);`,
},
},
{
DB: &db.apiKeysDB.DB,
Description: "scret db snapshot",
Version: 14,
CreateDB: func(ctx context.Context, log *zap.Logger) error {
if err := db.openDatabase(ctx, APIKeysDBName); err != nil {
return ErrDatabase.Wrap(err)
}
return nil
},
Action: migrate.SQL{
`CREATE TABLE secret (
token bytea NOT NULL,
created_at timestamp with time zone NOT NULL,
PRIMARY KEY ( token )
);`,
},
},
},
}
}

View File

@ -0,0 +1,114 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package storagenodedb
import (
"context"
"database/sql"
"fmt"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/zeebo/errs"
"go.uber.org/zap/zaptest"
"storj.io/common/testcontext"
"storj.io/private/tagsql"
"storj.io/storj/storage/filestore"
)
// TestSnapshot tests if the snapshot migration (used for faster testplanet) is the same as the prod migration.
func TestSnapshot(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
fromMigrationSteps := getSchemeSnapshot(t, ctx, "migration", func(ctx context.Context, db *DB) error {
return db.MigrateToLatest(ctx)
})
fromSnapshot := getSchemeSnapshot(t, ctx, "steps", func(ctx context.Context, db *DB) error {
return db.TestMigrateToLatest(ctx)
})
require.Equal(t, fromSnapshot, fromMigrationSteps, "storagenode/storagenodedb/snapshot.go produces different scheme compared to storagenode/storagenodedb/database.go. "+
"If you changed the database.go recently, please also change snapshot.go (but instead of adding new migration step, try to modify the existing steps. snapshot.go should have "+
"just the minimal number of migrations to keep the unit test executions fast.)")
}
func getSchemeSnapshot(t *testing.T, ctx *testcontext.Context, name string, init func(ctx context.Context, db *DB) error) schemeSnapshot {
log := zaptest.NewLogger(t)
storageDir := ctx.Dir(name)
cfg := Config{
Pieces: storageDir,
Storage: storageDir,
Info: filepath.Join(storageDir, "piecestore.db"),
Info2: filepath.Join(storageDir, "info.db"),
Filestore: filestore.DefaultConfig,
}
db, err := OpenNew(ctx, log, cfg)
if err != nil {
require.NoError(t, err)
}
defer ctx.Check(db.Close)
err = init(ctx, db)
require.NoError(t, err)
return getSerializedScheme(t, ctx, db)
}
// schemeSnapshot represents dbname -> scheme.
type schemeSnapshot map[string]dbScheme
// dbScheme represents uniq id (table/name/...) -> sql.
type dbScheme map[string]string
func getSerializedScheme(t *testing.T, ctx *testcontext.Context, db *DB) schemeSnapshot {
dbs := schemeSnapshot{}
for dbName, db := range db.SQLDBs {
s := dbScheme{}
sqliteScheme, err := readSqliteScheme(ctx, db.GetDB())
require.Nil(t, err)
for k, v := range sqliteScheme {
s[k] = v
}
dbs[dbName] = s
}
return dbs
}
func readSqliteScheme(ctx context.Context, db tagsql.DB) (map[string]string, error) {
var root int
var schemaType, name, table string
var sqlContent sql.NullString
res := map[string]string{}
schema, err := db.QueryContext(ctx, "select * from sqlite_schema")
if err != nil {
return nil, errs.Combine(err, schema.Close())
}
for schema.Next() {
if schema.Err() != nil {
return nil, errs.Combine(schema.Err(), schema.Close())
}
err = schema.Scan(&schemaType, &name, &table, &root, &sqlContent)
if err != nil {
return nil, errs.Combine(err, schema.Close())
}
// due to the migration logic we will have separated version table for each db
if name != "versions" {
res[fmt.Sprintf("%s.%s.%s", schemaType, name, table)] = sqlContent.String
}
}
return res, errs.Combine(schema.Err(), schema.Close())
}