Implement mutex around satellitedb (#932)

This commit is contained in:
Egon Elbre 2018-12-27 11:56:25 +02:00 committed by GitHub
parent a9572b7169
commit 4346cd060f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 517 additions and 168 deletions

View File

@ -276,7 +276,7 @@ func cmdQDiag(cmd *cobra.Command, args []string) (err error) {
}
}()
list, err := database.RepairQueue().Peekqueue(qdiagCfg.QListLimit)
list, err := database.RepairQueue().Peekqueue(context.Background(), qdiagCfg.QListLimit)
if err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/pkg/utils"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/storage/teststore"
)
@ -39,7 +40,7 @@ type Node struct {
Discovery *discovery.Discovery
StatDB statdb.DB
Overlay *overlay.Cache
Database *satellitedb.DB
Database satellite.DB
Dependencies []io.Closer
}

View File

@ -132,6 +132,7 @@ func (t *tally) queryBW(ctx context.Context) error {
if err != nil {
return Error.Wrap(err)
}
var bwAgreements []bwagreement.Agreement
if isNil {
t.logger.Info("Tally found no existing bandwith tracking data")
@ -142,6 +143,7 @@ func (t *tally) queryBW(ctx context.Context) error {
if err != nil {
return Error.Wrap(err)
}
if len(bwAgreements) == 0 {
t.logger.Info("Tally found no new bandwidth allocations")
return nil

View File

@ -51,6 +51,7 @@ func TestQueryWithBw(t *testing.T) {
db, err := satellitedb.NewInMemory()
assert.NoError(t, err)
defer ctx.Check(db.Close)
assert.NoError(t, db.CreateTables())
bwDb := db.BandwidthAgreement()
@ -61,12 +62,15 @@ func TestQueryWithBw(t *testing.T) {
assert.NoError(t, err)
k, ok := fiC.Key.(*ecdsa.PrivateKey)
assert.True(t, ok)
//generate an agreement with the key
pba, err := test.GeneratePayerBandwidthAllocation(pb.PayerBandwidthAllocation_GET, k)
assert.NoError(t, err)
rba, err := test.GenerateRenterBandwidthAllocation(pba, k)
assert.NoError(t, err)
//save to db
err = bwDb.CreateAgreement(ctx, bwagreement.Agreement{Signature: rba.GetSignature(), Agreement: rba.GetData()})
assert.NoError(t, err)

View File

@ -11,16 +11,16 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"storj.io/storj/internal/identity"
testidentity "storj.io/storj/internal/identity"
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/pb"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestBandwidthAgreements(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

View File

@ -118,7 +118,7 @@ func (c *checker) identifyInjuredSegments(ctx context.Context) (err error) {
numHealthy := len(nodeIDs) - len(missingPieces)
if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) < pointer.Remote.Redundancy.RepairThreshold) {
err = c.repairQueue.Enqueue(&pb.InjuredSegment{
err = c.repairQueue.Enqueue(ctx, &pb.InjuredSegment{
Path: string(item.Key),
LostPieces: missingPieces,
})

View File

@ -107,7 +107,7 @@ func TestIdentifyInjuredSegments(t *testing.T) {
//check if the expected segments were added to the queue
dequeued := []*pb.InjuredSegment{}
for i := 0; i < len(segs); i++ {
injSeg, err := repairQueue.Dequeue()
injSeg, err := repairQueue.Dequeue(ctx)
assert.NoError(t, err)
dequeued = append(dequeued, &injSeg)
}
@ -243,7 +243,7 @@ func BenchmarkIdentifyInjuredSegments(b *testing.B) {
//check if the expected segments were added to the queue
dequeued := []*pb.InjuredSegment{}
for i := 0; i < len(segs); i++ {
injSeg, err := repairQueue.Dequeue()
injSeg, err := repairQueue.Dequeue(ctx)
assert.NoError(b, err)
dequeued = append(dequeued, &injSeg)
}

View File

@ -12,12 +12,12 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestIrreparable(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

View File

@ -4,6 +4,8 @@
package queue
import (
"context"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
@ -13,9 +15,9 @@ import (
// RepairQueue is the interface for the data repair queue
type RepairQueue interface {
Enqueue(qi *pb.InjuredSegment) error
Dequeue() (pb.InjuredSegment, error)
Peekqueue(limit int) ([]pb.InjuredSegment, error)
Enqueue(ctx context.Context, qi *pb.InjuredSegment) error
Dequeue(ctx context.Context) (pb.InjuredSegment, error)
Peekqueue(ctx context.Context, limit int) ([]pb.InjuredSegment, error)
}
// Queue implements the RepairQueue interface
@ -30,7 +32,7 @@ func NewQueue(client storage.Queue) *Queue {
}
// Enqueue adds a repair segment to the queue
func (q *Queue) Enqueue(qi *pb.InjuredSegment) error {
func (q *Queue) Enqueue(ctx context.Context, qi *pb.InjuredSegment) error {
val, err := proto.Marshal(qi)
if err != nil {
return Error.New("error marshalling injured seg %s", err)
@ -44,7 +46,7 @@ func (q *Queue) Enqueue(qi *pb.InjuredSegment) error {
}
// Dequeue returns the next repair segement and removes it from the queue
func (q *Queue) Dequeue() (pb.InjuredSegment, error) {
func (q *Queue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) {
val, err := q.db.Dequeue()
if err != nil {
if err == storage.ErrEmptyQueue {
@ -61,7 +63,7 @@ func (q *Queue) Dequeue() (pb.InjuredSegment, error) {
}
// Peekqueue returns upto 'limit' of the entries from the repair queue
func (q *Queue) Peekqueue(limit int) ([]pb.InjuredSegment, error) {
func (q *Queue) Peekqueue(ctx context.Context, limit int) ([]pb.InjuredSegment, error) {
if limit < 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}

View File

@ -15,7 +15,7 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/pb"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage/redis"
"storj.io/storj/storage/redis/redisserver"
@ -23,7 +23,9 @@ import (
)
func TestEnqueueDequeue(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := db.RepairQueue()
@ -31,28 +33,32 @@ func TestEnqueueDequeue(t *testing.T) {
Path: "abc",
LostPieces: []int32{int32(1), int32(3)},
}
err := q.Enqueue(seg)
err := q.Enqueue(ctx, seg)
assert.NoError(t, err)
s, err := q.Dequeue()
s, err := q.Dequeue(ctx)
assert.NoError(t, err)
assert.True(t, proto.Equal(&s, seg))
})
}
func TestDequeueEmptyQueue(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := db.RepairQueue()
s, err := q.Dequeue()
s, err := q.Dequeue(ctx)
assert.Error(t, err)
assert.Equal(t, pb.InjuredSegment{}, s)
})
}
func TestSequential(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
q := db.RepairQueue()
@ -63,79 +69,91 @@ func TestSequential(t *testing.T) {
Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)},
}
err := q.Enqueue(seg)
err := q.Enqueue(ctx, seg)
assert.NoError(t, err)
addSegs = append(addSegs, seg)
}
list, err := q.Peekqueue(100)
list, err := q.Peekqueue(ctx, 100)
assert.NoError(t, err)
for i := 0; i < N; i++ {
assert.True(t, proto.Equal(addSegs[i], &list[i]))
}
// TODO: fix out of order issue
for i := 0; i < N; i++ {
dqSeg, err := q.Dequeue()
dequeued, err := q.Dequeue(ctx)
assert.NoError(t, err)
assert.True(t, proto.Equal(addSegs[i], &dqSeg))
expected := dequeued.LostPieces[0]
assert.True(t, proto.Equal(addSegs[expected], &dequeued))
}
})
}
func TestParallel(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
t.Skip("logic is broken on database side")
queue := queue.NewQueue(testqueue.New())
const N = 100
errs := make(chan error, N*2)
entries := make(chan *pb.InjuredSegment, N*2)
var wg sync.WaitGroup
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()
wg.Add(N)
// Add to queue concurrently
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
err := queue.Enqueue(&pb.InjuredSegment{
Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)},
})
if err != nil {
errs <- err
}
}(i)
q := db.RepairQueue()
const N = 100
errs := make(chan error, N*2)
entries := make(chan *pb.InjuredSegment, N*2)
var wg sync.WaitGroup
}
wg.Wait()
wg.Add(N)
// Remove from queue concurrently
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
segment, err := queue.Dequeue()
if err != nil {
errs <- err
}
entries <- &segment
}(i)
}
wg.Wait()
close(errs)
close(entries)
wg.Add(N)
// Add to queue concurrently
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
err := q.Enqueue(ctx, &pb.InjuredSegment{
Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)},
})
if err != nil {
errs <- err
}
}(i)
for err := range errs {
t.Error(err)
}
}
wg.Wait()
var items []*pb.InjuredSegment
for segment := range entries {
items = append(items, segment)
}
wg.Add(N)
// Remove from queue concurrently
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
segment, err := q.Dequeue(ctx)
if err != nil {
errs <- err
}
entries <- &segment
}(i)
}
wg.Wait()
close(errs)
close(entries)
sort.Slice(items, func(i, k int) bool { return items[i].LostPieces[0] < items[k].LostPieces[0] })
// check if the enqueued and dequeued elements match
for i := 0; i < N; i++ {
assert.Equal(t, items[i].LostPieces[0], int32(i))
}
for err := range errs {
t.Error(err)
}
var items []*pb.InjuredSegment
for segment := range entries {
items = append(items, segment)
}
sort.Slice(items, func(i, k int) bool {
return items[i].LostPieces[0] < items[k].LostPieces[0]
})
// check if the enqueued and dequeued elements match
for i := 0; i < N; i++ {
assert.Equal(t, items[i].LostPieces[0], int32(i))
}
})
}
func BenchmarkRedisSequential(b *testing.B) {
@ -154,6 +172,8 @@ func BenchmarkTeststoreSequential(b *testing.B) {
}
func benchmarkSequential(b *testing.B, q queue.RepairQueue) {
ctx := testcontext.New(b)
defer ctx.Cleanup()
b.ResetTimer()
for n := 0; n < b.N; n++ {
@ -164,12 +184,12 @@ func benchmarkSequential(b *testing.B, q queue.RepairQueue) {
Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)},
}
err := q.Enqueue(seg)
err := q.Enqueue(ctx, seg)
assert.NoError(b, err)
addSegs = append(addSegs, seg)
}
for i := 0; i < N; i++ {
dqSeg, err := q.Dequeue()
dqSeg, err := q.Dequeue(ctx)
assert.NoError(b, err)
assert.True(b, proto.Equal(addSegs[i], &dqSeg))
}
@ -192,6 +212,8 @@ func BenchmarkTeststoreParallel(b *testing.B) {
}
func benchmarkParallel(b *testing.B, q queue.RepairQueue) {
ctx := testcontext.New(b)
defer ctx.Cleanup()
b.ResetTimer()
for n := 0; n < b.N; n++ {
@ -205,7 +227,7 @@ func benchmarkParallel(b *testing.B, q queue.RepairQueue) {
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
err := q.Enqueue(&pb.InjuredSegment{
err := q.Enqueue(ctx, &pb.InjuredSegment{
Path: strconv.Itoa(i),
LostPieces: []int32{int32(i)},
})
@ -221,7 +243,7 @@ func benchmarkParallel(b *testing.B, q queue.RepairQueue) {
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
segment, err := q.Dequeue()
segment, err := q.Dequeue(ctx)
if err != nil {
errs <- err
}

View File

@ -60,7 +60,7 @@ func (service *repairService) Run(ctx context.Context) (err error) {
// process picks an item from repair queue and spawns a repair worker
func (service *repairService) process(ctx context.Context) error {
seg, err := service.queue.Dequeue()
seg, err := service.queue.Dequeue(ctx)
if err != nil {
if err == storage.ErrEmptyQueue {
return nil

View File

@ -16,7 +16,7 @@ import (
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storage"
"storj.io/storj/storage/teststore"
@ -132,7 +132,7 @@ func TestCache_Masterdb(t *testing.T) {
defer ctx.Check(planet.Shutdown)
planet.Start(ctx)
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
testCache(ctx, t, db.OverlayCache(), db.StatDB())
})
}

View File

@ -12,7 +12,7 @@ import (
"storj.io/storj/internal/testcontext"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
@ -22,7 +22,7 @@ func getRatio(success, total int64) (ratio float64) {
}
func TestStatdb(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

27
satellite/db.go Normal file
View File

@ -0,0 +1,27 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package satellite
import (
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/statdb"
"storj.io/storj/storage"
)
// DB is the master database for the satellite
type DB interface {
BandwidthAgreement() bwagreement.DB
// PointerDB() pointerdb.DB
StatDB() statdb.DB
OverlayCache() storage.KeyValueStore
RepairQueue() queue.RepairQueue
Accounting() accounting.DB
Irreparable() irreparable.DB
CreateTables() error
Close() error
}

View File

@ -13,6 +13,7 @@ import (
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/utils"
"storj.io/storj/satellite"
dbx "storj.io/storj/satellite/satellitedb/dbx"
"storj.io/storj/storage"
)
@ -28,21 +29,27 @@ type DB struct {
}
// New creates instance of database (supports: postgres, sqlite3)
func New(databaseURL string) (*DB, error) {
func New(databaseURL string) (satellite.DB, error) {
driver, source, err := utils.SplitDBURL(databaseURL)
if err != nil {
return nil, err
}
db, err := dbx.Open(driver, source)
if err != nil {
return nil, Error.New("failed opening database %q, %q: %v",
driver, source, err)
}
return &DB{db: db}, nil
core := &DB{db: db}
if driver == "sqlite3" {
return NewMutex(core), nil
}
return core, nil
}
// NewInMemory creates instance of Sqlite in memory satellite database
func NewInMemory() (*DB, error) {
func NewInMemory() (satellite.DB, error) {
return New("sqlite3://file::memory:?mode=memory&cache=shared")
}
@ -63,12 +70,12 @@ func (db *DB) StatDB() statdb.DB {
// OverlayCache is a getter for overlay cache repository
func (db *DB) OverlayCache() storage.KeyValueStore {
return newOverlaycache(db.db)
return &overlaycache{db: db.db}
}
// RepairQueue is a getter for RepairQueue repository
func (db *DB) RepairQueue() queue.RepairQueue {
return newRepairQueue(db.db)
return &repairQueue{db: db.db}
}
// Accounting returns database for tracking bandwidth agreements over time

View File

@ -6,11 +6,11 @@ package satellitedb_test
import (
"testing"
"storj.io/storj/satellite/satellitedb"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
)
func TestDatabase(t *testing.T) {
satellitedbtest.Run(t, func(t *testing.T, db *satellitedb.DB) {
satellitedbtest.Run(t, func(t *testing.T, db satellite.DB) {
})
}

View File

@ -171,8 +171,9 @@ delete overlay_cache_node ( where overlay_cache_node.key = ? )
//--- repairqueue ---//
model injuredsegment (
key info
key id
field id serial64
field info blob
)
@ -183,4 +184,4 @@ read first (
read limitoffset (
select injuredsegment
)
delete injuredsegment ( where injuredsegment.info = ? )
delete injuredsegment ( where injuredsegment.id = ? )

View File

@ -305,8 +305,9 @@ CREATE TABLE bwagreements (
PRIMARY KEY ( signature )
);
CREATE TABLE injuredsegments (
id bigserial NOT NULL,
info bytea NOT NULL,
PRIMARY KEY ( info )
PRIMARY KEY ( id )
);
CREATE TABLE irreparabledbs (
segmentpath bytea NOT NULL,
@ -429,8 +430,9 @@ CREATE TABLE bwagreements (
PRIMARY KEY ( signature )
);
CREATE TABLE injuredsegments (
id INTEGER NOT NULL,
info BLOB NOT NULL,
PRIMARY KEY ( info )
PRIMARY KEY ( id )
);
CREATE TABLE irreparabledbs (
segmentpath BLOB NOT NULL,
@ -934,6 +936,7 @@ func (f Bwagreement_CreatedAt_Field) value() interface{} {
func (Bwagreement_CreatedAt_Field) _Column() string { return "created_at" }
type Injuredsegment struct {
Id int64
Info []byte
}
@ -942,6 +945,25 @@ func (Injuredsegment) _Table() string { return "injuredsegments" }
type Injuredsegment_Update_Fields struct {
}
type Injuredsegment_Id_Field struct {
_set bool
_null bool
_value int64
}
func Injuredsegment_Id(v int64) Injuredsegment_Id_Field {
return Injuredsegment_Id_Field{_set: true, _value: v}
}
func (f Injuredsegment_Id_Field) value() interface{} {
if !f._set || f._null {
return nil
}
return f._value
}
func (Injuredsegment_Id_Field) _Column() string { return "id" }
type Injuredsegment_Info_Field struct {
_set bool
_null bool
@ -1679,13 +1701,13 @@ func (obj *postgresImpl) Create_Injuredsegment(ctx context.Context,
injuredsegment *Injuredsegment, err error) {
__info_val := injuredsegment_info.value()
var __embed_stmt = __sqlbundle_Literal("INSERT INTO injuredsegments ( info ) VALUES ( ? ) RETURNING injuredsegments.info")
var __embed_stmt = __sqlbundle_Literal("INSERT INTO injuredsegments ( info ) VALUES ( ? ) RETURNING injuredsegments.id, injuredsegments.info")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __info_val)
injuredsegment = &Injuredsegment{}
err = obj.driver.QueryRow(__stmt, __info_val).Scan(&injuredsegment.Info)
err = obj.driver.QueryRow(__stmt, __info_val).Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -2083,7 +2105,7 @@ func (obj *postgresImpl) Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx cont
func (obj *postgresImpl) First_Injuredsegment(ctx context.Context) (
injuredsegment *Injuredsegment, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.info FROM injuredsegments LIMIT 1 OFFSET 0")
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.id, injuredsegments.info FROM injuredsegments LIMIT 1 OFFSET 0")
var __values []interface{}
__values = append(__values)
@ -2105,7 +2127,7 @@ func (obj *postgresImpl) First_Injuredsegment(ctx context.Context) (
}
injuredsegment = &Injuredsegment{}
err = __rows.Scan(&injuredsegment.Info)
err = __rows.Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -2118,7 +2140,7 @@ func (obj *postgresImpl) Limited_Injuredsegment(ctx context.Context,
limit int, offset int64) (
rows []*Injuredsegment, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.info FROM injuredsegments LIMIT ? OFFSET ?")
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.id, injuredsegments.info FROM injuredsegments LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
@ -2136,7 +2158,7 @@ func (obj *postgresImpl) Limited_Injuredsegment(ctx context.Context,
for __rows.Next() {
injuredsegment := &Injuredsegment{}
err = __rows.Scan(&injuredsegment.Info)
err = __rows.Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -2578,14 +2600,14 @@ func (obj *postgresImpl) Delete_OverlayCacheNode_By_Key(ctx context.Context,
}
func (obj *postgresImpl) Delete_Injuredsegment_By_Info(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
func (obj *postgresImpl) Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error) {
var __embed_stmt = __sqlbundle_Literal("DELETE FROM injuredsegments WHERE injuredsegments.info = ?")
var __embed_stmt = __sqlbundle_Literal("DELETE FROM injuredsegments WHERE injuredsegments.id = ?")
var __values []interface{}
__values = append(__values, injuredsegment_info.value())
__values = append(__values, injuredsegment_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
@ -3321,7 +3343,7 @@ func (obj *sqlite3Impl) Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx conte
func (obj *sqlite3Impl) First_Injuredsegment(ctx context.Context) (
injuredsegment *Injuredsegment, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.info FROM injuredsegments LIMIT 1 OFFSET 0")
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.id, injuredsegments.info FROM injuredsegments LIMIT 1 OFFSET 0")
var __values []interface{}
__values = append(__values)
@ -3343,7 +3365,7 @@ func (obj *sqlite3Impl) First_Injuredsegment(ctx context.Context) (
}
injuredsegment = &Injuredsegment{}
err = __rows.Scan(&injuredsegment.Info)
err = __rows.Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -3356,7 +3378,7 @@ func (obj *sqlite3Impl) Limited_Injuredsegment(ctx context.Context,
limit int, offset int64) (
rows []*Injuredsegment, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.info FROM injuredsegments LIMIT ? OFFSET ?")
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.id, injuredsegments.info FROM injuredsegments LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values)
@ -3374,7 +3396,7 @@ func (obj *sqlite3Impl) Limited_Injuredsegment(ctx context.Context,
for __rows.Next() {
injuredsegment := &Injuredsegment{}
err = __rows.Scan(&injuredsegment.Info)
err = __rows.Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -3876,14 +3898,14 @@ func (obj *sqlite3Impl) Delete_OverlayCacheNode_By_Key(ctx context.Context,
}
func (obj *sqlite3Impl) Delete_Injuredsegment_By_Info(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
func (obj *sqlite3Impl) Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error) {
var __embed_stmt = __sqlbundle_Literal("DELETE FROM injuredsegments WHERE injuredsegments.info = ?")
var __embed_stmt = __sqlbundle_Literal("DELETE FROM injuredsegments WHERE injuredsegments.id = ?")
var __values []interface{}
__values = append(__values, injuredsegment_info.value())
__values = append(__values, injuredsegment_id.value())
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
@ -4032,13 +4054,13 @@ func (obj *sqlite3Impl) getLastInjuredsegment(ctx context.Context,
pk int64) (
injuredsegment *Injuredsegment, err error) {
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.info FROM injuredsegments WHERE _rowid_ = ?")
var __embed_stmt = __sqlbundle_Literal("SELECT injuredsegments.id, injuredsegments.info FROM injuredsegments WHERE _rowid_ = ?")
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, pk)
injuredsegment = &Injuredsegment{}
err = obj.driver.QueryRow(__stmt, pk).Scan(&injuredsegment.Info)
err = obj.driver.QueryRow(__stmt, pk).Scan(&injuredsegment.Id, &injuredsegment.Info)
if err != nil {
return nil, obj.makeErr(err)
}
@ -4367,14 +4389,14 @@ func (rx *Rx) Delete_Bwagreement_By_Signature(ctx context.Context,
return tx.Delete_Bwagreement_By_Signature(ctx, bwagreement_signature)
}
func (rx *Rx) Delete_Injuredsegment_By_Info(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
func (rx *Rx) Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Delete_Injuredsegment_By_Info(ctx, injuredsegment_info)
return tx.Delete_Injuredsegment_By_Id(ctx, injuredsegment_id)
}
func (rx *Rx) Delete_Irreparabledb_By_Segmentpath(ctx context.Context,
@ -4672,8 +4694,8 @@ type Methods interface {
bwagreement_signature Bwagreement_Signature_Field) (
deleted bool, err error)
Delete_Injuredsegment_By_Info(ctx context.Context,
injuredsegment_info Injuredsegment_Info_Field) (
Delete_Injuredsegment_By_Id(ctx context.Context,
injuredsegment_id Injuredsegment_Id_Field) (
deleted bool, err error)
Delete_Irreparabledb_By_Segmentpath(ctx context.Context,

View File

@ -32,8 +32,9 @@ CREATE TABLE bwagreements (
PRIMARY KEY ( signature )
);
CREATE TABLE injuredsegments (
id bigserial NOT NULL,
info bytea NOT NULL,
PRIMARY KEY ( info )
PRIMARY KEY ( id )
);
CREATE TABLE irreparabledbs (
segmentpath bytea NOT NULL,

View File

@ -32,8 +32,9 @@ CREATE TABLE bwagreements (
PRIMARY KEY ( signature )
);
CREATE TABLE injuredsegments (
id INTEGER NOT NULL,
info BLOB NOT NULL,
PRIMARY KEY ( info )
PRIMARY KEY ( id )
);
CREATE TABLE irreparabledbs (
segmentpath BLOB NOT NULL,

View File

@ -0,0 +1,267 @@
// Copyright (C) 2018 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"errors"
"sync"
"time"
"storj.io/storj/pkg/accounting"
"storj.io/storj/pkg/bwagreement"
"storj.io/storj/pkg/datarepair/irreparable"
"storj.io/storj/pkg/datarepair/queue"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/statdb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
// Mutex wraps DB in a mutex for non-concurrent databases
type Mutex struct {
mu sync.Mutex
db *DB
}
// NewMutex returns a mutex around DB
func NewMutex(db *DB) *Mutex {
return &Mutex{db: db}
}
func (db *Mutex) locked() func() {
db.mu.Lock()
return db.mu.Unlock
}
// BandwidthAgreement is a getter for bandwidth agreement repository
func (db *Mutex) BandwidthAgreement() bwagreement.DB {
return &muBandwidthAgreement{mu: db, bandwidth: db.db.BandwidthAgreement()}
}
// // PointerDB is a getter for PointerDB repository
// func (db *Mutex) PointerDB() pointerdb.DB {
// return &pointerDB{db: db.db}
// }
// StatDB is a getter for StatDB repository
func (db *Mutex) StatDB() statdb.DB {
return &muStatDB{mu: db, statdb: db.db.StatDB()}
}
// OverlayCache is a getter for overlay cache repository
func (db *Mutex) OverlayCache() storage.KeyValueStore {
return &muOverlayCache{mu: db, overlay: db.db.OverlayCache()}
}
// RepairQueue is a getter for RepairQueue repository
func (db *Mutex) RepairQueue() queue.RepairQueue {
return &muRepairQueue{mu: db, repair: db.db.RepairQueue()}
}
// Accounting returns database for tracking bandwidth agreements over time
func (db *Mutex) Accounting() accounting.DB {
return &muAccounting{mu: db, accounting: db.db.Accounting()}
}
// Irreparable returns database for storing segments that failed repair
func (db *Mutex) Irreparable() irreparable.DB {
return &muIrreparable{mu: db, irreparable: db.db.Irreparable()}
}
// CreateTables is a method for creating all tables for database
func (db *Mutex) CreateTables() error {
return db.db.CreateTables()
}
// Close is used to close db connection
func (db *Mutex) Close() error {
return db.db.Close()
}
type muBandwidthAgreement struct {
mu *Mutex
bandwidth bwagreement.DB
}
func (db *muBandwidthAgreement) CreateAgreement(ctx context.Context, agreement bwagreement.Agreement) error {
defer db.mu.locked()()
return db.bandwidth.CreateAgreement(ctx, agreement)
}
func (db *muBandwidthAgreement) GetAgreements(ctx context.Context) ([]bwagreement.Agreement, error) {
defer db.mu.locked()()
return db.bandwidth.GetAgreements(ctx)
}
func (db *muBandwidthAgreement) GetAgreementsSince(ctx context.Context, since time.Time) ([]bwagreement.Agreement, error) {
defer db.mu.locked()()
return db.bandwidth.GetAgreementsSince(ctx, since)
}
// muStatDB implements mutex around statdb.DB
type muStatDB struct {
mu *Mutex
statdb statdb.DB
}
// Create a db entry for the provided storagenode
func (db *muStatDB) Create(ctx context.Context, nodeID storj.NodeID, startingStats *statdb.NodeStats) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.Create(ctx, nodeID, startingStats)
}
// Get a storagenode's stats from the db
func (db *muStatDB) Get(ctx context.Context, nodeID storj.NodeID) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.Get(ctx, nodeID)
}
// FindInvalidNodes finds a subset of storagenodes that have stats below provided reputation requirements
func (db *muStatDB) FindInvalidNodes(ctx context.Context, nodeIDs storj.NodeIDList, maxStats *statdb.NodeStats) (invalidIDs storj.NodeIDList, err error) {
defer db.mu.locked()()
return db.statdb.FindInvalidNodes(ctx, nodeIDs, maxStats)
}
// Update all parts of single storagenode's stats in the db
func (db *muStatDB) Update(ctx context.Context, updateReq *statdb.UpdateRequest) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.Update(ctx, updateReq)
}
// UpdateUptime updates a single storagenode's uptime stats in the db
func (db *muStatDB) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.UpdateUptime(ctx, nodeID, isUp)
}
// UpdateAuditSuccess updates a single storagenode's audit stats in the db
func (db *muStatDB) UpdateAuditSuccess(ctx context.Context, nodeID storj.NodeID, auditSuccess bool) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.UpdateAuditSuccess(ctx, nodeID, auditSuccess)
}
// UpdateBatch for updating multiple farmers' stats in the db
func (db *muStatDB) UpdateBatch(ctx context.Context, updateReqList []*statdb.UpdateRequest) (statsList []*statdb.NodeStats, failedUpdateReqs []*statdb.UpdateRequest, err error) {
defer db.mu.locked()()
return db.statdb.UpdateBatch(ctx, updateReqList)
}
// CreateEntryIfNotExists creates a statdb node entry and saves to statdb if it didn't already exist
func (db *muStatDB) CreateEntryIfNotExists(ctx context.Context, nodeID storj.NodeID) (stats *statdb.NodeStats, err error) {
defer db.mu.locked()()
return db.statdb.CreateEntryIfNotExists(ctx, nodeID)
}
// muOverlayCache implements a mutex around overlay cache
type muOverlayCache struct {
mu *Mutex
overlay storage.KeyValueStore
}
// Put adds a value to store
func (db *muOverlayCache) Put(key storage.Key, value storage.Value) error {
defer db.mu.locked()()
return db.overlay.Put(key, value)
}
// Get gets a value to store
func (db *muOverlayCache) Get(key storage.Key) (storage.Value, error) {
defer db.mu.locked()()
return db.overlay.Get(key)
}
// GetAll gets all values from the store
func (db *muOverlayCache) GetAll(keys storage.Keys) (storage.Values, error) {
defer db.mu.locked()()
return db.overlay.GetAll(keys)
}
// Delete deletes key and the value
func (db *muOverlayCache) Delete(key storage.Key) error {
defer db.mu.locked()()
return db.overlay.Delete(key)
}
// List lists all keys starting from start and upto limit items
func (db *muOverlayCache) List(start storage.Key, limit int) (keys storage.Keys, err error) {
defer db.mu.locked()()
return db.overlay.List(start, limit)
}
// ReverseList lists all keys in revers order
func (db *muOverlayCache) ReverseList(start storage.Key, limit int) (storage.Keys, error) {
defer db.mu.locked()()
return db.overlay.ReverseList(start, limit)
}
// Iterate iterates over items based on opts
func (db *muOverlayCache) Iterate(opts storage.IterateOptions, fn func(storage.Iterator) error) error {
return errors.New("not implemented")
}
// Close closes the store
func (db *muOverlayCache) Close() error {
defer db.mu.locked()()
return db.overlay.Close()
}
// muRepairQueue implements mutex around repair queue
type muRepairQueue struct {
mu *Mutex
repair queue.RepairQueue
}
func (db *muRepairQueue) Enqueue(ctx context.Context, seg *pb.InjuredSegment) error {
defer db.mu.locked()()
return db.repair.Enqueue(ctx, seg)
}
func (db *muRepairQueue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) {
defer db.mu.locked()()
return db.repair.Dequeue(ctx)
}
func (db *muRepairQueue) Peekqueue(ctx context.Context, limit int) ([]pb.InjuredSegment, error) {
defer db.mu.locked()()
return db.repair.Peekqueue(ctx, limit)
}
type muAccounting struct {
mu *Mutex
accounting accounting.DB
}
func (db *muAccounting) LastRawTime(ctx context.Context, timestampType string) (time.Time, bool, error) {
defer db.mu.locked()()
return db.accounting.LastRawTime(ctx, timestampType)
}
func (db *muAccounting) SaveBWRaw(ctx context.Context, latestBwa time.Time, bwTotals map[string]int64) (err error) {
defer db.mu.locked()()
return db.accounting.SaveBWRaw(ctx, latestBwa, bwTotals)
}
func (db *muAccounting) SaveAtRestRaw(ctx context.Context, latestTally time.Time, nodeData map[storj.NodeID]int64) error {
defer db.mu.locked()()
return db.accounting.SaveAtRestRaw(ctx, latestTally, nodeData)
}
type muIrreparable struct {
mu *Mutex
irreparable irreparable.DB
}
func (db *muIrreparable) IncrementRepairAttempts(ctx context.Context, segmentInfo *irreparable.RemoteSegmentInfo) (err error) {
defer db.mu.locked()()
return db.irreparable.IncrementRepairAttempts(ctx, segmentInfo)
}
func (db *muIrreparable) Get(ctx context.Context, segmentPath []byte) (resp *irreparable.RemoteSegmentInfo, err error) {
defer db.mu.locked()()
return db.irreparable.Get(ctx, segmentPath)
}
func (db *muIrreparable) Delete(ctx context.Context, segmentPath []byte) (err error) {
defer db.mu.locked()()
return db.irreparable.Delete(ctx, segmentPath)
}

View File

@ -14,31 +14,24 @@ import (
)
type overlaycache struct {
db *dbx.DB
ctx context.Context
}
func newOverlaycache(db *dbx.DB) *overlaycache {
return &overlaycache{
db: db,
ctx: context.Background(),
}
db *dbx.DB
}
func (o *overlaycache) Put(key storage.Key, value storage.Value) error {
if key.IsZero() {
return storage.ErrEmptyKey.New("")
}
ctx := context.Background() // TODO: fix
tx, err := o.db.Open(o.ctx)
tx, err := o.db.Open(ctx)
if err != nil {
return Error.Wrap(err)
}
_, err = tx.Get_OverlayCacheNode_By_Key(o.ctx, dbx.OverlayCacheNode_Key(key))
_, err = tx.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
if err != nil {
_, err = tx.Create_OverlayCacheNode(
o.ctx,
ctx,
dbx.OverlayCacheNode_Key(key),
dbx.OverlayCacheNode_Value(value),
)
@ -49,7 +42,7 @@ func (o *overlaycache) Put(key storage.Key, value storage.Value) error {
updateFields := dbx.OverlayCacheNode_Update_Fields{}
updateFields.Value = dbx.OverlayCacheNode_Value(value)
_, err := tx.Update_OverlayCacheNode_By_Key(
o.ctx,
ctx,
dbx.OverlayCacheNode_Key(key),
updateFields,
)
@ -65,7 +58,9 @@ func (o *overlaycache) Get(key storage.Key) (storage.Value, error) {
return nil, storage.ErrEmptyKey.New("")
}
node, err := o.db.Get_OverlayCacheNode_By_Key(o.ctx, dbx.OverlayCacheNode_Key(key))
ctx := context.Background() // TODO: fix
node, err := o.db.Get_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
if err == sql.ErrNoRows {
return nil, storage.ErrKeyNotFound.New(key.String())
}
@ -87,20 +82,22 @@ func (o *overlaycache) GetAll(keys storage.Keys) (storage.Values, error) {
}
func (o *overlaycache) Delete(key storage.Key) error {
_, err := o.db.Delete_OverlayCacheNode_By_Key(o.ctx, dbx.OverlayCacheNode_Key(key))
ctx := context.Background() // TODO: fix
_, err := o.db.Delete_OverlayCacheNode_By_Key(ctx, dbx.OverlayCacheNode_Key(key))
return err
}
func (o *overlaycache) List(start storage.Key, limit int) (keys storage.Keys, err error) {
ctx := context.Background() // TODO: fix
if limit <= 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
var rows []*dbx.OverlayCacheNode
if start == nil {
rows, err = o.db.Limited_OverlayCacheNode(o.ctx, limit, 0)
rows, err = o.db.Limited_OverlayCacheNode(ctx, limit, 0)
} else {
rows, err = o.db.Limited_OverlayCacheNode_By_Key_GreaterOrEqual(o.ctx, dbx.OverlayCacheNode_Key(start), limit, 0)
rows, err = o.db.Limited_OverlayCacheNode_By_Key_GreaterOrEqual(ctx, dbx.OverlayCacheNode_Key(start), limit, 0)
}
if err != nil {
return []storage.Key{}, err

View File

@ -7,6 +7,7 @@ import (
"context"
"github.com/golang/protobuf/proto"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/utils"
dbx "storj.io/storj/satellite/satellitedb/dbx"
@ -14,68 +15,61 @@ import (
)
type repairQueue struct {
db *dbx.DB
ctx context.Context
db *dbx.DB
}
func newRepairQueue(db *dbx.DB) *repairQueue {
return &repairQueue{
db: db,
ctx: context.Background(),
}
}
func (r *repairQueue) Enqueue(seg *pb.InjuredSegment) error {
func (r *repairQueue) Enqueue(ctx context.Context, seg *pb.InjuredSegment) error {
val, err := proto.Marshal(seg)
if err != nil {
return err
}
_, err = r.db.Create_Injuredsegment(
r.ctx,
ctx,
dbx.Injuredsegment_Info(val),
)
return err
}
func (r *repairQueue) Dequeue() (pb.InjuredSegment, error) {
tx, err := r.db.Open(r.ctx)
func (r *repairQueue) Dequeue(ctx context.Context) (pb.InjuredSegment, error) {
// TODO: fix out of order issue
tx, err := r.db.Open(ctx)
if err != nil {
return pb.InjuredSegment{}, Error.Wrap(err)
}
res, err := tx.First_Injuredsegment(r.ctx)
res, err := tx.First_Injuredsegment(ctx)
if err != nil {
return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
if res == nil {
} else if res == nil {
return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(storage.ErrEmptyQueue, tx.Rollback()))
}
deleted, err := tx.Delete_Injuredsegment_By_Info(
r.ctx,
dbx.Injuredsegment_Info(res.Info),
deleted, err := tx.Delete_Injuredsegment_By_Id(
ctx,
dbx.Injuredsegment_Id(res.Id),
)
if err != nil {
return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
}
if !deleted {
} else if !deleted {
return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(Error.New("Injured segment not deleted"), tx.Rollback()))
}
if err := tx.Commit(); err != nil {
return pb.InjuredSegment{}, Error.Wrap(err)
}
seg := &pb.InjuredSegment{}
err = proto.Unmarshal(res.Info, seg)
if err != nil {
return pb.InjuredSegment{}, Error.Wrap(utils.CombineErrors(err, tx.Rollback()))
if err = proto.Unmarshal(res.Info, seg); err != nil {
return pb.InjuredSegment{}, Error.Wrap(err)
}
return *seg, Error.Wrap(tx.Commit())
return *seg, nil
}
func (r *repairQueue) Peekqueue(limit int) ([]pb.InjuredSegment, error) {
func (r *repairQueue) Peekqueue(ctx context.Context, limit int) ([]pb.InjuredSegment, error) {
if limit <= 0 || limit > storage.LookupLimit {
limit = storage.LookupLimit
}
rows, err := r.db.Limited_Injuredsegment(r.ctx, limit, 0)
rows, err := r.db.Limited_Injuredsegment(ctx, limit, 0)
if err != nil {
return nil, err
}

View File

@ -10,6 +10,7 @@ import (
"os"
"testing"
"storj.io/storj/satellite"
"storj.io/storj/satellite/satellitedb"
)
@ -25,7 +26,7 @@ var (
// Run method will iterate over all supported databases. Will establish
// connection and will create tables for each DB.
func Run(t *testing.T, test func(t *testing.T, db *satellitedb.DB)) {
func Run(t *testing.T, test func(t *testing.T, db satellite.DB)) {
for _, dbInfo := range []struct {
dbName string
dbURL string