cmd/partnerid-to-useragent-migration: add projects migration and tests
Change-Id: I23f931cb6ff8e047aa9b30a45b4e05c41d87c04e
This commit is contained in:
parent
d393f6094c
commit
6d6d6776d8
@ -123,10 +123,10 @@ func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
||||
if err != nil {
|
||||
return errs.New("error migrating users: %w", err)
|
||||
}
|
||||
// err = MigrateProjects(ctx, log, conn, &p, offset)
|
||||
// if err != nil {
|
||||
// return errs.New("error migrating projects: %w", err)
|
||||
// }
|
||||
err = MigrateProjects(ctx, log, conn, &p, offset)
|
||||
if err != nil {
|
||||
return errs.New("error migrating projects: %w", err)
|
||||
}
|
||||
// err = MigrateAPIKeys(ctx, log, conn, &p, offset)
|
||||
// if err != nil {
|
||||
// return errs.New("error migrating api_keys: %w", err)
|
||||
@ -139,5 +139,5 @@ func Migrate(ctx context.Context, log *zap.Logger, config Config) (err error) {
|
||||
// if err != nil {
|
||||
// return errs.New("error migrating value_attributions: %w", err)
|
||||
// }
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/common/uuid"
|
||||
"storj.io/private/dbutil"
|
||||
"storj.io/private/dbutil/tempdb"
|
||||
migrator "storj.io/storj/cmd/partnerid-to-useragent-migration"
|
||||
@ -57,6 +58,7 @@ func TestMigrateUsersUpdateNoRows(t *testing.T) {
|
||||
p.UUIDs = append(p.UUIDs, info.UUID)
|
||||
p.Names = append(p.Names, []byte(info.Name))
|
||||
}
|
||||
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB) {
|
||||
// insert an entry with no partner ID
|
||||
_, err = db.Console().Users().Insert(ctx, &console.User{
|
||||
@ -67,7 +69,12 @@ func TestMigrateUsersUpdateNoRows(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {}
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {
|
||||
_, users, err := db.Console().Users().GetByEmailWithUnverified(ctx, "test@storj.test")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, users, 1)
|
||||
require.Nil(t, users[0].UserAgent)
|
||||
}
|
||||
test(t, 7, prepare, migrator.MigrateUsers, check, &p)
|
||||
}
|
||||
|
||||
@ -196,6 +203,179 @@ func TestMigrateUsers(t *testing.T) {
|
||||
test(t, 7, prepare, migrator.MigrateUsers, check, &p)
|
||||
}
|
||||
|
||||
// Test no entries in table doesn't error.
|
||||
func TestMigrateProjectsSelectNoRows(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
partnerDB := rewards.DefaultPartnersDB
|
||||
partnerInfo, err := partnerDB.All(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var p migrator.Partners
|
||||
for _, info := range partnerInfo {
|
||||
p.UUIDs = append(p.UUIDs, info.UUID)
|
||||
p.Names = append(p.Names, []byte(info.Name))
|
||||
}
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB) {}
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {}
|
||||
test(t, 7, prepare, migrator.MigrateProjects, check, &p)
|
||||
}
|
||||
|
||||
// Test no rows to update returns no error.
|
||||
func TestMigrateProjectsUpdateNoRows(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
partnerDB := rewards.DefaultPartnersDB
|
||||
partnerInfo, err := partnerDB.All(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var p migrator.Partners
|
||||
for _, info := range partnerInfo {
|
||||
p.UUIDs = append(p.UUIDs, info.UUID)
|
||||
p.Names = append(p.Names, []byte(info.Name))
|
||||
}
|
||||
var id uuid.UUID
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB) {
|
||||
// insert an entry with no partner ID
|
||||
proj, err := db.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "test",
|
||||
Description: "test",
|
||||
OwnerID: testrand.UUID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
id = proj.ID
|
||||
}
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {
|
||||
proj, err := db.Console().Projects().Get(ctx, id)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, proj.UserAgent)
|
||||
}
|
||||
test(t, 7, prepare, migrator.MigrateProjects, check, &p)
|
||||
}
|
||||
|
||||
// Test select offset beyond final row.
|
||||
// With only one row, selecting with an offset of 1 will return 0 rows.
|
||||
// Test that this is accounted for and updates the row correctly.
|
||||
func TestMigrateProjectsSelectOffsetBeyondRowCount(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
partnerDB := rewards.DefaultPartnersDB
|
||||
partnerInfo, err := partnerDB.All(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var p migrator.Partners
|
||||
for _, info := range partnerInfo {
|
||||
p.UUIDs = append(p.UUIDs, info.UUID)
|
||||
p.Names = append(p.Names, []byte(info.Name))
|
||||
}
|
||||
var projID uuid.UUID
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB) {
|
||||
prj, err := db.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "test",
|
||||
Description: "test",
|
||||
PartnerID: p.UUIDs[0],
|
||||
OwnerID: testrand.UUID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
projID = prj.ID
|
||||
}
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {
|
||||
proj, err := db.Console().Projects().Get(ctx, projID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, p.Names[0], proj.UserAgent)
|
||||
}
|
||||
test(t, 7, prepare, migrator.MigrateProjects, check, &p)
|
||||
}
|
||||
|
||||
func TestMigrateProjects(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
partnerDB := rewards.DefaultPartnersDB
|
||||
partnerInfo, err := partnerDB.All(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var p migrator.Partners
|
||||
for _, info := range partnerInfo {
|
||||
p.UUIDs = append(p.UUIDs, info.UUID)
|
||||
p.Names = append(p.Names, []byte(info.Name))
|
||||
}
|
||||
|
||||
var n int
|
||||
|
||||
prepare := func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB) {
|
||||
// insert an entry with no partner ID
|
||||
_, err = db.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "test",
|
||||
Description: "test",
|
||||
OwnerID: testrand.UUID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
n++
|
||||
|
||||
// insert an entry with a partner ID which does not exist in the partnersDB
|
||||
_, err = db.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "test",
|
||||
Description: "test",
|
||||
PartnerID: testrand.UUID(),
|
||||
OwnerID: testrand.UUID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
n++
|
||||
|
||||
for _, p := range partnerInfo {
|
||||
id := testrand.UUID()
|
||||
|
||||
// The partner Kafka has no UUID and its ID is too short to convert to a UUID.
|
||||
// The Console.Projects API expects a UUID for inserting and getting.
|
||||
// Even if we insert its ID, OSPP005, directly into the DB, attempting to
|
||||
// retrieve the entry from the DB would result in an error when it tries to
|
||||
// convert the PartnerID bytes to a UUID.
|
||||
if p.UUID.IsZero() {
|
||||
continue
|
||||
}
|
||||
_, err = db.Console().Projects().Insert(ctx, &console.Project{
|
||||
Name: "test",
|
||||
Description: "test",
|
||||
PartnerID: p.UUID,
|
||||
OwnerID: id,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
n++
|
||||
}
|
||||
}
|
||||
|
||||
check := func(t *testing.T, ctx context.Context, db satellite.DB) {
|
||||
projects, err := db.Console().Projects().GetAll(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, projects, n)
|
||||
for _, prj := range projects {
|
||||
if prj.PartnerID.IsZero() {
|
||||
require.Nil(t, prj.UserAgent)
|
||||
continue
|
||||
}
|
||||
var expectedUA []byte
|
||||
for _, p := range partnerInfo {
|
||||
if prj.PartnerID == p.UUID {
|
||||
expectedUA = []byte(p.Name)
|
||||
break
|
||||
}
|
||||
}
|
||||
if expectedUA == nil {
|
||||
expectedUA = prj.PartnerID.Bytes()
|
||||
}
|
||||
require.Equal(t, expectedUA, prj.UserAgent)
|
||||
}
|
||||
// reset n for the subsequent CRDB test
|
||||
n = 0
|
||||
}
|
||||
test(t, 7, prepare, migrator.MigrateProjects, check, &p)
|
||||
}
|
||||
|
||||
func test(t *testing.T, offset int, prepare func(t *testing.T, ctx *testcontext.Context, rawDB *dbutil.TempDatabase, db satellite.DB),
|
||||
migrate func(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *migrator.Partners, limit int) (err error),
|
||||
check func(t *testing.T, ctx context.Context, db satellite.DB), p *migrator.Partners) {
|
||||
|
@ -123,3 +123,114 @@ func MigrateUsers(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partn
|
||||
log.Info("users migration complete", zap.Int("total rows updated", total))
|
||||
return nil
|
||||
}
|
||||
|
||||
// MigrateProjects updates the user_agent column to corresponding PartnerInfo.Names or partner_id if applicable.
|
||||
func MigrateProjects(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
startID := []byte{}
|
||||
nextID := []byte{}
|
||||
var total int
|
||||
more := true
|
||||
|
||||
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
|
||||
_, err = conn.Prepare(ctx, "select-for-update", "SELECT id FROM projects WHERE id > $1 ORDER BY id OFFSET $2 LIMIT 1")
|
||||
if err != nil {
|
||||
return errs.New("could not prepare select query")
|
||||
}
|
||||
|
||||
// update range from startID to nextID. Return count of updated rows to log progress.
|
||||
_, err = conn.Prepare(ctx, "update-limited", `
|
||||
WITH partners AS (
|
||||
SELECT unnest($1::bytea[]) as id,
|
||||
unnest($2::bytea[]) as name
|
||||
),
|
||||
updated as (
|
||||
UPDATE projects
|
||||
SET user_agent = (
|
||||
CASE
|
||||
WHEN partner_id IN (SELECT id FROM partners)
|
||||
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
|
||||
ELSE partner_id
|
||||
END
|
||||
)
|
||||
FROM partners
|
||||
WHERE projects.id > $3 AND projects.id <= $4
|
||||
AND partner_id IS NOT NULL
|
||||
AND user_agent IS NULL
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT count(*)
|
||||
FROM updated;
|
||||
`)
|
||||
if err != nil {
|
||||
return errs.New("could not prepare update statement")
|
||||
}
|
||||
|
||||
for {
|
||||
row := conn.QueryRow(ctx, "select-for-update", startID, offset)
|
||||
err = row.Scan(&nextID)
|
||||
if err != nil {
|
||||
if errs.Is(err, pgx.ErrNoRows) {
|
||||
more = false
|
||||
} else {
|
||||
return errs.New("unable to select row for update from projects: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var updated int
|
||||
for {
|
||||
var row pgx.Row
|
||||
if more {
|
||||
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID, nextID)
|
||||
} else {
|
||||
// if !more then the select statement reached the end of the table. Update to the end of the table.
|
||||
row = conn.QueryRow(ctx, `
|
||||
WITH partners AS (
|
||||
SELECT unnest($1::bytea[]) as id,
|
||||
unnest($2::bytea[]) as name
|
||||
),
|
||||
updated as (
|
||||
UPDATE projects
|
||||
SET user_agent = (
|
||||
CASE
|
||||
WHEN partner_id IN (SELECT id FROM partners)
|
||||
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
|
||||
ELSE partner_id
|
||||
END
|
||||
)
|
||||
FROM partners
|
||||
WHERE projects.id > $3
|
||||
AND partner_id IS NOT NULL
|
||||
AND user_agent IS NULL
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT count(*)
|
||||
FROM updated;
|
||||
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID,
|
||||
)
|
||||
}
|
||||
|
||||
err := row.Scan(&updated)
|
||||
if err != nil {
|
||||
if cockroachutil.NeedsRetry(err) {
|
||||
continue
|
||||
} else if errs.Is(err, pgx.ErrNoRows) {
|
||||
break
|
||||
}
|
||||
return errs.New("updating projects %w", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
total += updated
|
||||
if !more {
|
||||
log.Info("batch update complete", zap.Int("rows updated", updated))
|
||||
break
|
||||
}
|
||||
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", nextID))
|
||||
startID = nextID
|
||||
}
|
||||
log.Info("projects migration complete", zap.Int("total rows updated", total))
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user