Implements Data Repair Interfaces in Stream Store (#455)
* add filter field into OverlayOptions message * chooseFiltered method, add excluded parameter in populate method * change excluded type to []dht.NodeID in ChooseFiltered, change comment * change name filter to excluded_nodes in proto * implement helper function contains * delete ChooseFiltered and add its functionality into Choose method to keep original author's history, add excluded argument into Choose calls * regenerate mock_client.go * regenerate protobuf * adding the repair() func * update test case to use new IDFromString function * modified the repair() and updated streams mock * modified the repair() and updated streams mock * Options struct * adding the repair() func * modified the repair() and updated streams mock * modified the repair() and updated streams mock * integrating the segment repair() * development repair with hack working * repair segment changes * integrated with mini hacks and rigged up test case with dev debug info * integrated with ec and overlay * added repair test case * made the getNewUniqueNodes() to recursively go thru choose() to find get the required number of unique nodes * cleaned up code
This commit is contained in:
parent
d2933840d2
commit
7ce94627f1
@ -13,6 +13,7 @@ import (
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
ranger "storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
// MockStore is a mock of Store interface
|
||||
@ -103,3 +104,15 @@ func (m *MockStore) Put(arg0 context.Context, arg1 io.Reader, arg2 time.Time, ar
|
||||
func (mr *MockStoreMockRecorder) Put(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockStore)(nil).Put), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// Repair mocks base method
|
||||
func (m *MockStore) Repair(arg0 context.Context, arg1 storj.Path, arg2 []int) error {
|
||||
ret := m.ctrl.Call(m, "Repair", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Repair indicates an expected call of Repair
|
||||
func (mr *MockStoreMockRecorder) Repair(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockStore)(nil).Repair), arg0, arg1, arg2)
|
||||
}
|
||||
|
@ -22,8 +22,9 @@ import (
|
||||
"storj.io/storj/pkg/piecestore/rpc/client"
|
||||
"storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
"storj.io/storj/pkg/storage/ec"
|
||||
ecclient "storj.io/storj/pkg/storage/ec"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/pkg/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -49,6 +50,7 @@ type ListItem struct {
|
||||
type Store interface {
|
||||
Meta(ctx context.Context, path storj.Path) (meta Meta, err error)
|
||||
Get(ctx context.Context, path storj.Path) (rr ranger.Ranger, meta Meta, err error)
|
||||
Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error)
|
||||
Put(ctx context.Context, data io.Reader, expiration time.Time, segmentInfo func() (storj.Path, []byte, error)) (meta Meta, err error)
|
||||
Delete(ctx context.Context, path storj.Path) (err error)
|
||||
List(ctx context.Context, prefix, startAfter, endBefore storj.Path, recursive bool, limit int, metaFlags uint32) (items []ListItem, more bool, err error)
|
||||
@ -204,7 +206,7 @@ func (s *segmentStore) Get(ctx context.Context, path storj.Path) (
|
||||
|
||||
if pr.GetType() == pb.Pointer_REMOTE {
|
||||
seg := pr.GetRemote()
|
||||
pid := client.PieceID(seg.PieceId)
|
||||
pid := client.PieceID(seg.GetPieceId())
|
||||
nodes, err := s.lookupNodes(ctx, seg)
|
||||
if err != nil {
|
||||
return nil, Meta{}, Error.Wrap(err)
|
||||
@ -272,6 +274,119 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error)
|
||||
return s.pdb.Delete(ctx, path)
|
||||
}
|
||||
|
||||
// Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes
|
||||
func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
//Read the segment's pointer's info from the PointerDB
|
||||
pr, err := s.pdb.Get(ctx, path)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
if pr.GetType() != pb.Pointer_REMOTE {
|
||||
return Error.New("Cannot repair inline segment %s", client.PieceID(pr.GetInlineSegment()))
|
||||
}
|
||||
|
||||
seg := pr.GetRemote()
|
||||
pid := client.PieceID(seg.GetPieceId())
|
||||
|
||||
// Get the list of remote pieces from the pointer
|
||||
originalNodes, err := s.lookupNodes(ctx, seg)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// get the nodes list that needs to be excluded
|
||||
var excludeNodeIDs []dht.NodeID
|
||||
|
||||
// count the number of nil nodes thats needs to be repaired
|
||||
totalNilNodes := 0
|
||||
for j, v := range originalNodes {
|
||||
if v != nil {
|
||||
excludeNodeIDs = append(excludeNodeIDs, node.IDFromString(v.GetId()))
|
||||
} else {
|
||||
totalNilNodes++
|
||||
}
|
||||
|
||||
//remove all lost pieces from the list to have only healthy pieces
|
||||
for i := range lostPieces {
|
||||
if j == lostPieces[i] {
|
||||
totalNilNodes++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Request Overlay for n-h new storage nodes
|
||||
op := overlay.Options{Amount: totalNilNodes, Space: 0, Excluded: excludeNodeIDs}
|
||||
newNodes, err := s.oc.Choose(ctx, op)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalRepairCount := len(newNodes)
|
||||
|
||||
//make a repair nodes list just with new unique ids
|
||||
repairNodesList := make([]*pb.Node, len(originalNodes))
|
||||
for j, vr := range originalNodes {
|
||||
// find the nil in the original node list
|
||||
if vr == nil {
|
||||
// replace the location with the newNode Node info
|
||||
totalRepairCount--
|
||||
repairNodesList[j] = newNodes[totalRepairCount]
|
||||
}
|
||||
}
|
||||
|
||||
es, err := makeErasureScheme(pr.GetRemote().GetRedundancy())
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
signedMessage, err := s.pdb.SignedMessage()
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
pba := s.pdb.PayerBandwidthAllocation()
|
||||
|
||||
// download the segment using the nodes just with healthy nodes
|
||||
rr, err := s.ec.Get(ctx, originalNodes, es, pid, pr.GetSize(), pba, signedMessage)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// get io.Reader from ranger
|
||||
r, err := rr.Range(ctx, 0, rr.Size())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer utils.LogClose(r)
|
||||
|
||||
// puts file to ecclient
|
||||
exp := pr.GetExpirationDate()
|
||||
|
||||
successfulNodes, err := s.ec.Put(ctx, repairNodesList, s.rs, pid, r, time.Unix(exp.GetSeconds(), 0), pba, signedMessage)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
// merge the successful nodes list into the originalNodes list
|
||||
for i, v := range originalNodes {
|
||||
if v == nil {
|
||||
// copy the successfuNode info
|
||||
originalNodes[i] = successfulNodes[i]
|
||||
}
|
||||
}
|
||||
|
||||
metadata := pr.GetMetadata()
|
||||
pointer, err := s.makeRemotePointer(originalNodes, pid, rr.Size(), exp, metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the segment info in the pointerDB
|
||||
return s.pdb.Put(ctx, path, pointer)
|
||||
}
|
||||
|
||||
// lookupNodes calls Lookup to get node addresses from the overlay
|
||||
func (s *segmentStore) lookupNodes(ctx context.Context, seg *pb.RemoteSegment) (nodes []*pb.Node, err error) {
|
||||
// Get list of all nodes IDs storing a piece from the segment
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"storj.io/storj/pkg/pb"
|
||||
pdb "storj.io/storj/pkg/pointerdb/pdbclient"
|
||||
mock_pointerdb "storj.io/storj/pkg/pointerdb/pdbclient/mocks"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
mock_ecclient "storj.io/storj/pkg/storage/ec/mocks"
|
||||
"storj.io/storj/pkg/storage/meta"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -227,6 +228,85 @@ func TestSegmentStoreGetInline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSegmentStoreRepairRemote(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
ti := time.Unix(0, 0).UTC()
|
||||
someTime, err := ptypes.TimestampProto(ti)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for _, tt := range []struct {
|
||||
pathInput string
|
||||
thresholdSize int
|
||||
pointerType pb.Pointer_DataType
|
||||
size int64
|
||||
metadata []byte
|
||||
lostPieces []int
|
||||
newNodes []*pb.Node
|
||||
data string
|
||||
strsize, offset, length int64
|
||||
substr string
|
||||
meta Meta
|
||||
}{
|
||||
{"path/1/2/3", 10, pb.Pointer_REMOTE, int64(3), []byte("metadata"), []int{}, []*pb.Node{{Id: "1"}, {Id: "2"}}, "abcdefghijkl", 12, 1, 4, "bcde", Meta{}},
|
||||
} {
|
||||
mockOC := mock_overlay.NewMockClient(ctrl)
|
||||
mockEC := mock_ecclient.NewMockClient(ctrl)
|
||||
mockPDB := mock_pointerdb.NewMockClient(ctrl)
|
||||
mockES := mock_eestream.NewMockErasureScheme(ctrl)
|
||||
rs := eestream.RedundancyStrategy{
|
||||
ErasureScheme: mockES,
|
||||
}
|
||||
|
||||
ss := segmentStore{mockOC, mockEC, mockPDB, rs, tt.thresholdSize}
|
||||
assert.NotNil(t, ss)
|
||||
|
||||
calls := []*gomock.Call{
|
||||
mockPDB.EXPECT().Get(
|
||||
gomock.Any(), gomock.Any(),
|
||||
).Return(&pb.Pointer{
|
||||
Type: tt.pointerType,
|
||||
Remote: &pb.RemoteSegment{
|
||||
Redundancy: &pb.RedundancyScheme{
|
||||
Type: pb.RedundancyScheme_RS,
|
||||
MinReq: 1,
|
||||
Total: 2,
|
||||
RepairThreshold: 1,
|
||||
SuccessThreshold: 2,
|
||||
},
|
||||
PieceId: "here's my piece id",
|
||||
RemotePieces: []*pb.RemotePiece{},
|
||||
},
|
||||
CreationDate: someTime,
|
||||
ExpirationDate: someTime,
|
||||
Size: tt.size,
|
||||
Metadata: tt.metadata,
|
||||
}, nil),
|
||||
mockOC.EXPECT().BulkLookup(gomock.Any(), gomock.Any()),
|
||||
mockOC.EXPECT().Choose(gomock.Any(), gomock.Any()).Return(tt.newNodes, nil),
|
||||
mockPDB.EXPECT().SignedMessage(),
|
||||
mockPDB.EXPECT().PayerBandwidthAllocation(),
|
||||
mockEC.EXPECT().Get(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).Return(ranger.ByteRanger([]byte(tt.data)), nil),
|
||||
mockEC.EXPECT().Put(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).Return(tt.newNodes, nil),
|
||||
mockES.EXPECT().RequiredCount().Return(1),
|
||||
mockES.EXPECT().TotalCount().Return(1),
|
||||
mockES.EXPECT().ErasureShareSize().Return(1),
|
||||
mockPDB.EXPECT().Put(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).Return(nil),
|
||||
}
|
||||
gomock.InOrder(calls...)
|
||||
|
||||
err := ss.Repair(ctx, tt.pathInput, tt.lostPieces)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSegmentStoreGetRemote(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
Loading…
Reference in New Issue
Block a user