satellite/bloomfilter: don't create BF for disqualified nodes

Currently we have large set of nodes that are already disqualified and
we are not sending bloom filters to them. The issue is that we are still
generating filters for them while garbage collection process. Even if
we have only segment with one piece which was stored on this node. This
consumes additional memory and processing powers.

This change is changing logic behind `AllPieceCounts` (renamed to
ActiveNodesPieceCounts) to return piece count for all nodes except disqualified one (even with piece count = 0). With this change we can
modify GC observer to skip nodes that where not returned by
ActiveNodesPieceCounts.

Change-Id: Ic75159135abe535084d8aeee560bb801a4a03e17
This commit is contained in:
Michal Niewrzal 2023-11-21 10:51:27 +01:00 committed by Storj Robot
parent 07cb8dc677
commit 573ce712f2
10 changed files with 108 additions and 61 deletions

View File

@ -67,7 +67,7 @@ func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
obs.log.Debug("collecting bloom filters started")
// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx)
lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
@ -197,8 +197,14 @@ func (fork *observerFork) add(nodeID storj.NodeID, pieceID storj.PieceID) {
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := fork.config.InitialPieces
if pieceCounts := fork.pieceCounts[nodeID]; pieceCounts > 0 {
numPieces = pieceCounts
if pieceCounts, found := fork.pieceCounts[nodeID]; found {
if pieceCounts > 0 {
numPieces = pieceCounts
}
} else {
// node was not in pieceCounts which means it was disqalified
// and we won't generate bloom filter for it
return
}
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, fork.config.FalsePositiveRate, 2*memory.MiB)

View File

@ -63,7 +63,7 @@ func (obs *SyncObserver) Start(ctx context.Context, startTime time.Time) (err er
obs.log.Debug("collecting bloom filters started")
// load last piece counts from overlay db
lastPieceCounts, err := obs.overlay.AllPieceCounts(ctx)
lastPieceCounts, err := obs.overlay.ActiveNodesPieceCounts(ctx)
if err != nil {
obs.log.Error("error getting last piece counts", zap.Error(err))
err = nil
@ -147,8 +147,14 @@ func (obs *SyncObserver) add(nodeID storj.NodeID, pieceID storj.PieceID) {
if !ok {
// If we know how many pieces a node should be storing, use that number. Otherwise use default.
numPieces := obs.config.InitialPieces
if pieceCounts := obs.lastPieceCounts[nodeID]; pieceCounts > 0 {
numPieces = pieceCounts
if pieceCounts, found := obs.lastPieceCounts[nodeID]; found {
if pieceCounts > 0 {
numPieces = pieceCounts
}
} else {
// node was not in lastPieceCounts which means it was disqalified
// and we won't generate bloom filter for it
return
}
hashCount, tableSize := bloomfilter.OptimalParameters(numPieces, obs.config.FalsePositiveRate, 2*memory.MiB)

View File

@ -8,6 +8,7 @@ import (
"bytes"
"fmt"
"io"
"slices"
"sort"
"strconv"
"testing"
@ -18,6 +19,7 @@ import (
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
@ -26,6 +28,7 @@ import (
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
"storj.io/storj/satellite/overlay"
"storj.io/uplink"
)
@ -50,16 +53,21 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
defer ctx.Check(project.Close)
type testCase struct {
Bucket string
ZipBatchSize int
ExpectedPacks int
Bucket string
ZipBatchSize int
ExpectedPacks int
DisqualifiedNodes []storj.NodeID
}
testCases := []testCase{
{"bloomfilters-bucket-1", 1, 7},
{"bloomfilters-bucket-2", 2, 4},
{"bloomfilters-bucket-7", 7, 1},
{"bloomfilters-bucket-100", 100, 1},
{"bloomfilters-bucket-1", 1, 7, []storj.NodeID{}},
{"bloomfilters-bucket-2", 2, 4, []storj.NodeID{}},
{"bloomfilters-bucket-2-dq-nodes", 2, 3, []storj.NodeID{
planet.StorageNodes[0].ID(),
planet.StorageNodes[3].ID(),
}},
{"bloomfilters-bucket-7", 7, 1, []storj.NodeID{}},
{"bloomfilters-bucket-100", 100, 1, []storj.NodeID{}},
}
for _, tc := range testCases {
@ -78,6 +86,24 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
bloomfilter.NewSyncObserver(zaptest.NewLogger(t), config, planet.Satellites[0].Overlay.DB),
}
expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
_, err := planet.Satellites[0].DB.Testing().RawDB().ExecContext(ctx, "UPDATE nodes SET disqualified = null WHERE id = $1", node.ID())
require.NoError(t, err)
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
for _, nodeID := range tc.DisqualifiedNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.DisqualifyNode(ctx, nodeID, overlay.DisqualificationReasonAuditFailure))
if index := slices.Index(expectedNodeIds, nodeID.String()); index != -1 {
expectedNodeIds = slices.Delete(expectedNodeIds, index, index+1)
}
}
sort.Strings(expectedNodeIds)
for _, observer := range observers {
name := fmt.Sprintf("%s-%T", tc.Bucket, observer)
t.Run(name, func(t *testing.T) {
@ -134,6 +160,10 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
require.Equal(t, file.Name, pbRetainInfo.StorageNodeId.String())
nodeIds = append(nodeIds, pbRetainInfo.StorageNodeId.String())
nodeID, err := storj.NodeIDFromBytes(pbRetainInfo.StorageNodeId.Bytes())
require.NoError(t, err)
require.NotContains(t, tc.DisqualifiedNodes, nodeID)
}
count++
@ -149,11 +179,6 @@ func TestObserverGarbageCollectionBloomFilters(t *testing.T) {
sort.Strings(packNames)
require.Equal(t, expectedPackNames, packNames)
expectedNodeIds := []string{}
for _, node := range planet.StorageNodes {
expectedNodeIds = append(expectedNodeIds, node.ID().String())
}
sort.Strings(expectedNodeIds)
sort.Strings(nodeIds)
require.Equal(t, expectedNodeIds, nodeIds)
})

View File

@ -35,9 +35,9 @@ func TestObserverPieceTracker(t *testing.T) {
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// ensure that the piece counts are empty
pieceCounts, err := planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err := planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(pieceCounts))
require.Equal(t, 4, len(pieceCounts))
// Setup: create 50KiB of data for the uplink to upload
testdata := testrand.Bytes(50 * memory.KiB)
@ -51,7 +51,7 @@ func TestObserverPieceTracker(t *testing.T) {
require.NoError(t, err)
// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)
@ -71,7 +71,7 @@ func TestObserverPieceTracker(t *testing.T) {
require.NoError(t, err)
// Check that the piece counts are correct
pieceCounts, err = planet.Satellites[0].Overlay.DB.AllPieceCounts(ctx)
pieceCounts, err = planet.Satellites[0].Overlay.DB.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.True(t, len(pieceCounts) > 0)

View File

@ -24,56 +24,64 @@ func TestDB_PieceCounts(t *testing.T) {
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
overlaydb := db.OverlayCache()
type TestNode struct {
ID storj.NodeID
PieceCount int64
expectedNodePieces := make(map[storj.NodeID]int64, 100)
for i := 0; i < 100; i++ {
expectedNodePieces[testrand.NodeID()] = int64(math.Pow10(i + 1))
}
nodes := make([]TestNode, 100)
for i := range nodes {
nodes[i].ID = testrand.NodeID()
nodes[i].PieceCount = int64(math.Pow10(i + 1))
}
var nodeToDisqualify storj.NodeID
for i, node := range nodes {
i := 0
for nodeID := range expectedNodePieces {
addr := fmt.Sprintf("127.0.%d.0:8080", i)
lastNet := fmt.Sprintf("127.0.%d", i)
d := overlay.NodeCheckInInfo{
NodeID: node.ID,
NodeID: nodeID,
Address: &pb.NodeAddress{Address: addr},
LastIPPort: addr,
LastNet: lastNet,
Version: &pb.NodeVersion{Version: "v1.0.0"},
IsUp: true,
}
err := overlaydb.UpdateCheckIn(ctx, d, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
i++
nodeToDisqualify = nodeID
}
// check that they are initialized to zero
initialCounts, err := overlaydb.AllPieceCounts(ctx)
initialCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.Empty(t, initialCounts)
// TODO: make AllPieceCounts return results for all nodes,
// since it will keep the logic slightly clearer.
// update counts
counts := make(map[storj.NodeID]int64)
for _, node := range nodes {
counts[node.ID] = node.PieceCount
require.Equal(t, len(expectedNodePieces), len(initialCounts))
for nodeID := range expectedNodePieces {
pieceCount, found := initialCounts[nodeID]
require.True(t, found)
require.Zero(t, pieceCount)
}
err = overlaydb.UpdatePieceCounts(ctx, counts)
err = overlaydb.UpdatePieceCounts(ctx, expectedNodePieces)
require.NoError(t, err)
// fetch new counts
updatedCounts, err := overlaydb.AllPieceCounts(ctx)
updatedCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
// verify values
for _, node := range nodes {
count, ok := updatedCounts[node.ID]
for nodeID, pieceCount := range expectedNodePieces {
count, ok := updatedCounts[nodeID]
require.True(t, ok)
require.Equal(t, count, node.PieceCount)
require.Equal(t, pieceCount, count)
}
// disqualify one node so it won't be returned by ActiveNodesPieceCounts
_, err = overlaydb.DisqualifyNode(ctx, nodeToDisqualify, time.Now(), overlay.DisqualificationReasonAuditFailure)
require.NoError(t, err)
pieceCounts, err := overlaydb.ActiveNodesPieceCounts(ctx)
require.NoError(t, err)
require.NotContains(t, pieceCounts, nodeToDisqualify)
})
}
@ -121,7 +129,7 @@ func BenchmarkDB_PieceCounts(b *testing.B) {
b.Run("All", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := overlaydb.AllPieceCounts(ctx)
_, err := overlaydb.ActiveNodesPieceCounts(ctx)
if err != nil {
b.Fatal(err)
}

View File

@ -80,8 +80,9 @@ type DB interface {
// SetAllContainedNodes updates the contained field for all nodes, as necessary.
SetAllContainedNodes(ctx context.Context, containedNodes []storj.NodeID) (err error)
// AllPieceCounts returns a map of node IDs to piece counts from the db.
AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
// ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db.
// Returns only pieces for nodes that are not disqualified.
ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error)
// UpdatePieceCounts sets the piece count field for the given node IDs.
UpdatePieceCounts(ctx context.Context, pieceCounts map[storj.NodeID]int64) (err error)

View File

@ -814,7 +814,7 @@ func (m *mockdb) SetAllContainedNodes(ctx context.Context, containedNodes []stor
}
// AllPieceCounts satisfies nodeevents.DB interface.
func (m *mockdb) AllPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) {
func (m *mockdb) ActiveNodesPieceCounts(ctx context.Context) (pieceCounts map[storj.NodeID]int64, err error) {
panic("implement me")
}

View File

@ -167,7 +167,7 @@ read paged (
read all (
select node.id node.piece_count
where node.piece_count != 0
where node.disqualified = null
)
// node_api_version is a table for storing the supported API.

View File

@ -15096,11 +15096,11 @@ func (obj *pgxImpl) Paged_Node(ctx context.Context,
}
func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
func (obj *pgxImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL")
var __values []interface{}
@ -23469,11 +23469,11 @@ func (obj *pgxcockroachImpl) Paged_Node(ctx context.Context,
}
func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
func (obj *pgxcockroachImpl) All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error) {
defer mon.Task()(&ctx)(&err)
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.piece_count != 0")
var __embed_stmt = __sqlbundle_Literal("SELECT nodes.id, nodes.piece_count FROM nodes WHERE nodes.disqualified is NULL")
var __values []interface{}
@ -28896,7 +28896,7 @@ type Methods interface {
All_Node_Id(ctx context.Context) (
rows []*Id_Row, err error)
All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx context.Context) (
All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx context.Context) (
rows []*Id_PieceCount_Row, err error)
All_Project(ctx context.Context) (

View File

@ -757,14 +757,15 @@ func (cache *overlaycache) TestUnsuspendNodeUnknownAudit(ctx context.Context, no
return nil
}
// AllPieceCounts returns a map of node IDs to piece counts from the db.
// ActiveNodesPieceCounts returns a map of node IDs to piece counts from the db. Returns only pieces for
// nodes that are not disqualified.
// NB: a valid, partial piece map can be returned even if node ID parsing error(s) are returned.
func (cache *overlaycache) AllPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
func (cache *overlaycache) ActiveNodesPieceCounts(ctx context.Context) (_ map[storj.NodeID]int64, err error) {
defer mon.Task()(&ctx)(&err)
// NB: `All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number` selects node
// ID and piece count from the nodes table where piece count is not zero.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_PieceCount_Not_Number(ctx)
// NB: `All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null` selects node
// ID and piece count from the nodes which are not disqualified.
rows, err := cache.db.All_Node_Id_Node_PieceCount_By_Disqualified_Is_Null(ctx)
if err != nil {
return nil, Error.Wrap(err)
}