satellite/repair/repairer: attempt repair GETs using nodes' last IP and port first

Sometimes we see timeouts from DNS lookups when trying to do
repair GETs. Solution: try using node's last IP and port first.
If we can't connect, retry with DNS lookup.

Change-Id: I59e223aebb436118779fb18378f6e09d072f12be
This commit is contained in:
Cameron Ayer 2021-07-13 09:52:37 -04:00 committed by Cameron Ayer
parent b0d98b1c1a
commit 449c873681
4 changed files with 240 additions and 14 deletions

View File

@ -324,12 +324,12 @@ func (service *Service) CreateAuditOrderLimit(ctx context.Context, nodeID storj.
//
// The length of the returned orders slice is the total number of pieces of the
// segment, setting to null the ones which don't correspond to a healthy piece.
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, healthy metabase.Pieces) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, err error) {
func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket metabase.BucketLocation, segment metabase.Segment, healthy metabase.Pieces) (_ []*pb.AddressedOrderLimit, _ storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, err error) {
defer mon.Task()(&ctx)(&err)
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
pieceSize := eestream.CalcPieceSize(int64(segment.EncryptedSize), redundancy)
@ -343,14 +343,15 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
signer, err := NewSignerRepairGet(service, segment.RootPieceID, time.Now(), pieceSize, bucket)
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
cachedIPsAndPorts = make(map[storj.NodeID]string)
var nodeErrors errs.Group
var limitsCount int
limits := make([]*pb.AddressedOrderLimit, totalPieces)
@ -361,12 +362,16 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
continue
}
if node.LastIPPort != "" {
cachedIPsAndPorts[piece.StorageNode] = node.LastIPPort
}
limit, err := signer.Sign(ctx, storj.NodeURL{
ID: piece.StorageNode,
Address: node.Address.Address,
}, int32(piece.Number))
if err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
limits[piece.Number] = limit
@ -375,14 +380,14 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucket m
if limitsCount < redundancy.RequiredCount() {
err = Error.New("not enough nodes available: got %d, required %d", limitsCount, redundancy.RequiredCount())
return nil, storj.PiecePrivateKey{}, errs.Combine(err, nodeErrors.Err())
return nil, storj.PiecePrivateKey{}, nil, errs.Combine(err, nodeErrors.Err())
}
if err := service.updateBandwidth(ctx, bucket, limits...); err != nil {
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
return nil, storj.PiecePrivateKey{}, nil, Error.Wrap(err)
}
return limits, signer.PrivateKey, nil
return limits, signer.PrivateKey, cachedIPsAndPorts, nil
}
// CreatePutRepairOrderLimits creates the order limits for uploading the repaired pieces of segment to newNodes.

View File

@ -5,16 +5,22 @@ package repair_test
import (
"context"
"crypto/tls"
"fmt"
"io"
"math"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
@ -25,7 +31,9 @@ import (
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/storage"
"storj.io/uplink/private/eestream"
)
// TestDataRepair does the following:
@ -1525,3 +1533,203 @@ func corruptPieceData(ctx context.Context, t *testing.T, planet *testplanet.Plan
err = writer.Commit(ctx)
require.NoError(t, err)
}
type mockConnector struct {
realConnector rpc.Connector
addressesDialed []string
dialInstead map[string]string
}
func (m *mockConnector) DialContext(ctx context.Context, tlsConfig *tls.Config, address string) (rpc.ConnectorConn, error) {
m.addressesDialed = append(m.addressesDialed, address)
replacement := m.dialInstead[address]
if replacement == "" {
// allow numeric ip addresses through, return errors for unexpected dns lookups
host, _, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}
if net.ParseIP(host) == nil {
return nil, &net.DNSError{
Err: "unexpected lookup",
Name: address,
Server: "a.totally.real.dns.server.i.promise",
IsNotFound: true,
}
}
replacement = address
}
return m.realConnector.DialContext(ctx, tlsConfig, replacement)
}
func ecRepairerWithMockConnector(t testing.TB, sat *testplanet.Satellite, mock *mockConnector) *repairer.ECRepairer {
tlsOptions := sat.Dialer.TLSOptions
newDialer := rpc.NewDefaultDialer(tlsOptions)
mock.realConnector = newDialer.Connector
newDialer.Connector = mock
ec := repairer.NewECRepairer(
zaptest.NewLogger(t).Named("a-special-repairer"),
newDialer,
signing.SigneeFromPeerIdentity(sat.Identity.PeerIdentity()),
sat.Config.Repairer.DownloadTimeout,
sat.Config.Repairer.InMemoryRepair,
)
return ec
}
func TestECRepairerGetDoesNameLookupIfNecessary(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
require.NoError(t, err)
segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
require.True(t, len(segment.Pieces) > 1)
limits, privateKey, _, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
cachedIPsAndPorts := make(map[storj.NodeID]string)
for i, l := range limits {
if l == nil {
continue
}
cachedIPsAndPorts[l.Limit.StorageNodeId] = fmt.Sprintf("garbageXXX#:%d", i)
}
mock := &mockConnector{}
ec := ecRepairerWithMockConnector(t, testSatellite, mock)
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
require.NoError(t, err)
readCloser, failed, err := ec.Get(ctx, limits, cachedIPsAndPorts, privateKey, redundancy, int64(segment.EncryptedSize))
require.NoError(t, err)
require.Len(t, failed, 0)
require.NotNil(t, readCloser)
// repair will only download minimum required
minReq := redundancy.RequiredCount()
var numDialed int
for _, ip := range cachedIPsAndPorts {
for _, dialed := range mock.addressesDialed {
if dialed == ip {
numDialed++
if numDialed == minReq {
break
}
}
}
if numDialed == minReq {
break
}
}
require.True(t, numDialed == minReq)
})
}
func TestECRepairerGetPrefersCachedIPPort(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)
audits.Chore.Loop.TriggerWait()
queue := audits.Queues.Fetch()
queueSegment, err := queue.Next()
require.NoError(t, err)
segment, err := testSatellite.Metainfo.Metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: queueSegment.StreamID,
Position: queueSegment.Position,
})
require.NoError(t, err)
require.True(t, len(segment.Pieces) > 1)
limits, privateKey, _, err := testSatellite.Orders.Service.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, segment.Pieces)
require.NoError(t, err)
// make it so that when the cached IP is dialed, we dial the "right" address,
// but when the "right" address is dialed (meaning it came from the OrderLimit,
// we dial something else!
cachedIPsAndPorts := make(map[storj.NodeID]string)
mock := &mockConnector{
dialInstead: make(map[string]string),
}
var realAddresses []string
for i, l := range limits {
if l == nil {
continue
}
ip := fmt.Sprintf("garbageXXX#:%d", i)
cachedIPsAndPorts[l.Limit.StorageNodeId] = ip
address := l.StorageNodeAddress.Address
mock.dialInstead[ip] = address
mock.dialInstead[address] = "utter.failure?!*"
realAddresses = append(realAddresses, address)
}
ec := ecRepairerWithMockConnector(t, testSatellite, mock)
redundancy, err := eestream.NewRedundancyStrategyFromStorj(segment.Redundancy)
require.NoError(t, err)
readCloser, failed, err := ec.Get(ctx, limits, cachedIPsAndPorts, privateKey, redundancy, int64(segment.EncryptedSize))
require.NoError(t, err)
require.Len(t, failed, 0)
require.NotNil(t, readCloser)
// repair will only download minimum required.
minReq := redundancy.RequiredCount()
var numDialed int
for _, ip := range cachedIPsAndPorts {
for _, dialed := range mock.addressesDialed {
if dialed == ip {
numDialed++
if numDialed == minReq {
break
}
}
}
if numDialed == minReq {
break
}
}
require.True(t, numDialed == minReq)
// and that the right address was never dialed directly
require.NotContains(t, mock.addressesDialed, realAddresses)
})
}

