sn/sndb/payouts: migrate distributed == paid for periods < 2020-12

while satellites have also run this logic, old satellites that
no longer exist cannot and so the node cannot get the updated
data. this locally migrates it so that the calculations for
the undistributed amounts are correct.

there's also some tab/space whitespace and gofmt fixes.

Change-Id: I470879703314fe6541eaba5f21b47849781894f8
This commit is contained in:
Jeff Wendling 2021-02-17 13:07:06 -05:00
parent f969e26827
commit 9e7e753faf
7 changed files with 130 additions and 49 deletions

View File

@ -1971,6 +1971,14 @@ func (db *DB) Migration(ctx context.Context) *migrate.Migration {
return nil
}),
},
{
DB: &db.payoutDB.DB,
Description: "Assume distributed == paid for paystubs before 2020-12.",
Version: 51,
Action: migrate.SQL{
`UPDATE paystubs SET distributed = paid WHERE period < '2020-12'`,
},
},
},
}
}

View File

@ -21,12 +21,34 @@ import (
"storj.io/storj/storagenode/storagenodedb/testdata"
)
// insertOldData will insert any OldData from the MultiDBState into the
// appropriate rawDB. This prepares the rawDB for the test comparing schema and
// data and any changes to rows.
func insertOldData(ctx context.Context, mdbs *testdata.MultiDBState, rawDBs map[string]storagenodedb.DBContainer) error {
for dbName, dbState := range mdbs.DBStates {
if dbState.OldData == "" {
continue
}
rawDB, ok := rawDBs[dbName]
if !ok {
return errs.New("Failed to find DB %s", dbName)
}
_, err := rawDB.GetDB().ExecContext(ctx, dbState.OldData)
if err != nil {
return err
}
}
return nil
}
// insertNewData will insert any NewData from the MultiDBState into the
// appropriate rawDB. This prepares the rawDB for the test comparing schema and
// data.
// data. It will not insert NewData if OldData is set: the migration is expected
// to convert OldData into what NewData would insert.
func insertNewData(ctx context.Context, mdbs *testdata.MultiDBState, rawDBs map[string]storagenodedb.DBContainer) error {
for dbName, dbState := range mdbs.DBStates {
if dbState.NewData == "" {
if dbState.NewData == "" || dbState.OldData != "" {
continue
}
@ -104,6 +126,7 @@ func TestMigrate(t *testing.T) {
db, err := storagenodedb.OpenNew(ctx, log, cfg)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()
rawDBs := db.RawDatabases()
// get migration for this database
migrations := db.Migration(ctx)
@ -111,15 +134,17 @@ func TestMigrate(t *testing.T) {
// the schema is different when migration step is before the step, cannot test the layout
tag := fmt.Sprintf("#%d - v%d", i, step.Version)
// run migration up to a specific version
err := migrations.TargetVersion(step.Version).Run(ctx, log.Named("migrate"))
require.NoError(t, err, tag)
// find the matching expected version
expected, ok := testdata.States.FindVersion(step.Version)
require.True(t, ok)
rawDBs := db.RawDatabases()
// insert old data for any tables
err = insertOldData(ctx, expected, rawDBs)
require.NoError(t, err, tag)
// run migration up to a specific version
err := migrations.TargetVersion(step.Version).Run(ctx, log.Named("migrate"))
require.NoError(t, err, tag)
// insert data for new tables
err = insertNewData(ctx, expected, rawDBs)

View File

@ -155,29 +155,29 @@ func (db *payoutDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID, pe
func (db *payoutDB) AllPayStubs(ctx context.Context, period string) (_ []payouts.PayStub, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT
satellite_id,
created_at,
codes,
usage_at_rest,
usage_get,
usage_put,
usage_get_repair,
usage_put_repair,
usage_get_audit,
comp_at_rest,
comp_get,
comp_put,
comp_get_repair,
comp_put_repair,
comp_get_audit,
surge_percent,
held,
owed,
disposed,
paid,
distributed
FROM paystubs WHERE period = ?`
query := `SELECT
satellite_id,
created_at,
codes,
usage_at_rest,
usage_get,
usage_put,
usage_get_repair,
usage_put_repair,
usage_get_audit,
comp_at_rest,
comp_get,
comp_put,
comp_get_repair,
comp_put_repair,
comp_get_audit,
surge_percent,
held,
owed,
disposed,
paid,
distributed
FROM paystubs WHERE period = ?`
rows, err := db.QueryContext(ctx, query, period)
if err != nil {
@ -230,10 +230,10 @@ func (db *payoutDB) AllPayStubs(ctx context.Context, period string) (_ []payouts
func (db *payoutDB) SatellitesHeldbackHistory(ctx context.Context, id storj.NodeID) (_ []payouts.HoldForPeriod, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT
period,
held
FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
query := `SELECT
period,
held
FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
rows, err := db.QueryContext(ctx, query, id)
if err != nil {
@ -351,9 +351,9 @@ func (db *payoutDB) StorePayment(ctx context.Context, payment payouts.Payment) (
func (db *payoutDB) SatellitesDisposedHistory(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) {
defer mon.Task()(&ctx)(&err)
query := `SELECT
disposed
FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
query := `SELECT
disposed
FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
rows, err := db.QueryContext(ctx, query, satelliteID)
if err != nil {

View File

@ -65,6 +65,7 @@ var States = MultiDBStates{
&v48,
&v49,
&v50,
&v51,
},
}
@ -96,9 +97,12 @@ type DBStates map[string]*DBState
// DBState allows you to define the desired state of the DB using SQl commands.
// Both the SQl and NewData fields contains SQL that will be executed to create
// the expected DB. The NewData SQL additionally will be executed on the testDB
// to ensure data is consistent.
// to ensure data is consistent. If OldData is not empty, it is executed on the
// testDB before the migration is run, and NewData is not run on the testDB. This
// is used to assert that a migration that modifies data runs as expected.
type DBState struct {
SQL string
OldData string
NewData string
}
@ -125,7 +129,7 @@ type DBSnapshot struct {
}
// LoadMultiDBSnapshot converts a MultiDBState into a MultiDBSnapshot. It
// executes the SQL and stores the shema and data.
// executes the SQL and stores the schema and data.
func LoadMultiDBSnapshot(ctx context.Context, multiDBState *MultiDBState) (*MultiDBSnapshot, error) {
multiDBSnapshot := NewMultiDBSnapshot()
for dbName, dbState := range multiDBState.DBStates {

View File

@ -8,9 +8,9 @@ import "storj.io/storj/storagenode/storagenodedb"
var v49 = MultiDBState{
Version: 49,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v47.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v47.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v48.DBStates[storagenodedb.ReputationDBName],
storagenodedb.UsedSerialsDBName: v47.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v47.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v48.DBStates[storagenodedb.ReputationDBName],
storagenodedb.PieceSpaceUsedDBName: v47.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v47.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v47.DBStates[storagenodedb.PieceExpirationDBName],
@ -58,6 +58,6 @@ var v49 = MultiDBState{
PRIMARY KEY ( id )
);`,
},
storagenodedb.PricingDBName: v47.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v47.DBStates[storagenodedb.APIKeysDBName]},
storagenodedb.PricingDBName: v47.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v47.DBStates[storagenodedb.APIKeysDBName]},
}

View File

@ -8,9 +8,9 @@ import "storj.io/storj/storagenode/storagenodedb"
var v50 = MultiDBState{
Version: 50,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v47.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v47.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v48.DBStates[storagenodedb.ReputationDBName],
storagenodedb.UsedSerialsDBName: v47.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v47.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v48.DBStates[storagenodedb.ReputationDBName],
storagenodedb.PieceSpaceUsedDBName: v47.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v47.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v47.DBStates[storagenodedb.PieceExpirationDBName],
@ -58,6 +58,6 @@ var v50 = MultiDBState{
PRIMARY KEY ( id )
);`,
},
storagenodedb.PricingDBName: v47.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v47.DBStates[storagenodedb.APIKeysDBName]},
storagenodedb.PricingDBName: v47.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v47.DBStates[storagenodedb.APIKeysDBName]},
}

View File

@ -0,0 +1,44 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package testdata
import "storj.io/storj/storagenode/storagenodedb"
var v51 = MultiDBState{
Version: 51,
DBStates: DBStates{
storagenodedb.UsedSerialsDBName: v47.DBStates[storagenodedb.UsedSerialsDBName],
storagenodedb.StorageUsageDBName: v47.DBStates[storagenodedb.StorageUsageDBName],
storagenodedb.ReputationDBName: v48.DBStates[storagenodedb.ReputationDBName],
storagenodedb.PieceSpaceUsedDBName: v47.DBStates[storagenodedb.PieceSpaceUsedDBName],
storagenodedb.PieceInfoDBName: v47.DBStates[storagenodedb.PieceInfoDBName],
storagenodedb.PieceExpirationDBName: v47.DBStates[storagenodedb.PieceExpirationDBName],
storagenodedb.OrdersDBName: v47.DBStates[storagenodedb.OrdersDBName],
storagenodedb.BandwidthDBName: v47.DBStates[storagenodedb.BandwidthDBName],
storagenodedb.SatellitesDBName: v47.DBStates[storagenodedb.SatellitesDBName],
storagenodedb.DeprecatedInfoDBName: v47.DBStates[storagenodedb.DeprecatedInfoDBName],
storagenodedb.NotificationsDBName: v47.DBStates[storagenodedb.NotificationsDBName],
storagenodedb.HeldAmountDBName: &DBState{
SQL: v50.DBStates[storagenodedb.HeldAmountDBName].SQL,
OldData: `
-- distributed is 0 for two periods < 2020-12, and 2 periods >= 2020-12.
INSERT INTO paystubs (period, satellite_id, created_at, codes, usage_at_rest, usage_get, usage_put, usage_get_repair, usage_put_repair, usage_get_audit, comp_at_rest, comp_get, comp_put, comp_get_repair, comp_put_repair, comp_get_audit, surge_percent, held, owed, disposed, paid, distributed) VALUES
('2020-10', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 0),
('2020-11', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 101, 201, 301, 401, 501, 601, 701, 801, 901, 1010, 1101, 1201, 1301, 1401, 1501, 1601, 1701, 0),
('2020-12', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 102, 202, 302, 402, 502, 602, 702, 802, 902, 1020, 1102, 1202, 1302, 1402, 1502, 1602, 1702, 0),
('2021-01', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 103, 203, 303, 403, 503, 603, 703, 803, 903, 1030, 1103, 1203, 1303, 1403, 1503, 1603, 1703, 0)
`,
NewData: `
-- distributed has been updated for the periods < 2020-12.
INSERT INTO paystubs (period, satellite_id, created_at, codes, usage_at_rest, usage_get, usage_put, usage_get_repair, usage_put_repair, usage_get_audit, comp_at_rest, comp_get, comp_put, comp_get_repair, comp_put_repair, comp_get_audit, surge_percent, held, owed, disposed, paid, distributed) VALUES
('2020-10', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1700),
('2020-11', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 101, 201, 301, 401, 501, 601, 701, 801, 901, 1010, 1101, 1201, 1301, 1401, 1501, 1601, 1701, 1701),
('2020-12', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 102, 202, 302, 402, 502, 602, 702, 802, 902, 1020, 1102, 1202, 1302, 1402, 1502, 1602, 1702, 0),
('2021-01', 'foo', '2020-04-07T00:00:00.000000Z', 'X', 103, 203, 303, 403, 503, 603, 703, 803, 903, 1030, 1103, 1203, 1303, 1403, 1503, 1603, 1703, 0)
`,
},
storagenodedb.PricingDBName: v47.DBStates[storagenodedb.PricingDBName],
storagenodedb.APIKeysDBName: v47.DBStates[storagenodedb.APIKeysDBName],
},
}