Integrate SegmentStore Repair method with repair service (#582)
* add storeConfig struct and getSegmentStore helper for creating a segment store * implement segment store in repairer, remove unnecessary repairer Repair method * change repair method parameter from int to int32 to match type being passed in * implement repairer service in captplanet * rework Config, set Config defaults in captplanet/setup
This commit is contained in:
parent
de7358fb30
commit
de46a999bc
@ -129,9 +129,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
|
||||
runCfg.Satellite.PointerDB,
|
||||
runCfg.Satellite.Kademlia,
|
||||
o,
|
||||
// TODO(coyle): re-enable the checker and repairer after we determine why it is panicing
|
||||
// TODO(coyle): re-enable the checker after we determine why it is panicing
|
||||
// runCfg.Satellite.Checker,
|
||||
// runCfg.Satellite.Repairer,
|
||||
runCfg.Satellite.Repairer,
|
||||
)
|
||||
}()
|
||||
|
||||
|
@ -132,7 +132,6 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
startingPort := setupCfg.StartingPort
|
||||
|
||||
overrides := map[string]interface{}{
|
||||
"satellite.repairer.queue-address": "redis://127.0.0.1:6378?db=1&password=abc123",
|
||||
"satellite.identity.cert-path": setupCfg.HCIdentity.CertPath,
|
||||
"satellite.identity.key-path": setupCfg.HCIdentity.KeyPath,
|
||||
"satellite.identity.address": joinHostPort(
|
||||
@ -145,6 +144,12 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) {
|
||||
setupCfg.BasePath, "satellite", "pointerdb.db"),
|
||||
"satellite.overlay.database-url": "bolt://" + filepath.Join(
|
||||
setupCfg.BasePath, "satellite", "overlay.db"),
|
||||
"satellite.repairer.queue-address": "redis://127.0.0.1:6378?db=1&password=abc123",
|
||||
"satellite.repairer.overlay-addr": joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+1),
|
||||
"satellite.repairer.pointer-db-addr": joinHostPort(
|
||||
setupCfg.ListenHost, startingPort+1),
|
||||
"satellite.repairer.api-key": setupCfg.APIKey,
|
||||
"uplink.cert-path": setupCfg.ULIdentity.CertPath,
|
||||
"uplink.key-path": setupCfg.ULIdentity.KeyPath,
|
||||
"uplink.address": joinHostPort(
|
||||
|
@ -7,10 +7,18 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/vivint/infectious"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/miniogw"
|
||||
"storj.io/storj/pkg/overlay"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/provider"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
segment "storj.io/storj/pkg/storage/segments"
|
||||
"storj.io/storj/pkg/transport"
|
||||
"storj.io/storj/storage/redis"
|
||||
)
|
||||
|
||||
@ -19,6 +27,8 @@ type Config struct {
|
||||
QueueAddress string `help:"data repair queue address" default:"redis://127.0.0.1:6378?db=1&password=abc123"`
|
||||
MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"`
|
||||
Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"`
|
||||
miniogw.ClientConfig
|
||||
miniogw.RSConfig
|
||||
}
|
||||
|
||||
// Run runs the repairer with configured values
|
||||
@ -29,7 +39,13 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
||||
}
|
||||
|
||||
queue := queue.NewQueue(client)
|
||||
repairer := newRepairer(queue, c.Interval, c.MaxRepair)
|
||||
|
||||
ss, err := c.getSegmentStore(ctx, server.Identity())
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
repairer := newRepairer(queue, ss, c.Interval, c.MaxRepair)
|
||||
|
||||
// TODO(coyle): we need to figure out how to propagate the error up to cancel the service
|
||||
go func() {
|
||||
@ -40,3 +56,33 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error)
|
||||
|
||||
return server.Run(ctx)
|
||||
}
|
||||
|
||||
// getSegmentStore creates a new segment store from storeConfig values
|
||||
func (c Config) getSegmentStore(ctx context.Context, identity *provider.FullIdentity) (ss segment.Store, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
t := transport.NewClient(identity)
|
||||
|
||||
var oc overlay.Client
|
||||
oc, err = overlay.NewOverlayClient(identity, c.OverlayAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pdb, err := pdbclient.NewClient(identity, c.PointerDBAddr, c.APIKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ec := ecclient.NewClient(identity, t, c.MaxBufferMem)
|
||||
fc, err := infectious.NewFEC(c.MinThreshold, c.MaxThreshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, c.ErasureShareSize), c.RepairThreshold, c.SuccessThreshold)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return segment.NewSegmentStore(oc, ec, pdb, rs, c.MaxInlineSize), nil
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"storj.io/storj/internal/sync2"
|
||||
"storj.io/storj/pkg/datarepair/queue"
|
||||
"storj.io/storj/pkg/pb"
|
||||
segment "storj.io/storj/pkg/storage/segments"
|
||||
)
|
||||
|
||||
// Repairer is the interface for the data repair queue
|
||||
@ -23,13 +24,15 @@ type Repairer interface {
|
||||
// repairer holds important values for data repair
|
||||
type repairer struct {
|
||||
queue queue.RepairQueue
|
||||
store segment.Store
|
||||
limiter *sync2.Limiter
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func newRepairer(queue queue.RepairQueue, interval time.Duration, concurrency int) *repairer {
|
||||
func newRepairer(queue queue.RepairQueue, ss segment.Store, interval time.Duration, concurrency int) *repairer {
|
||||
return &repairer{
|
||||
queue: queue,
|
||||
store: ss,
|
||||
limiter: sync2.NewLimiter(concurrency),
|
||||
ticker: time.NewTicker(interval),
|
||||
}
|
||||
@ -65,7 +68,7 @@ func (r *repairer) process(ctx context.Context) error {
|
||||
}
|
||||
|
||||
r.limiter.Go(ctx, func() {
|
||||
err := r.Repair(ctx, &seg)
|
||||
err := r.store.Repair(ctx, seg.GetPath(), seg.GetLostPieces())
|
||||
if err != nil {
|
||||
zap.L().Error("Repair failed", zap.Error(err))
|
||||
}
|
||||
@ -73,11 +76,3 @@ func (r *repairer) process(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Repair starts repair of the segment
|
||||
func (r *repairer) Repair(ctx context.Context, seg *pb.InjuredSegment) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
// TODO:
|
||||
zap.L().Debug("Repairing", zap.Any("segment", seg))
|
||||
return err
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: storj.io/storj/pkg/storage/segments (interfaces: Store)
|
||||
// Source: pkg/storage/segments/store.go
|
||||
|
||||
// Package segments is a generated GoMock package.
|
||||
package segments
|
||||
@ -11,9 +11,8 @@ import (
|
||||
time "time"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
ranger "storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storj"
|
||||
storj "storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// MockStore is a mock of Store interface
|
||||
@ -39,21 +38,22 @@ func (m *MockStore) EXPECT() *MockStoreMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Delete mocks base method
|
||||
func (m *MockStore) Delete(arg0 context.Context, arg1 string) error {
|
||||
ret := m.ctrl.Call(m, "Delete", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
// Meta mocks base method
|
||||
func (m *MockStore) Meta(ctx context.Context, path storj.Path) (Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Meta", ctx, path)
|
||||
ret0, _ := ret[0].(Meta)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Delete indicates an expected call of Delete
|
||||
func (mr *MockStoreMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0, arg1)
|
||||
// Meta indicates an expected call of Meta
|
||||
func (mr *MockStoreMockRecorder) Meta(ctx, path interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), ctx, path)
|
||||
}
|
||||
|
||||
// Get mocks base method
|
||||
func (m *MockStore) Get(arg0 context.Context, arg1 string) (ranger.Ranger, Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Get", arg0, arg1)
|
||||
func (m *MockStore) Get(ctx context.Context, path storj.Path) (ranger.Ranger, Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Get", ctx, path)
|
||||
ret0, _ := ret[0].(ranger.Ranger)
|
||||
ret1, _ := ret[1].(Meta)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -61,13 +61,50 @@ func (m *MockStore) Get(arg0 context.Context, arg1 string) (ranger.Ranger, Meta,
|
||||
}
|
||||
|
||||
// Get indicates an expected call of Get
|
||||
func (mr *MockStoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0, arg1)
|
||||
func (mr *MockStoreMockRecorder) Get(ctx, path interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), ctx, path)
|
||||
}
|
||||
|
||||
// Repair mocks base method
|
||||
func (m *MockStore) Repair(ctx context.Context, path storj.Path, lostPieces []int32) error {
|
||||
ret := m.ctrl.Call(m, "Repair", ctx, path, lostPieces)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Repair indicates an expected call of Repair
|
||||
func (mr *MockStoreMockRecorder) Repair(ctx, path, lostPieces interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockStore)(nil).Repair), ctx, path, lostPieces)
|
||||
}
|
||||
|
||||
// Put mocks base method
|
||||
func (m *MockStore) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Put", ctx, data, expiration, segmentInfo)
|
||||
ret0, _ := ret[0].(Meta)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Put indicates an expected call of Put
|
||||
func (mr *MockStoreMockRecorder) Put(ctx, data, expiration, segmentInfo interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), ctx, data, expiration, segmentInfo)
|
||||
}
|
||||
|
||||
// Delete mocks base method
|
||||
func (m *MockStore) Delete(ctx context.Context, path storj.Path) error {
|
||||
ret := m.ctrl.Call(m, "Delete", ctx, path)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Delete indicates an expected call of Delete
|
||||
func (mr *MockStoreMockRecorder) Delete(ctx, path interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), ctx, path)
|
||||
}
|
||||
|
||||
// List mocks base method
|
||||
func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 string, arg4 bool, arg5 int, arg6 uint32) ([]ListItem, bool, error) {
|
||||
ret := m.ctrl.Call(m, "List", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) ([]ListItem, bool, error) {
|
||||
ret := m.ctrl.Call(m, "List", ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags)
|
||||
ret0, _ := ret[0].([]ListItem)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
@ -75,44 +112,6 @@ func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 string, arg4 boo
|
||||
}
|
||||
|
||||
// List indicates an expected call of List
|
||||
func (mr *MockStoreMockRecorder) List(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
}
|
||||
|
||||
// Meta mocks base method
|
||||
func (m *MockStore) Meta(arg0 context.Context, arg1 string) (Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Meta", arg0, arg1)
|
||||
ret0, _ := ret[0].(Meta)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Meta indicates an expected call of Meta
|
||||
func (mr *MockStoreMockRecorder) Meta(arg0, arg1 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), arg0, arg1)
|
||||
}
|
||||
|
||||
// Put mocks base method
|
||||
func (m *MockStore) Put(arg0 context.Context, arg1 io.Reader, arg2 time.Time, arg3 func() (string, []byte, error)) (Meta, error) {
|
||||
ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(Meta)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Put indicates an expected call of Put
|
||||
func (mr *MockStoreMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// Repair mocks base method
|
||||
func (m *MockStore) Repair(arg0 context.Context, arg1 storj.Path, arg2 []int) error {
|
||||
ret := m.ctrl.Call(m, "Repair", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Repair indicates an expected call of Repair
|
||||
func (mr *MockStoreMockRecorder) Repair(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockStore)(nil).Repair), arg0, arg1, arg2)
|
||||
func (mr *MockStoreMockRecorder) List(ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags)
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ type ListItem struct {
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path) (meta Meta, err error)
|
||||
Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error)
|
||||
Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error)
|
||||
Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error)
|
||||
Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (meta Meta, err error)
|
||||
Delete(ctx context.Context, path storj.Path) (err error)
|
||||
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
@ -277,7 +277,7 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error)
|
||||
}
|
||||
|
||||
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
|
||||
func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error) {
|
||||
func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
//Read the segment's pointer's info from the PointerDB
|
||||
@ -313,7 +313,7 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
|
||||
|
||||
//remove all lost pieces from the list to have only healthy pieces
|
||||
for i := range lostPieces {
|
||||
if j == lostPieces[i] {
|
||||
if j == int(lostPieces[i]) {
|
||||
totalNilNodes++
|
||||
}
|
||||
}
|
||||
|
@ -242,14 +242,14 @@ func TestSegmentStoreRepairRemote(t *testing.T) {
|
||||
pointerType pb.Pointer_DataType
|
||||
size int64
|
||||
metadata []byte
|
||||
lostPieces []int
|
||||
lostPieces []int32
|
||||
newNodes []*pb.Node
|
||||
data string
|
||||
strsize, offset, length int64
|
||||
substr string
|
||||
meta Meta
|
||||
}{
|
||||
{"path/1/2/3", 10, pb.Pointer_REMOTE, int64(3), []byte("metadata"), []int{}, []*pb.Node{{Id: "1"}, {Id: "2"}}, "abcdefghijkl", 12, 1, 4, "bcde", Meta{}},
|
||||
{"path/1/2/3", 10, pb.Pointer_REMOTE, int64(3), []byte("metadata"), []int32{}, []*pb.Node{{Id: "1"}, {Id: "2"}}, "abcdefghijkl", 12, 1, 4, "bcde", Meta{}},
|
||||
} {
|
||||
mockOC := mock_overlay.NewMockClient(ctrl)
|
||||
mockEC := mock_ecclient.NewMockClient(ctrl)
|
||||
|
Loading…
Reference in New Issue
Block a user