satellite/orders;overlay: Consolidate order limit storage node lookups into 1 query.

https: //storjlabs.atlassian.net/browse/SM-449
Change-Id: Idc62cc2978fba67cf48f7c98b27b0f996f9c58ac
This commit is contained in:
Ethan 2020-03-13 14:01:48 -04:00 committed by Jess G
parent 49a30ce4a7
commit bdbf764b86
4 changed files with 172 additions and 15 deletions

View File

@ -144,12 +144,24 @@ func (service *Service) CreateGetOrderLimitsOld(ctx context.Context, bucketID []
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
}
nodes, err := service.overlay.GetNodes(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
var combinedErrs error
var limits []*pb.AddressedOrderLimit
for _, piece := range pointer.GetRemote().GetRemotePieces() {
node, err := service.overlay.Get(ctx, piece.NodeId)
if err != nil {
service.log.Debug("error getting node from overlay", zap.Error(err))
node, ok := nodes[piece.NodeId]
if !ok {
service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId))
err = errs.New("node ID %v does not exist in nodes map", piece.NodeId)
combinedErrs = errs.Combine(combinedErrs, err)
continue
}
@ -244,12 +256,24 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byt
pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy)
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
}
nodes, err := service.overlay.GetNodes(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
var combinedErrs error
var limits []*pb.AddressedOrderLimit
for _, piece := range pointer.GetRemote().GetRemotePieces() {
node, err := service.overlay.Get(ctx, piece.NodeId)
if err != nil {
service.log.Debug("error getting node from overlay", zap.Error(err))
node, ok := nodes[piece.NodeId]
if !ok {
service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId))
err = errs.New("node ID %v does not exist in nodes map", piece.NodeId)
combinedErrs = errs.Combine(combinedErrs, err)
continue
}
@ -442,12 +466,24 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID []
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
}
nodes, err := service.overlay.GetNodes(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
var combinedErrs error
var limits []*pb.AddressedOrderLimit
for _, piece := range pointer.GetRemote().GetRemotePieces() {
node, err := service.overlay.Get(ctx, piece.NodeId)
if err != nil {
service.log.Error("error getting node from overlay", zap.Error(err))
node, ok := nodes[piece.NodeId]
if !ok {
service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId))
err = errs.New("node ID %v does not exist in nodes map", piece.NodeId)
combinedErrs = errs.Combine(combinedErrs, err)
continue
}
@ -529,6 +565,17 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []b
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
}
nodes, err := service.overlay.GetNodes(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
var combinedErrs error
var limitsCount int32
limits := make([]*pb.AddressedOrderLimit, totalPieces)
@ -537,9 +584,10 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []b
continue
}
node, err := service.overlay.Get(ctx, piece.NodeId)
if err != nil {
service.log.Error("error getting node from overlay", zap.Error(err))
node, ok := nodes[piece.NodeId]
if !ok {
service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId))
err = errs.New("node ID %v does not exist in nodes map", piece.NodeId)
combinedErrs = errs.Combine(combinedErrs, err)
continue
}
@ -702,13 +750,25 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces()))
for i, piece := range pointer.GetRemote().GetRemotePieces() {
nodeIDs[i] = piece.NodeId
}
nodes, err := service.overlay.GetNodes(ctx, nodeIDs)
if err != nil {
service.log.Debug("error getting nodes from overlay", zap.Error(err))
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
}
var combinedErrs error
var limitsCount int
limits := make([]*pb.AddressedOrderLimit, totalPieces)
for _, piece := range healthy {
node, err := service.overlay.Get(ctx, piece.NodeId)
if err != nil {
service.log.Error("error getting node from the overlay", zap.Error(err))
node, ok := nodes[piece.NodeId]
if !ok {
service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId))
err = errs.New("node ID %v does not exist in nodes map", piece.NodeId)
combinedErrs = errs.Combine(combinedErrs, err)
continue
}

View File

