From bdbf764b869b23d26190d71a4c101878d868d03f Mon Sep 17 00:00:00 2001 From: Ethan Date: Fri, 13 Mar 2020 14:01:48 -0400 Subject: [PATCH] satellite/orders;overlay: Consolidate order limit storage node lookups into 1 query. https: //storjlabs.atlassian.net/browse/SM-449 Change-Id: Idc62cc2978fba67cf48f7c98b27b0f996f9c58ac --- satellite/orders/service.go | 90 ++++++++++++++++++++++----- satellite/overlay/service.go | 9 +++ satellite/overlay/service_test.go | 41 ++++++++++++ satellite/satellitedb/overlaycache.go | 47 ++++++++++++++ 4 files changed, 172 insertions(+), 15 deletions(-) diff --git a/satellite/orders/service.go b/satellite/orders/service.go index aa64feaf8..e38a49f93 100644 --- a/satellite/orders/service.go +++ b/satellite/orders/service.go @@ -144,12 +144,24 @@ func (service *Service) CreateGetOrderLimitsOld(ctx context.Context, bucketID [] pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy) + nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) + for i, piece := range pointer.GetRemote().GetRemotePieces() { + nodeIDs[i] = piece.NodeId + } + + nodes, err := service.overlay.GetNodes(ctx, nodeIDs) + if err != nil { + service.log.Debug("error getting nodes from overlay", zap.Error(err)) + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + var combinedErrs error var limits []*pb.AddressedOrderLimit for _, piece := range pointer.GetRemote().GetRemotePieces() { - node, err := service.overlay.Get(ctx, piece.NodeId) - if err != nil { - service.log.Debug("error getting node from overlay", zap.Error(err)) + node, ok := nodes[piece.NodeId] + if !ok { + service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId)) + err = errs.New("node ID %v does not exist in nodes map", piece.NodeId) combinedErrs = errs.Combine(combinedErrs, err) continue } @@ -244,12 +256,24 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucketID []byt pieceSize := eestream.CalcPieceSize(pointer.GetSegmentSize(), redundancy) + nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) + for i, piece := range pointer.GetRemote().GetRemotePieces() { + nodeIDs[i] = piece.NodeId + } + + nodes, err := service.overlay.GetNodes(ctx, nodeIDs) + if err != nil { + service.log.Debug("error getting nodes from overlay", zap.Error(err)) + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + var combinedErrs error var limits []*pb.AddressedOrderLimit for _, piece := range pointer.GetRemote().GetRemotePieces() { - node, err := service.overlay.Get(ctx, piece.NodeId) - if err != nil { - service.log.Debug("error getting node from overlay", zap.Error(err)) + node, ok := nodes[piece.NodeId] + if !ok { + service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId)) + err = errs.New("node ID %v does not exist in nodes map", piece.NodeId) combinedErrs = errs.Combine(combinedErrs, err) continue } @@ -442,12 +466,24 @@ func (service *Service) CreateDeleteOrderLimits(ctx context.Context, bucketID [] return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } + nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) + for i, piece := range pointer.GetRemote().GetRemotePieces() { + nodeIDs[i] = piece.NodeId + } + + nodes, err := service.overlay.GetNodes(ctx, nodeIDs) + if err != nil { + service.log.Debug("error getting nodes from overlay", zap.Error(err)) + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + var combinedErrs error var limits []*pb.AddressedOrderLimit for _, piece := range pointer.GetRemote().GetRemotePieces() { - node, err := service.overlay.Get(ctx, piece.NodeId) - if err != nil { - service.log.Error("error getting node from overlay", zap.Error(err)) + node, ok := nodes[piece.NodeId] + if !ok { + service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId)) + err = errs.New("node ID %v does not exist in nodes map", piece.NodeId) combinedErrs = errs.Combine(combinedErrs, err) continue } @@ -529,6 +565,17 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []b return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } + nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) + for i, piece := range pointer.GetRemote().GetRemotePieces() { + nodeIDs[i] = piece.NodeId + } + + nodes, err := service.overlay.GetNodes(ctx, nodeIDs) + if err != nil { + service.log.Debug("error getting nodes from overlay", zap.Error(err)) + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + var combinedErrs error var limitsCount int32 limits := make([]*pb.AddressedOrderLimit, totalPieces) @@ -537,9 +584,10 @@ func (service *Service) CreateAuditOrderLimits(ctx context.Context, bucketID []b continue } - node, err := service.overlay.Get(ctx, piece.NodeId) - if err != nil { - service.log.Error("error getting node from overlay", zap.Error(err)) + node, ok := nodes[piece.NodeId] + if !ok { + service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId)) + err = errs.New("node ID %v does not exist in nodes map", piece.NodeId) combinedErrs = errs.Combine(combinedErrs, err) continue } @@ -702,13 +750,25 @@ func (service *Service) CreateGetRepairOrderLimits(ctx context.Context, bucketID return nil, storj.PiecePrivateKey{}, Error.Wrap(err) } + nodeIDs := make([]storj.NodeID, len(pointer.GetRemote().GetRemotePieces())) + for i, piece := range pointer.GetRemote().GetRemotePieces() { + nodeIDs[i] = piece.NodeId + } + + nodes, err := service.overlay.GetNodes(ctx, nodeIDs) + if err != nil { + service.log.Debug("error getting nodes from overlay", zap.Error(err)) + return nil, storj.PiecePrivateKey{}, Error.Wrap(err) + } + var combinedErrs error var limitsCount int limits := make([]*pb.AddressedOrderLimit, totalPieces) for _, piece := range healthy { - node, err := service.overlay.Get(ctx, piece.NodeId) - if err != nil { - service.log.Error("error getting node from the overlay", zap.Error(err)) + node, ok := nodes[piece.NodeId] + if !ok { + service.log.Debug("node does not exist in nodes map", zap.Stringer("ID", piece.NodeId)) + err = errs.New("node ID %v does not exist in nodes map", piece.NodeId) combinedErrs = errs.Combine(combinedErrs, err) continue } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 9c8a451a2..0f69df69d 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -43,6 +43,8 @@ type DB interface { // Get looks up the node by nodeID Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error) + // GetNodes returns a map of nodes for the supplied nodeIDs + GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (map[storj.NodeID]*NodeDossier, error) // KnownOffline filters a set of nodes to offline nodes KnownOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new @@ -269,6 +271,13 @@ func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDo return service.db.Get(ctx, nodeID) } +// GetNodes returns a map of nodes for the supplied nodeIDs. +func (service *Service) GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*NodeDossier, err error) { + defer mon.Task()(&ctx)(&err) + + return service.db.GetNodes(ctx, nodeIDs) +} + // IsOnline checks if a node is 'online' based on the collected statistics. func (service *Service) IsOnline(node *NodeDossier) bool { return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 077975416..f60ddc598 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -5,6 +5,7 @@ package overlay_test import ( "context" + "reflect" "sort" "testing" "time" @@ -279,6 +280,46 @@ func TestNodeInfo(t *testing.T) { }) } +func TestGetNodes(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 2, UplinkCount: 0, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + // pause chores that might update node data + planet.Satellites[0].Audit.Chore.Loop.Pause() + planet.Satellites[0].Repair.Checker.Loop.Pause() + planet.Satellites[0].Repair.Repairer.Loop.Pause() + planet.Satellites[0].DowntimeTracking.DetectionChore.Loop.Pause() + planet.Satellites[0].DowntimeTracking.EstimationChore.Loop.Pause() + for _, node := range planet.StorageNodes { + node.Contact.Chore.Pause(ctx) + } + + // should not return anything if nodeIDs aren't in the nodes table + actualNodes, err := planet.Satellites[0].Overlay.Service.GetNodes(ctx, []storj.NodeID{}) + require.NoError(t, err) + require.Equal(t, 0, len(actualNodes)) + actualNodes, err = planet.Satellites[0].Overlay.Service.GetNodes(ctx, []storj.NodeID{testrand.NodeID()}) + require.NoError(t, err) + require.Equal(t, 0, len(actualNodes)) + + expectedNodes := make(map[storj.NodeID]*overlay.NodeDossier, len(planet.StorageNodes)) + nodeIDs := make([]storj.NodeID, len(planet.StorageNodes)+1) + for i, node := range planet.StorageNodes { + nodeIDs[i] = node.ID() + node, err := planet.Satellites[0].Overlay.Service.Get(ctx, node.ID()) + require.NoError(t, err) + expectedNodes[node.Id] = node + } + // add a fake node ID to make sure GetNodes doesn't error and still returns the expected nodes. + nodeIDs[len(planet.StorageNodes)] = testrand.NodeID() + + actualNodes, err = planet.Satellites[0].Overlay.Service.GetNodes(ctx, nodeIDs) + require.NoError(t, err) + + require.True(t, reflect.DeepEqual(expectedNodes, actualNodes)) + }) +} + func TestKnownReliable(t *testing.T) { onlineWindow := 500 * time.Millisecond diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index 4e67c3a2f..b074eca7e 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -307,6 +307,53 @@ func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay return convertDBNode(ctx, node) } +// GetNodes returns a map of nodes for the supplied nodeIDs +func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*overlay.NodeDossier, err error) { + defer mon.Task()(&ctx)(&err) + + var rows *sql.Rows + rows, err = cache.db.Query(ctx, cache.db.Rebind(` + SELECT nodes.id, address, last_net, last_ip_port, protocol, type, email, wallet, + free_bandwidth, free_disk, piece_count, major, minor, patch, hash, timestamp, + release, latency_90, audit_success_count, total_audit_count, uptime_success_count, total_uptime_count, + created_at, updated_at, last_contact_success, last_contact_failure, contained, disqualified, + suspended, audit_reputation_alpha, audit_reputation_beta, unknown_audit_reputation_alpha, + unknown_audit_reputation_beta, uptime_reputation_alpha, uptime_reputation_beta, + exit_initiated_at, exit_loop_completed_at, exit_finished_at, exit_success + FROM nodes + WHERE id = any($1::bytea[]) + `), postgresNodeIDList(nodeIDs), + ) + if err != nil { + return nil, err + } + defer func() { err = errs.Combine(err, rows.Close()) }() + + nodes := make(map[storj.NodeID]*overlay.NodeDossier) + for rows.Next() { + dbNode := &dbx.Node{} + err = rows.Scan(&dbNode.Id, &dbNode.Address, &dbNode.LastNet, &dbNode.LastIpPort, &dbNode.Protocol, &dbNode.Type, &dbNode.Email, &dbNode.Wallet, + &dbNode.FreeBandwidth, &dbNode.FreeDisk, &dbNode.PieceCount, &dbNode.Major, &dbNode.Minor, &dbNode.Patch, &dbNode.Hash, &dbNode.Timestamp, + &dbNode.Release, &dbNode.Latency90, &dbNode.AuditSuccessCount, &dbNode.TotalAuditCount, &dbNode.UptimeSuccessCount, &dbNode.TotalUptimeCount, + &dbNode.CreatedAt, &dbNode.UpdatedAt, &dbNode.LastContactSuccess, &dbNode.LastContactFailure, &dbNode.Contained, &dbNode.Disqualified, + &dbNode.Suspended, &dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta, &dbNode.UnknownAuditReputationAlpha, + &dbNode.UnknownAuditReputationBeta, &dbNode.UptimeReputationAlpha, &dbNode.UptimeReputationBeta, + &dbNode.ExitInitiatedAt, &dbNode.ExitLoopCompletedAt, &dbNode.ExitFinishedAt, &dbNode.ExitSuccess, + ) + if err != nil { + return nil, err + } + + dossier, err := convertDBNode(ctx, dbNode) + if err != nil { + return nil, err + } + nodes[dossier.Id] = dossier + } + + return nodes, Error.Wrap(rows.Err()) +} + // KnownOffline filters a set of nodes to offline nodes func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) { defer mon.Task()(&ctx)(&err)