satellite/metabase: use db methods that pass context correctly
It turns out that some DB methods are not passing context to to lower levels even if have context as a parameter. Change-Id: I82f4fc1a47c362687a91364d85e4468057e53134
This commit is contained in:
parent
d3e51353ab
commit
ae345320fe
@ -57,7 +57,7 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
|
||||
opts.ZombieDeletionDeadline = &deadline
|
||||
}
|
||||
|
||||
row := db.db.QueryRow(ctx, `
|
||||
row := db.db.QueryRowContext(ctx, `
|
||||
INSERT INTO objects (
|
||||
project_id, bucket_name, object_key, version, stream_id,
|
||||
expires_at, encryption,
|
||||
@ -128,7 +128,7 @@ func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExact
|
||||
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
|
||||
}
|
||||
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
INSERT INTO objects (
|
||||
project_id, bucket_name, object_key, version, stream_id,
|
||||
expires_at, encryption,
|
||||
@ -190,7 +190,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
||||
err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
|
||||
// Verify that object exists and is partial.
|
||||
var value int
|
||||
err = tx.QueryRow(ctx, `
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
SELECT 1
|
||||
FROM objects WHERE
|
||||
project_id = $1 AND
|
||||
@ -208,7 +208,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
|
||||
}
|
||||
|
||||
// Verify that the segment does not exist.
|
||||
err = tx.QueryRow(ctx, `
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
SELECT 1
|
||||
FROM segments WHERE
|
||||
stream_id = $1 AND
|
||||
@ -475,7 +475,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec
|
||||
totalEncryptedSize += int64(seg.EncryptedSize)
|
||||
}
|
||||
|
||||
err = tx.QueryRow(ctx, `
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
UPDATE objects SET
|
||||
status =`+committedStatus+`,
|
||||
segment_count = $6,
|
||||
|
@ -87,7 +87,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit
|
||||
totalEncryptedSize += int64(seg.EncryptedSize)
|
||||
}
|
||||
|
||||
err = tx.QueryRow(ctx, `
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
UPDATE objects SET
|
||||
status =`+committedStatus+`,
|
||||
segment_count = $6,
|
||||
@ -183,7 +183,7 @@ type segmentInfoForCommit struct {
|
||||
func fetchSegmentsForCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID) (segments []segmentInfoForCommit, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
err = withRows(tx.Query(ctx, `
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
SELECT position, encrypted_size, plain_offset, plain_size
|
||||
FROM segments
|
||||
WHERE stream_id = $1
|
||||
@ -289,7 +289,7 @@ func updateSegmentOffsets(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID,
|
||||
return nil
|
||||
}
|
||||
|
||||
updateResult, err := tx.Exec(ctx, `
|
||||
updateResult, err := tx.ExecContext(ctx, `
|
||||
UPDATE segments
|
||||
SET plain_offset = P.plain_offset
|
||||
FROM (SELECT unnest($2::INT8[]), unnest($3::INT8[])) as P(position, plain_offset)
|
||||
@ -323,7 +323,7 @@ func (db *DB) deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, strea
|
||||
}
|
||||
|
||||
// This potentially could be done together with the previous database call.
|
||||
err = withRows(tx.Query(ctx, `
|
||||
err = withRows(tx.QueryContext(ctx, `
|
||||
DELETE FROM segments
|
||||
WHERE stream_id = $1 AND position = ANY($2)
|
||||
RETURNING root_piece_id, remote_alias_pieces
|
||||
|
@ -133,11 +133,11 @@ func (db *DB) MigrateToLatest(ctx context.Context) error {
|
||||
|
||||
case dbutil.Cockroach:
|
||||
var dbName string
|
||||
if err := db.db.QueryRow(ctx, `SELECT current_database();`).Scan(&dbName); err != nil {
|
||||
if err := db.db.QueryRowContext(ctx, `SELECT current_database();`).Scan(&dbName); err != nil {
|
||||
return errs.New("error querying current database: %+v", err)
|
||||
}
|
||||
|
||||
_, err := db.db.Exec(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`,
|
||||
_, err := db.db.ExecContext(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`,
|
||||
pgutil.QuoteIdentifier(dbName)))
|
||||
if err != nil {
|
||||
return errs.Wrap(err)
|
||||
|
@ -98,7 +98,7 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa
|
||||
if err := opts.Verify(); err != nil {
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
@ -168,7 +168,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
@ -307,7 +307,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa
|
||||
default:
|
||||
return DeleteObjectResult{}, Error.New("unhandled database: %v", db.impl)
|
||||
}
|
||||
err = withRows(db.db.Query(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error {
|
||||
err = withRows(db.db.QueryContext(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error {
|
||||
result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows)
|
||||
return err
|
||||
})
|
||||
@ -334,7 +334,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO
|
||||
return DeleteObjectResult{}, err
|
||||
}
|
||||
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
@ -409,7 +409,7 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl
|
||||
sort.Slice(objectKeys, func(i, j int) bool {
|
||||
return bytes.Compare(objectKeys[i], objectKeys[j]) < 0
|
||||
})
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
WITH deleted_objects AS (
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
|
@ -80,7 +80,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
|
||||
deletedSegmentsBatch = deletedSegmentsBatch[:0]
|
||||
batchDeletedObjects := 0
|
||||
deletedSegments := 0
|
||||
err = withRows(db.db.Query(ctx, query,
|
||||
err = withRows(db.db.QueryContext(ctx, query,
|
||||
opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize))(func(rows tagsql.Rows) error {
|
||||
ids := map[uuid.UUID]struct{}{} // TODO: avoid map here
|
||||
for rows.Next() {
|
||||
|
@ -72,7 +72,7 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers
|
||||
}
|
||||
|
||||
object := Object{}
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id,
|
||||
created_at, expires_at,
|
||||
@ -128,7 +128,7 @@ func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVe
|
||||
}
|
||||
|
||||
object := Object{}
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id, version,
|
||||
created_at, expires_at,
|
||||
@ -183,7 +183,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio
|
||||
}
|
||||
|
||||
var aliasPieces AliasPieces
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id,
|
||||
created_at, expires_at, repaired_at,
|
||||
@ -251,7 +251,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
|
||||
}
|
||||
|
||||
var aliasPieces AliasPieces
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
created_at, expires_at, repaired_at,
|
||||
root_piece_id, encrypted_key_nonce, encrypted_key,
|
||||
@ -304,7 +304,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje
|
||||
}
|
||||
|
||||
var aliasPieces AliasPieces
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id, position,
|
||||
created_at, repaired_at,
|
||||
@ -369,7 +369,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) (
|
||||
}
|
||||
|
||||
var aliasPieces AliasPieces
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
stream_id, position,
|
||||
created_at, expires_at, repaired_at,
|
||||
@ -436,7 +436,7 @@ func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, er
|
||||
}
|
||||
|
||||
var value int
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
SELECT
|
||||
1
|
||||
FROM objects
|
||||
|
@ -252,7 +252,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
|
||||
}
|
||||
|
||||
if it.prefixLimit == "" {
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, status,
|
||||
created_at, expires_at,
|
||||
@ -274,7 +274,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato
|
||||
|
||||
// TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it
|
||||
// works with CRDB.
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, status,
|
||||
created_at, expires_at,
|
||||
@ -307,7 +307,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
}
|
||||
|
||||
if it.prefixLimit == "" {
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, status,
|
||||
created_at, expires_at,
|
||||
@ -332,7 +332,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
|
||||
|
||||
// TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it
|
||||
// works with CRDB.
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, status,
|
||||
created_at, expires_at,
|
||||
@ -367,7 +367,7 @@ func nextBucket(b []byte) []byte {
|
||||
func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
object_key, stream_id, version, status,
|
||||
created_at, expires_at,
|
||||
|
@ -40,7 +40,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS
|
||||
|
||||
ListLimit.Ensure(&opts.Limit)
|
||||
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position,
|
||||
created_at,
|
||||
@ -154,7 +154,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
|
||||
var rows tagsql.Rows
|
||||
var rowsErr error
|
||||
if opts.Range == nil {
|
||||
rows, rowsErr = db.db.Query(ctx, `
|
||||
rows, rowsErr = db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position, plain_size, plain_offset, created_at,
|
||||
encrypted_etag, encrypted_key_nonce, encrypted_key
|
||||
@ -166,7 +166,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions)
|
||||
LIMIT $3
|
||||
`, opts.StreamID, opts.Cursor, opts.Limit+1)
|
||||
} else {
|
||||
rows, rowsErr = db.db.Query(ctx, `
|
||||
rows, rowsErr = db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
position, plain_size, plain_offset, created_at,
|
||||
encrypted_etag, encrypted_key_nonce, encrypted_key
|
||||
|
@ -166,7 +166,7 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool {
|
||||
func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
project_id, bucket_name,
|
||||
object_key, stream_id, version,
|
||||
@ -250,7 +250,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h
|
||||
bytesIDs[i] = id[:]
|
||||
}
|
||||
|
||||
rows, err := db.db.Query(ctx, `
|
||||
rows, err := db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
stream_id, position,
|
||||
created_at, expires_at, repaired_at,
|
||||
@ -463,7 +463,7 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
|
||||
func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
return it.db.db.Query(ctx, `
|
||||
return it.db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
stream_id, position,
|
||||
created_at, expires_at, repaired_at,
|
||||
|
@ -106,7 +106,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
|
||||
func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
|
||||
objs := []RawObject{}
|
||||
|
||||
rows, err := db.db.Query(ctx, `
|
||||
rows, err := db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
project_id, bucket_name, object_key, version, stream_id,
|
||||
created_at, expires_at,
|
||||
@ -167,7 +167,7 @@ func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err erro
|
||||
func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) {
|
||||
segs := []RawSegment{}
|
||||
|
||||
rows, err := db.db.Query(ctx, `
|
||||
rows, err := db.db.QueryContext(ctx, `
|
||||
SELECT
|
||||
stream_id, position,
|
||||
created_at, repaired_at, expires_at,
|
||||
|
@ -29,11 +29,11 @@ func (db *DB) GetTableStats(ctx context.Context, opts GetTableStats) (result Tab
|
||||
|
||||
var group errs2.Group
|
||||
group.Go(func() error {
|
||||
row := db.db.QueryRow(ctx, `SELECT count(*) FROM objects `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval))
|
||||
row := db.db.QueryRowContext(ctx, `SELECT count(*) FROM objects `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval))
|
||||
return Error.Wrap(row.Scan(&result.ObjectCount))
|
||||
})
|
||||
group.Go(func() error {
|
||||
row := db.db.QueryRow(ctx, `SELECT count(*) FROM segments `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval))
|
||||
row := db.db.QueryRowContext(ctx, `SELECT count(*) FROM segments `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval))
|
||||
return Error.Wrap(row.Scan(&result.SegmentCount))
|
||||
})
|
||||
err = errs.Combine(group.Wait()...)
|
||||
|
@ -28,7 +28,7 @@ func (db *DB) GetStreamPieceCountByNodeID(ctx context.Context, opts GetStreamPie
|
||||
|
||||
countByAlias := map[NodeAlias]int64{}
|
||||
result = map[storj.NodeID]int64{}
|
||||
err = withRows(db.db.Query(ctx, `
|
||||
err = withRows(db.db.QueryContext(ctx, `
|
||||
SELECT remote_alias_pieces
|
||||
FROM segments
|
||||
WHERE stream_id = $1 AND remote_alias_pieces IS NOT null
|
||||
|
@ -75,7 +75,7 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces)
|
||||
}
|
||||
|
||||
var resultPieces AliasPieces
|
||||
err = db.db.QueryRow(ctx, `
|
||||
err = db.db.QueryRowContext(ctx, `
|
||||
UPDATE segments SET
|
||||
remote_alias_pieces = CASE
|
||||
WHEN remote_alias_pieces = $3 THEN $4
|
||||
|
Loading…
Reference in New Issue
Block a user