storj/cmd/partnerid-to-useragent-migration/migrations.go
Cameron Ayer 9d3614fc1b cmd/partnerid-to-useragent-migration: add value_attributions migration and tests
Change-Id: I21b8a752720c1567840b58cc9380eff0b8abeece
2022-01-31 16:34:31 +00:00

593 lines
16 KiB
Go

// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
pgx "github.com/jackc/pgx/v4"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/private/dbutil/cockroachutil"
"storj.io/private/dbutil/pgutil"
)
// MigrateUsers updates the user_agent column to corresponding Partners.Names or partner_id if applicable.
func MigrateUsers(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
defer mon.Task()(&ctx)(&err)
startID := []byte{}
nextID := []byte{}
var total int
more := true
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
_, err = conn.Prepare(ctx, "select-for-update", "SELECT id FROM users WHERE id > $1 ORDER BY id OFFSET $2")
if err != nil {
return errs.New("could not prepare select query")
}
// update range from startID to nextID. Return count of updated rows to log progress.
_, err = conn.Prepare(ctx, "update-limited", `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE users
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE users.id > $3 AND users.id <= $4
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`)
if err != nil {
return errs.New("could not prepare update statement")
}
for {
row := conn.QueryRow(ctx, "select-for-update", startID, offset)
err = row.Scan(&nextID)
if err != nil {
if errs.Is(err, pgx.ErrNoRows) {
more = false
} else {
return errs.New("unable to select row for update from users: %w", err)
}
}
var updated int
for {
var row pgx.Row
if more {
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID, nextID)
} else {
// if !more then the select statement reached the end of the table. Update to the end of the table.
row = conn.QueryRow(ctx, `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE users
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE users.id > $3
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID,
)
}
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("updating users %w", err)
}
break
}
total += updated
if !more {
log.Info("batch update complete", zap.Int("rows updated", updated))
break
}
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", nextID))
startID = nextID
}
log.Info("users migration complete", zap.Int("total rows updated", total))
return nil
}
// MigrateProjects updates the user_agent column to corresponding PartnerInfo.Names or partner_id if applicable.
func MigrateProjects(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
defer mon.Task()(&ctx)(&err)
startID := []byte{}
nextID := []byte{}
var total int
more := true
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
_, err = conn.Prepare(ctx, "select-for-update", "SELECT id FROM projects WHERE id > $1 ORDER BY id OFFSET $2 LIMIT 1")
if err != nil {
return errs.New("could not prepare select query")
}
// update range from startID to nextID. Return count of updated rows to log progress.
_, err = conn.Prepare(ctx, "update-limited", `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE projects
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE projects.id > $3 AND projects.id <= $4
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`)
if err != nil {
return errs.New("could not prepare update statement")
}
for {
row := conn.QueryRow(ctx, "select-for-update", startID, offset)
err = row.Scan(&nextID)
if err != nil {
if errs.Is(err, pgx.ErrNoRows) {
more = false
} else {
return errs.New("unable to select row for update from projects: %w", err)
}
}
var updated int
for {
var row pgx.Row
if more {
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID, nextID)
} else {
// if !more then the select statement reached the end of the table. Update to the end of the table.
row = conn.QueryRow(ctx, `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE projects
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE projects.id > $3
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID,
)
}
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("updating projects %w", err)
}
break
}
total += updated
if !more {
log.Info("batch update complete", zap.Int("rows updated", updated))
break
}
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", nextID))
startID = nextID
}
log.Info("projects migration complete", zap.Int("total rows updated", total))
return nil
}
// MigrateAPIKeys updates the user_agent column to corresponding PartnerInfo.Names or partner_id if applicable.
func MigrateAPIKeys(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
defer mon.Task()(&ctx)(&err)
startID := []byte{}
nextID := []byte{}
var total int
more := true
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
_, err = conn.Prepare(ctx, "select-for-update", "SELECT id FROM api_keys WHERE id > $1 ORDER BY id OFFSET $2 LIMIT 1")
if err != nil {
return errs.New("could not prepare select query")
}
// update range from startID to nextID. Return count of updated rows to log progress.
_, err = conn.Prepare(ctx, "update-limited", `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE api_keys
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE api_keys.id > $3 AND api_keys.id <= $4
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`)
if err != nil {
return errs.New("could not prepare update statement")
}
for {
row := conn.QueryRow(ctx, "select-for-update", startID, offset)
err = row.Scan(&nextID)
if err != nil {
if errs.Is(err, pgx.ErrNoRows) {
more = false
} else {
return errs.New("unable to select row for update from api_keys: %w", err)
}
}
var updated int
for {
var row pgx.Row
if more {
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID, nextID)
} else {
// if !more then the select statement reached the end of the table. Update to the end of the table.
row = conn.QueryRow(ctx, `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE api_keys
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE api_keys.id > $3
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID,
)
}
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("updating api_keys %w", err)
}
break
}
total += updated
if !more {
log.Info("batch update complete", zap.Int("rows updated", updated))
break
}
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", nextID))
startID = nextID
}
log.Info("api_keys migration complete", zap.Int("total rows updated", total))
return nil
}
// MigrateBucketMetainfos updates the user_agent column to corresponding Partners.Names or partner_id if applicable.
func MigrateBucketMetainfos(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
defer mon.Task()(&ctx)(&err)
startID := []byte{}
nextID := []byte{}
var total int
more := true
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
_, err = conn.Prepare(ctx, "select-for-update", "SELECT id FROM bucket_metainfos WHERE id > $1 ORDER BY id OFFSET $2 LIMIT 1")
if err != nil {
return errs.New("could not prepare select query")
}
// update range from startID to nextID. Return count of updated rows to log progress.
_, err = conn.Prepare(ctx, "update-limited", `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE bucket_metainfos
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE bucket_metainfos.id > $3 AND bucket_metainfos.id <= $4
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`)
if err != nil {
return errs.New("could not prepare update statement")
}
for {
row := conn.QueryRow(ctx, "select-for-update", startID, offset)
err = row.Scan(&nextID)
if err != nil {
if errs.Is(err, pgx.ErrNoRows) {
more = false
} else {
return errs.New("unable to select row for update from bucket_metainfos: %w", err)
}
}
var updated int
for {
var row pgx.Row
if more {
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID, nextID)
} else {
// if !more then the select statement reached the end of the table. Update to the end of the table.
row = conn.QueryRow(ctx, `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE bucket_metainfos
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE bucket_metainfos.id > $3
AND partner_id IS NOT NULL
AND user_agent IS NULL
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startID,
)
}
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("updating bucket_metainfos %w", err)
}
break
}
total += updated
if !more {
log.Info("batch update complete", zap.Int("rows updated", updated))
break
}
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last id", nextID))
startID = nextID
}
log.Info("bucket_metainfos migration complete", zap.Int("total rows updated", total))
return nil
}
// MigrateValueAttributions updates the user_agent column to corresponding Partners.Names or partner_id if applicable.
func MigrateValueAttributions(ctx context.Context, log *zap.Logger, conn *pgx.Conn, p *Partners, offset int) (err error) {
defer mon.Task()(&ctx)(&err)
startProjectID := []byte{}
startBucket := []byte{}
nextProjectID := []byte{}
nextBucket := []byte{}
var total int
more := true
// select the next ID after startID and offset the result. We will update relevant rows from the range of startID to nextID.
_, err = conn.Prepare(ctx, "select-for-update", `
SELECT project_id, bucket_name
FROM value_attributions
WHERE project_id > $1
OR (
project_id = $1 AND bucket_name > $2
)
ORDER BY project_id, bucket_name
OFFSET $3
LIMIT 1
`)
if err != nil {
return errs.New("could not prepare select query")
}
// update range from startID to nextID. Return count of updated rows to log progress.
_, err = conn.Prepare(ctx, "update-limited", `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE value_attributions
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE partner_id IS NOT NULL
AND user_agent IS NULL
AND (
value_attributions.project_id > $3
OR (
value_attributions.project_id = $3 AND value_attributions.bucket_name > $5
)
)
AND (
value_attributions.project_id < $4
OR (
value_attributions.project_id = $4 AND value_attributions.bucket_name <= $6
)
)
RETURNING 1
)
SELECT count(*)
FROM updated;
`)
if err != nil {
return errs.New("could not prepare update statement")
}
for {
row := conn.QueryRow(ctx, "select-for-update", startProjectID, startBucket, offset)
err = row.Scan(&nextProjectID, &nextBucket)
if err != nil {
if errs.Is(err, pgx.ErrNoRows) {
more = false
} else {
return errs.New("unable to select row for update from value_attributions: %w", err)
}
}
var updated int
for {
var row pgx.Row
if more {
row = conn.QueryRow(ctx, "update-limited", pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startProjectID, nextProjectID, startBucket, nextBucket)
} else {
// if !more then the select statement reached the end of the table. Update to the end of the table.
row = conn.QueryRow(ctx, `
WITH partners AS (
SELECT unnest($1::bytea[]) as id,
unnest($2::bytea[]) as name
),
updated as (
UPDATE value_attributions
SET user_agent = (
CASE
WHEN partner_id IN (SELECT id FROM partners)
THEN (SELECT name FROM partners WHERE partner_id = partners.id)
ELSE partner_id
END
)
FROM partners
WHERE partner_id IS NOT NULL
AND user_agent IS NULL
AND (
value_attributions.project_id > $3
OR (
value_attributions.project_id = $3 AND value_attributions.bucket_name > $4
)
)
RETURNING 1
)
SELECT count(*)
FROM updated;
`, pgutil.UUIDArray(p.UUIDs), pgutil.ByteaArray(p.Names), startProjectID, startBucket,
)
}
err := row.Scan(&updated)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
} else if errs.Is(err, pgx.ErrNoRows) {
break
}
return errs.New("updating value_attributions %w", err)
}
break
}
total += updated
if !more {
log.Info("batch update complete", zap.Int("rows updated", updated))
break
}
log.Info("batch update complete", zap.Int("rows updated", updated), zap.Binary("last project id", nextProjectID), zap.String("last bucket name", string(nextBucket)))
startProjectID = nextProjectID
startBucket = nextBucket
}
log.Info("value_attributions migration complete", zap.Int("total rows updated", total))
return nil
}