Keep track of 'irreparable' segments in a database (#686)
* initial irreparable development * added the Open, Insert, Update, Get, Delete, Close support * added the Unit test cases * code review changes
This commit is contained in:
parent
ae790dfd9f
commit
9f6e5d6731
@ -11,6 +11,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/irreparabledb"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/pointerdb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -27,17 +28,19 @@ type checker struct {
|
||||
pointerdb *pointerdb.Server
|
||||
repairQueue *queue.Queue
|
||||
overlay pb.OverlayServer
|
||||
irrdb *irreparabledb.Database
|
||||
limit int
|
||||
logger *zap.Logger
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
// NewChecker creates a new instance of checker
|
||||
func newChecker(pointerdb *pointerdb.Server, repairQueue *queue.Queue, overlay pb.OverlayServer, limit int, logger *zap.Logger, interval time.Duration) *checker {
|
||||
// newChecker creates a new instance of checker
|
||||
func newChecker(pointerdb *pointerdb.Server, repairQueue *queue.Queue, overlay pb.OverlayServer, irrdb *irreparabledb.Database, limit int, logger *zap.Logger, interval time.Duration) *checker {
|
||||
return &checker{
|
||||
pointerdb: pointerdb,
|
||||
repairQueue: repairQueue,
|
||||
overlay: overlay,
|
||||
irrdb: irrdb,
|
||||
limit: limit,
|
||||
logger: logger,
|
||||
ticker: time.NewTicker(interval),
|
||||
@ -98,7 +101,7 @@ func (c *checker) identifyInjuredSegments(ctx context.Context) (err error) {
|
||||
return Error.New("error getting offline nodes %s", err)
|
||||
}
|
||||
numHealthy := len(nodeIDs) - len(missingPieces)
|
||||
if int32(numHealthy) < pointer.Remote.Redundancy.RepairThreshold {
|
||||
if (int32(numHealthy) >= pointer.Remote.Redundancy.MinReq) && (int32(numHealthy) < pointer.Remote.Redundancy.RepairThreshold) {
|
||||
err = c.repairQueue.Enqueue(&pb.InjuredSegment{
|
||||
Path: string(item.Key),
|
||||
LostPieces: missingPieces,
|
||||
@ -106,6 +109,21 @@ func (c *checker) identifyInjuredSegments(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return Error.New("error adding injured segment to queue %s", err)
|
||||
}
|
||||
} else if int32(numHealthy) < pointer.Remote.Redundancy.MinReq {
|
||||
// make an entry in to the irreparable table
|
||||
segmentInfo := &irreparabledb.RemoteSegmentInfo{
|
||||
EncryptedSegmentPath: item.Key,
|
||||
EncryptedSegmentDetail: item.Value,
|
||||
LostPiecesCount: int64(len(missingPieces)),
|
||||
RepairUnixSec: time.Now().Unix(),
|
||||
RepairAttemptCount: int64(1),
|
||||
}
|
||||
|
||||
//add the entry if new or update attempt count if already exists
|
||||
err := c.irrdb.IncrementRepairAttempts(ctx, segmentInfo)
|
||||
if err != nil {
|
||||
return Error.New("error handling irreparable segment to queue %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"storj.io/storj/internal/teststorj"
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/irreparabledb"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -88,8 +89,16 @@ func TestIdentifyInjuredSegments(t *testing.T) {
|
||||
overlayServer := mocks.NewOverlay(nodes)
|
||||
limit := 0
|
||||
interval := time.Second
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, limit, logger, interval)
|
||||
err := checker.identifyInjuredSegments(ctx)
|
||||
irrdb, err := irreparabledb.New("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
assert.NoError(t, err)
|
||||
defer func() {
|
||||
err := irrdb.Close()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
assert.NoError(t, err)
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, irrdb, limit, logger, interval)
|
||||
assert.NoError(t, err)
|
||||
err = checker.identifyInjuredSegments(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//check if the expected segments were added to the queue
|
||||
@ -130,7 +139,15 @@ func TestOfflineNodes(t *testing.T) {
|
||||
overlayServer := mocks.NewOverlay(nodes)
|
||||
limit := 0
|
||||
interval := time.Second
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, limit, logger, interval)
|
||||
irrdb, err := irreparabledb.New("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
assert.NoError(t, err)
|
||||
defer func() {
|
||||
err := irrdb.Close()
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
assert.NoError(t, err)
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, irrdb, limit, logger, interval)
|
||||
assert.NoError(t, err)
|
||||
offline, err := checker.offlineNodes(ctx, nodeIDs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, expectedOffline, offline)
|
||||
@ -139,6 +156,12 @@ func TestOfflineNodes(t *testing.T) {
|
||||
func BenchmarkIdentifyInjuredSegments(b *testing.B) {
|
||||
logger := zap.NewNop()
|
||||
pointerdb := pointerdb.NewServer(teststore.New(), &overlay.Cache{}, logger, pointerdb.Config{}, nil)
|
||||
irrdb, err := irreparabledb.New("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
assert.NoError(b, err)
|
||||
defer func() {
|
||||
err := irrdb.Close()
|
||||
assert.NoError(b, err)
|
||||
}()
|
||||
|
||||
addr, cleanup, err := redisserver.Start()
|
||||
defer cleanup()
|
||||
@ -200,7 +223,9 @@ func BenchmarkIdentifyInjuredSegments(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
interval := time.Second
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, limit, logger, interval)
|
||||
assert.NoError(b, err)
|
||||
checker := newChecker(pointerdb, repairQueue, overlayServer, irrdb, limit, logger, interval)
|
||||
assert.NoError(b, err)
|
||||
err = checker.identifyInjuredSegments(ctx)
|
||||
assert.NoError(b, err)
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/irreparabledb"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
mock "storj.io/storj/pkg/overlay/mocks"
|
||||
"storj.io/storj/pkg/pb"
|
||||
@ -20,13 +21,18 @@ import (
|
||||
|
||||
// Config contains configurable values for checker
|
||||
type Config struct {
|
||||
QueueAddress string `help:"data checker queue address" default:"redis://127.0.0.1:6378?db=1&password=abc123"`
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"30s"`
|
||||
QueueAddress string `help:"data checker queue address" default:"redis://127.0.0.1:6378?db=1&password=abc123"`
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"30s"`
|
||||
IrreparabledbURL string `help:"the database connection string to use" default:"sqlite3://$CONFDIR/irreparabledb.db"`
|
||||
}
|
||||
|
||||
// Initialize a Checker struct
|
||||
func (c Config) initialize(ctx context.Context) (Checker, error) {
|
||||
pdb := pointerdb.LoadFromContext(ctx)
|
||||
irrdb, err := irreparabledb.New(c.IrreparabledbURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var o pb.OverlayServer
|
||||
x := overlay.LoadServerFromContext(ctx)
|
||||
if x == nil {
|
||||
@ -39,7 +45,7 @@ func (c Config) initialize(ctx context.Context) (Checker, error) {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
repairQueue := queue.NewQueue(redisQ)
|
||||
return newChecker(pdb, repairQueue, o, 0, zap.L(), c.Interval), nil
|
||||
return newChecker(pdb, repairQueue, o, irrdb, 0, zap.L(), c.Interval), nil
|
||||
}
|
||||
|
||||
// Run runs the checker with configured values
|
||||
|
11
pkg/irreparabledb/common.go
Normal file
11
pkg/irreparabledb/common.go
Normal file
@ -0,0 +1,11 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package irreparabledb
|
||||
|
||||
import (
|
||||
"github.com/zeebo/errs"
|
||||
)
|
||||
|
||||
// Error is the default boltdb errs class
|
||||
var Error = errs.Class("irreparabaledb error")
|
7
pkg/irreparabledb/dbx/gen.go
Normal file
7
pkg/irreparabledb/dbx/gen.go
Normal file
@ -0,0 +1,7 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package irreparabledb
|
||||
|
||||
//go:generate dbx.v1 schema -d postgres -d sqlite3 irreparabledb.dbx .
|
||||
//go:generate dbx.v1 golang -d postgres -d sqlite3 irreparabledb.dbx .
|
18
pkg/irreparabledb/dbx/irreparabledb.dbx
Normal file
18
pkg/irreparabledb/dbx/irreparabledb.dbx
Normal file
@ -0,0 +1,18 @@
|
||||
// dbx.v1 golang irreparabledb.dbx .
|
||||
model irreparabledb (
|
||||
key segmentpath
|
||||
|
||||
field segmentpath blob
|
||||
field segmentdetail blob (updatable)
|
||||
field pieces_lost_count int64 (updatable)
|
||||
field seg_damaged_unix_sec int64 (updatable)
|
||||
field repair_attempt_count int64 (updatable)
|
||||
)
|
||||
|
||||
create irreparabledb ( )
|
||||
update irreparabledb ( where irreparabledb.segmentpath = ? )
|
||||
delete irreparabledb ( where irreparabledb.segmentpath = ? )
|
||||
read one (
|
||||
select irreparabledb
|
||||
where irreparabledb.segmentpath = ?
|
||||
)
|
1212
pkg/irreparabledb/dbx/irreparabledb.dbx.go
Normal file
1212
pkg/irreparabledb/dbx/irreparabledb.dbx.go
Normal file
File diff suppressed because it is too large
Load Diff
10
pkg/irreparabledb/dbx/irreparabledb.dbx.postgres.sql
Normal file
10
pkg/irreparabledb/dbx/irreparabledb.dbx.postgres.sql
Normal file
@ -0,0 +1,10 @@
|
||||
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
|
||||
-- DO NOT EDIT
|
||||
CREATE TABLE irreparabledbs (
|
||||
segmentpath bytea NOT NULL,
|
||||
segmentdetail bytea NOT NULL,
|
||||
pieces_lost_count bigint NOT NULL,
|
||||
seg_damaged_unix_sec bigint NOT NULL,
|
||||
repair_attempt_count bigint NOT NULL,
|
||||
PRIMARY KEY ( segmentpath )
|
||||
);
|
10
pkg/irreparabledb/dbx/irreparabledb.dbx.sqlite3.sql
Normal file
10
pkg/irreparabledb/dbx/irreparabledb.dbx.sqlite3.sql
Normal file
@ -0,0 +1,10 @@
|
||||
-- AUTOGENERATED BY gopkg.in/spacemonkeygo/dbx.v1
|
||||
-- DO NOT EDIT
|
||||
CREATE TABLE irreparabledbs (
|
||||
segmentpath BLOB NOT NULL,
|
||||
segmentdetail BLOB NOT NULL,
|
||||
pieces_lost_count INTEGER NOT NULL,
|
||||
seg_damaged_unix_sec INTEGER NOT NULL,
|
||||
repair_attempt_count INTEGER NOT NULL,
|
||||
PRIMARY KEY ( segmentpath )
|
||||
);
|
115
pkg/irreparabledb/irreparabledb.go
Normal file
115
pkg/irreparabledb/irreparabledb.go
Normal file
@ -0,0 +1,115 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package irreparabledb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"storj.io/storj/internal/migrate"
|
||||
dbx "storj.io/storj/pkg/irreparabledb/dbx"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
// Database implements the irreparable RPC service
|
||||
type Database struct {
|
||||
db *dbx.DB
|
||||
}
|
||||
|
||||
// RemoteSegmentInfo is info about a single entry stored in the irreparable db
|
||||
type RemoteSegmentInfo struct {
|
||||
EncryptedSegmentPath []byte
|
||||
EncryptedSegmentDetail []byte //contains marshaled info of pb.Pointer
|
||||
LostPiecesCount int64
|
||||
RepairUnixSec int64
|
||||
RepairAttemptCount int64
|
||||
}
|
||||
|
||||
// New creates instance of Server
|
||||
func New(source string) (*Database, error) {
|
||||
u, err := utils.ParseURL(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := dbx.Open(u.Scheme, u.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = migrate.Create("irreparabledb", db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Database{
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IncrementRepairAttempts a db entry for to increment the repair attempts field
|
||||
func (db *Database) IncrementRepairAttempts(ctx context.Context, segmentInfo *RemoteSegmentInfo) (err error) {
|
||||
tx, err := db.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dbxInfo, err := db.Get(ctx, segmentInfo.EncryptedSegmentPath)
|
||||
if err != nil {
|
||||
// no rows err, so create/insert an entry
|
||||
_, err = db.db.Create_Irreparabledb(
|
||||
ctx,
|
||||
dbx.Irreparabledb_Segmentpath(segmentInfo.EncryptedSegmentPath),
|
||||
dbx.Irreparabledb_Segmentdetail(segmentInfo.EncryptedSegmentDetail),
|
||||
dbx.Irreparabledb_PiecesLostCount(segmentInfo.LostPiecesCount),
|
||||
dbx.Irreparabledb_SegDamagedUnixSec(segmentInfo.RepairUnixSec),
|
||||
dbx.Irreparabledb_RepairAttemptCount(segmentInfo.RepairAttemptCount),
|
||||
)
|
||||
if err != nil {
|
||||
return utils.CombineErrors(err, tx.Rollback())
|
||||
}
|
||||
} else {
|
||||
// row exits increment the attempt counter
|
||||
dbxInfo.RepairAttemptCount++
|
||||
updateFields := dbx.Irreparabledb_Update_Fields{}
|
||||
updateFields.RepairAttemptCount = dbx.Irreparabledb_RepairAttemptCount(dbxInfo.RepairAttemptCount)
|
||||
_, err = db.db.Update_Irreparabledb_By_Segmentpath(
|
||||
ctx,
|
||||
dbx.Irreparabledb_Segmentpath(dbxInfo.EncryptedSegmentPath),
|
||||
updateFields,
|
||||
)
|
||||
if err != nil {
|
||||
return utils.CombineErrors(err, tx.Rollback())
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// Get a irreparable's segment info from the db
|
||||
func (db *Database) Get(ctx context.Context, segmentPath []byte) (resp *RemoteSegmentInfo, err error) {
|
||||
dbxInfo, err := db.db.Get_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentPath))
|
||||
if err != nil {
|
||||
return &RemoteSegmentInfo{}, err
|
||||
}
|
||||
|
||||
return &RemoteSegmentInfo{
|
||||
EncryptedSegmentPath: dbxInfo.Segmentpath,
|
||||
EncryptedSegmentDetail: dbxInfo.Segmentdetail,
|
||||
LostPiecesCount: dbxInfo.PiecesLostCount,
|
||||
RepairUnixSec: dbxInfo.SegDamagedUnixSec,
|
||||
RepairAttemptCount: dbxInfo.RepairAttemptCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete a irreparable's segment info from the db
|
||||
func (db *Database) Delete(ctx context.Context, segmentPath []byte) (err error) {
|
||||
_, err = db.db.Delete_Irreparabledb_By_Segmentpath(ctx, dbx.Irreparabledb_Segmentpath(segmentPath))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Close close db connection
|
||||
func (db *Database) Close() (err error) {
|
||||
return db.db.Close()
|
||||
}
|
90
pkg/irreparabledb/irreparabledb_test.go
Normal file
90
pkg/irreparabledb/irreparabledb_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright (C) 2018 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package irreparabledb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"storj.io/storj/internal/testcontext"
|
||||
)
|
||||
|
||||
const (
|
||||
// this connstring is expected to work under the storj-test docker-compose instance
|
||||
defaultPostgresConn = "postgres://storj:storj-pass@test-postgres/teststorj?sslmode=disable"
|
||||
)
|
||||
|
||||
var (
|
||||
testPostgres = flag.String("postgres-test-db", os.Getenv("STORJ_POSTGRES_TEST"), "PostgreSQL test database connection string")
|
||||
)
|
||||
|
||||
func TestPostgres(t *testing.T) {
|
||||
if *testPostgres == "" {
|
||||
t.Skipf("postgres flag missing, example:\n-postgres-test-db=%s", defaultPostgresConn)
|
||||
}
|
||||
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// creating in-memory db and opening connection
|
||||
irrdb, err := New(*testPostgres)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ctx.Check(irrdb.db.Close)
|
||||
|
||||
testDatabase(ctx, t, irrdb)
|
||||
}
|
||||
|
||||
func TestSqlite3(t *testing.T) {
|
||||
ctx := testcontext.New(t)
|
||||
defer ctx.Cleanup()
|
||||
|
||||
// creating in-memory db and opening connection
|
||||
irrdb, err := New("sqlite3://file::memory:?mode=memory&cache=shared")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ctx.Check(irrdb.db.Close)
|
||||
|
||||
testDatabase(ctx, t, irrdb)
|
||||
}
|
||||
|
||||
func testDatabase(ctx context.Context, t *testing.T, irrdb *Database) {
|
||||
//testing variables
|
||||
segmentInfo := &RemoteSegmentInfo{
|
||||
EncryptedSegmentPath: []byte("IamSegmentkeyinfo"),
|
||||
EncryptedSegmentDetail: []byte("IamSegmentdetailinfo"),
|
||||
LostPiecesCount: int64(10),
|
||||
RepairUnixSec: time.Now().Unix(),
|
||||
RepairAttemptCount: int64(10),
|
||||
}
|
||||
|
||||
{ // New entry
|
||||
err := irrdb.IncrementRepairAttempts(ctx, segmentInfo)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
{ //Increment the already existing entry
|
||||
err := irrdb.IncrementRepairAttempts(ctx, segmentInfo)
|
||||
assert.NoError(t, err)
|
||||
segmentInfo.RepairAttemptCount++
|
||||
|
||||
dbxInfo, err := irrdb.Get(ctx, segmentInfo.EncryptedSegmentPath)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, segmentInfo, dbxInfo)
|
||||
}
|
||||
|
||||
{ //Delete existing entry
|
||||
err := irrdb.Delete(ctx, segmentInfo.EncryptedSegmentPath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = irrdb.Get(ctx, segmentInfo.EncryptedSegmentPath)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
@ -33,7 +33,18 @@ func ParseURL(s string) (*url.URL, error) {
|
||||
Path: strings.TrimPrefix(s, "bolt://"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if strings.HasPrefix(s, "sqlite3://") {
|
||||
return &url.URL{
|
||||
Scheme: "sqlite3",
|
||||
Path: strings.TrimPrefix(s, "sqlite3://"),
|
||||
}, nil
|
||||
}
|
||||
if strings.HasPrefix(s, "postgres://") {
|
||||
return &url.URL{
|
||||
Scheme: "postgres",
|
||||
Path: s,
|
||||
}, nil
|
||||
}
|
||||
return url.Parse(s)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user