satellite/overlay: add disqualification reason
Add disqualification reason to NodeDossier. Extend DB.DisqualifyNode with disqualification reason. Extend reputation Service.TestDisqualifyNode with disqualification reason. Change-Id: I8611b6340c7f42ac1bb8bd0fd7f0648ad650ab2d
This commit is contained in:
parent
4223fa01f8
commit
3f47d19aa6
@ -18,6 +18,7 @@ import (
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/orders"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
func TestRollupNoDeletes(t *testing.T) {
|
||||
@ -366,7 +367,7 @@ func dqNodes(ctx *testcontext.Context, planet *testplanet.Planet) (map[storj.Nod
|
||||
continue
|
||||
}
|
||||
|
||||
err := planet.Satellites[0].Overlay.DB.DisqualifyNode(ctx, n.ID())
|
||||
err := planet.Satellites[0].Overlay.DB.DisqualifyNode(ctx, n.ID(), time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ func TestDisqualifiedNodesGetNoDownload(t *testing.T) {
|
||||
segment := segments[0]
|
||||
disqualifiedNode := segment.Pieces[0].StorageNode
|
||||
|
||||
err = satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode)
|
||||
err = satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode, overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
limits, _, err := satellitePeer.Orders.Service.CreateGetOrderLimits(ctx, bucket, segment, 0)
|
||||
@ -170,7 +170,7 @@ func TestDisqualifiedNodesGetNoUpload(t *testing.T) {
|
||||
disqualifiedNode := planet.StorageNodes[0]
|
||||
satellitePeer.Audit.Worker.Loop.Pause()
|
||||
|
||||
err := satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode.ID())
|
||||
err := satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode.ID(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
request := overlay.FindStorageNodesRequest{
|
||||
@ -213,7 +213,7 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
|
||||
satellitePeer.Audit.Worker.Loop.Pause()
|
||||
|
||||
disqualifiedNode := planet.StorageNodes[0]
|
||||
err := satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode.ID())
|
||||
err := satellitePeer.Reputation.Service.TestDisqualifyNode(ctx, disqualifiedNode.ID(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
info := overlay.NodeCheckInInfo{
|
||||
|
@ -714,7 +714,7 @@ func (endpoint *Endpoint) getFinishedMessage(ctx context.Context, nodeID storj.N
|
||||
message = &pb.SatelliteMessage{Message: &pb.SatelliteMessage_ExitFailed{
|
||||
ExitFailed: signed,
|
||||
}}
|
||||
err = endpoint.overlay.DisqualifyNode(ctx, nodeID)
|
||||
err = endpoint.overlay.DisqualifyNode(ctx, nodeID, overlay.DisqualificationReasonUnknown)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
@ -409,7 +409,7 @@ func TestExitDisqualifiedNodeFailOnStart(t *testing.T) {
|
||||
satellite := planet.Satellites[0]
|
||||
exitingNode := planet.StorageNodes[0]
|
||||
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID())
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn, err := exitingNode.Dialer.DialNodeURL(ctx, satellite.NodeURL())
|
||||
@ -452,7 +452,7 @@ func TestExitDisqualifiedNodeFailEventually(t *testing.T) {
|
||||
}
|
||||
|
||||
if !isDisqualified {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID())
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, exitingNode.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -265,7 +265,7 @@ func BenchmarkNodeSelection(b *testing.B) {
|
||||
err := overlaydb.TestSuspendNodeUnknownAudit(ctx, nodeID, now)
|
||||
require.NoError(b, err)
|
||||
case 1:
|
||||
err := overlaydb.DisqualifyNode(ctx, nodeID)
|
||||
err := overlaydb.DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(b, err)
|
||||
case 2:
|
||||
err := overlaydb.UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
|
||||
|
@ -8,13 +8,18 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/common/pb"
|
||||
"storj.io/common/storj"
|
||||
"storj.io/common/testcontext"
|
||||
"storj.io/common/testrand"
|
||||
"storj.io/storj/private/testplanet"
|
||||
"storj.io/storj/private/teststorj"
|
||||
"storj.io/storj/satellite"
|
||||
"storj.io/storj/satellite/overlay"
|
||||
"storj.io/storj/satellite/satellitedb/satellitedbtest"
|
||||
"storj.io/storj/storagenode"
|
||||
)
|
||||
|
||||
@ -164,3 +169,72 @@ func TestOperatorConfig(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDBDisqualifyNode(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
overlayDB := db.OverlayCache()
|
||||
now := time.Now().Truncate(time.Second).UTC()
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
NodeID storj.NodeID
|
||||
DisqualifiedAt time.Time
|
||||
Reason overlay.DisqualificationReason
|
||||
}{
|
||||
{
|
||||
Name: "Unknown",
|
||||
NodeID: testrand.NodeID(),
|
||||
DisqualifiedAt: now,
|
||||
Reason: overlay.DisqualificationReasonUnknown,
|
||||
},
|
||||
{
|
||||
Name: "Audit Failure",
|
||||
NodeID: testrand.NodeID(),
|
||||
DisqualifiedAt: now,
|
||||
Reason: overlay.DisqualificationReasonAuditFailure,
|
||||
},
|
||||
{
|
||||
Name: "Suspension",
|
||||
NodeID: testrand.NodeID(),
|
||||
DisqualifiedAt: now,
|
||||
Reason: overlay.DisqualificationReasonSuspension,
|
||||
},
|
||||
{
|
||||
Name: "Node Offline",
|
||||
NodeID: testrand.NodeID(),
|
||||
DisqualifiedAt: now,
|
||||
Reason: overlay.DisqualificationReasonNodeOffline,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testcase := range cases {
|
||||
t.Run(testcase.Name, func(t *testing.T) {
|
||||
checkIn := overlay.NodeCheckInInfo{
|
||||
NodeID: testcase.NodeID,
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: 1,
|
||||
Address: "127.0.0.1:0",
|
||||
},
|
||||
IsUp: true,
|
||||
Version: &pb.NodeVersion{
|
||||
Version: "v0.0.0",
|
||||
CommitHash: "",
|
||||
Timestamp: now,
|
||||
},
|
||||
}
|
||||
|
||||
err := overlayDB.UpdateCheckIn(ctx, checkIn, now, overlay.NodeSelectionConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = overlayDB.DisqualifyNode(ctx, testcase.NodeID, testcase.DisqualifiedAt, testcase.Reason)
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := overlayDB.Get(ctx, testcase.NodeID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, info.Disqualified)
|
||||
assert.Equal(t, testcase.DisqualifiedAt, info.Disqualified.UTC())
|
||||
assert.Equal(t, testcase.Reason, *info.DisqualificationReason)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ type DB interface {
|
||||
GetNodesNetwork(ctx context.Context, nodeIDs []storj.NodeID) (nodeNets []string, err error)
|
||||
|
||||
// DisqualifyNode disqualifies a storage node.
|
||||
DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error)
|
||||
DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason DisqualificationReason) (err error)
|
||||
|
||||
// DQNodesLastSeenBefore disqualifies a limited number of nodes where last_contact_success < cutoff except those already disqualified
|
||||
// or gracefully exited or where last_contact_success = '0001-01-01 00:00:00+00'.
|
||||
@ -216,22 +216,23 @@ type ExitStatusRequest struct {
|
||||
// NodeDossier is the complete info that the satellite tracks for a storage node.
|
||||
type NodeDossier struct {
|
||||
pb.Node
|
||||
Type pb.NodeType
|
||||
Operator pb.NodeOperator
|
||||
Capacity pb.NodeCapacity
|
||||
Reputation NodeStats
|
||||
Version pb.NodeVersion
|
||||
Contained bool
|
||||
Disqualified *time.Time
|
||||
UnknownAuditSuspended *time.Time
|
||||
OfflineSuspended *time.Time
|
||||
OfflineUnderReview *time.Time
|
||||
PieceCount int64
|
||||
ExitStatus ExitStatus
|
||||
CreatedAt time.Time
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
CountryCode location.CountryCode
|
||||
Type pb.NodeType
|
||||
Operator pb.NodeOperator
|
||||
Capacity pb.NodeCapacity
|
||||
Reputation NodeStats
|
||||
Version pb.NodeVersion
|
||||
Contained bool
|
||||
Disqualified *time.Time
|
||||
DisqualificationReason *DisqualificationReason
|
||||
UnknownAuditSuspended *time.Time
|
||||
OfflineSuspended *time.Time
|
||||
OfflineUnderReview *time.Time
|
||||
PieceCount int64
|
||||
ExitStatus ExitStatus
|
||||
CreatedAt time.Time
|
||||
LastNet string
|
||||
LastIPPort string
|
||||
CountryCode location.CountryCode
|
||||
}
|
||||
|
||||
// NodeStats contains statistics about a node.
|
||||
@ -641,9 +642,9 @@ func (service *Service) GetReliablePiecesInExcludedCountries(ctx context.Context
|
||||
}
|
||||
|
||||
// DisqualifyNode disqualifies a storage node.
|
||||
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
func (service *Service) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason DisqualificationReason) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return service.db.DisqualifyNode(ctx, nodeID)
|
||||
return service.db.DisqualifyNode(ctx, nodeID, time.Now().UTC(), reason)
|
||||
}
|
||||
|
||||
// ResolveIPAndNetwork resolves the target address and determines its IP and /24 subnet IPv4 or /64 subnet IPv6.
|
||||
|
@ -87,7 +87,7 @@ func testCache(ctx context.Context, t *testing.T, store overlay.DB) {
|
||||
err = store.UpdateCheckIn(ctx, d, time.Now().UTC(), nodeSelectionConfig)
|
||||
require.NoError(t, err)
|
||||
// disqualify one node
|
||||
err = service.DisqualifyNode(ctx, valid3ID)
|
||||
err = service.DisqualifyNode(ctx, valid3ID, overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -440,7 +440,7 @@ func TestKnownReliable(t *testing.T) {
|
||||
oc := satellite.DB.OverlayCache()
|
||||
|
||||
// Disqualify storage node #0
|
||||
err := oc.DisqualifyNode(ctx, planet.StorageNodes[0].ID())
|
||||
err := oc.DisqualifyNode(ctx, planet.StorageNodes[0].ID(), time.Now().UTC(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop storage node #1
|
||||
|
@ -76,7 +76,7 @@ func testDatabase(ctx context.Context, t *testing.T, cache overlay.DB) {
|
||||
}
|
||||
|
||||
if tt.disqualified {
|
||||
err = cache.DisqualifyNode(ctx, tt.nodeID)
|
||||
err = cache.DisqualifyNode(ctx, tt.nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
if tt.offline {
|
||||
|
@ -138,7 +138,7 @@ func testDataRepair(t *testing.T, inMemoryRepair bool) {
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if nodesToDisqualify[node.ID()] {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
continue
|
||||
}
|
||||
@ -305,7 +305,7 @@ func TestDataRepairPendingObject(t *testing.T) {
|
||||
|
||||
for _, node := range planet.StorageNodes {
|
||||
if nodesToDisqualify[node.ID()] {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID())
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, node.ID(), time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
continue
|
||||
}
|
||||
@ -1311,7 +1311,7 @@ func TestRepairExpiredSegment(t *testing.T) {
|
||||
}
|
||||
|
||||
for nodeID := range nodesToDQ {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
@ -1396,7 +1396,7 @@ func TestRemoveDeletedSegmentFromQueue(t *testing.T) {
|
||||
}
|
||||
|
||||
for nodeID := range nodesToDQ {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID)
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, nodeID, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
@ -1659,7 +1659,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
||||
remotePieces := segment.Pieces
|
||||
|
||||
for i := 0; i < toDQ; i++ {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode)
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -1671,7 +1671,7 @@ func TestIrreparableSegmentAccordingToOverlay(t *testing.T) {
|
||||
// Disqualify nodes so that online nodes < minimum threshold
|
||||
// This will make the segment irreparable
|
||||
for _, piece := range remotePieces {
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.StorageNode)
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, piece.StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -1852,7 +1852,7 @@ func TestRepairMultipleDisqualifiedAndSuspended(t *testing.T) {
|
||||
// disqualify and suspend nodes
|
||||
for i := 0; i < toDisqualify; i++ {
|
||||
nodesToDisqualify[remotePieces[i].StorageNode] = true
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode)
|
||||
err := satellite.DB.OverlayCache().DisqualifyNode(ctx, remotePieces[i].StorageNode, time.Now(), overlay.DisqualificationReasonUnknown)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
for i := toDisqualify; i < toDisqualify+toSuspend; i++ {
|
||||
|
@ -137,13 +137,15 @@ func (service *Service) TestSuspendNodeUnknownAudit(ctx context.Context, nodeID
|
||||
}
|
||||
|
||||
// TestDisqualifyNode disqualifies a storage node.
|
||||
func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
err = service.db.DisqualifyNode(ctx, nodeID, time.Now())
|
||||
func (service *Service) TestDisqualifyNode(ctx context.Context, nodeID storj.NodeID, reason overlay.DisqualificationReason) (err error) {
|
||||
disqualifiedAt := time.Now()
|
||||
|
||||
err = service.db.DisqualifyNode(ctx, nodeID, disqualifiedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return service.overlay.DisqualifyNode(ctx, nodeID)
|
||||
return service.overlay.DisqualifyNode(ctx, nodeID, disqualifiedAt, reason)
|
||||
}
|
||||
|
||||
// TestUnsuspendNodeUnknownAudit unsuspends a storage node for unknown audits.
|
||||
|
@ -642,10 +642,14 @@ func (cache *overlaycache) UpdateReputation(ctx context.Context, id storj.NodeID
|
||||
|
||||
updateFields := dbx.Node_Update_Fields{}
|
||||
updateFields.UnknownAuditSuspended = dbx.Node_UnknownAuditSuspended_Raw(request.UnknownAuditSuspended)
|
||||
updateFields.Disqualified = dbx.Node_Disqualified_Raw(request.Disqualified)
|
||||
updateFields.OfflineSuspended = dbx.Node_OfflineSuspended_Raw(request.OfflineSuspended)
|
||||
updateFields.VettedAt = dbx.Node_VettedAt_Raw(request.VettedAt)
|
||||
|
||||
updateFields.Disqualified = dbx.Node_Disqualified_Raw(request.Disqualified)
|
||||
if request.Disqualified != nil {
|
||||
updateFields.DisqualificationReason = dbx.Node_DisqualificationReason(int(request.DisqualificationReason))
|
||||
}
|
||||
|
||||
err = cache.db.UpdateNoReturn_Node_By_Id_And_Disqualified_Is_Null_And_ExitFinishedAt_Is_Null(ctx, dbx.Node_Id(id.Bytes()), updateFields)
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
@ -696,10 +700,11 @@ func (cache *overlaycache) UpdateNodeInfo(ctx context.Context, nodeID storj.Node
|
||||
}
|
||||
|
||||
// DisqualifyNode disqualifies a storage node.
|
||||
func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.NodeID) (err error) {
|
||||
func (cache *overlaycache) DisqualifyNode(ctx context.Context, nodeID storj.NodeID, disqualifiedAt time.Time, reason overlay.DisqualificationReason) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
updateFields := dbx.Node_Update_Fields{}
|
||||
updateFields.Disqualified = dbx.Node_Disqualified(time.Now().UTC())
|
||||
updateFields.Disqualified = dbx.Node_Disqualified(disqualifiedAt.UTC())
|
||||
updateFields.DisqualificationReason = dbx.Node_DisqualificationReason(int(reason))
|
||||
|
||||
dbNode, err := cache.db.Update_Node_By_Id(ctx, dbx.Node_Id(nodeID.Bytes()), updateFields)
|
||||
if err != nil {
|
||||
@ -1062,14 +1067,15 @@ func convertDBNode(ctx context.Context, info *dbx.Node) (_ *overlay.NodeDossier,
|
||||
Timestamp: info.Timestamp,
|
||||
Release: info.Release,
|
||||
},
|
||||
Disqualified: info.Disqualified,
|
||||
UnknownAuditSuspended: info.UnknownAuditSuspended,
|
||||
OfflineSuspended: info.OfflineSuspended,
|
||||
OfflineUnderReview: info.UnderReview,
|
||||
PieceCount: info.PieceCount,
|
||||
ExitStatus: exitStatus,
|
||||
CreatedAt: info.CreatedAt,
|
||||
LastNet: info.LastNet,
|
||||
Disqualified: info.Disqualified,
|
||||
DisqualificationReason: (*overlay.DisqualificationReason)(info.DisqualificationReason),
|
||||
UnknownAuditSuspended: info.UnknownAuditSuspended,
|
||||
OfflineSuspended: info.OfflineSuspended,
|
||||
OfflineUnderReview: info.UnderReview,
|
||||
PieceCount: info.PieceCount,
|
||||
ExitStatus: exitStatus,
|
||||
CreatedAt: info.CreatedAt,
|
||||
LastNet: info.LastNet,
|
||||
}
|
||||
if info.LastIpPort != nil {
|
||||
node.LastIPPort = *info.LastIpPort
|
||||
|
Loading…
Reference in New Issue
Block a user