satellite/{overlay, satellitedb}: account for suspended field in overlay cache

Make sure that suspended nodes are treated appropriately by the overlay
cache. This means we should expect the following behavior:
* suspended nodes (vetted or not) should not be selected for uploading
new segments
* suspended nodes should be treated by the checker and repairer as
"unhealthy", and should be removed upon successful repair

This commit also removes unused overlay functionality.

Fixes a bug with commit 8b72181a1f where
the audit reporter was automatically suspending nodes regardless of
audit outcome (see test added).

Tests:
* updates repair tests to ensure that a suspended node is treated as
unhealthy and will be removed from the pointer on successful repair
* updates overlay tests for KnownUnreliableOrOffline and KnownReliable
to expect suspended nodes to be considered "unreliable"
* adds satellitedb test that ensures overlay.SelectStorageNodes and
overlay.SelectNewStorageNodes do not include suspended nodes
* adds audit reporter test to ensure that different audit outcomes
result in the correct suspended/disqualified states

Change-Id: I40dba67278c8e8d2ce0bcec5e0a5cb6e4ce2f561
This commit is contained in:
Moby von Briesen 2020-03-11 17:11:46 -04:00 committed by Maximillian von Briesen
parent 80acf33abc
commit 2f991b6c56
8 changed files with 240 additions and 317 deletions

View File

@ -13,6 +13,7 @@ import (
"storj.io/common/pkcrypto"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/audit"
)
@ -76,3 +77,68 @@ func TestRecordAuditsAtLeastOnce(t *testing.T) {
require.EqualValues(t, 1, node.Reputation.AuditCount)
})
}
// TestRecordAuditsCorrectOutcome ensures that audit successes, failures, and unknown audits result in the correct disqualification/suspension state.
func TestRecordAuditsCorrectOutcome(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 0,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
goodNode := planet.StorageNodes[0].ID()
dqNode := planet.StorageNodes[1].ID()
suspendedNode := planet.StorageNodes[2].ID()
pendingNode := planet.StorageNodes[3].ID()
offlineNode := planet.StorageNodes[4].ID()
report := audit.Report{
Successes: []storj.NodeID{goodNode},
Fails: []storj.NodeID{dqNode},
Unknown: []storj.NodeID{suspendedNode},
PendingAudits: []*audit.PendingAudit{
{
NodeID: pendingNode,
PieceID: testrand.PieceID(),
StripeIndex: 0,
ShareSize: 10,
ExpectedShareHash: []byte{},
ReverifyCount: 0,
Path: "",
},
},
Offlines: []storj.NodeID{offlineNode},
}
failed, err := audits.Reporter.RecordAudits(ctx, report, "")
require.NoError(t, err)
require.Zero(t, failed)
overlay := satellite.Overlay.Service
node, err := overlay.Get(ctx, goodNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.Nil(t, node.Suspended)
node, err = overlay.Get(ctx, dqNode)
require.NoError(t, err)
require.NotNil(t, node.Disqualified)
require.Nil(t, node.Suspended)
node, err = overlay.Get(ctx, suspendedNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.NotNil(t, node.Suspended)
node, err = overlay.Get(ctx, pendingNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.Nil(t, node.Suspended)
node, err = overlay.Get(ctx, offlineNode)
require.NoError(t, err)
require.Nil(t, node.Disqualified)
require.Nil(t, node.Suspended)
})
}

View File

@ -53,10 +53,6 @@ type DB interface {
KnownReliable(ctx context.Context, onlineWindow time.Duration, nodeIDs storj.NodeIDList) ([]*pb.Node, error)
// Reliable returns all nodes that are reliable
Reliable(context.Context, *NodeCriteria) (storj.NodeIDList, error)
// Paginate will page through the database nodes
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
// PaginateQualified will page through the qualified nodes
PaginateQualified(ctx context.Context, offset int64, limit int) ([]*pb.Node, bool, error)
// Update updates node address
UpdateAddress(ctx context.Context, value *NodeDossier, defaults NodeSelectionConfig) error
// BatchUpdateStats updates multiple storagenode's stats in one transaction
@ -250,18 +246,6 @@ func (service *Service) Inspect(ctx context.Context) (_ storage.Keys, err error)
return nil, errors.New("not implemented")
}
// Paginate returns a list of `limit` nodes starting from `start` offset.
func (service *Service) Paginate(ctx context.Context, offset int64, limit int) (_ []*NodeDossier, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.Paginate(ctx, offset, limit)
}
// PaginateQualified returns a list of `limit` qualified nodes starting from `start` offset.
func (service *Service) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
return service.db.PaginateQualified(ctx, offset, limit)
}
// Get looks up the provided nodeID from the overlay.
func (service *Service) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)

