diff --git a/pkg/eestream/decode.go b/pkg/eestream/decode.go index 3dbc328ae..752c04f92 100644 --- a/pkg/eestream/decode.go +++ b/pkg/eestream/decode.go @@ -98,13 +98,19 @@ func (dr *decodedReader) Close() error { dr.cancel() // avoid double close of readers dr.close.Do(func() { - errs := make([]error, len(dr.readers)+1) + var errs []error // close the readers - for i, r := range dr.readers { - errs[i] = r.Close() + for _, r := range dr.readers { + err := r.Close() + if err != nil { + errs = append(errs, err) + } } // close the stripe reader - errs[len(dr.readers)] = dr.stripeReader.Close() + err := dr.stripeReader.Close() + if err != nil { + errs = append(errs, err) + } dr.closeErr = utils.CombineErrors(errs...) }) return dr.closeErr diff --git a/pkg/eestream/stripe.go b/pkg/eestream/stripe.go index e97f660ac..fa172cde3 100644 --- a/pkg/eestream/stripe.go +++ b/pkg/eestream/stripe.go @@ -41,13 +41,10 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip errmap: make(map[int]error, es.TotalCount()), } - for i := 0; i < es.TotalCount(); i++ { + for i := range rs { r.inbufs[i] = make([]byte, es.EncodedBlockSize()) r.bufs[i] = NewPieceBuffer(make([]byte, bufSize), es.EncodedBlockSize(), r.cond) - } - - // Kick off a goroutine each reader to be copied into a PieceBuffer. - for i, buf := range r.bufs { + // Kick off a goroutine each reader to be copied into a PieceBuffer. go func(r io.Reader, buf *PieceBuffer) { _, err := io.Copy(buf, r) if err != nil { @@ -55,7 +52,7 @@ func NewStripeReader(rs map[int]io.ReadCloser, es ErasureScheme, mbm int) *Strip return } buf.SetError(io.EOF) - }(rs[i], buf) + }(rs[i], r.bufs[i]) } return r @@ -112,12 +109,12 @@ func (r *StripeReader) ReadStripe(num int64, p []byte) ([]byte, error) { // buffers without blocking. The return value n is the number of erasure shares // read. func (r *StripeReader) readAvailableShares(num int64) (n int) { - for i := 0; i < len(r.bufs); i++ { + for i, buf := range r.bufs { if r.inmap[i] != nil || r.errmap[i] != nil { continue } - if r.bufs[i].HasShare(num) { - err := r.bufs[i].ReadShare(num, r.inbufs[i]) + if buf.HasShare(num) { + err := buf.ReadShare(num, r.inbufs[i]) if err != nil { r.errmap[i] = err } else { diff --git a/pkg/piecestore/rpc/client/client.go b/pkg/piecestore/rpc/client/client.go index b5e6022f1..424e97c05 100644 --- a/pkg/piecestore/rpc/client/client.go +++ b/pkg/piecestore/rpc/client/client.go @@ -129,7 +129,10 @@ func (client *Client) Put(ctx context.Context, id PieceID, data io.Reader, ttl t if err == io.ErrUnexpectedEOF { _ = writer.Close() zap.S().Infof("Node cut from upload due to slow connection. Deleting piece %s...", id) - return client.Delete(ctx, id) + deleteErr := client.Delete(ctx, id) + if deleteErr != nil { + return deleteErr + } } if err != nil { return err diff --git a/pkg/storage/ec/client.go b/pkg/storage/ec/client.go index 5f5c299a2..6906e90bf 100644 --- a/pkg/storage/ec/client.go +++ b/pkg/storage/ec/client.go @@ -27,7 +27,7 @@ var mon = monkit.Package() // Client defines an interface for storing erasure coded data to piece store nodes type Client interface { Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, - pieceID client.PieceID, data io.Reader, expiration time.Time) error + pieceID client.PieceID, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, err error) Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme, pieceID client.PieceID, size int64) (ranger.Ranger, error) Delete(ctx context.Context, nodes []*pb.Node, pieceID client.PieceID) error @@ -44,6 +44,7 @@ type defaultDialer struct { func (d *defaultDialer) dial(ctx context.Context, node *pb.Node) (ps client.PSClient, err error) { defer mon.Task()(&ctx)(&err) + c, err := d.t.DialNode(ctx, node) if err != nil { return nil, err @@ -64,55 +65,72 @@ func NewClient(identity *provider.FullIdentity, t transport.Client, mbm int) Cli } func (ec *ecClient) Put(ctx context.Context, nodes []*pb.Node, rs eestream.RedundancyStrategy, - pieceID client.PieceID, data io.Reader, expiration time.Time) (err error) { + pieceID client.PieceID, data io.Reader, expiration time.Time) (successfulNodes []*pb.Node, err error) { defer mon.Task()(&ctx)(&err) + if len(nodes) != rs.TotalCount() { - return Error.New("number of nodes (%d) do not match total count (%d) of erasure scheme", - len(nodes), rs.TotalCount()) + return nil, Error.New("number of nodes (%d) do not match total count (%d) of erasure scheme", len(nodes), rs.TotalCount()) } if !unique(nodes) { - return Error.New("duplicated nodes are not allowed") + return nil, Error.New("duplicated nodes are not allowed") } + padded := eestream.PadReader(ioutil.NopCloser(data), rs.DecodedBlockSize()) readers, err := eestream.EncodeReader(ctx, padded, rs, ec.mbm) if err != nil { - return err + return nil, err } - errs := make(chan error, len(readers)) + + type info struct { + i int + err error + } + infos := make(chan info, len(nodes)) + for i, n := range nodes { go func(i int, n *pb.Node) { derivedPieceID, err := pieceID.Derive([]byte(n.GetId())) if err != nil { zap.S().Errorf("Failed deriving piece id for %s: %v", pieceID, err) - errs <- err + infos <- info{i: i, err: err} return } ps, err := ec.d.dial(ctx, n) if err != nil { zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v", pieceID, derivedPieceID, n.GetId(), err) - errs <- err + infos <- info{i: i, err: err} return } err = ps.Put(ctx, derivedPieceID, readers[i], expiration, &pb.PayerBandwidthAllocation{}) // normally the bellow call should be deferred, but doing so fails // randomly the unit tests utils.LogClose(ps) - if err != nil { + // io.ErrUnexpectedEOF means the piece upload was interrupted due to slow connection. + // No error logging for this case. + if err != nil && err != io.ErrUnexpectedEOF { zap.S().Errorf("Failed putting piece %s -> %s to node %s: %v", pieceID, derivedPieceID, n.GetId(), err) } - errs <- err + infos <- info{i: i, err: err} }(i, n) } - allerrs := collectErrors(errs, len(readers)) - sc := len(readers) - len(allerrs) - if sc < rs.RepairThreshold() { - return Error.New( - "successful puts (%d) less than repair threshold (%d)", - sc, rs.RepairThreshold()) + + successfulNodes = make([]*pb.Node, len(nodes)) + var successfulCount int + for range nodes { + info := <-infos + if info.err == nil { + successfulNodes[info.i] = nodes[info.i] + successfulCount++ + } } - return nil + + if successfulCount < rs.RepairThreshold() { + return nil, Error.New("successful puts (%d) less than repair threshold (%d)", successfulCount, rs.RepairThreshold()) + } + + return successfulNodes, nil } func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.ErasureScheme, @@ -122,16 +140,24 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu if len(nodes) != es.TotalCount() { return nil, Error.New("number of nodes (%v) do not match total count (%v) of erasure scheme", len(nodes), es.TotalCount()) } + paddedSize := calcPadded(size, es.DecodedBlockSize()) pieceSize := paddedSize / int64(es.RequiredCount()) rrs := map[int]ranger.Ranger{} + type rangerInfo struct { i int rr ranger.Ranger err error } ch := make(chan rangerInfo, len(nodes)) + for i, n := range nodes { + if (n == nil) { + ch <- rangerInfo{i: i, rr: nil, err: nil} + continue + } + go func(i int, n *pb.Node) { derivedPieceID, err := pieceID.Derive([]byte(n.GetId())) if err != nil { @@ -151,23 +177,33 @@ func (ec *ecClient) Get(ctx context.Context, nodes []*pb.Node, es eestream.Erasu ch <- rangerInfo{i: i, rr: rr, err: nil} }(i, n) } + for range nodes { rri := <-ch - if rri.err == nil { + if rri.err == nil && rri.rr != nil { rrs[rri.i] = rri.rr } } + rr, err = eestream.Decode(rrs, es, ec.mbm) if err != nil { return nil, err } + return eestream.Unpad(rr, int(paddedSize-size)) } func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID client.PieceID) (err error) { defer mon.Task()(&ctx)(&err) + errs := make(chan error, len(nodes)) + for _, n := range nodes { + if n == nil { + errs <- nil + continue + } + go func(n *pb.Node) { derivedPieceID, err := pieceID.Derive([]byte(n.GetId())) if err != nil { @@ -193,10 +229,13 @@ func (ec *ecClient) Delete(ctx context.Context, nodes []*pb.Node, pieceID client errs <- err }(n) } + allerrs := collectErrors(errs, len(nodes)) + if len(allerrs) > 0 && len(allerrs) == len(nodes) { return allerrs[0] } + return nil } diff --git a/pkg/storage/ec/client_test.go b/pkg/storage/ec/client_test.go index 84bfc45d8..e3317318a 100644 --- a/pkg/storage/ec/client_test.go +++ b/pkg/storage/ec/client_test.go @@ -185,12 +185,20 @@ TestLoop: } r := io.LimitReader(rand.Reader, int64(size)) ec := ecClient{d: &mockDialer{m: m}, mbm: tt.mbm} - err = ec.Put(ctx, tt.nodes, rs, id, r, ttl) + successfulNodes, err := ec.Put(ctx, tt.nodes, rs, id, r, ttl) if tt.errString != "" { assert.EqualError(t, err, tt.errString, errTag) } else { assert.NoError(t, err, errTag) + assert.Equal(t, len(tt.nodes), len(successfulNodes), errTag) + for i := range tt.nodes { + if tt.errs[i] != nil { + assert.Nil(t, successfulNodes[i], errTag) + } else { + assert.Equal(t, tt.nodes[i], successfulNodes[i], errTag) + } + } } } } @@ -231,6 +239,8 @@ TestLoop: []error{ErrOpFailed, ErrDialFailed, nil, ErrDialFailed}, ""}, {[]*pb.Node{node0, node1, node2, node3}, 0, []error{ErrDialFailed, ErrOpFailed, ErrOpFailed, ErrDialFailed}, ""}, + {[]*pb.Node{nil, nil, node2, node3}, 0, + []error{nil, nil, nil, nil}, ""}, } { errTag := fmt.Sprintf("Test case #%d", i) @@ -288,6 +298,8 @@ TestLoop: {[]*pb.Node{node0, node1}, []error{nil, ErrOpFailed}, ""}, {[]*pb.Node{node0, node1}, []error{ErrDialFailed, ErrDialFailed}, dialFailed}, {[]*pb.Node{node0, node1}, []error{ErrOpFailed, ErrOpFailed}, opFailed}, + {[]*pb.Node{nil, node1}, []error{nil, nil}, ""}, + {[]*pb.Node{nil, nil}, []error{nil, nil}, ""}, } { errTag := fmt.Sprintf("Test case #%d", i) @@ -300,7 +312,7 @@ TestLoop: m := make(map[*pb.Node]client.PSClient, len(tt.nodes)) for _, n := range tt.nodes { - if errs[n] != ErrDialFailed { + if n != nil && errs[n] != ErrDialFailed { derivedID, err := id.Derive([]byte(n.GetId())) if !assert.NoError(t, err, errTag) { continue TestLoop diff --git a/pkg/storage/ec/mocks/mock_client.go b/pkg/storage/ec/mocks/mock_client.go index 9cd0468a5..ad261be48 100644 --- a/pkg/storage/ec/mocks/mock_client.go +++ b/pkg/storage/ec/mocks/mock_client.go @@ -11,8 +11,9 @@ import ( time "time" gomock "github.com/golang/mock/gomock" + eestream "storj.io/storj/pkg/eestream" - "storj.io/storj/pkg/pb" + pb "storj.io/storj/pkg/pb" client "storj.io/storj/pkg/piecestore/rpc/client" ranger "storj.io/storj/pkg/ranger" ) @@ -66,10 +67,11 @@ func (mr *MockClientMockRecorder) Get(arg0, arg1, arg2, arg3, arg4 interface{}) } // Put mocks base method -func (m *MockClient) Put(arg0 context.Context, arg1 []*pb.Node, arg2 eestream.RedundancyStrategy, arg3 client.PieceID, arg4 io.Reader, arg5 time.Time) error { +func (m *MockClient) Put(arg0 context.Context, arg1 []*pb.Node, arg2 eestream.RedundancyStrategy, arg3 client.PieceID, arg4 io.Reader, arg5 time.Time) ([]*pb.Node, error) { ret := m.ctrl.Call(m, "Put", arg0, arg1, arg2, arg3, arg4, arg5) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].([]*pb.Node) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Put indicates an expected call of Put diff --git a/pkg/storage/segments/store.go b/pkg/storage/segments/store.go index e15619779..c6f5b9447 100644 --- a/pkg/storage/segments/store.go +++ b/pkg/storage/segments/store.go @@ -120,11 +120,11 @@ func (s *segmentStore) Put(ctx context.Context, path paths.Path, data io.Reader, sizedReader := SizeReader(peekReader) // puts file to ecclient - err = s.ec.Put(ctx, nodes, s.rs, pieceID, sizedReader, expiration) + successfulNodes, err := s.ec.Put(ctx, nodes, s.rs, pieceID, sizedReader, expiration) if err != nil { return Meta{}, Error.Wrap(err) } - p, err = s.makeRemotePointer(nodes, pieceID, sizedReader.Size(), exp, metadata) + p, err = s.makeRemotePointer(successfulNodes, pieceID, sizedReader.Size(), exp, metadata) if err != nil { return Meta{}, err } @@ -149,6 +149,9 @@ func (s *segmentStore) makeRemotePointer(nodes []*pb.Node, pieceID client.PieceI exp *timestamp.Timestamp, metadata []byte) (pointer *pb.Pointer, err error) { var remotePieces []*pb.RemotePiece for i := range nodes { + if (nodes[i] == nil) { + continue + } remotePieces = append(remotePieces, &pb.RemotePiece{ PieceNum: int32(i), NodeId: nodes[i].Id, @@ -249,15 +252,22 @@ func (s *segmentStore) Delete(ctx context.Context, path paths.Path) (err error) // lookupNodes calls Lookup to get node addresses from the overlay func (s *segmentStore) lookupNodes(ctx context.Context, seg *pb.RemoteSegment) (nodes []*pb.Node, err error) { - pieces := seg.GetRemotePieces() + // Get list of all nodes IDs storing a piece from the segment var nodeIds []dht.NodeID - for _, p := range pieces { + for _, p := range seg.GetRemotePieces() { nodeIds = append(nodeIds, kademlia.StringToNodeID(p.GetNodeId())) } - nodes, err = s.oc.BulkLookup(ctx, nodeIds) + // Lookup the node info from node IDs + n, err := s.oc.BulkLookup(ctx, nodeIds) if err != nil { return nil, Error.Wrap(err) } + // Create an indexed list of nodes based on the piece number. + // Missing pieces are represented by a nil node. + nodes = make([]*pb.Node, seg.GetRedundancy().GetTotal()) + for i, p := range seg.GetRemotePieces() { + nodes[p.PieceNum] = n[i] + } return nodes, nil }