moved invalid/offline back into SQL (#1838)
* moved invalid/offline back into SQL, removed GetAll()
This commit is contained in:
parent
ecb81144a1
commit
6ece4f11ad
@ -175,15 +175,16 @@ func (checker *Checker) getMissingPieces(ctx context.Context, pieces []*pb.Remot
|
||||
for _, p := range pieces {
|
||||
nodeIDs = append(nodeIDs, p.NodeId)
|
||||
}
|
||||
nodes, err := checker.overlay.GetAll(ctx, nodeIDs)
|
||||
|
||||
badNodeIDs, err := checker.overlay.KnownUnreliableOrOffline(ctx, nodeIDs)
|
||||
if err != nil {
|
||||
return nil, Error.New("error getting nodes %s", err)
|
||||
}
|
||||
|
||||
for i, node := range nodes {
|
||||
if node == nil || !checker.overlay.IsOnline(node) || !checker.overlay.IsHealthy(node) {
|
||||
missingPieces = append(missingPieces, pieces[i].GetPieceNum())
|
||||
for _, p := range pieces {
|
||||
for _, nodeID := range badNodeIDs {
|
||||
if nodeID == p.NodeId {
|
||||
missingPieces = append(missingPieces, p.GetPieceNum())
|
||||
}
|
||||
}
|
||||
}
|
||||
return missingPieces, nil
|
||||
|
@ -40,8 +40,9 @@ type DB interface {
|
||||
|
||||
// Get looks up the node by nodeID
|
||||
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
|
||||
// GetAll looks up nodes based on the ids from the overlay cache
|
||||
GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*NodeDossier, error)
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
|
||||
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
|
||||
KnownUnreliableOrOffline(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
|
||||
// Paginate will page through the database nodes
|
||||
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
|
||||
|
||||
@ -150,24 +151,12 @@ func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossie
|
||||
return cache.db.Get(ctx, nodeID)
|
||||
}
|
||||
|
||||
// IsNew checks if a node is 'new' based on the collected statistics.
|
||||
func (cache *Cache) IsNew(node *NodeDossier) bool {
|
||||
return node.Reputation.AuditCount < cache.preferences.AuditCount
|
||||
}
|
||||
|
||||
// IsOnline checks if a node is 'online' based on the collected statistics.
|
||||
func (cache *Cache) IsOnline(node *NodeDossier) bool {
|
||||
return time.Now().Sub(node.Reputation.LastContactSuccess) < cache.preferences.OnlineWindow &&
|
||||
node.Reputation.LastContactSuccess.After(node.Reputation.LastContactFailure)
|
||||
}
|
||||
|
||||
// IsHealthy checks if a node is 'valid' based on the collected statistics.
|
||||
func (cache *Cache) IsHealthy(node *NodeDossier) bool {
|
||||
r, p := node.Reputation, cache.preferences
|
||||
return r.AuditCount >= p.AuditCount && r.UptimeCount >= p.UptimeCount &&
|
||||
r.AuditSuccessRatio >= p.AuditSuccessRatio && r.UptimeRatio >= p.UptimeRatio
|
||||
}
|
||||
|
||||
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
|
||||
func (cache *Cache) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) ([]*pb.Node, error) {
|
||||
return cache.FindStorageNodesWithPreferences(ctx, req, &cache.preferences)
|
||||
@ -207,8 +196,6 @@ func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req Fin
|
||||
}
|
||||
}
|
||||
|
||||
auditCount := preferences.AuditCount
|
||||
|
||||
// add selected new nodes to the excluded list for reputable node selection
|
||||
for _, newNode := range newNodes {
|
||||
excluded = append(excluded, newNode.Id)
|
||||
@ -217,7 +204,7 @@ func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req Fin
|
||||
reputableNodes, err := cache.db.SelectStorageNodes(ctx, reputableNodeCount-len(newNodes), &NodeCriteria{
|
||||
FreeBandwidth: req.FreeBandwidth,
|
||||
FreeDisk: req.FreeDisk,
|
||||
AuditCount: auditCount,
|
||||
AuditCount: preferences.AuditCount,
|
||||
AuditSuccessRatio: preferences.AuditSuccessRatio,
|
||||
UptimeCount: preferences.UptimeCount,
|
||||
UptimeSuccessRatio: preferences.UptimeRatio,
|
||||
@ -239,15 +226,18 @@ func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req Fin
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// GetAll looks up the provided ids from the overlay cache
|
||||
func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) (_ []*NodeDossier, err error) {
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
|
||||
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
|
||||
func (cache *Cache) KnownUnreliableOrOffline(ctx context.Context, nodeIds storj.NodeIDList) (badNodes storj.NodeIDList, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
if len(ids) == 0 {
|
||||
return nil, OverlayError.New("no ids provided")
|
||||
criteria := &NodeCriteria{
|
||||
AuditCount: cache.preferences.AuditCount,
|
||||
AuditSuccessRatio: cache.preferences.AuditSuccessRatio,
|
||||
OnlineWindow: cache.preferences.OnlineWindow,
|
||||
UptimeCount: cache.preferences.UptimeCount,
|
||||
UptimeSuccessRatio: cache.preferences.UptimeRatio,
|
||||
}
|
||||
|
||||
return cache.db.GetAll(ctx, ids)
|
||||
return cache.db.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
|
||||
}
|
||||
|
||||
// Put adds a node id and proto definition into the overlay cache
|
||||
|
@ -78,29 +78,6 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
|
||||
// TODO: add erroring database test
|
||||
}
|
||||
|
||||
{ // GetAll
|
||||
nodes, err := cache.GetAll(ctx, storj.NodeIDList{valid2ID, valid1ID, valid2ID})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, nodes[0].Id, valid2ID)
|
||||
assert.Equal(t, nodes[1].Id, valid1ID)
|
||||
assert.Equal(t, nodes[2].Id, valid2ID)
|
||||
|
||||
nodes, err = cache.GetAll(ctx, storj.NodeIDList{valid1ID, missingID})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, nodes[0].Id, valid1ID)
|
||||
assert.Nil(t, nodes[1])
|
||||
|
||||
nodes, err = cache.GetAll(ctx, make(storj.NodeIDList, 2))
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, nodes[0])
|
||||
assert.Nil(t, nodes[1])
|
||||
|
||||
_, err = cache.GetAll(ctx, storj.NodeIDList{})
|
||||
assert.True(t, overlay.OverlayError.Has(err))
|
||||
|
||||
// TODO: add erroring database test
|
||||
}
|
||||
|
||||
{ // Paginate
|
||||
|
||||
// should return two nodes
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zeebo/errs"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
@ -16,6 +17,38 @@ import (
|
||||
"storj.io/storj/pkg/storj"
|
||||
)
|
||||
|
||||
func TestOffline(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
service := satellite.Overlay.Service
|
||||
// TODO: handle cleanup
|
||||
|
||||
result, err := service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
|
||||
planet.StorageNodes[0].ID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, result)
|
||||
|
||||
result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
|
||||
planet.StorageNodes[0].ID(),
|
||||
planet.StorageNodes[1].ID(),
|
||||
planet.StorageNodes[2].ID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, result)
|
||||
|
||||
result, err = service.KnownUnreliableOrOffline(ctx, []storj.NodeID{
|
||||
planet.StorageNodes[0].ID(),
|
||||
storj.NodeID{1, 2, 3, 4}, //note that this succeeds by design
|
||||
planet.StorageNodes[2].ID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNodeSelection(t *testing.T) {
|
||||
t.Skip("flaky")
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
|
@ -6,6 +6,7 @@ package overlay_test
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -78,6 +79,62 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
{ // TestKnownUnreliableOrOffline
|
||||
for _, tt := range []struct {
|
||||
nodeID storj.NodeID
|
||||
auditSuccessCount int64
|
||||
auditCount int64
|
||||
auditSuccessRatio float64
|
||||
uptimeSuccessCount int64
|
||||
uptimeCount int64
|
||||
uptimeRatio float64
|
||||
}{
|
||||
{storj.NodeID{1}, 20, 20, 1.0, 20, 20, 1.0}, // good ratios => good
|
||||
{storj.NodeID{2}, 5, 20, 0.25, 20, 20, 1}, // bad audit success, good uptime => bad
|
||||
{storj.NodeID{3}, 20, 20, 1.0, 5, 20, 0.25}, // good audit success, bad uptime => bad
|
||||
{storj.NodeID{4}, 0, 0, 0.0, 20, 20, 1.0}, // "bad" audit success, no audits => now considered bad
|
||||
{storj.NodeID{5}, 20, 20, 1.0, 0, 0, 0.25}, // "bad" uptime success, no checks => now considered bad
|
||||
{storj.NodeID{6}, 0, 1, 0.0, 5, 5, .01}, // bad audit success exactly one audit => bad
|
||||
{storj.NodeID{7}, 0, 20, 0.0, 20, 20, 1.0}, // impossible math, but good ratios => good
|
||||
} {
|
||||
nodeStats := &overlay.NodeStats{
|
||||
AuditSuccessRatio: tt.auditSuccessRatio,
|
||||
UptimeRatio: tt.uptimeRatio,
|
||||
AuditCount: tt.auditCount,
|
||||
AuditSuccessCount: tt.auditSuccessCount,
|
||||
UptimeCount: tt.uptimeCount,
|
||||
UptimeSuccessCount: tt.uptimeSuccessCount,
|
||||
}
|
||||
|
||||
err := cache.UpdateAddress(ctx, &pb.Node{Id: tt.nodeID})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = cache.CreateStats(ctx, tt.nodeID, nodeStats)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
nodeIds := storj.NodeIDList{
|
||||
storj.NodeID{1}, storj.NodeID{2},
|
||||
storj.NodeID{3}, storj.NodeID{4},
|
||||
storj.NodeID{5}, storj.NodeID{6},
|
||||
}
|
||||
criteria := &overlay.NodeCriteria{
|
||||
AuditSuccessRatio: 0.5,
|
||||
UptimeSuccessRatio: 0.5,
|
||||
OnlineWindow: time.Hour,
|
||||
}
|
||||
|
||||
invalid, err := cache.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Contains(t, invalid, storj.NodeID{2})
|
||||
assert.Contains(t, invalid, storj.NodeID{3})
|
||||
assert.Contains(t, invalid, storj.NodeID{4})
|
||||
assert.Contains(t, invalid, storj.NodeID{5})
|
||||
assert.Contains(t, invalid, storj.NodeID{6})
|
||||
assert.Len(t, invalid, 5)
|
||||
}
|
||||
|
||||
{ // TestUpdateOperator
|
||||
nodeID := storj.NodeID{10}
|
||||
err := cache.UpdateAddress(ctx, &pb.Node{Id: nodeID})
|
||||
|
@ -137,19 +137,11 @@ func (endpoint *Endpoint) SegmentHealth(ctx context.Context, in *pb.SegmentHealt
|
||||
nodeIDs = append(nodeIDs, piece.NodeId)
|
||||
}
|
||||
|
||||
nodes, err := endpoint.cache.GetAll(ctx, nodeIDs)
|
||||
badNodes, err := endpoint.cache.KnownUnreliableOrOffline(ctx, nodeIDs)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
onlineNodeCount := int32(0)
|
||||
for _, n := range nodes {
|
||||
if endpoint.cache.IsOnline(n) && endpoint.cache.IsHealthy(n) {
|
||||
onlineNodeCount++
|
||||
}
|
||||
}
|
||||
|
||||
health.OnlineNodes = onlineNodeCount
|
||||
health.OnlineNodes = int32(len(nodeIDs) - len(badNodes))
|
||||
|
||||
if in.GetSegmentIndex() > -1 {
|
||||
health.Segment = []byte("s" + strconv.FormatInt(in.GetSegmentIndex(), 10))
|
||||
|
@ -703,13 +703,6 @@ func (m *lockedOverlayCache) Get(ctx context.Context, nodeID storj.NodeID) (*ove
|
||||
return m.db.Get(ctx, nodeID)
|
||||
}
|
||||
|
||||
// GetAll looks up nodes based on the ids from the overlay cache
|
||||
func (m *lockedOverlayCache) GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*overlay.NodeDossier, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.GetAll(ctx, nodeIDs)
|
||||
}
|
||||
|
||||
// Paginate will page through the database nodes
|
||||
func (m *lockedOverlayCache) Paginate(ctx context.Context, offset int64, limit int) ([]*overlay.NodeDossier, bool, error) {
|
||||
m.Lock()
|
||||
@ -731,6 +724,14 @@ func (m *lockedOverlayCache) SelectStorageNodes(ctx context.Context, count int,
|
||||
return m.db.SelectStorageNodes(ctx, count, criteria)
|
||||
}
|
||||
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unhealth or offlines node, independent of new
|
||||
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
|
||||
func (m *lockedOverlayCache) KnownUnreliableOrOffline(ctx context.Context, a1 *overlay.NodeCriteria, a2 storj.NodeIDList) (storj.NodeIDList, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return m.db.KnownUnreliableOrOffline(ctx, a1, a2)
|
||||
}
|
||||
|
||||
// Update updates node address
|
||||
func (m *lockedOverlayCache) UpdateAddress(ctx context.Context, value *pb.Node) error {
|
||||
m.Lock()
|
||||
|
@ -68,8 +68,7 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int,
|
||||
|
||||
safeQuery := `
|
||||
WHERE type = ? AND free_bandwidth >= ? AND free_disk >= ?
|
||||
AND total_audit_count < ?
|
||||
AND (audit_success_ratio >= ? OR total_audit_count = 0)
|
||||
AND total_audit_count < ? AND audit_success_ratio >= ?
|
||||
AND last_contact_success > ?
|
||||
AND last_contact_success > last_contact_failure`
|
||||
args := append(make([]interface{}, 0, 10),
|
||||
@ -155,18 +154,38 @@ func (cache *overlaycache) Get(ctx context.Context, id storj.NodeID) (*overlay.N
|
||||
return convertDBNode(node)
|
||||
}
|
||||
|
||||
// GetAll looks up nodes based on the ids from the overlay cache
|
||||
func (cache *overlaycache) GetAll(ctx context.Context, ids storj.NodeIDList) ([]*overlay.NodeDossier, error) {
|
||||
infos := make([]*overlay.NodeDossier, len(ids))
|
||||
for i, id := range ids {
|
||||
// TODO: abort on canceled context
|
||||
info, err := cache.Get(ctx, id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
infos[i] = info
|
||||
// KnownUnreliableOrOffline filters a set of nodes to unreliable or offlines node, independent of new
|
||||
// Note that KnownUnreliableOrOffline will not return node ids which are not in the database at all
|
||||
func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteria *overlay.NodeCriteria, nodeIds storj.NodeIDList) (goodNodes storj.NodeIDList, err error) {
|
||||
if len(nodeIds) == 0 {
|
||||
return nil, Error.New("no ids provided")
|
||||
}
|
||||
return infos, nil
|
||||
args := make([]interface{}, len(nodeIds))
|
||||
for i, id := range nodeIds {
|
||||
args[i] = id.Bytes()
|
||||
}
|
||||
args = append(args, criteria.AuditSuccessRatio, criteria.UptimeSuccessRatio, time.Now().Add(-criteria.OnlineWindow))
|
||||
|
||||
rows, err := cache.db.Query(cache.db.Rebind(`
|
||||
SELECT id FROM nodes
|
||||
WHERE id IN (?`+strings.Repeat(", ?", len(nodeIds)-1)+`)
|
||||
AND ( audit_success_ratio < ? OR uptime_ratio < ?
|
||||
OR last_contact_success <= ? OR last_contact_success <= last_contact_failure )`), args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, rows.Close())
|
||||
}()
|
||||
for rows.Next() {
|
||||
var id storj.NodeID
|
||||
err = rows.Scan(&id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
goodNodes = append(goodNodes, id)
|
||||
}
|
||||
return goodNodes, nil
|
||||
}
|
||||
|
||||
// Paginate will run through
|
||||
@ -238,10 +257,10 @@ func (cache *overlaycache) UpdateAddress(ctx context.Context, info *pb.Node) (er
|
||||
dbx.Node_Latency90(0),
|
||||
dbx.Node_AuditSuccessCount(0),
|
||||
dbx.Node_TotalAuditCount(0),
|
||||
dbx.Node_AuditSuccessRatio(0),
|
||||
dbx.Node_AuditSuccessRatio(1),
|
||||
dbx.Node_UptimeSuccessCount(0),
|
||||
dbx.Node_TotalUptimeCount(0),
|
||||
dbx.Node_UptimeRatio(0),
|
||||
dbx.Node_UptimeRatio(1),
|
||||
dbx.Node_LastContactSuccess(time.Now()),
|
||||
dbx.Node_LastContactFailure(time.Time{}),
|
||||
)
|
||||
|
@ -159,7 +159,7 @@ func (endpoint *Endpoint) Upload(stream pb.Piecestore_UploadServer) (err error)
|
||||
if err != nil {
|
||||
endpoint.log.Info("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Node ID", limit.StorageNodeId), zap.Error(err))
|
||||
} else {
|
||||
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId))
|
||||
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Action", limit.Action))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -327,7 +327,7 @@ func (endpoint *Endpoint) Download(stream pb.Piecestore_DownloadServer) (err err
|
||||
if err != nil {
|
||||
endpoint.log.Info("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Error(err))
|
||||
} else {
|
||||
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId))
|
||||
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Action", limit.Action))
|
||||
}
|
||||
}()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user