Pointer stores only nodes with successfully uploaded pieces (#390)
This commit is contained in:
parent
2019803c5e
commit
bc0f697929
@ -98,13 +98,19 @@ func (dr *decodedReader) Close() error {
|
||||
dr.cancel()
|
||||
// avoid double close of readers
|
||||
dr.close.Do(func() {
|
||||
errs := make([]error, len(dr.readers)+1)
|
||||
var errs []error
|
||||
// close the readers
|
||||
for i, r := range dr.readers {
|
||||
errs[i] = r.Close()
|
||||
for _, r := range dr.readers {
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
// close the stripe reader
|
||||
errs[len(dr.readers)] = dr.stripeReader.Close()
|
||||
err := dr.stripeReader.Close()
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
dr.closeErr = utils.CombineErrors(errs...)
|
||||
})
|
||||
return dr.closeErr
|
||||
|
@ -41,13 +41,10 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip
|
||||
errmap: make(map[int]error, es.TotalCount()),
|
||||
}
|
||||
|
||||
for i := 0; i < es.TotalCount(); i++ {
|
||||
for i := range rs {
|
||||
r.inbufs[i] = make([]byte, es.EncodedBlockSize())
|
||||
r.bufs[i] = NewPieceBuffer(make([]byte, bufSize), es.EncodedBlockSize(), r.cond)
|
||||
}
|
||||
|
||||
// Kick off a goroutine each reader to be copied into a PieceBuffer.
|
||||
for i, buf := range r.bufs {
|
||||
// Kick off a goroutine each reader to be copied into a PieceBuffer.
|
||||
go func(r io.Reader, buf *PieceBuffer) {
|
||||
_, err := io.Copy(buf, r)
|
||||
if err != nil {
|
||||
@ -55,7 +52,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip
|
||||
return
|
||||
}
|
||||
buf.SetError(io.EOF)
|
||||
}(rs[i], buf)
|
||||
}(rs[i], r.bufs[i])
|
||||
}
|
||||
|
||||
return r
|
||||
@ -112,12 +109,12 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) {
|
||||
// buffers without blocking. The return value n is the number of erasure shares
|
||||
// read.
|
||||
func (r *StripeReader) readAvailableShares(num int64) (n int) {
|
||||
for i := 0; i < len(r.bufs); i++ {
|
||||
for i, buf := range r.bufs {
|
||||
if r.inmap[i] != nil || r.errmap[i] != nil {
|
||||
continue
|
||||
}
|
||||
if r.bufs[i].HasShare(num) {
|
||||
err := r.bufs[i].ReadShare(num, r.inbufs[i])
|
||||
if buf.HasShare(num) {
|
||||
err := buf.ReadShare(num, r.inbufs[i])
|
||||
if err != nil {
|
||||
r.errmap[i] = err
|
||||
} else {
|
||||
|
@ -129,7 +129,10 @@ func (client *Client) Put(ctx context.Context, id PieceID, data io.Reader, ttl t
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
_ = writer.Close()
|
||||
zap.S().Infof("Node cut from upload due to slow connection. Deleting piece %s...", id)
|
||||
return client.Delete(ctx, id)
|
||||
deleteErr := client.Delete(ctx, id)
|
||||
if deleteErr != nil {
|
||||
return deleteErr
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -27,7 +27,7 @@ var mon = monkit.Package()
|
||||
// Client defines an interface for storing erasure coded data to piece store nodes
|
||||
type Client interface {
|
||||
Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy,
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) error
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, err error)
|
||||
Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme,
|
||||
pieceID client.PieceID, size int64) (ranger.Ranger, error)
|
||||
Delete(ctx context.Context, nodes []*pb.Node, pieceID client.PieceID) error
|
||||
@ -44,6 +44,7 @@ type defaultDialer struct {
|
||||
|
||||
func (d *defaultDialer) dial(ctx context.Context, node *pb.Node) (ps client.PSClient, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
c, err := d.t.DialNode(ctx, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -64,55 +65,72 @@ func NewClient(identity *provider.FullIdentity, t transport.Client, mbm int) Cli
|
||||
}
|
||||
|
||||
func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy,
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) (err error) {
|
||||
pieceID client.PieceID, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(nodes) != rs.TotalCount() {
|
||||
return Error.New("number of nodes (%d) do not match total count (%d) of erasure scheme",
|
||||
len(nodes), rs.TotalCount())
|
||||
return nil, Error.New("number of nodes (%d) do not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount())
|
||||
}
|
||||
if !unique(nodes) {
|
||||
return Error.New("duplicated nodes are not allowed")
|
||||
return nil, Error.New("duplicated nodes are not allowed")
|
||||
}
|
||||
|
||||
padded := eestream.PadReader(ioutil.NopCloser(data), rs.DecodedBlockSize())
|
||||
readers, err := eestream.EncodeReader(ctx, padded, rs, ec.mbm)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
errs := make(chan error, len(readers))
|
||||
|
||||
type info struct {
|
||||
i int
|
||||
err error
|
||||
}
|
||||
infos := make(chan info, len(nodes))
|
||||
|
||||
for i, n := range nodes {
|
||||
go func(i int, n *pb.Node) {
|
||||
derivedPieceID, err := pieceID.Derive([]byte(n.GetId()))
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err)
|
||||
errs <- err
|
||||
infos <- info{i: i, err: err}
|
||||
return
|
||||
}
|
||||
ps, err := ec.d.dial(ctx, n)
|
||||
if err != nil {
|
||||
zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v",
|
||||
pieceID, derivedPieceID, n.GetId(), err)
|
||||
errs <- err
|
||||
infos <- info{i: i, err: err}
|
||||
return
|
||||
}
|
||||
err = ps.Put(ctx, derivedPieceID, readers[i], expiration, &pb.PayerBandwidthAllocation{})
|
||||
// normally the bellow call should be deferred, but doing so fails
|
||||
// randomly the unit tests
|
||||
utils.LogClose(ps)
|
||||
if err != nil {
|
||||
// io.ErrUnexpectedEOF means the piece upload was interrupted due to slow connection.
|
||||
// No error logging for this case.
|
||||
if err != nil && err != io.ErrUnexpectedEOF {
|
||||
zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v",
|
||||
pieceID, derivedPieceID, n.GetId(), err)
|
||||
}
|
||||
errs <- err
|
||||
infos <- info{i: i, err: err}
|
||||
}(i, n)
|
||||
}
|
||||
allerrs := collectErrors(errs, len(readers))
|
||||
sc := len(readers) - len(allerrs)
|
||||
if sc < rs.RepairThreshold() {
|
||||
return Error.New(
|
||||
"successful puts (%d) less than repair threshold (%d)",
|
||||
sc, rs.RepairThreshold())
|
||||
|
||||
successfulNodes = make([]*pb.Node, len(nodes))
|
||||
var successfulCount int
|
||||
for range nodes {
|
||||
info := <-infos
|
||||
if info.err == nil {
|
||||
successfulNodes[info.i] = nodes[info.i]
|
||||
successfulCount++
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
if successfulCount < rs.RepairThreshold() {
|
||||
return nil, Error.New("successful puts (%d) less than repair threshold (%d)", successfulCount, rs.RepairThreshold())
|
||||
}
|
||||
|
||||
return successfulNodes, nil
|
||||
}
|
||||
|
||||
func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme,
|
||||
@ -122,16 +140,24 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu
|
||||
if len(nodes) != es.TotalCount() {
|
||||
return nil, Error.New("number of nodes (%v) do not match total count (%v) of erasure scheme", len(nodes), es.TotalCount())
|
||||
}
|
||||
|
||||
paddedSize := calcPadded(size, es.DecodedBlockSize())
|
||||
pieceSize := paddedSize / int64(es.RequiredCount())
|
||||
rrs := map[int]ranger.Ranger{}
|
||||
|
||||
type rangerInfo struct {
|
||||
i int
|
||||
rr ranger.Ranger
|
||||
err error
|
||||
}
|
||||
ch := make(chan rangerInfo, len(nodes))
|
||||
|
||||
for i, n := range nodes {
|
||||
if (n == nil) {
|
||||
ch <- rangerInfo{i: i, rr: nil, err: nil}
|
||||
continue
|
||||
}
|
||||
|
||||
go func(i int, n *pb.Node) {
|
||||
derivedPieceID, err := pieceID.Derive([]byte(n.GetId()))
|
||||
if err != nil {
|
||||
@ -151,23 +177,33 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu
|
||||
ch <- rangerInfo{i: i, rr: rr, err: nil}
|
||||
}(i, n)
|
||||
}
|
||||
|
||||
for range nodes {
|
||||
rri := <-ch
|
||||
if rri.err == nil {
|
||||
if rri.err == nil && rri.rr != nil {
|
||||
rrs[rri.i] = rri.rr
|
||||
}
|
||||
}
|
||||
|
||||
rr, err = eestream.Decode(rrs, es, ec.mbm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return eestream.Unpad(rr, int(paddedSize-size))
|
||||
}
|
||||
|
||||
func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID client.PieceID) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
errs := make(chan error, len(nodes))
|
||||
|
||||
for _, n := range nodes {
|
||||
if n == nil {
|
||||
errs <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
go func(n *pb.Node) {
|
||||
derivedPieceID, err := pieceID.Derive([]byte(n.GetId()))
|
||||
if err != nil {
|
||||
@ -193,10 +229,13 @@ func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID client
|
||||
errs <- err
|
||||
}(n)
|
||||
}
|
||||
|
||||
allerrs := collectErrors(errs, len(nodes))
|
||||
|
||||
if len(allerrs) > 0 && len(allerrs) == len(nodes) {
|
||||
return allerrs[0]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -185,12 +185,20 @@ TestLoop:
|
||||
}
|
||||
r := io.LimitReader(rand.Reader, int64(size))
|
||||
ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm}
|
||||
err = ec.Put(ctx, tt.nodes, rs, id, r, ttl)
|
||||
successfulNodes, err := ec.Put(ctx, tt.nodes, rs, id, r, ttl)
|
||||
|
||||
if tt.errString != "" {
|
||||
assert.EqualError(t, err, tt.errString, errTag)
|
||||
} else {
|
||||
assert.NoError(t, err, errTag)
|
||||
assert.Equal(t, len(tt.nodes), len(successfulNodes), errTag)
|
||||
for i := range tt.nodes {
|
||||
if tt.errs[i] != nil {
|
||||
assert.Nil(t, successfulNodes[i], errTag)
|
||||
} else {
|
||||
assert.Equal(t, tt.nodes[i], successfulNodes[i], errTag)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -231,6 +239,8 @@ TestLoop:
|
||||
[]error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed}, ""},
|
||||
{[]*pb.Node{node0, node1, node2, node3}, 0,
|
||||
[]error{ErrDialFailed, ErrOpFailed, ErrOpFailed, ErrDialFailed}, ""},
|
||||
{[]*pb.Node{nil, nil, node2, node3}, 0,
|
||||
[]error{nil, nil, nil, nil}, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
@ -288,6 +298,8 @@ TestLoop:
|
||||
{[]*pb.Node{node0, node1}, []error{nil, ErrOpFailed}, ""},
|
||||
{[]*pb.Node{node0, node1}, []error{ErrDialFailed, ErrDialFailed}, dialFailed},
|
||||
{[]*pb.Node{node0, node1}, []error{ErrOpFailed, ErrOpFailed}, opFailed},
|
||||
{[]*pb.Node{nil, node1}, []error{nil, nil}, ""},
|
||||
{[]*pb.Node{nil, nil}, []error{nil, nil}, ""},
|
||||
} {
|
||||
errTag := fmt.Sprintf("Test case #%d", i)
|
||||
|
||||
@ -300,7 +312,7 @@ TestLoop:
|
||||
|
||||
m := make(map[*pb.Node]client.PSClient, len(tt.nodes))
|
||||
for _, n := range tt.nodes {
|
||||
if errs[n] != ErrDialFailed {
|
||||
if n != nil && errs[n] != ErrDialFailed {
|
||||
derivedID, err := id.Derive([]byte(n.GetId()))
|
||||
if !assert.NoError(t, err, errTag) {
|
||||
continue TestLoop
|
||||
|
@ -11,8 +11,9 @@ import (
|
||||
time "time"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
eestream "storj.io/storj/pkg/eestream"
|
||||
"storj.io/storj/pkg/pb"
|
||||
pb "storj.io/storj/pkg/pb"
|
||||
client "storj.io/storj/pkg/piecestore/rpc/client"
|
||||
ranger "storj.io/storj/pkg/ranger"
|
||||
)
|
||||
@ -66,10 +67,11 @@ func (mr *MockClientMockRecorder) Get(arg0, arg1, arg2, arg3, arg4 interface{})
|
||||
}
|
||||
|
||||
// Put mocks base method
|
||||
func (m *MockClient) Put(arg0 context.Context, arg1 []*pb.Node, arg2 eestream.RedundancyStrategy, arg3 client.PieceID, arg4 io.Reader, arg5 time.Time) error {
|
||||
func (m *MockClient) Put(arg0 context.Context, arg1 []*pb.Node, arg2 eestream.RedundancyStrategy, arg3 client.PieceID, arg4 io.Reader, arg5 time.Time) ([]*pb.Node, error) {
|
||||
ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3, arg4, arg5)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
ret0, _ := ret[0].([]*pb.Node)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Put indicates an expected call of Put
|
||||
|
@ -120,11 +120,11 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader,
|
||||
sizedReader := SizeReader(peekReader)
|
||||
|
||||
// puts file to ecclient
|
||||
err = s.ec.Put(ctx, nodes, s.rs, pieceID, sizedReader, expiration)
|
||||
successfulNodes, err := s.ec.Put(ctx, nodes, s.rs, pieceID, sizedReader, expiration)
|
||||
if err != nil {
|
||||
return Meta{}, Error.Wrap(err)
|
||||
}
|
||||
p, err = s.makeRemotePointer(nodes, pieceID, sizedReader.Size(), exp, metadata)
|
||||
p, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata)
|
||||
if err != nil {
|
||||
return Meta{}, err
|
||||
}
|
||||
@ -149,6 +149,9 @@ func (s *segmentStore) makeRemotePointer(nodes []*pb.Node, pieceID client.PieceI
|
||||
exp *timestamp.Timestamp, metadata []byte) (pointer *pb.Pointer, err error) {
|
||||
var remotePieces []*pb.RemotePiece
|
||||
for i := range nodes {
|
||||
if (nodes[i] == nil) {
|
||||
continue
|
||||
}
|
||||
remotePieces = append(remotePieces, &pb.RemotePiece{
|
||||
PieceNum: int32(i),
|
||||
NodeId: nodes[i].Id,
|
||||
@ -249,15 +252,22 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) (err error)
|
||||
|
||||
// lookupNodes calls Lookup to get node addresses from the overlay
|
||||
func (s *segmentStore) lookupNodes(ctx context.Context, seg *pb.RemoteSegment) (nodes []*pb.Node, err error) {
|
||||
pieces := seg.GetRemotePieces()
|
||||
// Get list of all nodes IDs storing a piece from the segment
|
||||
var nodeIds []dht.NodeID
|
||||
for _, p := range pieces {
|
||||
for _, p := range seg.GetRemotePieces() {
|
||||
nodeIds = append(nodeIds, kademlia.StringToNodeID(p.GetNodeId()))
|
||||
}
|
||||
nodes, err = s.oc.BulkLookup(ctx, nodeIds)
|
||||
// Lookup the node info from node IDs
|
||||
n, err := s.oc.BulkLookup(ctx, nodeIds)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
// Create an indexed list of nodes based on the piece number.
|
||||
// Missing pieces are represented by a nil node.
|
||||
nodes = make([]*pb.Node, seg.GetRedundancy().GetTotal())
|
||||
for i, p := range seg.GetRemotePieces() {
|
||||
nodes[p.PieceNum] = n[i]
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user