satellitedb/repairqueue: runs a different implementation of the query within Select() for postgres vs cockroach
Change-Id: Ie34dbdb9d870d7d9f8f269702b6b3bad0c55b98e
This commit is contained in:
parent
7af42e3c10
commit
7c5f777a4f
1
go.mod
1
go.mod
@ -26,6 +26,7 @@ require (
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/cheggaaa/pb v1.0.5-0.20160713104425-73ae1d68fe0b // indirect
|
||||
github.com/cheggaaa/pb/v3 v3.0.1
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c
|
||||
github.com/djherbis/atime v1.0.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/eclipse/paho.mqtt.golang v1.1.1 // indirect
|
||||
|
6
go.sum
6
go.sum
@ -50,7 +50,9 @@ github.com/cheggaaa/pb/v3 v3.0.1/go.mod h1:SqqeMF/pMOIu3xgGoxtPYhMNQP258xE4x/XRT
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cloudfoundry/gosigar v1.1.0 h1:V/dVCzhKOdIU3WRB5inQU20s4yIgL9Dxx/Mhi0SF8eM=
|
||||
github.com/cloudfoundry/gosigar v1.1.0/go.mod h1:3qLfc2GlfmwOx2+ZDaRGH3Y9fwQ0sQeaAleo2GV5pH0=
|
||||
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
|
||||
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c h1:2zRrJWIt/f9c9HhNHAgrRgq0San5gRRUJTBXLkchal0=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
|
||||
github.com/containerd/containerd v1.2.7 h1:8lqLbl7u1j3MmiL9cJ/O275crSq7bfwUayvvatEupQk=
|
||||
github.com/containerd/containerd v1.2.7/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
|
||||
@ -211,7 +213,9 @@ github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7V
|
||||
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
|
||||
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
|
||||
github.com/jackc/pgx v3.2.0+incompatible h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=
|
||||
github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
|
||||
@ -357,9 +361,11 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
|
||||
github.com/rs/cors v1.5.0 h1:dgSHE6+ia18arGOTIYQKKGWLvEbGvmbNE6NfxhoNHUY=
|
||||
github.com/rs/cors v1.5.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
||||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/segmentio/go-prompt v1.2.1-0.20161017233205-f0d19b6901ad h1:EqOdoSJGI7CsBQczPcIgmpm3hJE7X8Hj3jrgI002whs=
|
||||
github.com/segmentio/go-prompt v1.2.1-0.20161017233205-f0d19b6901ad/go.mod h1:B3ehdD1xPoWDKgrQgUaGk+m8H1xb1J5TyYDfKpKNeEE=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
|
||||
|
@ -99,7 +99,7 @@ func (db *DB) OverlayCache() overlay.DB {
|
||||
|
||||
// RepairQueue is a getter for RepairQueue repository
|
||||
func (db *DB) RepairQueue() queue.RepairQueue {
|
||||
return &repairQueue{db: db.db}
|
||||
return &repairQueue{db: db.db, dbType: db.implementation}
|
||||
}
|
||||
|
||||
// StoragenodeAccounting returns database for tracking storagenode usage
|
||||
|
@ -6,15 +6,20 @@ package satellitedb
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/crdb"
|
||||
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/private/dbutil"
|
||||
"storj.io/storj/private/dbutil/pgutil"
|
||||
dbx "storj.io/storj/satellite/satellitedb/dbx"
|
||||
"storj.io/storj/storage"
|
||||
)
|
||||
|
||||
type repairQueue struct {
|
||||
db *dbx.DB
|
||||
db *dbx.DB
|
||||
dbType dbutil.Implementation
|
||||
}
|
||||
|
||||
func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment) (err error) {
|
||||
@ -31,13 +36,26 @@ func (r *repairQueue) Insert(ctx context.Context, seg *pb.InjuredSegment) (err e
|
||||
|
||||
func (r *repairQueue) Select(ctx context.Context) (seg *pb.InjuredSegment, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
err = r.db.QueryRowContext(ctx, `
|
||||
UPDATE injuredsegments SET attempted = timezone('utc', now()) WHERE path = (
|
||||
SELECT path FROM injuredsegments
|
||||
WHERE attempted IS NULL OR attempted < timezone('utc', now()) - interval '1 hour'
|
||||
ORDER BY attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
|
||||
) RETURNING data`).Scan(&seg)
|
||||
|
||||
switch r.dbType {
|
||||
case dbutil.Cockroach:
|
||||
err = crdb.ExecuteTx(ctx, r.db.DB, nil, func(tx *sql.Tx) error {
|
||||
return tx.QueryRowContext(ctx, `
|
||||
UPDATE injuredsegments SET attempted = now() AT TIME ZONE 'UTC' WHERE path = (
|
||||
SELECT path FROM injuredsegments
|
||||
WHERE attempted IS NULL OR attempted < now() AT TIME ZONE 'UTC' - interval '1 hour'
|
||||
ORDER BY attempted LIMIT 1
|
||||
) RETURNING data`).Scan(&seg)
|
||||
})
|
||||
case dbutil.Postgres:
|
||||
err = r.db.QueryRowContext(ctx, `
|
||||
UPDATE injuredsegments SET attempted = now() AT TIME ZONE 'UTC' WHERE path = (
|
||||
SELECT path FROM injuredsegments
|
||||
WHERE attempted IS NULL OR attempted < now() AT TIME ZONE 'UTC' - interval '1 hour'
|
||||
ORDER BY attempted NULLS FIRST FOR UPDATE SKIP LOCKED LIMIT 1
|
||||
) RETURNING data`).Scan(&seg)
|
||||
default:
|
||||
return seg, fmt.Errorf("invalid dbType: %v", r.dbType)
|
||||
}
|
||||
if err == sql.ErrNoRows {
|
||||
err = storage.ErrEmptyQueue.New("")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user