Randomize node selection during GETs (#827)

This commit is contained in:
Kaloyan Raev 2018-12-11 18:05:14 +02:00 committed by GitHub
parent 175e25f93a
commit 252da15f0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 79 deletions

View File

@ -62,8 +62,13 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
defer mon.Task()(&ctx)(&err)
if len(nodes) != rs.TotalCount() {
return nil, Error.New("number of nodes (%d) do not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount())
return nil, Error.New("size of nodes slice (%d) does not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount())
}
if nonNilCount(nodes) < rs.RepairThreshold() {
return nil, Error.New("number of non-nil nodes (%d) is less than repair threshold (%d) of erasure scheme", nonNilCount(nodes), rs.RepairThreshold())
}
if !unique(nodes) {
return nil, Error.New("duplicated nodes are not allowed")
}
@ -149,9 +154,12 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu
pieceID psclient.PieceID, size int64, pba *pb.PayerBandwidthAllocation, authorization *pb.SignedMessage) (rr ranger.Ranger, err error) {
defer mon.Task()(&ctx)(&err)
validNodeCount := validCount(nodes)
if validNodeCount < es.RequiredCount() {
return nil, Error.New("number of nodes (%v) do not match minimum required count (%v) of erasure scheme", len(nodes), es.RequiredCount())
if len(nodes) != es.TotalCount() {
return nil, Error.New("size of nodes slice (%d) does not match total count (%d) of erasure scheme", len(nodes), es.TotalCount())
}
if nonNilCount(nodes) < es.RequiredCount() {
return nil, Error.New("number of non-nil nodes (%d) is less than required count (%d) of erasure scheme", nonNilCount(nodes), es.RequiredCount())
}
paddedSize := calcPadded(size, es.StripeSize())
@ -327,7 +335,7 @@ func (lr *lazyPieceRanger) Range(ctx context.Context, offset, length int64) (io.
return lr.ranger.Range(ctx, offset, length)
}
func validCount(nodes []*pb.Node) int {
func nonNilCount(nodes []*pb.Node) int {
total := 0
for _, node := range nodes {
if node != nil {

View File

@ -89,7 +89,7 @@ TestLoop:
errString string
}{
{[]*pb.Node{}, 0, 0, true, []error{},
fmt.Sprintf("ecclient error: number of nodes (0) do not match total count (%v) of erasure scheme", n)},
fmt.Sprintf("ecclient error: size of nodes slice (0) does not match total count (%v) of erasure scheme", n)},
{[]*pb.Node{node0, node1, node2, node3}, 0, -1, true,
[]error{nil, nil, nil, nil},
"eestream error: negative max buffer memory"},
@ -109,7 +109,7 @@ TestLoop:
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
"ecclient error: successful puts (1) less than repair threshold (2)"},
{[]*pb.Node{nil, nil, node2, node3}, 0, 0, false,
{[]*pb.Node{nil, nil, node2, node3}, 2, 0, false,
[]error{nil, nil, nil, nil}, ""},
} {
errTag := fmt.Sprintf("Test case #%d", i)
@ -201,7 +201,7 @@ TestLoop:
errString string
}{
{[]*pb.Node{}, 0, []error{}, "ecclient error: " +
fmt.Sprintf("number of nodes (0) do not match minimum required count (%v) of erasure scheme", k)},
fmt.Sprintf("size of nodes slice (0) does not match total count (%v) of erasure scheme", n)},
{[]*pb.Node{node0, node1, node2, node3}, -1,
[]error{nil, nil, nil, nil},
"eestream error: negative max buffer memory"},

View File

@ -6,6 +6,7 @@ package segments
import (
"context"
"io"
"math/rand"
"time"
"github.com/golang/protobuf/ptypes"
@ -199,16 +200,16 @@ func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Rang
return nil, Meta{}, Error.Wrap(err)
}
if pr.GetType() == pb.Pointer_REMOTE {
switch pr.GetType() {
case pb.Pointer_INLINE:
rr = ranger.ByteRanger(pr.InlineSegment)
case pb.Pointer_REMOTE:
seg := pr.GetRemote()
pid := psclient.PieceID(seg.GetPieceId())
// fall back if nodes are not available
if nodes == nil {
nodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
nodes, err = s.lookupAndAlignNodes(ctx, nodes, seg)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
es, err := makeErasureScheme(pr.GetRemote().GetRedundancy())
@ -216,27 +217,30 @@ func (s *segmentStore) Get(ctx context.Context, path storj.Path) (rr ranger.Rang
return nil, Meta{}, err
}
// calculate how many minimum nodes needed based on t = k + (n-o)k/o
rs := pr.GetRemote().GetRedundancy()
needed := rs.GetMinReq() + ((rs.GetTotal()-rs.GetSuccessThreshold())*rs.GetMinReq())/rs.GetSuccessThreshold()
needed := calcNeededNodes(pr.GetRemote().GetRedundancy())
selected := make([]*pb.Node, es.TotalCount())
for i, v := range nodes {
if v != nil {
needed--
if needed <= 0 {
nodes = nodes[:i+1]
break
}
for _, i := range rand.Perm(len(nodes)) {
node := nodes[i]
if node == nil {
continue
}
selected[i] = node
needed--
if needed <= 0 {
break
}
}
authorization := s.pdb.SignedMessage()
rr, err = s.ec.Get(ctx, nodes, es, pid, pr.GetSegmentSize(), pba, authorization)
rr, err = s.ec.Get(ctx, selected, es, pid, pr.GetSegmentSize(), pba, authorization)
if err != nil {
return nil, Meta{}, Error.Wrap(err)
}
} else {
rr = ranger.ByteRanger(pr.InlineSegment)
default:
return nil, Meta{}, Error.New("unsupported pointer type: %d", pr.GetType())
}
return rr, convertMeta(pr), nil
@ -251,6 +255,28 @@ func makeErasureScheme(rs *pb.RedundancyScheme) (eestream.ErasureScheme, error)
return es, nil
}
// calcNeededNodes calculate how many minimum nodes are needed for download,
// based on t = k + (n-o)k/o
func calcNeededNodes(rs *pb.RedundancyScheme) int32 {
extra := int32(1)
if rs.GetSuccessThreshold() > 0 {
extra = ((rs.GetTotal() - rs.GetSuccessThreshold()) * rs.GetMinReq()) / rs.GetSuccessThreshold()
if extra == 0 {
// ensure there is at least one extra node, so we can have error detection/correction
extra = 1
}
}
needed := rs.GetMinReq() + extra
if needed > rs.GetTotal() {
needed = rs.GetTotal()
}
return needed
}
// Delete tells piece stores to delete a segment and deletes pointer from pointerdb
func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error) {
defer mon.Task()(&ctx)(&err)
@ -264,12 +290,9 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error)
seg := pr.GetRemote()
pid := psclient.PieceID(seg.PieceId)
// fall back if nodes are not available
if nodes == nil {
nodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return Error.Wrap(err)
}
nodes, err = s.lookupAndAlignNodes(ctx, nodes, seg)
if err != nil {
return Error.Wrap(err)
}
authorization := s.pdb.SignedMessage()
@ -288,7 +311,7 @@ func (s *segmentStore) Delete(ctx context.Context, path storj.Path) (err error)
func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces []int32) (err error) {
defer mon.Task()(&ctx)(&err)
//Read the segment's pointer's info from the PointerDB
// Read the segment's pointer's info from the PointerDB
pr, originalNodes, pba, err := s.pdb.Get(ctx, path)
if err != nil {
return Error.Wrap(err)
@ -301,24 +324,20 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
seg := pr.GetRemote()
pid := psclient.PieceID(seg.GetPieceId())
// fall back if nodes are not available
if originalNodes == nil {
// Get the list of remote pieces from the pointer
originalNodes, err = s.lookupNodes(ctx, seg)
if err != nil {
return Error.Wrap(err)
}
originalNodes, err = s.lookupAndAlignNodes(ctx, originalNodes, seg)
if err != nil {
return Error.Wrap(err)
}
// get the nodes list that needs to be excluded
// Get the nodes list that needs to be excluded
var excludeNodeIDs storj.NodeIDList
// count the number of nil nodes thats needs to be repaired
// Count the number of nil nodes thats needs to be repaired
totalNilNodes := 0
healthyNodes := make([]*pb.Node, len(originalNodes))
// populate healthyNodes with all nodes from originalNodes except those correlating to indices in lostPieces
// Populate healthyNodes with all nodes from originalNodes except those correlating to indices in lostPieces
for i, v := range originalNodes {
if v == nil {
totalNilNodes++
@ -335,7 +354,7 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
}
}
//Request Overlay for n-h new storage nodes
// Request Overlay for n-h new storage nodes
op := overlay.Options{Amount: totalNilNodes, Space: 0, Excluded: excludeNodeIDs}
newNodes, err := s.oc.Choose(ctx, op)
if err != nil {
@ -348,23 +367,23 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
totalRepairCount := len(newNodes)
//make a repair nodes list just with new unique ids
repairNodesList := make([]*pb.Node, len(healthyNodes))
for j, vr := range healthyNodes {
// check that totalRepairCount is non-negative
// Make a repair nodes list just with new unique ids
repairNodes := make([]*pb.Node, len(healthyNodes))
for i, vr := range healthyNodes {
// Check that totalRepairCount is non-negative
if totalRepairCount < 0 {
return Error.New("Total repair count (%d) less than zero", totalRepairCount)
}
// find the nil in the node list
// Find the nil nodes in the healthyNodes list
if vr == nil {
// replace the location with the newNode Node info
// Assign the item in repairNodes list with an item from the newNode list
totalRepairCount--
repairNodesList[j] = newNodes[totalRepairCount]
repairNodes[i] = newNodes[totalRepairCount]
}
}
// check that all nil nodes have a replacement prepared
// Check that all nil nodes have a replacement prepared
if totalRepairCount != 0 {
return Error.New("Failed to replace all nil nodes (%d). (%d) new nodes not inserted", len(newNodes), totalRepairCount)
}
@ -376,28 +395,25 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
signedMessage := s.pdb.SignedMessage()
// download the segment using the nodes just with healthy nodes
// Download the segment using just the healthyNodes
rr, err := s.ec.Get(ctx, healthyNodes, es, pid, pr.GetSegmentSize(), pba, signedMessage)
if err != nil {
return Error.Wrap(err)
}
// get io.Reader from ranger
r, err := rr.Range(ctx, 0, rr.Size())
if err != nil {
return err
return Error.Wrap(err)
}
defer utils.LogClose(r)
// puts file to ecclient
exp := pr.GetExpirationDate()
successfulNodes, err := s.ec.Put(ctx, repairNodesList, s.rs, pid, r, time.Unix(exp.GetSeconds(), 0), pba, signedMessage)
// Upload the repaired pieces to the repairNodes
successfulNodes, err := s.ec.Put(ctx, repairNodes, s.rs, pid, r, convertTime(pr.GetExpirationDate()), pba, signedMessage)
if err != nil {
return Error.Wrap(err)
}
// merge the successful nodes list into the healthy nodes list
// Merge the successful nodes list into the healthy nodes list
for i, v := range healthyNodes {
if v == nil {
// copy the successfuNode info
@ -406,7 +422,7 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
}
metadata := pr.GetMetadata()
pointer, err := s.makeRemotePointer(healthyNodes, pid, rr.Size(), exp, metadata)
pointer, err := s.makeRemotePointer(healthyNodes, pid, rr.Size(), pr.GetExpirationDate(), metadata)
if err != nil {
return err
}
@ -415,25 +431,32 @@ func (s *segmentStore) Repair(ctx context.Context, path storj.Path, lostPieces [
return s.pdb.Put(ctx, path, pointer)
}
// lookupNodes calls Lookup to get node addresses from the overlay
func (s *segmentStore) lookupNodes(ctx context.Context, seg *pb.RemoteSegment) (nodes []*pb.Node, err error) {
// Get list of all nodes IDs storing a piece from the segment
var nodeIds storj.NodeIDList
for _, p := range seg.RemotePieces {
nodeIds = append(nodeIds, p.NodeId)
// lookupNodes, if necessary, calls Lookup to get node addresses from the overlay.
// It also realigns the nodes to an indexed list of nodes based on the piece number.
// Missing pieces are represented by a nil node.
func (s *segmentStore) lookupAndAlignNodes(ctx context.Context, nodes []*pb.Node, seg *pb.RemoteSegment) (result []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if nodes == nil {
// Get list of all nodes IDs storing a piece from the segment
var nodeIds storj.NodeIDList
for _, p := range seg.RemotePieces {
nodeIds = append(nodeIds, p.NodeId)
}
// Lookup the node info from node IDs
nodes, err = s.oc.BulkLookup(ctx, nodeIds)
if err != nil {
return nil, Error.Wrap(err)
}
}
// Lookup the node info from node IDs
n, err := s.oc.BulkLookup(ctx, nodeIds)
if err != nil {
return nil, Error.Wrap(err)
}
// Create an indexed list of nodes based on the piece number.
// Missing pieces are represented by a nil node.
nodes = make([]*pb.Node, seg.GetRedundancy().GetTotal())
// Realign the nodes
result = make([]*pb.Node, seg.GetRedundancy().GetTotal())
for i, p := range seg.GetRemotePieces() {
nodes[p.PieceNum] = n[i]
result[p.PieceNum] = nodes[i]
}
return nodes, nil
return result, nil
}
// List retrieves paths to segments and their metadata stored in the pointerdb

View File

@ -5,6 +5,7 @@ package segments
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -555,3 +556,30 @@ func TestSegmentStoreList(t *testing.T) {
assert.NoError(t, err)
}
}
func TestCalcNeededNodes(t *testing.T) {
for i, tt := range []struct {
k, m, o, n int32
needed int32
}{
{k: 0, m: 0, o: 0, n: 0, needed: 0},
{k: 1, m: 1, o: 1, n: 1, needed: 1},
{k: 1, m: 1, o: 2, n: 2, needed: 2},
{k: 1, m: 2, o: 2, n: 2, needed: 2},
{k: 2, m: 3, o: 4, n: 4, needed: 3},
{k: 2, m: 4, o: 6, n: 8, needed: 3},
{k: 20, m: 30, o: 40, n: 50, needed: 25},
{k: 29, m: 35, o: 80, n: 95, needed: 34},
} {
tag := fmt.Sprintf("#%d. %+v", i, tt)
rs := pb.RedundancyScheme{
MinReq: tt.k,
RepairThreshold: tt.m,
SuccessThreshold: tt.o,
Total: tt.n,
}
assert.Equal(t, tt.needed, calcNeededNodes(&rs), tag)
}
}