From 573ce712f23462fc49774469feb864b548a37edb Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Tue, 21 Nov 2023 10:51:27 +0100 Subject: [PATCH] satellite/bloomfilter: don't create BF for disqualified nodes Currently we have large set of nodes that are already disqualified and we are not sending bloom filters to them. The issue is that we are still generating filters for them while garbage collection process. Even if we have only segment with one piece which was stored on this node. This consumes additional memory and processing powers. This change is changing logic behind `AllPieceCounts` (renamed to ActiveNodesPieceCounts) to return piece count for all nodes except disqualified one (even with piece count = 0). With this change we can modify GC observer to skip nodes that where not returned by ActiveNodesPieceCounts. Change-Id: Ic75159135abe535084d8aeee560bb801a4a03e17 --- satellite/gc/bloomfilter/observer.go | 12 +++- satellite/gc/bloomfilter/observer_sync.go | 12 +++- satellite/gc/bloomfilter/observer_test.go | 49 +++++++++++++---- satellite/gc/piecetracker/observer_test.go | 8 +-- satellite/overlay/piececount_test.go | 58 +++++++++++--------- satellite/overlay/service.go | 5 +- satellite/overlay/uploadselection_test.go | 2 +- satellite/satellitedb/dbx/node.dbx | 2 +- satellite/satellitedb/dbx/satellitedb.dbx.go | 10 ++-- satellite/satellitedb/overlaycache.go | 11 ++-- 10 files changed, 108 insertions(+), 61 deletions(-) diff --git a/satellite/gc/bloomfilter/observer.go b/satellite/gc/bloomfilter/observer.go index ea5b412c6..8434fc991 100644 --- a/satellite/gc/bloomfilter/observer.go +++ b/satellite/gc/bloomfilter/observer.go @@ -67,7 +67,7 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error) obs.log.Debug("collecting bloom filters started") // load last piece counts from overlay db - lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx) + lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx) if err != nil { obs.log.Error("error getting last piece counts", zap.Error(err)) err = nil @@ -197,8 +197,14 @@ func (fork *observerFork) add(nodeID storj.NodeID, pieceID storj.PieceID) { if !ok { // If we know how many pieces a node should be storing, use that number. Otherwise use default. numPieces := fork.config.InitialPieces - if pieceCounts := fork.pieceCounts[nodeID]; pieceCounts > 0 { - numPieces = pieceCounts + if pieceCounts, found := fork.pieceCounts[nodeID]; found { + if pieceCounts > 0 { + numPieces = pieceCounts + } + } else { + // node was not in pieceCounts which means it was disqalified + // and we won't generate bloom filter for it + return } hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, fork.config.FalsePositiveRate, 2*memory.MiB) diff --git a/satellite/gc/bloomfilter/observer_sync.go b/satellite/gc/bloomfilter/observer_sync.go index 24af81a65..274738a62 100644 --- a/satellite/gc/bloomfilter/observer_sync.go +++ b/satellite/gc/bloomfilter/observer_sync.go @@ -63,7 +63,7 @@ func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err er obs.log.Debug("collecting bloom filters started") // load last piece counts from overlay db - lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx) + lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx) if err != nil { obs.log.Error("error getting last piece counts", zap.Error(err)) err = nil @@ -147,8 +147,14 @@ func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) { if !ok { // If we know how many pieces a node should be storing, use that number. Otherwise use default. numPieces := obs.config.InitialPieces - if pieceCounts := obs.lastPieceCounts[nodeID]; pieceCounts > 0 { - numPieces = pieceCounts + if pieceCounts, found := obs.lastPieceCounts[nodeID]; found { + if pieceCounts > 0 { + numPieces = pieceCounts + } + } else { + // node was not in lastPieceCounts which means it was disqalified + // and we won't generate bloom filter for it + return } hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB) diff --git a/satellite/gc/bloomfilter/observer_test.go b/satellite/gc/bloomfilter/observer_test.go index fb0558689..926932313 100644 --- a/satellite/gc/bloomfilter/observer_test.go +++ b/satellite/gc/bloomfilter/observer_test.go @@ -8,6 +8,7 @@ import ( "bytes" "fmt" "io" + "slices" "sort" "strconv" "testing" @@ -18,6 +19,7 @@ import ( "storj.io/common/memory" "storj.io/common/pb" + "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -26,6 +28,7 @@ import ( "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase/rangedloop" "storj.io/storj/satellite/metabase/rangedloop/rangedlooptest" + "storj.io/storj/satellite/overlay" "storj.io/uplink" ) @@ -50,16 +53,21 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { defer ctx.Check(project.Close) type testCase struct { - Bucket string - ZipBatchSize int - ExpectedPacks int + Bucket string + ZipBatchSize int + ExpectedPacks int + DisqualifiedNodes []storj.NodeID } testCases := []testCase{ - {"bloomfilters-bucket-1", 1, 7}, - {"bloomfilters-bucket-2", 2, 4}, - {"bloomfilters-bucket-7", 7, 1}, - {"bloomfilters-bucket-100", 100, 1}, + {"bloomfilters-bucket-1", 1, 7, []storj.NodeID{}}, + {"bloomfilters-bucket-2", 2, 4, []storj.NodeID{}}, + {"bloomfilters-bucket-2-dq-nodes", 2, 3, []storj.NodeID{ + planet.StorageNodes[0].ID(), + planet.StorageNodes[3].ID(), + }}, + {"bloomfilters-bucket-7", 7, 1, []storj.NodeID{}}, + {"bloomfilters-bucket-100", 100, 1, []storj.NodeID{}}, } for _, tc := range testCases { @@ -78,6 +86,24 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB), } + expectedNodeIds := []string{} + for _, node := range planet.StorageNodes { + _, err := planet.Satellites[0].DB.Testing().RawDB().ExecContext(ctx, "UPDATE nodes SET disqualified = null WHERE id = $1", node.ID()) + require.NoError(t, err) + + expectedNodeIds = append(expectedNodeIds, node.ID().String()) + } + + for _, nodeID := range tc.DisqualifiedNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.DisqualifyNode(ctx, nodeID, overlay.DisqualificationReasonAuditFailure)) + + if index := slices.Index(expectedNodeIds, nodeID.String()); index != -1 { + expectedNodeIds = slices.Delete(expectedNodeIds, index, index+1) + } + } + + sort.Strings(expectedNodeIds) + for _, observer := range observers { name := fmt.Sprintf("%s-%T", tc.Bucket, observer) t.Run(name, func(t *testing.T) { @@ -134,6 +160,10 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String()) nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String()) + + nodeID, err := storj.NodeIDFromBytes(pbRetainInfo.StorageNodeId.Bytes()) + require.NoError(t, err) + require.NotContains(t, tc.DisqualifiedNodes, nodeID) } count++ @@ -149,11 +179,6 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) { sort.Strings(packNames) require.Equal(t, expectedPackNames, packNames) - expectedNodeIds := []string{} - for _, node := range planet.StorageNodes { - expectedNodeIds = append(expectedNodeIds, node.ID().String()) - } - sort.Strings(expectedNodeIds) sort.Strings(nodeIds) require.Equal(t, expectedNodeIds, nodeIds) }) diff --git a/satellite/gc/piecetracker/observer_test.go b/satellite/gc/piecetracker/observer_test.go index 0979aeabf..ddad49c68 100644 --- a/satellite/gc/piecetracker/observer_test.go +++ b/satellite/gc/piecetracker/observer_test.go @@ -35,9 +35,9 @@ func TestObserverPieceTracker(t *testing.T) { }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { // ensure that the piece counts are empty - pieceCounts, err := planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + pieceCounts, err := planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx) require.NoError(t, err) - require.Equal(t, 0, len(pieceCounts)) + require.Equal(t, 4, len(pieceCounts)) // Setup: create 50KiB of data for the uplink to upload testdata := testrand.Bytes(50 * memory.KiB) @@ -51,7 +51,7 @@ func TestObserverPieceTracker(t *testing.T) { require.NoError(t, err) // Check that the piece counts are correct - pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx) require.NoError(t, err) require.True(t, len(pieceCounts) > 0) @@ -71,7 +71,7 @@ func TestObserverPieceTracker(t *testing.T) { require.NoError(t, err) // Check that the piece counts are correct - pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx) + pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx) require.NoError(t, err) require.True(t, len(pieceCounts) > 0) diff --git a/satellite/overlay/piececount_test.go b/satellite/overlay/piececount_test.go index 366301a31..6e9431636 100644 --- a/satellite/overlay/piececount_test.go +++ b/satellite/overlay/piececount_test.go @@ -24,56 +24,64 @@ func TestDB_PieceCounts(t *testing.T) { satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) { overlaydb := db.OverlayCache() - type TestNode struct { - ID storj.NodeID - PieceCount int64 + expectedNodePieces := make(map[storj.NodeID]int64, 100) + + for i := 0; i < 100; i++ { + expectedNodePieces[testrand.NodeID()] = int64(math.Pow10(i + 1)) } - nodes := make([]TestNode, 100) - for i := range nodes { - nodes[i].ID = testrand.NodeID() - nodes[i].PieceCount = int64(math.Pow10(i + 1)) - } + var nodeToDisqualify storj.NodeID - for i, node := range nodes { + i := 0 + for nodeID := range expectedNodePieces { addr := fmt.Sprintf("127.0.%d.0:8080", i) lastNet := fmt.Sprintf("127.0.%d", i) d := overlay.NodeCheckInInfo{ - NodeID: node.ID, + NodeID: nodeID, Address: &pb.NodeAddress{Address: addr}, LastIPPort: addr, LastNet: lastNet, Version: &pb.NodeVersion{Version: "v1.0.0"}, + IsUp: true, } err := overlaydb.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{}) require.NoError(t, err) + i++ + + nodeToDisqualify = nodeID } // check that they are initialized to zero - initialCounts, err := overlaydb.AllPieceCounts(ctx) + initialCounts, err := overlaydb.ActiveNodesPieceCounts(ctx) require.NoError(t, err) - require.Empty(t, initialCounts) - // TODO: make AllPieceCounts return results for all nodes, - // since it will keep the logic slightly clearer. - - // update counts - counts := make(map[storj.NodeID]int64) - for _, node := range nodes { - counts[node.ID] = node.PieceCount + require.Equal(t, len(expectedNodePieces), len(initialCounts)) + for nodeID := range expectedNodePieces { + pieceCount, found := initialCounts[nodeID] + require.True(t, found) + require.Zero(t, pieceCount) } - err = overlaydb.UpdatePieceCounts(ctx, counts) + + err = overlaydb.UpdatePieceCounts(ctx, expectedNodePieces) require.NoError(t, err) // fetch new counts - updatedCounts, err := overlaydb.AllPieceCounts(ctx) + updatedCounts, err := overlaydb.ActiveNodesPieceCounts(ctx) require.NoError(t, err) // verify values - for _, node := range nodes { - count, ok := updatedCounts[node.ID] + for nodeID, pieceCount := range expectedNodePieces { + count, ok := updatedCounts[nodeID] require.True(t, ok) - require.Equal(t, count, node.PieceCount) + require.Equal(t, pieceCount, count) } + + // disqualify one node so it won't be returned by ActiveNodesPieceCounts + _, err = overlaydb.DisqualifyNode(ctx, nodeToDisqualify, time.Now(), overlay.DisqualificationReasonAuditFailure) + require.NoError(t, err) + + pieceCounts, err := overlaydb.ActiveNodesPieceCounts(ctx) + require.NoError(t, err) + require.NotContains(t, pieceCounts, nodeToDisqualify) }) } @@ -121,7 +129,7 @@ func BenchmarkDB_PieceCounts(b *testing.B) { b.Run("All", func(b *testing.B) { for i := 0; i < b.N; i++ { - _, err := overlaydb.AllPieceCounts(ctx) + _, err := overlaydb.ActiveNodesPieceCounts(ctx) if err != nil { b.Fatal(err) } diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 67a6cbde6..44c962ebe 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -80,8 +80,9 @@ type DB interface { // SetAllContainedNodes updates the contained field for all nodes, as necessary. SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error) - // AllPieceCounts returns a map of node IDs to piece counts from the db. - AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) + // ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db. + // Returns only pieces for nodes that are not disqualified. + ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) // UpdatePieceCounts sets the piece count field for the given node IDs. UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int64) (err error) diff --git a/satellite/overlay/uploadselection_test.go b/satellite/overlay/uploadselection_test.go index c86d7e5eb..795ed3259 100644 --- a/satellite/overlay/uploadselection_test.go +++ b/satellite/overlay/uploadselection_test.go @@ -814,7 +814,7 @@ func (m *mockdb) SetAllContainedNodes(ctx context.Context, containedNodes []stor } // AllPieceCounts satisfies nodeevents.DB interface. -func (m *mockdb) AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) { +func (m *mockdb) ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) { panic("implement me") } diff --git a/satellite/satellitedb/dbx/node.dbx b/satellite/satellitedb/dbx/node.dbx index e9fa0f2d1..596311a47 100644 --- a/satellite/satellitedb/dbx/node.dbx +++ b/satellite/satellitedb/dbx/node.dbx @@ -167,7 +167,7 @@ read paged ( read all ( select node.id node.piece_count - where node.piece_count != 0 + where node.disqualified = null ) // node_api_version is a table for storing the supported API. diff --git a/satellite/satellitedb/dbx/satellitedb.dbx.go b/satellite/satellitedb/dbx/satellitedb.dbx.go index 8a2259b1a..be6efbd7a 100644 --- a/satellite/satellitedb/dbx/satellitedb.dbx.go +++ b/satellite/satellitedb/dbx/satellitedb.dbx.go @@ -15096,11 +15096,11 @@ func (obj *pgxImpl) Paged_Node(ctx context.Context, } -func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) ( +func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) ( rows []*Id_PieceCount_Row, err error) { defer mon.Task()(&ctx)(&err) - var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0") + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL") var __values []interface{} @@ -23469,11 +23469,11 @@ func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context, } -func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) ( +func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) ( rows []*Id_PieceCount_Row, err error) { defer mon.Task()(&ctx)(&err) - var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0") + var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL") var __values []interface{} @@ -28896,7 +28896,7 @@ type Methods interface { All_Node_Id(ctx context.Context) ( rows []*Id_Row, err error) - All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) ( + All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) ( rows []*Id_PieceCount_Row, err error) All_Project(ctx context.Context) ( diff --git a/satellite/satellitedb/overlaycache.go b/satellite/satellitedb/overlaycache.go index b5a3421cf..d0e97ef52 100644 --- a/satellite/satellitedb/overlaycache.go +++ b/satellite/satellitedb/overlaycache.go @@ -757,14 +757,15 @@ func (cache *overlaycache) TestUnsuspendNodeUnknownAudit(ctx context.Context, no return nil } -// AllPieceCounts returns a map of node IDs to piece counts from the db. +// ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db. Returns only pieces for +// nodes that are not disqualified. // NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned. -func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) { +func (cache *overlaycache) ActiveNodesPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) { defer mon.Task()(&ctx)(&err) - // NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node - // ID and piece count from the nodes table where piece count is not zero. - rows, err := cache.db.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx) + // NB: `All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null` selects node + // ID and piece count from the nodes which are not disqualified. + rows, err := cache.db.All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx) if err != nil { return nil, Error.Wrap(err) }