View File

@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/pb"
@ -23,7 +22,6 @@ import (
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb/satellitedbtest"
"storj.io/storj/storagenode"
)
func TestCache_Database(t *testing.T) {
@ -100,30 +98,6 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
// TODO: add erroring database test
}
{ // Paginate
// should return two nodes
nodes, more, err := service.Paginate(ctx, 0, 2)
assert.NotNil(t, more)
assert.NoError(t, err)
assert.Equal(t, len(nodes), 2)
// should return no nodes
zero, more, err := service.Paginate(ctx, 0, 0)
assert.NoError(t, err)
assert.NotNil(t, more)
assert.NotEqual(t, len(zero), 0)
}
{ // PaginateQualified
// should return two nodes
nodes, more, err := service.PaginateQualified(ctx, 0, 3)
assert.NotNil(t, more)
assert.NoError(t, err)
assert.Equal(t, len(nodes), 2)
}
{ // Reputation
valid1, err := service.Get(ctx, valid1ID)
require.NoError(t, err)
@ -321,54 +295,46 @@ func TestGetNodes(t *testing.T) {
}
func TestKnownReliable(t *testing.T) {
onlineWindow := 500 * time.Millisecond
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.OnlineWindow = onlineWindow
},
StorageNode: func(index int, config *storagenode.Config) {
config.Contact.Interval = onlineWindow / 2
},
},
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
service := satellite.Overlay.Service
// Disqualify storage node #0
stats, err := service.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: planet.StorageNodes[0].ID(),
AuditOutcome: overlay.AuditFailure,
})
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, planet.StorageNodes[0].ID())
require.NoError(t, err)
require.NotNil(t, stats.Disqualified)
// Stop storage node #1
err = planet.StopPeer(planet.StorageNodes[1])
offlineNode := planet.StorageNodes[1]
err = planet.StopPeer(offlineNode)
require.NoError(t, err)
_, err = service.UpdateUptime(ctx, planet.StorageNodes[1].ID(), false)
// set last contact success to 1 hour ago to make node appear offline
checkInInfo := getNodeInfo(offlineNode.ID())
err = service.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-time.Hour))
require.NoError(t, err)
// Sleep for the duration of the online window and check that storage node #1 is offline
time.Sleep(onlineWindow)
node, err := service.Get(ctx, planet.StorageNodes[1].ID())
// Check that storage node #1 is offline
node, err := service.Get(ctx, offlineNode.ID())
require.NoError(t, err)
require.False(t, service.IsOnline(node))
// Check that only storage nodes #2 and #3 are reliable
// Suspend storage node #2
err = satellite.DB.OverlayCache().SuspendNode(ctx, planet.StorageNodes[2].ID(), time.Now())
require.NoError(t, err)
// Check that only storage nodes #3 and #4 are reliable
result, err := service.KnownReliable(ctx, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[1].ID(),
planet.StorageNodes[2].ID(),
planet.StorageNodes[3].ID(),
planet.StorageNodes[4].ID(),
})
require.NoError(t, err)
require.Len(t, result, 2)
// Sort the storage nodes for predictable checks
expectedReliable := []pb.Node{planet.StorageNodes[2].Local().Node, planet.StorageNodes[3].Local().Node}
expectedReliable := []pb.Node{planet.StorageNodes[3].Local().Node, planet.StorageNodes[4].Local().Node}
sort.Slice(expectedReliable, func(i, j int) bool { return expectedReliable[i].Id.Less(expectedReliable[j].Id) })
sort.Slice(result, func(i, j int) bool { return result[i].Id.Less(result[j].Id) })
@ -618,6 +584,80 @@ func TestGetSuccesfulNodesNotCheckedInSince(t *testing.T) {
})
}
// TestSuspendedSelection ensures that suspended nodes are not selected by SelectStorageNodes or SelectNewStorageNodes
func TestSuspendedSelection(t *testing.T) {
totalNodes := 10
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
cache := db.OverlayCache()
suspendedIDs := make(map[storj.NodeID]bool)
defaults := overlay.NodeSelectionConfig{
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
}
// put nodes in cache
for i := 0; i < totalNodes; i++ {
newID := testrand.NodeID()
n := pb.Node{Id: newID}
d := overlay.NodeDossier{Node: n, LastIPPort: "", LastNet: ""}
err := cache.UpdateAddress(ctx, &d, defaults)
require.NoError(t, err)
_, err = cache.UpdateNodeInfo(ctx, newID, &pb.InfoResponse{
Type: pb.NodeType_STORAGE,
Capacity: &pb.NodeCapacity{},
})
require.NoError(t, err)
if i%2 == 0 { // make half of nodes "new" and half "vetted"
_, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: newID,
IsUp: true,
AuditOutcome: overlay.AuditSuccess,
AuditLambda: 1,
AuditWeight: 1,
AuditDQ: 0.5,
})
require.NoError(t, err)
}
// suspend the first four nodes (2 new, 2 vetted)
if i < 4 {
err = cache.SuspendNode(ctx, newID, time.Now())
require.NoError(t, err)
suspendedIDs[newID] = true
}
}
var nodes []*overlay.NodeDossier
var err error
numNodesToSelect := 10
// select 10 vetted nodes - 5 vetted, 2 suspended, so expect 3
nodes, err = cache.SelectStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
require.Len(t, nodes, 3)
for _, node := range nodes {
require.False(t, suspendedIDs[node.Id])
}
// select 10 new nodes - 5 new, 2 suspended, so expect 3
nodes, err = cache.SelectNewStorageNodes(ctx, numNodesToSelect, &overlay.NodeCriteria{
OnlineWindow: time.Hour,
AuditCount: 1,
})
require.NoError(t, err)
require.Len(t, nodes, 3)
for _, node := range nodes {
require.False(t, suspendedIDs[node.Id])
}
})
}
func getNodeInfo(nodeID storj.NodeID) overlay.NodeCheckInInfo {
return overlay.NodeCheckInInfo{
NodeID: nodeID,

View File

@ -31,16 +31,19 @@ func TestStatDB(t *testing.T) {
func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
{ // TestKnownUnreliableOrOffline
for _, tt := range []struct {
nodeID storj.NodeID
auditAlpha float64
auditBeta float64
nodeID storj.NodeID
suspended bool
disqualified bool
offline bool
}{
{storj.NodeID{1}, 20, 0}, // good reputations => good
{storj.NodeID{2}, 0, 20}, // bad audit rep
{storj.NodeID{1}, false, false, false}, // good
{storj.NodeID{2}, false, true, false}, // disqualified
{storj.NodeID{3}, true, false, false}, // suspended
{storj.NodeID{4}, false, false, true}, // offline
} {
startingRep := overlay.NodeSelectionConfig{
AuditReputationAlpha0: tt.auditAlpha,
AuditReputationBeta0: tt.auditBeta,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
}
n := pb.Node{Id: tt.nodeID}
d := overlay.NodeDossier{Node: n, LastIPPort: "", LastNet: ""}
@ -48,29 +51,36 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
err := cache.UpdateAddress(ctx, &d, startingRep)
require.NoError(t, err)
// update stats so node disqualification is triggered
_, err = cache.UpdateStats(ctx, &overlay.UpdateRequest{
NodeID: tt.nodeID,
AuditOutcome: overlay.AuditSuccess,
IsUp: true,
AuditLambda: 1, AuditWeight: 1,
AuditDQ: 0.9,
})
require.NoError(t, err)
if tt.suspended {
err = cache.SuspendNode(ctx, tt.nodeID, time.Now())
require.NoError(t, err)
}
if tt.disqualified {
err = cache.DisqualifyNode(ctx, tt.nodeID)
require.NoError(t, err)
}
if tt.offline {
checkInInfo := getNodeInfo(tt.nodeID)
err = cache.UpdateCheckIn(ctx, checkInInfo, time.Now().Add(-2*time.Hour), overlay.NodeSelectionConfig{})
require.NoError(t, err)
}
}
nodeIds := storj.NodeIDList{
storj.NodeID{1}, storj.NodeID{2},
storj.NodeID{3},
storj.NodeID{3}, storj.NodeID{4},
storj.NodeID{5},
}
criteria := &overlay.NodeCriteria{OnlineWindow: time.Hour}
invalid, err := cache.KnownUnreliableOrOffline(ctx, criteria, nodeIds)
require.NoError(t, err)
require.Contains(t, invalid, storj.NodeID{2}) // bad audit
require.Contains(t, invalid, storj.NodeID{3}) // not in db
require.Len(t, invalid, 2)
require.Contains(t, invalid, storj.NodeID{2}) // disqualified
require.Contains(t, invalid, storj.NodeID{3}) // suspended
require.Contains(t, invalid, storj.NodeID{4}) // offline
require.Contains(t, invalid, storj.NodeID{5}) // not in db
require.Len(t, invalid, 4)
}
{ // TestUpdateOperator

View File

@ -672,14 +672,15 @@ func TestIrreparableSegmentNodesOffline(t *testing.T) {
})
}
// TestRepairMultipleDisqualified does the following:
// TestRepairMultipleDisqualifiedAndSuspended does the following:
// - Uploads test data to 7 nodes
// - Disqualifies 3 nodes
// - Disqualifies 2 nodes and suspends 1 node
// - Triggers data repair, which repairs the data from the remaining 4 nodes to additional 3 new nodes
// - Shuts down the 4 nodes from which the data was repaired
// - Now we have just the 3 new nodes to which the data was repaired
// - Downloads the data from these 3 nodes (succeeds because 3 nodes are enough for download)
func TestRepairMultipleDisqualified(t *testing.T) {
// - Expect newly repaired pointer does not contain the disqualified or suspended nodes
func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 12,
@ -718,32 +719,33 @@ func TestRepairMultipleDisqualified(t *testing.T) {
// calculate how many storagenodes to disqualify
numStorageNodes := len(planet.StorageNodes)
redundancy := pointer.GetRemote().GetRedundancy()
remotePieces := pointer.GetRemote().GetRemotePieces()
minReq := redundancy.GetMinReq()
numPieces := len(remotePieces)
toDisqualify := numPieces - (int(minReq + 1))
// sanity check
require.EqualValues(t, numPieces, 7)
toDisqualify := 2
toSuspend := 1
// we should have enough storage nodes to repair on
require.True(t, (numStorageNodes-toDisqualify) >= numPieces)
require.True(t, (numStorageNodes-toDisqualify-toSuspend) >= numPieces)
// disqualify nodes and track lost pieces
nodesToDisqualify := make(map[storj.NodeID]bool)
nodesToSuspend := make(map[storj.NodeID]bool)
nodesToKeepAlive := make(map[storj.NodeID]bool)
for i, piece := range remotePieces {
if i >= toDisqualify {
nodesToKeepAlive[piece.NodeId] = true
continue
}
nodesToDisqualify[piece.NodeId] = true
// disqualify and suspend nodes
for i := 0; i < toDisqualify; i++ {
nodesToDisqualify[remotePieces[i].NodeId] = true
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].NodeId)
require.NoError(t, err)
}
for _, node := range planet.StorageNodes {
if nodesToDisqualify[node.ID()] {
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
require.NoError(t, err)
}
for i := toDisqualify; i < toDisqualify+toSuspend; i++ {
nodesToSuspend[remotePieces[i].NodeId] = true
err := satellite.DB.OverlayCache().SuspendNode(ctx, remotePieces[i].NodeId, time.Now())
require.NoError(t, err)
}
for i := toDisqualify + toSuspend; i < len(remotePieces); i++ {
nodesToKeepAlive[remotePieces[i].NodeId] = true
}
err = satellite.Repair.Checker.RefreshReliabilityCache(ctx)
@ -765,13 +767,14 @@ func TestRepairMultipleDisqualified(t *testing.T) {
require.NoError(t, err)
require.Equal(t, newData, testData)
// updated pointer should not contain any of the disqualified nodes
// updated pointer should not contain any of the disqualified or suspended nodes
pointer, err = metainfo.Get(ctx, path)
require.NoError(t, err)
remotePieces = pointer.GetRemote().GetRemotePieces()
for _, piece := range remotePieces {
require.False(t, nodesToDisqualify[piece.NodeId])
require.False(t, nodesToSuspend[piece.NodeId])
}
})
}

View File

@ -211,13 +211,6 @@ read limitoffset (
orderby asc node.id
)
read limitoffset (
select node.id node.last_net node.last_ip_port node.address node.protocol
where node.id >= ?
where node.disqualified = null
orderby asc node.id
)
read all (
select node.id node.piece_count
where node.piece_count != 0

View File

@ -8447,14 +8447,6 @@ type Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row struct {
LastContactFailure time.Time
}
type Id_LastNet_LastIpPort_Address_Protocol_Row struct {
Id []byte
LastNet string
LastIpPort *string
Address string
Protocol int
}
type Id_PieceCount_Row struct {
Id []byte
PieceCount int64
@ -10222,43 +10214,6 @@ func (obj *postgresImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx co
}
func (obj *postgresImpl) Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_LastIpPort_Address_Protocol_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.last_ip_port, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values, node_id_greater_or_equal.value())
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_LastNet_LastIpPort_Address_Protocol_Row{}
err = __rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *postgresImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
@ -10941,7 +10896,7 @@ func (obj *postgresImpl) Paged_PendingSerialQueue(ctx context.Context,
rows []*PendingSerialQueue, next *Paged_PendingSerialQueue_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue WHERE (pending_serial_queue.storage_node_id > ? OR (pending_serial_queue.storage_node_id = ? AND (pending_serial_queue.bucket_id > ? OR (pending_serial_queue.bucket_id = ? AND pending_serial_queue.serial_number > ?)))) ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
var __embed_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue WHERE (pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number) > (?, ?, ?) ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
@ -10949,7 +10904,7 @@ func (obj *postgresImpl) Paged_PendingSerialQueue(ctx context.Context,
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storage_node_id, start._value_storage_node_id, start._value_bucket_id, start._value_bucket_id, start._value_serial_number, limit)
__values = append(__values, start._value_storage_node_id, start._value_bucket_id, start._value_serial_number, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
@ -16412,43 +16367,6 @@ func (obj *cockroachImpl) Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx c
}
func (obj *cockroachImpl) Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_LastIpPort_Address_Protocol_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.last_net, nodes.last_ip_port, nodes.address, nodes.protocol FROM nodes WHERE nodes.id >= ? AND nodes.disqualified is NULL ORDER BY nodes.id LIMIT ? OFFSET ?")
var __values []interface{}
__values = append(__values, node_id_greater_or_equal.value())
__values = append(__values, limit, offset)
var __stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
obj.logStmt(__stmt, __values...)
__rows, err := obj.driver.QueryContext(ctx, __stmt, __values...)
if err != nil {
return nil, obj.makeErr(err)
}
defer __rows.Close()
for __rows.Next() {
row := &Id_LastNet_LastIpPort_Address_Protocol_Row{}
err = __rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol)
if err != nil {
return nil, obj.makeErr(err)
}
rows = append(rows, row)
}
if err := __rows.Err(); err != nil {
return nil, obj.makeErr(err)
}
return rows, nil
}
func (obj *cockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
@ -17131,7 +17049,7 @@ func (obj *cockroachImpl) Paged_PendingSerialQueue(ctx context.Context,
rows []*PendingSerialQueue, next *Paged_PendingSerialQueue_Continuation, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue WHERE (pending_serial_queue.storage_node_id > ? OR (pending_serial_queue.storage_node_id = ? AND (pending_serial_queue.bucket_id > ? OR (pending_serial_queue.bucket_id = ? AND pending_serial_queue.serial_number > ?)))) ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
var __embed_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue WHERE (pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number) > (?, ?, ?) ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
var __embed_first_stmt = __sqlbundle_Literal("SELECT pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number, pending_serial_queue.action, pending_serial_queue.settled, pending_serial_queue.expires_at, pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number FROM pending_serial_queue ORDER BY pending_serial_queue.storage_node_id, pending_serial_queue.bucket_id, pending_serial_queue.serial_number LIMIT ?")
@ -17139,7 +17057,7 @@ func (obj *cockroachImpl) Paged_PendingSerialQueue(ctx context.Context,
var __stmt string
if start != nil && start._set {
__values = append(__values, start._value_storage_node_id, start._value_storage_node_id, start._value_bucket_id, start._value_bucket_id, start._value_serial_number, limit)
__values = append(__values, start._value_storage_node_id, start._value_bucket_id, start._value_serial_number, limit)
__stmt = __sqlbundle_Render(obj.dialect, __embed_stmt)
} else {
__values = append(__values, limit)
@ -22419,17 +22337,6 @@ func (rx *Rx) Limited_Node_Id_Node_Address_Node_LastIpPort_Node_LastContactSucce
return tx.Limited_Node_Id_Node_Address_Node_LastIpPort_Node_LastContactSuccess_Node_LastContactFailure_By_LastContactSuccess_Less_LastContactFailure_And_Disqualified_Is_Null_OrderBy_Asc_LastContactFailure(ctx, limit, offset)
}
func (rx *Rx) Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_LastIpPort_Address_Protocol_Row, err error) {
var tx *Tx
if tx, err = rx.getTx(ctx); err != nil {
return
}
return tx.Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, node_id_greater_or_equal, limit, offset)
}
func (rx *Rx) Limited_ProjectMember_By_ProjectId(ctx context.Context,
project_member_project_id ProjectMember_ProjectId_Field,
limit int, offset int64) (
@ -23426,11 +23333,6 @@ type Methods interface {
limit int, offset int64) (
rows []*Id_Address_LastIpPort_LastContactSuccess_LastContactFailure_Row, err error)
Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx context.Context,
node_id_greater_or_equal Node_Id_Field,
limit int, offset int64) (
rows []*Id_LastNet_LastIpPort_Address_Protocol_Row, err error)
Limited_ProjectMember_By_ProjectId(ctx context.Context,
project_member_project_id ProjectMember_ProjectId_Field,
limit int, offset int64) (

View File

@ -27,11 +27,6 @@ var (
mon = monkit.Package()
)
const (
// OverlayPaginateLimit defines how many nodes can be paginated at the same time.
OverlayPaginateLimit = 1000
)
var _ overlay.DB = (*overlaycache)(nil)
type overlaycache struct {
@ -45,6 +40,7 @@ func (cache *overlaycache) SelectStorageNodes(ctx context.Context, count int, cr
safeQuery := `
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_initiated_at IS NULL
AND type = ?
AND free_disk >= ?
@ -99,6 +95,7 @@ func (cache *overlaycache) SelectNewStorageNodes(ctx context.Context, count int,
safeQuery := `
WHERE disqualified IS NULL
AND suspended IS NULL
AND exit_initiated_at IS NULL
AND type = ?
AND free_disk >= ?
@ -190,8 +187,8 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
var rows *sql.Rows
rows, err = cache.db.Query(ctx, cache.db.Rebind(`SELECT id, type, address, last_net, last_ip_port,
free_disk, total_audit_count, audit_success_count,
total_uptime_count, uptime_success_count, disqualified, audit_reputation_alpha,
audit_reputation_beta
total_uptime_count, uptime_success_count, disqualified, suspended,
audit_reputation_alpha, audit_reputation_beta
FROM nodes
`+safeQuery+safeExcludeNodes+`
ORDER BY RANDOM()
@ -207,8 +204,8 @@ func (cache *overlaycache) queryNodes(ctx context.Context, excludedNodes []storj
dbNode := &dbx.Node{}
err = rows.Scan(&dbNode.Id, &dbNode.Type, &dbNode.Address, &dbNode.LastNet, &dbNode.LastIpPort,
&dbNode.FreeDisk, &dbNode.TotalAuditCount, &dbNode.AuditSuccessCount,
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, &dbNode.Disqualified, &dbNode.AuditReputationAlpha,
&dbNode.AuditReputationBeta,
&dbNode.TotalUptimeCount, &dbNode.UptimeSuccessCount, &dbNode.Disqualified, &dbNode.Suspended,
&dbNode.AuditReputationAlpha, &dbNode.AuditReputationBeta,
)
if err != nil {
return nil, err
@ -400,6 +397,7 @@ func (cache *overlaycache) KnownUnreliableOrOffline(ctx context.Context, criteri
SELECT id FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND suspended IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIds), time.Now().Add(-criteria.OnlineWindow),
)
@ -439,6 +437,7 @@ func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.
FROM nodes
WHERE id = any($1::bytea[])
AND disqualified IS NULL
AND suspended IS NULL
AND last_contact_success > $2
`), postgresNodeIDList(nodeIDs), time.Now().Add(-onlineWindow),
)
@ -448,16 +447,16 @@ func (cache *overlaycache) KnownReliable(ctx context.Context, onlineWindow time.
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
row := &dbx.Id_LastNet_LastIpPort_Address_Protocol_Row{}
row := &dbx.Node{}
err = rows.Scan(&row.Id, &row.LastNet, &row.LastIpPort, &row.Address, &row.Protocol)
if err != nil {
return nil, err
}
node, err := convertDBNodeToPBNode(ctx, row)
node, err := convertDBNode(ctx, row)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
nodes = append(nodes, &node.Node)
}
return nodes, Error.Wrap(rows.Err())
}
@ -468,6 +467,7 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
rows, err := cache.db.Query(ctx, cache.db.Rebind(`
SELECT id FROM nodes
WHERE disqualified IS NULL
AND suspended IS NULL
AND last_contact_success > ?
`), time.Now().Add(-criteria.OnlineWindow))
if err != nil {
@ -488,69 +488,6 @@ func (cache *overlaycache) Reliable(ctx context.Context, criteria *overlay.NodeC
return nodes, Error.Wrap(rows.Err())
}
// Paginate will run through
func (cache *overlaycache) Paginate(ctx context.Context, offset int64, limit int) (_ []*overlay.NodeDossier, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
cursor := storj.NodeID{}
// more represents end of table. If there are more rows in the database, more will be true.
more := true
if limit <= 0 || limit > OverlayPaginateLimit {
limit = OverlayPaginateLimit
}
dbxInfos, err := cache.db.Limited_Node_By_Id_GreaterOrEqual_OrderBy_Asc_Id(ctx, dbx.Node_Id(cursor.Bytes()), limit, offset)
if err != nil {
return nil, false, err
}
if len(dbxInfos) < limit {
more = false
}
infos := make([]*overlay.NodeDossier, len(dbxInfos))
for i, dbxInfo := range dbxInfos {
infos[i], err = convertDBNode(ctx, dbxInfo)
if err != nil {
return nil, false, err
}
}
return infos, more, nil
}
// PaginateQualified will retrieve all qualified nodes
func (cache *overlaycache) PaginateQualified(ctx context.Context, offset int64, limit int) (_ []*pb.Node, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
cursor := storj.NodeID{}
// more represents end of table. If there are more rows in the database, more will be true.
more := true
if limit <= 0 || limit > OverlayPaginateLimit {
limit = OverlayPaginateLimit
}
dbxInfos, err := cache.db.Limited_Node_Id_Node_LastNet_Node_LastIpPort_Node_Address_Node_Protocol_By_Id_GreaterOrEqual_And_Disqualified_Is_Null_OrderBy_Asc_Id(ctx, dbx.Node_Id(cursor.Bytes()), limit, offset)
if err != nil {
return nil, false, err
}
if len(dbxInfos) < limit {
more = false
}
infos := make([]*pb.Node, len(dbxInfos))
for i, dbxInfo := range dbxInfos {
infos[i], err = convertDBNodeToPBNode(ctx, dbxInfo)
if err != nil {
return nil, false, err
}
}
return infos, more, nil
}
// Update updates node address
func (cache *overlaycache) UpdateAddress(ctx context.Context, info *overlay.NodeDossier, defaults overlay.NodeSelectionConfig) (err error) {
defer mon.Task()(&ctx)(&err)
@ -1218,25 +1155,6 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
return node, nil
}
func convertDBNodeToPBNode(ctx context.Context, info *dbx.Id_LastNet_LastIpPort_Address_Protocol_Row) (_ *pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
if info == nil {
return nil, Error.New("missing info")
}
id, err := storj.NodeIDFromBytes(info.Id)
if err != nil {
return nil, err
}
return &pb.Node{
Id: id,
Address: &pb.NodeAddress{
Address: info.Address,
Transport: pb.NodeTransport(info.Protocol),
},
}, nil
}
func getNodeStats(dbNode *dbx.Node) *overlay.NodeStats {
nodeStats := &overlay.NodeStats{
Latency90: dbNode.Latency90,
@ -1313,7 +1231,11 @@ func buildUpdateStatement(update updateNodeStats) string {
sql += ","
}
atLeastOne = true
sql += fmt.Sprintf("suspended = '%v'", update.Suspended.value.Format(time.RFC3339Nano))
if update.Suspended.isNil {
sql += fmt.Sprintf("suspended = NULL")
} else {
sql += fmt.Sprintf("suspended = '%v'", update.Suspended.value.Format(time.RFC3339Nano))
}
}
if update.UptimeSuccessCount.set {
if atLeastOne {
@ -1412,10 +1334,12 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
unknownAuditBeta := dbNode.UnknownAuditReputationBeta
totalAuditCount := dbNode.TotalAuditCount
var updatedTotalAuditCount int64
switch updateReq.AuditOutcome {
case overlay.AuditSuccess:
// for a successful audit, increase reputation for normal *and* unknown audits
auditAlpha, auditBeta, totalAuditCount = updateReputation(
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
true,
auditAlpha,
auditBeta,
@ -1423,17 +1347,18 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
updateReq.AuditWeight,
totalAuditCount,
)
unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation(
// we will use updatedTotalAuditCount from the updateReputation call above
unknownAuditAlpha, unknownAuditBeta, _ = updateReputation(
true,
unknownAuditAlpha,
unknownAuditBeta,
updateReq.AuditLambda,
updateReq.AuditWeight,
totalAuditCount-1, // subtract one because this is still a single audit
totalAuditCount,
)
case overlay.AuditFailure:
// for audit failure, only update normal alpha/beta
auditAlpha, auditBeta, totalAuditCount = updateReputation(
auditAlpha, auditBeta, updatedTotalAuditCount = updateReputation(
false,
auditAlpha,
auditBeta,
@ -1443,7 +1368,7 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
)
case overlay.AuditUnknown:
// for audit unknown, only update unknown alpha/beta
unknownAuditAlpha, unknownAuditBeta, totalAuditCount = updateReputation(
unknownAuditAlpha, unknownAuditBeta, updatedTotalAuditCount = updateReputation(
false,
unknownAuditAlpha,
unknownAuditBeta,
@ -1465,7 +1390,7 @@ func populateUpdateNodeStats(dbNode *dbx.Node, updateReq *overlay.UpdateRequest)
updateFields := updateNodeStats{
NodeID: updateReq.NodeID,
TotalAuditCount: int64Field{set: true, value: totalAuditCount},
TotalAuditCount: int64Field{set: true, value: updatedTotalAuditCount},
TotalUptimeCount: int64Field{set: true, value: totalUptimeCount},
AuditReputationAlpha: float64Field{set: true, value: auditAlpha},
AuditReputationBeta: float64Field{set: true, value: auditBeta},