From de46a999bcc816175fc1bc01e06f46df23b32d4d Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 6 Nov 2018 09:52:11 -0500 Subject: [PATCH] Integrate SegmentStore Repair method with repair service (#582) * add storeConfig struct and getSegmentStore helper for creating a segment store * implement segment store in repairer, remove unnecessary repairer Repair method * change repair method parameter from int to int32 to match type being passed in * implement repairer service in captplanet * rework Config, set Config defaults in captplanet/setup --- cmd/captplanet/run.go | 4 +- cmd/captplanet/setup.go | 15 ++-- pkg/datarepair/repairer/config.go | 48 +++++++++++- pkg/datarepair/repairer/repairer.go | 15 ++-- pkg/storage/segments/mock_store.go | 113 ++++++++++++++-------------- pkg/storage/segments/store.go | 6 +- pkg/storage/segments/store_test.go | 4 +- 7 files changed, 125 insertions(+), 80 deletions(-) diff --git a/cmd/captplanet/run.go b/cmd/captplanet/run.go index 9c5fd38ef..1635b75dc 100644 --- a/cmd/captplanet/run.go +++ b/cmd/captplanet/run.go @@ -129,9 +129,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { runCfg.Satellite.PointerDB, runCfg.Satellite.Kademlia, o, - // TODO(coyle): re-enable the checker and repairer after we determine why it is panicing + // TODO(coyle): re-enable the checker after we determine why it is panicing // runCfg.Satellite.Checker, - // runCfg.Satellite.Repairer, + runCfg.Satellite.Repairer, ) }() diff --git a/cmd/captplanet/setup.go b/cmd/captplanet/setup.go index 3c2bee1f8..d963d361b 100644 --- a/cmd/captplanet/setup.go +++ b/cmd/captplanet/setup.go @@ -132,9 +132,8 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { startingPort := setupCfg.StartingPort overrides := map[string]interface{}{ - "satellite.repairer.queue-address": "redis://127.0.0.1:6378?db=1&password=abc123", - "satellite.identity.cert-path": setupCfg.HCIdentity.CertPath, - "satellite.identity.key-path": setupCfg.HCIdentity.KeyPath, + "satellite.identity.cert-path": setupCfg.HCIdentity.CertPath, + "satellite.identity.key-path": setupCfg.HCIdentity.KeyPath, "satellite.identity.address": joinHostPort( setupCfg.ListenHost, startingPort+1), "satellite.kademlia.todo-listen-addr": joinHostPort( @@ -145,8 +144,14 @@ func cmdSetup(cmd *cobra.Command, args []string) (err error) { setupCfg.BasePath, "satellite", "pointerdb.db"), "satellite.overlay.database-url": "bolt://" + filepath.Join( setupCfg.BasePath, "satellite", "overlay.db"), - "uplink.cert-path": setupCfg.ULIdentity.CertPath, - "uplink.key-path": setupCfg.ULIdentity.KeyPath, + "satellite.repairer.queue-address": "redis://127.0.0.1:6378?db=1&password=abc123", + "satellite.repairer.overlay-addr": joinHostPort( + setupCfg.ListenHost, startingPort+1), + "satellite.repairer.pointer-db-addr": joinHostPort( + setupCfg.ListenHost, startingPort+1), + "satellite.repairer.api-key": setupCfg.APIKey, + "uplink.cert-path": setupCfg.ULIdentity.CertPath, + "uplink.key-path": setupCfg.ULIdentity.KeyPath, "uplink.address": joinHostPort( setupCfg.ListenHost, startingPort), "uplink.overlay-addr": joinHostPort( diff --git a/pkg/datarepair/repairer/config.go b/pkg/datarepair/repairer/config.go index bc8581d51..47bba3c2d 100644 --- a/pkg/datarepair/repairer/config.go +++ b/pkg/datarepair/repairer/config.go @@ -7,10 +7,18 @@ import ( "context" "time" + "github.com/vivint/infectious" "go.uber.org/zap" "storj.io/storj/pkg/datarepair/queue" + "storj.io/storj/pkg/eestream" + "storj.io/storj/pkg/miniogw" + "storj.io/storj/pkg/overlay" + "storj.io/storj/pkg/pointerdb/pdbclient" "storj.io/storj/pkg/provider" + ecclient "storj.io/storj/pkg/storage/ec" + segment "storj.io/storj/pkg/storage/segments" + "storj.io/storj/pkg/transport" "storj.io/storj/storage/redis" ) @@ -19,6 +27,8 @@ type Config struct { QueueAddress string `help:"data repair queue address" default:"redis://127.0.0.1:6378?db=1&password=abc123"` MaxRepair int `help:"maximum segments that can be repaired concurrently" default:"100"` Interval time.Duration `help:"how frequently checker should audit segments" default:"3600s"` + miniogw.ClientConfig + miniogw.RSConfig } // Run runs the repairer with configured values @@ -29,7 +39,13 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) } queue := queue.NewQueue(client) - repairer := newRepairer(queue, c.Interval, c.MaxRepair) + + ss, err := c.getSegmentStore(ctx, server.Identity()) + if err != nil { + return Error.Wrap(err) + } + + repairer := newRepairer(queue, ss, c.Interval, c.MaxRepair) // TODO(coyle): we need to figure out how to propagate the error up to cancel the service go func() { @@ -40,3 +56,33 @@ func (c Config) Run(ctx context.Context, server *provider.Provider) (err error) return server.Run(ctx) } + +// getSegmentStore creates a new segment store from storeConfig values +func (c Config) getSegmentStore(ctx context.Context, identity *provider.FullIdentity) (ss segment.Store, err error) { + defer mon.Task()(&ctx)(&err) + + t := transport.NewClient(identity) + + var oc overlay.Client + oc, err = overlay.NewOverlayClient(identity, c.OverlayAddr) + if err != nil { + return nil, err + } + + pdb, err := pdbclient.NewClient(identity, c.PointerDBAddr, c.APIKey) + if err != nil { + return nil, err + } + + ec := ecclient.NewClient(identity, t, c.MaxBufferMem) + fc, err := infectious.NewFEC(c.MinThreshold, c.MaxThreshold) + if err != nil { + return nil, err + } + rs, err := eestream.NewRedundancyStrategy(eestream.NewRSScheme(fc, c.ErasureShareSize), c.RepairThreshold, c.SuccessThreshold) + if err != nil { + return nil, err + } + + return segment.NewSegmentStore(oc, ec, pdb, rs, c.MaxInlineSize), nil +} diff --git a/pkg/datarepair/repairer/repairer.go b/pkg/datarepair/repairer/repairer.go index 9c97fcf88..76496534f 100644 --- a/pkg/datarepair/repairer/repairer.go +++ b/pkg/datarepair/repairer/repairer.go @@ -12,6 +12,7 @@ import ( "storj.io/storj/internal/sync2" "storj.io/storj/pkg/datarepair/queue" "storj.io/storj/pkg/pb" + segment "storj.io/storj/pkg/storage/segments" ) // Repairer is the interface for the data repair queue @@ -23,13 +24,15 @@ type Repairer interface { // repairer holds important values for data repair type repairer struct { queue queue.RepairQueue + store segment.Store limiter *sync2.Limiter ticker *time.Ticker } -func newRepairer(queue queue.RepairQueue, interval time.Duration, concurrency int) *repairer { +func newRepairer(queue queue.RepairQueue, ss segment.Store, interval time.Duration, concurrency int) *repairer { return &repairer{ queue: queue, + store: ss, limiter: sync2.NewLimiter(concurrency), ticker: time.NewTicker(interval), } @@ -65,7 +68,7 @@ func (r *repairer) process(ctx context.Context) error { } r.limiter.Go(ctx, func() { - err := r.Repair(ctx, &seg) + err := r.store.Repair(ctx, seg.GetPath(), seg.GetLostPieces()) if err != nil { zap.L().Error("Repair failed", zap.Error(err)) } @@ -73,11 +76,3 @@ func (r *repairer) process(ctx context.Context) error { return nil } - -// Repair starts repair of the segment -func (r *repairer) Repair(ctx context.Context, seg *pb.InjuredSegment) (err error) { - defer mon.Task()(&ctx)(&err) - // TODO: - zap.L().Debug("Repairing", zap.Any("segment", seg)) - return err -} diff --git a/pkg/storage/segments/mock_store.go b/pkg/storage/segments/mock_store.go index 9b9a3e767..de84449df 100644 --- a/pkg/storage/segments/mock_store.go +++ b/pkg/storage/segments/mock_store.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: storj.io/storj/pkg/storage/segments (interfaces: Store) +// Source: pkg/storage/segments/store.go // Package segments is a generated GoMock package. package segments @@ -11,9 +11,8 @@ import ( time "time" gomock "github.com/golang/mock/gomock" - ranger "storj.io/storj/pkg/ranger" - "storj.io/storj/pkg/storj" + storj "storj.io/storj/pkg/storj" ) // MockStore is a mock of Store interface @@ -39,21 +38,22 @@ func (m *MockStore) EXPECT() *MockStoreMockRecorder { return m.recorder } -// Delete mocks base method -func (m *MockStore) Delete(arg0 context.Context, arg1 string) error { - ret := m.ctrl.Call(m, "Delete", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 +// Meta mocks base method +func (m *MockStore) Meta(ctx context.Context, path storj.Path) (Meta, error) { + ret := m.ctrl.Call(m, "Meta", ctx, path) + ret0, _ := ret[0].(Meta) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// Delete indicates an expected call of Delete -func (mr *MockStoreMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), arg0, arg1) +// Meta indicates an expected call of Meta +func (mr *MockStoreMockRecorder) Meta(ctx, path interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), ctx, path) } // Get mocks base method -func (m *MockStore) Get(arg0 context.Context, arg1 string) (ranger.Ranger, Meta, error) { - ret := m.ctrl.Call(m, "Get", arg0, arg1) +func (m *MockStore) Get(ctx context.Context, path storj.Path) (ranger.Ranger, Meta, error) { + ret := m.ctrl.Call(m, "Get", ctx, path) ret0, _ := ret[0].(ranger.Ranger) ret1, _ := ret[1].(Meta) ret2, _ := ret[2].(error) @@ -61,13 +61,50 @@ func (m *MockStore) Get(arg0 context.Context, arg1 string) (ranger.Ranger, Meta, } // Get indicates an expected call of Get -func (mr *MockStoreMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), arg0, arg1) +func (mr *MockStoreMockRecorder) Get(ctx, path interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockStore)(nil).Get), ctx, path) +} + +// Repair mocks base method +func (m *MockStore) Repair(ctx context.Context, path storj.Path, lostPieces []int32) error { + ret := m.ctrl.Call(m, "Repair", ctx, path, lostPieces) + ret0, _ := ret[0].(error) + return ret0 +} + +// Repair indicates an expected call of Repair +func (mr *MockStoreMockRecorder) Repair(ctx, path, lostPieces interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockStore)(nil).Repair), ctx, path, lostPieces) +} + +// Put mocks base method +func (m *MockStore) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (Meta, error) { + ret := m.ctrl.Call(m, "Put", ctx, data, expiration, segmentInfo) + ret0, _ := ret[0].(Meta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Put indicates an expected call of Put +func (mr *MockStoreMockRecorder) Put(ctx, data, expiration, segmentInfo interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), ctx, data, expiration, segmentInfo) +} + +// Delete mocks base method +func (m *MockStore) Delete(ctx context.Context, path storj.Path) error { + ret := m.ctrl.Call(m, "Delete", ctx, path) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockStoreMockRecorder) Delete(ctx, path interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), ctx, path) } // List mocks base method -func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 string, arg4 bool, arg5 int, arg6 uint32) ([]ListItem, bool, error) { - ret := m.ctrl.Call(m, "List", arg0, arg1, arg2, arg3, arg4, arg5, arg6) +func (m *MockStore) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) ([]ListItem, bool, error) { + ret := m.ctrl.Call(m, "List", ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags) ret0, _ := ret[0].([]ListItem) ret1, _ := ret[1].(bool) ret2, _ := ret[2].(error) @@ -75,44 +112,6 @@ func (m *MockStore) List(arg0 context.Context, arg1, arg2, arg3 string, arg4 boo } // List indicates an expected call of List -func (mr *MockStoreMockRecorder) List(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), arg0, arg1, arg2, arg3, arg4, arg5, arg6) -} - -// Meta mocks base method -func (m *MockStore) Meta(arg0 context.Context, arg1 string) (Meta, error) { - ret := m.ctrl.Call(m, "Meta", arg0, arg1) - ret0, _ := ret[0].(Meta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Meta indicates an expected call of Meta -func (mr *MockStoreMockRecorder) Meta(arg0, arg1 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Meta", reflect.TypeOf((*MockStore)(nil).Meta), arg0, arg1) -} - -// Put mocks base method -func (m *MockStore) Put(arg0 context.Context, arg1 io.Reader, arg2 time.Time, arg3 func() (string, []byte, error)) (Meta, error) { - ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(Meta) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Put indicates an expected call of Put -func (mr *MockStoreMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), arg0, arg1, arg2, arg3) -} - -// Repair mocks base method -func (m *MockStore) Repair(arg0 context.Context, arg1 storj.Path, arg2 []int) error { - ret := m.ctrl.Call(m, "Repair", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 -} - -// Repair indicates an expected call of Repair -func (mr *MockStoreMockRecorder) Repair(arg0, arg1, arg2 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockStore)(nil).Repair), arg0, arg1, arg2) +func (mr *MockStoreMockRecorder) List(ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockStore)(nil).List), ctx, prefix, startAfter, endBefore, recursive, limit, metaFlags) } diff --git a/pkg/storage/segments/store.go b/pkg/storage/segments/store.go index 81d1396ea..6921b8f31 100644 --- a/pkg/storage/segments/store.go +++ b/pkg/storage/segments/store.go @@ -50,7 +50,7 @@ type ListItem struct { type Store interface { Meta(ctx context.Context, path storj.Path) (meta Meta, err error) Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error) - Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error) + Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (meta Meta, err error) Delete(ctx context.Context, path storj.Path) (err error) List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error) @@ -277,7 +277,7 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) } // Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes -func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error) { +func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) { defer mon.Task()(&ctx)(&err) //Read the segment's pointer's info from the PointerDB @@ -313,7 +313,7 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [ //remove all lost pieces from the list to have only healthy pieces for i := range lostPieces { - if j == lostPieces[i] { + if j == int(lostPieces[i]) { totalNilNodes++ } } diff --git a/pkg/storage/segments/store_test.go b/pkg/storage/segments/store_test.go index 25cf42fc8..aea83b107 100644 --- a/pkg/storage/segments/store_test.go +++ b/pkg/storage/segments/store_test.go @@ -242,14 +242,14 @@ func TestSegmentStoreRepairRemote(t *testing.T) { pointerType pb.Pointer_DataType size int64 metadata []byte - lostPieces []int + lostPieces []int32 newNodes []*pb.Node data string strsize, offset, length int64 substr string meta Meta }{ - {"path/1/2/3", 10, pb.Pointer_REMOTE, int64(3), []byte("metadata"), []int{}, []*pb.Node{{Id: "1"}, {Id: "2"}}, "abcdefghijkl", 12, 1, 4, "bcde", Meta{}}, + {"path/1/2/3", 10, pb.Pointer_REMOTE, int64(3), []byte("metadata"), []int32{}, []*pb.Node{{Id: "1"}, {Id: "2"}}, "abcdefghijkl", 12, 1, 4, "bcde", Meta{}}, } { mockOC := mock_overlay.NewMockClient(ctrl) mockEC := mock_ecclient.NewMockClient(ctrl)