@ -43,6 +43,8 @@ type DB interface {
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// GetNodes returns a map of nodes for the supplied nodeIDs
GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (map[storj.NodeID]*NodeDossier, error)
// KnownOffline filters a set of nodes to offline nodes
KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
@ -269,6 +271,13 @@ func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDo
return service.db.Get(ctx, nodeID)
}
// GetNodes returns a map of nodes for the supplied nodeIDs.
func (service *Service) GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.GetNodes(ctx, nodeIDs)
}
// IsOnline checks if a node is 'online' based on the collected statistics.
func (service *Service) IsOnline(node *NodeDossier) bool {
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow

View File

@ -5,6 +5,7 @@ package overlay_test
import (
"context"
"reflect"
"sort"
"testing"
"time"
@ -279,6 +280,46 @@ func TestNodeInfo(t *testing.T) {
})
}
func TestGetNodes(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// pause chores that might update node data
planet.Satellites[0].Audit.Chore.Loop.Pause()
planet.Satellites[0].Repair.Checker.Loop.Pause()
planet.Satellites[0].Repair.Repairer.Loop.Pause()
planet.Satellites[0].DowntimeTracking.DetectionChore.Loop.Pause()
planet.Satellites[0].DowntimeTracking.EstimationChore.Loop.Pause()
for _, node := range planet.StorageNodes {
node.Contact.Chore.Pause(ctx)
}
// should not return anything if nodeIDs aren't in the nodes table
actualNodes, err := planet.Satellites[0].Overlay.Service.GetNodes(ctx, []storj.NodeID{})
require.NoError(t, err)
require.Equal(t, 0, len(actualNodes))
actualNodes, err = planet.Satellites[0].Overlay.Service.GetNodes(ctx, []storj.NodeID{testrand.NodeID()})
require.NoError(t, err)
require.Equal(t, 0, len(actualNodes))
expectedNodes := make(map[storj.NodeID]*overlay.NodeDossier, len(planet.StorageNodes))
nodeIDs := make([]storj.NodeID, len(planet.StorageNodes)+1)
for i, node := range planet.StorageNodes {
nodeIDs[i] = node.ID()
node, err := planet.Satellites[0].Overlay.Service.Get(ctx, node.ID())
require.NoError(t, err)
expectedNodes[node.Id] = node
}
// add a fake node ID to make sure GetNodes doesn't error and still returns the expected nodes.
nodeIDs[len(planet.StorageNodes)] = testrand.NodeID()
actualNodes, err = planet.Satellites[0].Overlay.Service.GetNodes(ctx, nodeIDs)
require.NoError(t, err)
require.True(t, reflect.DeepEqual(expectedNodes, actualNodes))
})
}
func TestKnownReliable(t *testing.T) {
onlineWindow := 500 * time.Millisecond

View File

@ -307,6 +307,53 @@ func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay
return convertDBNode(ctx, node)
}
// GetNodes returns a map of nodes for the supplied nodeIDs
func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`
SELECT nodes.id, address, last_net, last_ip_port, protocol, type, email, wallet,
free_bandwidth, free_disk, piece_count, major, minor, patch, hash, timestamp,
release, latency_90, audit_success_count, total_audit_count, uptime_success_count, total_uptime_count,
created_at, updated_at, last_contact_success, last_contact_failure, contained, disqualified,
suspended, audit_reputation_alpha, audit_reputation_beta, unknown_audit_reputation_alpha,
unknown_audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta,
exit_initiated_at, exit_loop_completed_at, exit_finished_at, exit_success
FROM nodes
WHERE id = any($1::bytea[])
`), postgresNodeIDList(nodeIDs),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
nodes := make(map[storj.NodeID]*overlay.NodeDossier)
for rows.Next() {
dbNode := &dbx.Node{}
err = rows.Scan(&dbNode.Id, &dbNode.Address, &dbNode.LastNet, &dbNode.LastIpPort, &dbNode.Protocol, &dbNode.Type, &dbNode.Email, &dbNode.Wallet,
&dbNode.FreeBandwidth, &dbNode.FreeDisk, &dbNode.PieceCount, &dbNode.Major, &dbNode.Minor, &dbNode.Patch, &dbNode.Hash, &dbNode.Timestamp,
&dbNode.Release, &dbNode.Latency90, &dbNode.AuditSuccessCount, &dbNode.TotalAuditCount, &dbNode.UptimeSuccessCount, &dbNode.TotalUptimeCount,
&dbNode.CreatedAt, &dbNode.UpdatedAt, &dbNode.LastContactSuccess, &dbNode.LastContactFailure, &dbNode.Contained, &dbNode.Disqualified,
&dbNode.Suspended, &dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta, &dbNode.UnknownAuditReputationAlpha,
&dbNode.UnknownAuditReputationBeta, &dbNode.UptimeReputationAlpha, &dbNode.UptimeReputationBeta,
&dbNode.ExitInitiatedAt, &dbNode.ExitLoopCompletedAt, &dbNode.ExitFinishedAt, &dbNode.ExitSuccess,
)
if err != nil {
return nil, err
}
dossier, err := convertDBNode(ctx, dbNode)
if err != nil {
return nil, err
}
nodes[dossier.Id] = dossier
}
return nodes, Error.Wrap(rows.Err())
}
// KnownOffline filters a set of nodes to offline nodes
func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)