From ae345320fecb1e00a049111239818307fbbc95ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Wed, 28 Jul 2021 15:44:22 +0200 Subject: [PATCH] satellite/metabase: use db methods that pass context correctly It turns out that some DB methods are not passing context to to lower levels even if have context as a parameter. Change-Id: I82f4fc1a47c362687a91364d85e4468057e53134 --- satellite/metabase/commit.go | 10 +++++----- satellite/metabase/commit_object.go | 8 ++++---- satellite/metabase/db.go | 4 ++-- satellite/metabase/delete.go | 10 +++++----- satellite/metabase/delete_bucket.go | 2 +- satellite/metabase/get.go | 14 +++++++------- satellite/metabase/iterator.go | 10 +++++----- satellite/metabase/list_segments.go | 6 +++--- satellite/metabase/loop.go | 6 +++--- satellite/metabase/raw.go | 4 ++-- satellite/metabase/stats.go | 4 ++-- satellite/metabase/streamstat.go | 2 +- satellite/metabase/update.go | 2 +- 13 files changed, 41 insertions(+), 41 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index ea95e7862..a5b38e806 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -57,7 +57,7 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe opts.ZombieDeletionDeadline = &deadline } - row := db.db.QueryRow(ctx, ` + row := db.db.QueryRowContext(ctx, ` INSERT INTO objects ( project_id, bucket_name, object_key, version, stream_id, expires_at, encryption, @@ -128,7 +128,7 @@ func (db *DB) BeginObjectExactVersion(ctx context.Context, opts BeginObjectExact ZombieDeletionDeadline: opts.ZombieDeletionDeadline, } - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` INSERT INTO objects ( project_id, bucket_name, object_key, version, stream_id, expires_at, encryption, @@ -190,7 +190,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { // Verify that object exists and is partial. var value int - err = tx.QueryRow(ctx, ` + err = tx.QueryRowContext(ctx, ` SELECT 1 FROM objects WHERE project_id = $1 AND @@ -208,7 +208,7 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) { } // Verify that the segment does not exist. - err = tx.QueryRow(ctx, ` + err = tx.QueryRowContext(ctx, ` SELECT 1 FROM segments WHERE stream_id = $1 AND @@ -475,7 +475,7 @@ func (db *DB) CommitObject(ctx context.Context, opts CommitObject) (object Objec totalEncryptedSize += int64(seg.EncryptedSize) } - err = tx.QueryRow(ctx, ` + err = tx.QueryRowContext(ctx, ` UPDATE objects SET status =`+committedStatus+`, segment_count = $6, diff --git a/satellite/metabase/commit_object.go b/satellite/metabase/commit_object.go index 0b6431b6c..db2399072 100644 --- a/satellite/metabase/commit_object.go +++ b/satellite/metabase/commit_object.go @@ -87,7 +87,7 @@ func (db *DB) CommitObjectWithSegments(ctx context.Context, opts CommitObjectWit totalEncryptedSize += int64(seg.EncryptedSize) } - err = tx.QueryRow(ctx, ` + err = tx.QueryRowContext(ctx, ` UPDATE objects SET status =`+committedStatus+`, segment_count = $6, @@ -183,7 +183,7 @@ type segmentInfoForCommit struct { func fetchSegmentsForCommit(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID) (segments []segmentInfoForCommit, err error) { defer mon.Task()(&ctx)(&err) - err = withRows(tx.Query(ctx, ` + err = withRows(tx.QueryContext(ctx, ` SELECT position, encrypted_size, plain_offset, plain_size FROM segments WHERE stream_id = $1 @@ -289,7 +289,7 @@ func updateSegmentOffsets(ctx context.Context, tx tagsql.Tx, streamID uuid.UUID, return nil } - updateResult, err := tx.Exec(ctx, ` + updateResult, err := tx.ExecContext(ctx, ` UPDATE segments SET plain_offset = P.plain_offset FROM (SELECT unnest($2::INT8[]), unnest($3::INT8[])) as P(position, plain_offset) @@ -323,7 +323,7 @@ func (db *DB) deleteSegmentsNotInCommit(ctx context.Context, tx tagsql.Tx, strea } // This potentially could be done together with the previous database call. - err = withRows(tx.Query(ctx, ` + err = withRows(tx.QueryContext(ctx, ` DELETE FROM segments WHERE stream_id = $1 AND position = ANY($2) RETURNING root_piece_id, remote_alias_pieces diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index c186e9776..2d06db162 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -133,11 +133,11 @@ func (db *DB) MigrateToLatest(ctx context.Context) error { case dbutil.Cockroach: var dbName string - if err := db.db.QueryRow(ctx, `SELECT current_database();`).Scan(&dbName); err != nil { + if err := db.db.QueryRowContext(ctx, `SELECT current_database();`).Scan(&dbName); err != nil { return errs.New("error querying current database: %+v", err) } - _, err := db.db.Exec(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`, + _, err := db.db.ExecContext(ctx, fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s;`, pgutil.QuoteIdentifier(dbName))) if err != nil { return errs.Wrap(err) diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 537e22ca8..ab7225f0f 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -98,7 +98,7 @@ func (db *DB) DeleteObjectExactVersion(ctx context.Context, opts DeleteObjectExa if err := opts.Verify(); err != nil { return DeleteObjectResult{}, err } - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` WITH deleted_objects AS ( DELETE FROM objects WHERE @@ -168,7 +168,7 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject) return DeleteObjectResult{}, err } - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` WITH deleted_objects AS ( DELETE FROM objects WHERE @@ -307,7 +307,7 @@ func (db *DB) DeleteObjectLatestVersion(ctx context.Context, opts DeleteObjectLa default: return DeleteObjectResult{}, Error.New("unhandled database: %v", db.impl) } - err = withRows(db.db.Query(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error { + err = withRows(db.db.QueryContext(ctx, query, opts.ProjectID, []byte(opts.BucketName), []byte(opts.ObjectKey)))(func(rows tagsql.Rows) error { result.Objects, result.Segments, err = db.scanObjectDeletion(ctx, opts.ObjectLocation, rows) return err }) @@ -334,7 +334,7 @@ func (db *DB) DeleteObjectAnyStatusAllVersions(ctx context.Context, opts DeleteO return DeleteObjectResult{}, err } - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` WITH deleted_objects AS ( DELETE FROM objects WHERE @@ -409,7 +409,7 @@ func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAl sort.Slice(objectKeys, func(i, j int) bool { return bytes.Compare(objectKeys[i], objectKeys[j]) < 0 }) - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` WITH deleted_objects AS ( DELETE FROM objects WHERE diff --git a/satellite/metabase/delete_bucket.go b/satellite/metabase/delete_bucket.go index c01a927a0..4ff8f3185 100644 --- a/satellite/metabase/delete_bucket.go +++ b/satellite/metabase/delete_bucket.go @@ -80,7 +80,7 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects) deletedSegmentsBatch = deletedSegmentsBatch[:0] batchDeletedObjects := 0 deletedSegments := 0 - err = withRows(db.db.Query(ctx, query, + err = withRows(db.db.QueryContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize))(func(rows tagsql.Rows) error { ids := map[uuid.UUID]struct{}{} // TODO: avoid map here for rows.Next() { diff --git a/satellite/metabase/get.go b/satellite/metabase/get.go index 48141a937..4c2b67e16 100644 --- a/satellite/metabase/get.go +++ b/satellite/metabase/get.go @@ -72,7 +72,7 @@ func (db *DB) GetObjectExactVersion(ctx context.Context, opts GetObjectExactVers } object := Object{} - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT stream_id, created_at, expires_at, @@ -128,7 +128,7 @@ func (db *DB) GetObjectLatestVersion(ctx context.Context, opts GetObjectLatestVe } object := Object{} - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT stream_id, version, created_at, expires_at, @@ -183,7 +183,7 @@ func (db *DB) GetSegmentByLocation(ctx context.Context, opts GetSegmentByLocatio } var aliasPieces AliasPieces - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT stream_id, created_at, expires_at, repaired_at, @@ -251,7 +251,7 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio } var aliasPieces AliasPieces - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT created_at, expires_at, repaired_at, root_piece_id, encrypted_key_nonce, encrypted_key, @@ -304,7 +304,7 @@ func (db *DB) GetLatestObjectLastSegment(ctx context.Context, opts GetLatestObje } var aliasPieces AliasPieces - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT stream_id, position, created_at, repaired_at, @@ -369,7 +369,7 @@ func (db *DB) GetSegmentByOffset(ctx context.Context, opts GetSegmentByOffset) ( } var aliasPieces AliasPieces - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT stream_id, position, created_at, expires_at, repaired_at, @@ -436,7 +436,7 @@ func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, er } var value int - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` SELECT 1 FROM objects diff --git a/satellite/metabase/iterator.go b/satellite/metabase/iterator.go index 6cd6f18b3..474596b31 100644 --- a/satellite/metabase/iterator.go +++ b/satellite/metabase/iterator.go @@ -252,7 +252,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato } if it.prefixLimit == "" { - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT object_key, stream_id, version, status, created_at, expires_at, @@ -274,7 +274,7 @@ func doNextQueryAllVersionsWithoutStatus(ctx context.Context, it *objectsIterato // TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it // works with CRDB. - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT object_key, stream_id, version, status, created_at, expires_at, @@ -307,7 +307,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) } if it.prefixLimit == "" { - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT object_key, stream_id, version, status, created_at, expires_at, @@ -332,7 +332,7 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) // TODO this query should use SUBSTRING(object_key from $8) but there is a problem how it // works with CRDB. - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT object_key, stream_id, version, status, created_at, expires_at, @@ -367,7 +367,7 @@ func nextBucket(b []byte) []byte { func doNextQueryStreamsByKey(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) { defer mon.Task()(&ctx)(&err) - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT object_key, stream_id, version, status, created_at, expires_at, diff --git a/satellite/metabase/list_segments.go b/satellite/metabase/list_segments.go index 7ce77adac..81c76f42f 100644 --- a/satellite/metabase/list_segments.go +++ b/satellite/metabase/list_segments.go @@ -40,7 +40,7 @@ func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListS ListLimit.Ensure(&opts.Limit) - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` SELECT position, created_at, @@ -154,7 +154,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions) var rows tagsql.Rows var rowsErr error if opts.Range == nil { - rows, rowsErr = db.db.Query(ctx, ` + rows, rowsErr = db.db.QueryContext(ctx, ` SELECT position, plain_size, plain_offset, created_at, encrypted_etag, encrypted_key_nonce, encrypted_key @@ -166,7 +166,7 @@ func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions) LIMIT $3 `, opts.StreamID, opts.Cursor, opts.Limit+1) } else { - rows, rowsErr = db.db.Query(ctx, ` + rows, rowsErr = db.db.QueryContext(ctx, ` SELECT position, plain_size, plain_offset, created_at, encrypted_etag, encrypted_key_nonce, encrypted_key diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index d030bc2a9..380c4fa37 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -166,7 +166,7 @@ func (it *loopIterator) Next(ctx context.Context, item *LoopObjectEntry) bool { func (it *loopIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { defer mon.Task()(&ctx)(&err) - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT project_id, bucket_name, object_key, stream_id, version, @@ -250,7 +250,7 @@ func (db *DB) IterateLoopStreams(ctx context.Context, opts IterateLoopStreams, h bytesIDs[i] = id[:] } - rows, err := db.db.Query(ctx, ` + rows, err := db.db.QueryContext(ctx, ` SELECT stream_id, position, created_at, expires_at, repaired_at, @@ -463,7 +463,7 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { defer mon.Task()(&ctx)(&err) - return it.db.db.Query(ctx, ` + return it.db.db.QueryContext(ctx, ` SELECT stream_id, position, created_at, expires_at, repaired_at, diff --git a/satellite/metabase/raw.go b/satellite/metabase/raw.go index 71c4c2bf2..088b73d32 100644 --- a/satellite/metabase/raw.go +++ b/satellite/metabase/raw.go @@ -106,7 +106,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) { objs := []RawObject{} - rows, err := db.db.Query(ctx, ` + rows, err := db.db.QueryContext(ctx, ` SELECT project_id, bucket_name, object_key, version, stream_id, created_at, expires_at, @@ -167,7 +167,7 @@ func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err erro func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) { segs := []RawSegment{} - rows, err := db.db.Query(ctx, ` + rows, err := db.db.QueryContext(ctx, ` SELECT stream_id, position, created_at, repaired_at, expires_at, diff --git a/satellite/metabase/stats.go b/satellite/metabase/stats.go index e03e5f71a..a3f1a411d 100644 --- a/satellite/metabase/stats.go +++ b/satellite/metabase/stats.go @@ -29,11 +29,11 @@ func (db *DB) GetTableStats(ctx context.Context, opts GetTableStats) (result Tab var group errs2.Group group.Go(func() error { - row := db.db.QueryRow(ctx, `SELECT count(*) FROM objects `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)) + row := db.db.QueryRowContext(ctx, `SELECT count(*) FROM objects `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)) return Error.Wrap(row.Scan(&result.ObjectCount)) }) group.Go(func() error { - row := db.db.QueryRow(ctx, `SELECT count(*) FROM segments `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)) + row := db.db.QueryRowContext(ctx, `SELECT count(*) FROM segments `+db.impl.AsOfSystemInterval(opts.AsOfSystemInterval)) return Error.Wrap(row.Scan(&result.SegmentCount)) }) err = errs.Combine(group.Wait()...) diff --git a/satellite/metabase/streamstat.go b/satellite/metabase/streamstat.go index 4a22e52f4..e4531d31f 100644 --- a/satellite/metabase/streamstat.go +++ b/satellite/metabase/streamstat.go @@ -28,7 +28,7 @@ func (db *DB) GetStreamPieceCountByNodeID(ctx context.Context, opts GetStreamPie countByAlias := map[NodeAlias]int64{} result = map[storj.NodeID]int64{} - err = withRows(db.db.Query(ctx, ` + err = withRows(db.db.QueryContext(ctx, ` SELECT remote_alias_pieces FROM segments WHERE stream_id = $1 AND remote_alias_pieces IS NOT null diff --git a/satellite/metabase/update.go b/satellite/metabase/update.go index f105bcc62..bb97bde8a 100644 --- a/satellite/metabase/update.go +++ b/satellite/metabase/update.go @@ -75,7 +75,7 @@ func (db *DB) UpdateSegmentPieces(ctx context.Context, opts UpdateSegmentPieces) } var resultPieces AliasPieces - err = db.db.QueryRow(ctx, ` + err = db.db.QueryRowContext(ctx, ` UPDATE segments SET remote_alias_pieces = CASE WHEN remote_alias_pieces = $3 THEN $4