satellite/overlay: Add retry to all selects in overlaycache

Change-Id: I0356d71a35701f8e0ca04a34b2bb2aea666c1394
This commit is contained in:
Ethan 2020-11-29 15:54:03 -05:00
parent 6bce907cb0
commit 5dc013d3bd

View File

@ -19,6 +19,7 @@ import (
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/private/version"
"storj.io/storj/private/dbutil/cockroachutil"
"storj.io/storj/private/dbutil/pgutil"
"storj.io/storj/private/tagsql"
"storj.io/storj/satellite/internalpb"
@ -38,6 +39,21 @@ type overlaycache struct {
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes.
func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
for {
reputable, new, err = cache.selectAllStorageNodesUpload(ctx, selectionCfg)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return reputable, new, err
}
break
}
return reputable, new, err
}
func (cache *overlaycache) selectAllStorageNodesUpload(ctx context.Context, selectionCfg overlay.NodeSelectionConfig) (reputable, new []*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
query := `
@ -103,6 +119,21 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
// GetNodesNetwork returns the /24 subnet for each storage node, order is not guaranteed.
func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
for {
nodeNets, err = cache.getNodesNetwork(ctx, nodeIDs)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nodeNets, err
}
break
}
return nodeNets, err
}
func (cache *overlaycache) getNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error) {
defer mon.Task()(&ctx)(&err)
var rows tagsql.Rows
@ -128,7 +159,7 @@ func (cache *overlaycache) GetNodesNetwork(ctx context.Context, nodeIDs []storj.
}
// Get looks up the node by nodeID.
func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay.NodeDossier, err error) {
func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (dossier *overlay.NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if id.IsZero() {
@ -147,7 +178,22 @@ func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (_ *overlay
}
// GetOnlineNodesForGetDelete returns a map of nodes for the supplied nodeIDs.
func (cache *overlaycache) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (_ map[storj.NodeID]*overlay.SelectedNode, err error) {
func (cache *overlaycache) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (nodes map[storj.NodeID]*overlay.SelectedNode, err error) {
for {
nodes, err = cache.getOnlineNodesForGetDelete(ctx, nodeIDs, onlineWindow)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nodes, err
}
break
}
return nodes, err
}
func (cache *overlaycache) getOnlineNodesForGetDelete(ctx context.Context, nodeIDs []storj.NodeID, onlineWindow time.Duration) (_ map[storj.NodeID]*overlay.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
var rows tagsql.Rows
@ -185,7 +231,22 @@ func (cache *overlaycache) GetOnlineNodesForGetDelete(ctx context.Context, nodeI
}
// KnownOffline filters a set of nodes to offline nodes.
func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) {
func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) {
for {
offlineNodes, err = cache.knownOffline(ctx, criteria, nodeIDs)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return offlineNodes, err
}
break
}
return offlineNodes, err
}
func (cache *overlaycache) knownOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (offlineNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIds) == 0 {
@ -217,10 +278,25 @@ func (cache *overlaycache) KnownOffline(ctx context.Context, criteria *overlay.N
}
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new.
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
for {
badNodes, err = cache.knownUnreliableOrOffline(ctx, criteria, nodeIDs)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return badNodes, err
}
break
}
return badNodes, err
}
func (cache *overlaycache) knownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIDs storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIds) == 0 {
if len(nodeIDs) == 0 {
return nil, Error.New("no ids provided")
}
@ -233,14 +309,14 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
AND unknown_audit_suspended IS NULL
AND exit_finished_at IS NULL
AND last_contact_success > $2
`), pgutil.NodeIDArray(nodeIds), time.Now().Add(-criteria.OnlineWindow),
`), pgutil.NodeIDArray(nodeIDs), time.Now().Add(-criteria.OnlineWindow),
)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
goodNodes := make(map[storj.NodeID]struct{}, len(nodeIds))
goodNodes := make(map[storj.NodeID]struct{}, len(nodeIDs))
for rows.Next() {
var id storj.NodeID
err = rows.Scan(&id)
@ -249,7 +325,7 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
}
goodNodes[id] = struct{}{}
}
for _, id := range nodeIds {
for _, id := range nodeIDs {
if _, ok := goodNodes[id]; !ok {
badNodes = append(badNodes, id)
}
@ -259,6 +335,21 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
for {
nodes, err = cache.knownReliable(ctx, onlineWindow, nodeIDs)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nodes, err
}
break
}
return nodes, err
}
func (cache *overlaycache) knownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) (nodes []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if len(nodeIDs) == 0 {
@ -298,6 +389,21 @@ func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.
// Reliable returns all reliable nodes.
func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
for {
nodes, err = cache.reliable(ctx, criteria)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return nodes, err
}
break
}
return nodes, err
}
func (cache *overlaycache) reliable(ctx context.Context, criteria *overlay.NodeCriteria) (nodes storj.NodeIDList, err error) {
// get reliable and online nodes
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
@ -702,6 +808,21 @@ func (cache *overlaycache) UpdatePieceCounts(ctx context.Context, pieceCounts ma
// GetExitingNodes returns nodes who have initiated a graceful exit and is not disqualified, but have not completed it.
func (cache *overlaycache) GetExitingNodes(ctx context.Context) (exitingNodes []*overlay.ExitStatus, err error) {
for {
exitingNodes, err = cache.getExitingNodes(ctx)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return exitingNodes, err
}
break
}
return exitingNodes, err
}
func (cache *overlaycache) getExitingNodes(ctx context.Context) (exitingNodes []*overlay.ExitStatus, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
@ -727,7 +848,22 @@ func (cache *overlaycache) GetExitingNodes(ctx context.Context) (exitingNodes []
}
// GetExitStatus returns a node's graceful exit status.
func (cache *overlaycache) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (_ *overlay.ExitStatus, err error) {
func (cache *overlaycache) GetExitStatus(ctx context.Context, nodeID storj.NodeID) (exitStatus *overlay.ExitStatus, err error) {
for {
exitStatus, err = cache.getExitStatus(ctx, nodeID)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return exitStatus, err
}
break
}
return exitStatus, err
}
func (cache *overlaycache) getExitStatus(ctx context.Context, nodeID storj.NodeID) (_ *overlay.ExitStatus, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
@ -753,6 +889,21 @@ func (cache *overlaycache) GetExitStatus(ctx context.Context, nodeID storj.NodeI
// GetGracefulExitCompletedByTimeFrame returns nodes who have completed graceful exit within a time window (time window is around graceful exit completion).
func (cache *overlaycache) GetGracefulExitCompletedByTimeFrame(ctx context.Context, begin, end time.Time) (exitedNodes storj.NodeIDList, err error) {
for {
exitedNodes, err = cache.getGracefulExitCompletedByTimeFrame(ctx, begin, end)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return exitedNodes, err
}
break
}
return exitedNodes, err
}
func (cache *overlaycache) getGracefulExitCompletedByTimeFrame(ctx context.Context, begin, end time.Time) (exitedNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
@ -782,6 +933,21 @@ func (cache *overlaycache) GetGracefulExitCompletedByTimeFrame(ctx context.Conte
// GetGracefulExitIncompleteByTimeFrame returns nodes who have initiated, but not completed graceful exit within a time window (time window is around graceful exit initiation).
func (cache *overlaycache) GetGracefulExitIncompleteByTimeFrame(ctx context.Context, begin, end time.Time) (exitingNodes storj.NodeIDList, err error) {
for {
exitingNodes, err = cache.getGracefulExitIncompleteByTimeFrame(ctx, begin, end)
if err != nil {
if cockroachutil.NeedsRetry(err) {
continue
}
return exitingNodes, err
}
break
}
return exitingNodes, err
}
func (cache *overlaycache) getGracefulExitIncompleteByTimeFrame(ctx context.Context, begin, end time.Time) (exitingNodes storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := cache.db.Query(ctx, cache.db.Rebind(`