View File

@ -62,7 +62,7 @@ func (ec *ECRepairer) dialPiecestore(ctx context.Context, n storj.NodeURL) (*pie
// After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece.
// If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits.
// If piece hash verification fails, it will return all failed node IDs.
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit, cachedIPsAndPorts map[storj.NodeID]string, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, failedPieces []*pb.RemotePiece, err error) {
defer mon.Task()(&ctx)(&err)
if len(limits) != es.TotalCount() {
@ -116,7 +116,20 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
inProgress++
cond.L.Unlock()
pieceReadCloser, err := ec.downloadAndVerifyPiece(ctx, limit, privateKey, pieceSize)
lastIPPort := cachedIPsAndPorts[limit.GetLimit().StorageNodeId]
address := limit.GetStorageNodeAddress().GetAddress()
var triedLastIPPort bool
if lastIPPort != "" && lastIPPort != address {
address = lastIPPort
triedLastIPPort = true
}
pieceReadCloser, err := ec.downloadAndVerifyPiece(ctx, limit, address, privateKey, pieceSize)
// if piecestore dial with last ip:port failed try again with node address
if triedLastIPPort && piecestore.Error.Has(err) {
pieceReadCloser, err = ec.downloadAndVerifyPiece(ctx, limit, limit.GetStorageNodeAddress().GetAddress(), privateKey, pieceSize)
}
cond.L.Lock()
inProgress--
if err != nil {
@ -170,7 +183,7 @@ func (ec *ECRepairer) Get(ctx context.Context, limits []*pb.AddressedOrderLimit,
// downloadAndVerifyPiece downloads a piece from a storagenode,
// expects the original order limit to have the correct piece public key,
// and expects the hash of the data to match the signed hash provided by the storagenode.
func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, pieceSize int64) (pieceReadCloser io.ReadCloser, err error) {
func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.AddressedOrderLimit, address string, privateKey storj.PiecePrivateKey, pieceSize int64) (pieceReadCloser io.ReadCloser, err error) {
defer mon.Task()(&ctx)(&err)
// contact node
@ -179,7 +192,7 @@ func (ec *ECRepairer) downloadAndVerifyPiece(ctx context.Context, limit *pb.Addr
ps, err := ec.dialPiecestore(downloadCtx, storj.NodeURL{
ID: limit.GetLimit().StorageNodeId,
Address: limit.GetStorageNodeAddress().Address,
Address: address,
})
if err != nil {
return nil, err

View File

@ -216,7 +216,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}
// Create the order limits for the GET_REPAIR action
getOrderLimits, getPrivateKey, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces)
getOrderLimits, getPrivateKey, cachedIPsAndPorts, err := repairer.orders.CreateGetRepairOrderLimits(ctx, metabase.BucketLocation{}, segment, healthyPieces)
if err != nil {
return false, orderLimitFailureError.New("could not create GET_REPAIR order limits: %w", err)
}
@ -258,7 +258,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
}
// Download the segment using just the healthy pieces
segmentReader, pbFailedPieces, err := repairer.ec.Get(ctx, getOrderLimits, getPrivateKey, redundancy, int64(segment.EncryptedSize))
segmentReader, pbFailedPieces, err := repairer.ec.Get(ctx, getOrderLimits, cachedIPsAndPorts, getPrivateKey, redundancy, int64(segment.EncryptedSize))
// Populate node IDs that failed piece hashes verification
var failedNodeIDs storj.NodeIDList