handle nil nodes in ec Put (#454)
* handle nil nodes in ec Put * read and discard readers for nil nodes * test 2 nil nodes, unique wont return false with nil nodes * Discard reader data for nil nodes * edit control flow
This commit is contained in:
parent
69b1307bd4
commit
118e9bec64
@ -88,13 +88,15 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
|
||||
infos := make(chan info, len(nodes))
|
||||
|
||||
for i, n := range nodes {
|
||||
|
||||
go func(i int, n *pb.Node) {
|
||||
if n == nil {
|
||||
_, err := io.Copy(ioutil.Discard, readers[i])
|
||||
infos <- info{i: i, err: err}
|
||||
return
|
||||
}
|
||||
|
||||
derivedPieceID, err := pieceID.Derive([]byte(n.GetId()))
|
||||
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
|
||||
infos <- info{i: i, err: err}
|
||||
@ -107,6 +109,7 @@ func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.Redun
|
||||
infos <- info{i: i, err: err}
|
||||
return
|
||||
}
|
||||
|
||||
err = ps.Put(ctx, derivedPieceID, readers[i], expiration, &pb.PayerBandwidthAllocation{})
|
||||
// normally the bellow call should be deferred, but doing so fails
|
||||
// randomly the unit tests
|
||||
@ -280,7 +283,7 @@ func unique(nodes []*pb.Node) bool {
|
||||
// sort the ids and check for identical neighbors
|
||||
sort.Strings(ids)
|
||||
for i := 1; i < len(ids); i++ {
|
||||
if ids[i] == ids[i-1] {
|
||||
if ids[i] != "" && ids[i] == ids[i-1] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -153,6 +154,8 @@ TestLoop:
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 2, 0, false,
|
||||
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed},
|
||||
"ecclient error: successful puts (1) less than repair threshold (2)"},
|
||||
{[]*pb.Node{nil, nil, node2, node3}, 0, 0, false,
|
||||
[]error{nil, nil, nil, nil}, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
@ -166,25 +169,32 @@ TestLoop:
|
||||
|
||||
m := make(map[*pb.Node]client.PSClient, len(tt.nodes))
|
||||
for _, n := range tt.nodes {
|
||||
if !tt.badInput {
|
||||
if n == nil || tt.badInput {
|
||||
continue
|
||||
}
|
||||
derivedID, err := id.Derive([]byte(n.GetId()))
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue TestLoop
|
||||
}
|
||||
ps := NewMockPSClient(ctrl)
|
||||
gomock.InOrder(
|
||||
ps.EXPECT().Put(gomock.Any(), derivedID, gomock.Any(), ttl, gomock.Any()).Return(errs[n]),
|
||||
ps.EXPECT().Put(gomock.Any(), derivedID, gomock.Any(), ttl, gomock.Any()).Return(errs[n]).
|
||||
Do(func(ctx context.Context, id client.PieceID, data io.Reader, ttl time.Time, ba *pb.PayerBandwidthAllocation) {
|
||||
// simulate that the mocked piece store client is reading the data
|
||||
_, err := io.Copy(ioutil.Discard, data)
|
||||
assert.NoError(t, err, errTag)
|
||||
}),
|
||||
ps.EXPECT().Close().Return(nil),
|
||||
)
|
||||
m[n] = ps
|
||||
}
|
||||
}
|
||||
rs, err := eestream.NewRedundancyStrategy(es, tt.min, 0)
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue
|
||||
}
|
||||
r := io.LimitReader(rand.Reader, int64(size))
|
||||
ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm}
|
||||
|
||||
successfulNodes, err := ec.Put(ctx, tt.nodes, rs, id, r, ttl)
|
||||
|
||||
if tt.errString != "" {
|
||||
|
Loading…
Reference in New Issue
Block a user