Download is hanging when one node is not responsive (#1764)
Change closes download connection in parallel and sets a timeout for communication between uplink and storage node
This commit is contained in:
parent
4da66792dd
commit
a585b97363
@ -10,6 +10,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/zeebo/errs"
|
||||
@ -296,6 +297,7 @@ func (uplink *Uplink) GetConfig(satellite *satellite.Peer) uplink.Config {
|
||||
config := getDefaultConfig()
|
||||
config.Client.SatelliteAddr = satellite.Addr()
|
||||
config.Client.APIKey = uplink.APIKey[satellite.ID()]
|
||||
config.Client.Timeout = 10 * time.Second
|
||||
|
||||
config.RS.MinThreshold = atLeastOne(uplink.StorageNodeCount * 1 / 5) // 20% of storage nodes
|
||||
config.RS.RepairThreshold = atLeastOne(uplink.StorageNodeCount * 2 / 5) // 40% of storage nodes
|
||||
|
@ -4,9 +4,12 @@
|
||||
package testplanet_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -16,6 +19,9 @@ import (
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls/extensions"
|
||||
"storj.io/storj/pkg/peertls/tlsopts"
|
||||
"storj.io/storj/pkg/server"
|
||||
"storj.io/storj/pkg/storj"
|
||||
"storj.io/storj/uplink"
|
||||
)
|
||||
@ -195,3 +201,99 @@ func TestUploadDownloadMultipleUplinksInParallel(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
type piecestoreMock struct {
|
||||
}
|
||||
|
||||
func (mock *piecestoreMock) Upload(server pb.Piecestore_UploadServer) error {
|
||||
return nil
|
||||
}
|
||||
func (mock *piecestoreMock) Download(server pb.Piecestore_DownloadServer) error {
|
||||
timoutTicker := time.NewTicker(30 * time.Second)
|
||||
defer timoutTicker.Stop()
|
||||
|
||||
select {
|
||||
case <-timoutTicker.C:
|
||||
return nil
|
||||
case <-server.Context().Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
func (mock *piecestoreMock) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestDownloadFromUnresponsiveNode(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
|
||||
expectedData := make([]byte, 1*memory.MiB)
|
||||
_, err := rand.Read(expectedData)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = planet.Uplinks[0].UploadWithConfig(ctx, planet.Satellites[0], &uplink.RSConfig{
|
||||
MinThreshold: 2,
|
||||
RepairThreshold: 3,
|
||||
SuccessThreshold: 4,
|
||||
MaxThreshold: 5,
|
||||
}, "testbucket", "test/path", expectedData)
|
||||
require.NoError(t, err)
|
||||
|
||||
// get a remote segment from pointerdb
|
||||
pdb := planet.Satellites[0].Metainfo.Service
|
||||
listResponse, _, err := pdb.List("", "", "", true, 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var path string
|
||||
var pointer *pb.Pointer
|
||||
for _, v := range listResponse {
|
||||
path = v.GetPath()
|
||||
pointer, err = pdb.Get(path)
|
||||
require.NoError(t, err)
|
||||
if pointer.GetType() == pb.Pointer_REMOTE {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
stopped := false
|
||||
// choose used storage node and replace it with fake listener
|
||||
unresponsiveNode := pointer.Remote.RemotePieces[0].NodeId
|
||||
for _, storageNode := range planet.StorageNodes {
|
||||
if storageNode.ID() == unresponsiveNode {
|
||||
err = planet.StopPeer(storageNode)
|
||||
require.NoError(t, err)
|
||||
|
||||
wl, err := planet.WriteWhitelist(storj.LatestIDVersion())
|
||||
require.NoError(t, err)
|
||||
options, err := tlsopts.NewOptions(storageNode.Identity, tlsopts.Config{
|
||||
RevocationDBURL: "bolt://" + filepath.Join(ctx.Dir("fakestoragenode"), "revocation.db"),
|
||||
UsePeerCAWhitelist: true,
|
||||
PeerCAWhitelistPath: wl,
|
||||
PeerIDVersions: "*",
|
||||
Extensions: extensions.Config{
|
||||
Revocation: false,
|
||||
WhitelistSignedLeaf: false,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
server, err := server.New(options, storageNode.Addr(), storageNode.PrivateAddr(), nil)
|
||||
require.NoError(t, err)
|
||||
pb.RegisterPiecestoreServer(server.GRPC(), &piecestoreMock{})
|
||||
go func() {
|
||||
err := server.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
stopped = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.True(t, stopped, "no storage node was altered")
|
||||
|
||||
data, err := planet.Uplinks[0].Download(ctx, planet.Satellites[0], "testbucket", "test/path")
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, expectedData, data)
|
||||
})
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"storj.io/storj/internal/errs2"
|
||||
"storj.io/storj/internal/readcloser"
|
||||
"storj.io/storj/pkg/encryption"
|
||||
"storj.io/storj/pkg/ranger"
|
||||
@ -98,21 +99,20 @@ func (dr *decodedReader) Read(p []byte) (n int, err error) {
|
||||
func (dr *decodedReader) Close() error {
|
||||
// cancel the context to terminate reader goroutines
|
||||
dr.cancel()
|
||||
// avoid double close of readers
|
||||
errorThreshold := len(dr.readers) - dr.scheme.RequiredCount()
|
||||
var closeGroup errs2.Group
|
||||
// avoid double close of readers
|
||||
dr.close.Do(func() {
|
||||
var errlist errs.Group
|
||||
// close the readers
|
||||
for _, r := range dr.readers {
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
errlist.Add(err)
|
||||
errorThreshold--
|
||||
}
|
||||
closeGroup.Go(r.Close)
|
||||
}
|
||||
|
||||
// close the stripe reader
|
||||
errlist.Add(dr.stripeReader.Close())
|
||||
dr.closeErr = errlist.Err()
|
||||
closeGroup.Go(dr.stripeReader.Close)
|
||||
|
||||
allErrors := closeGroup.Wait()
|
||||
errorThreshold -= len(allErrors)
|
||||
dr.closeErr = errs.Combine(allErrors...)
|
||||
})
|
||||
// TODO this is workaround, we need reorganize to return multiple errors or divide into fatal, non fatal
|
||||
if errorThreshold <= 0 {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// InvokeTimeout enables timeouts for requests that take too long
|
||||
@ -22,3 +23,74 @@ func (it InvokeTimeout) Intercept(ctx context.Context, method string, req interf
|
||||
defer cancel()
|
||||
return invoker(timedCtx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
// InvokeStreamTimeout enables timeouts for send/recv/close stream requests
|
||||
type InvokeStreamTimeout struct {
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
type clientStreamWrapper struct {
|
||||
timeout time.Duration
|
||||
stream grpc.ClientStream
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) Header() (metadata.MD, error) {
|
||||
return wrapper.stream.Header()
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) Trailer() metadata.MD {
|
||||
return wrapper.stream.Trailer()
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) Context() context.Context {
|
||||
return wrapper.stream.Context()
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) CloseSend() error {
|
||||
return wrapper.withTimeout(func() error {
|
||||
return wrapper.stream.CloseSend()
|
||||
})
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) SendMsg(m interface{}) error {
|
||||
return wrapper.withTimeout(func() error {
|
||||
return wrapper.stream.SendMsg(m)
|
||||
})
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) RecvMsg(m interface{}) error {
|
||||
return wrapper.withTimeout(func() error {
|
||||
return wrapper.stream.RecvMsg(m)
|
||||
})
|
||||
}
|
||||
|
||||
func (wrapper *clientStreamWrapper) withTimeout(f func() error) error {
|
||||
timoutTicker := time.NewTicker(wrapper.timeout)
|
||||
defer timoutTicker.Stop()
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-timoutTicker.C:
|
||||
wrapper.cancel()
|
||||
case <-wrapper.Context().Done():
|
||||
case <-doneCh:
|
||||
}
|
||||
}()
|
||||
|
||||
return f()
|
||||
}
|
||||
|
||||
// Intercept adds a timeout to a stream requests
|
||||
func (it InvokeStreamTimeout) Intercept(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (_ grpc.ClientStream, err error) {
|
||||
wrapper := &clientStreamWrapper{timeout: it.Timeout}
|
||||
ctx, wrapper.cancel = context.WithCancel(ctx)
|
||||
|
||||
wrapper.stream, err = streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
return wrapper.stream, err
|
||||
}
|
||||
return wrapper, nil
|
||||
}
|
||||
|
@ -71,6 +71,7 @@ func (transport *Transport) DialNode(ctx context.Context, node *pb.Node, opts ..
|
||||
grpc.WithBlock(),
|
||||
grpc.FailOnNonTempDialError(true),
|
||||
grpc.WithUnaryInterceptor(InvokeTimeout{transport.requestTimeout}.Intercept),
|
||||
grpc.WithStreamInterceptor(InvokeStreamTimeout{transport.requestTimeout}.Intercept),
|
||||
}, opts...)
|
||||
|
||||
timedCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
|
||||
@ -103,6 +104,7 @@ func (transport *Transport) DialAddress(ctx context.Context, address string, opt
|
||||
grpc.WithBlock(),
|
||||
grpc.FailOnNonTempDialError(true),
|
||||
grpc.WithUnaryInterceptor(InvokeTimeout{transport.requestTimeout}.Intercept),
|
||||
grpc.WithStreamInterceptor(InvokeStreamTimeout{transport.requestTimeout}.Intercept),
|
||||
}, opts...)
|
||||
|
||||
timedCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
|
||||
|
@ -6,6 +6,7 @@ package uplink
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/vivint/infectious"
|
||||
"github.com/zeebo/errs"
|
||||
@ -53,6 +54,7 @@ type ClientConfig struct {
|
||||
SatelliteAddr string `releaseDefault:"127.0.0.1:7777" devDefault:"127.0.0.1:10000" help:"the address to use for the satellite" noprefix:"true"`
|
||||
MaxInlineSize memory.Size `help:"max inline segment size in bytes" default:"4KiB"`
|
||||
SegmentSize memory.Size `help:"the size of a segment in bytes" default:"64MiB"`
|
||||
Timeout time.Duration `help:"timeout for request" default:"0h0m20s"`
|
||||
}
|
||||
|
||||
// Config uplink configuration
|
||||
@ -81,7 +83,7 @@ func (c Config) GetMetainfo(ctx context.Context, identity *identity.FullIdentity
|
||||
|
||||
// ToDo: Handle Versioning for Uplinks here
|
||||
|
||||
tc := transport.NewClient(tlsOpts)
|
||||
tc := transport.NewClientWithTimeout(tlsOpts, c.Client.Timeout)
|
||||
|
||||
if c.Client.SatelliteAddr == "" {
|
||||
return nil, nil, errors.New("satellite address not specified")
|
||||
|
Loading…
Reference in New Issue
Block a user