satellite/repair/checker: optimize processing, part 1

Optimization by reusing more slices.

Benchmark result:
name                                       old time/op    new time/op    delta
RemoteSegment/healthy_segment-8    33.2µs ± 1%    31.4µs ± 6%   -5.49%  (p=0.032 n=4+5)

name                                       old alloc/op   new alloc/op   delta
RemoteSegment/healthy_segment-8    15.9kB ± 0%    10.2kB ± 0%  -35.92%  (p=0.008 n=5+5)

name                                       old allocs/op  new allocs/op  delta
RemoteSegment/healthy_segment-8       280 ± 0%       250 ± 0%  -10.71%  (p=0.008 n=5+5)

Change-Id: I60462169285462dee6cd16d4f4ce1f30fb6cdfdf
This commit is contained in:
Michal Niewrzal 2023-10-06 11:14:35 +02:00 committed by Storj Robot
parent 3019471514
commit de4559d862
6 changed files with 52 additions and 23 deletions

View File

@ -20,6 +20,7 @@ import (
"storj.io/common/storj/location"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair"
"storj.io/storj/satellite/repair/queue"
@ -244,6 +245,10 @@ type observerFork struct {
lastStreamID uuid.UUID
totalStats aggregateStats
// reuse those slices to optimize memory usage
nodeIDs []storj.NodeID
nodes []nodeselection.SelectedNode
// define from which countries nodes should be marked as offline
excludedCountryCodes map[location.CountryCode]struct{}
doDeclumping bool
@ -301,7 +306,8 @@ func (fork *observerFork) loadRedundancy(redundancy storj.RedundancyScheme) (int
return int(redundancy.RequiredShares), repair, int(redundancy.OptimalShares), int(redundancy.TotalShares)
}
// Process repair implementation of partial's Process.
// Process is called repeatedly with batches of segments. It is not called
// concurrently on the same instance. Method is not concurrent-safe on it own.
func (fork *observerFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
for _, segment := range segments {
if err := fork.process(ctx, &segment); err != nil {
@ -350,17 +356,26 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("could not get estimate of total number of nodes: %w", err)
}
nodeIDs := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
nodeIDs[i] = piece.StorageNode
// reuse fork.nodeIDs and fork.nodes slices if large enough
if cap(fork.nodeIDs) < len(pieces) {
fork.nodeIDs = make([]storj.NodeID, len(pieces))
fork.nodes = make([]nodeselection.SelectedNode, len(pieces))
} else {
fork.nodeIDs = fork.nodeIDs[:len(pieces)]
fork.nodes = fork.nodes[:len(pieces)]
}
selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, nodeIDs)
for i, piece := range pieces {
fork.nodeIDs[i] = piece.StorageNode
}
selectedNodes, err := fork.nodesCache.GetNodes(ctx, segment.CreatedAt, fork.nodeIDs, fork.nodes)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting node information for pieces: %w", err)
}
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck, fork.doDeclumping, fork.placementRules(segment.Placement))
piecesCheck := repair.ClassifySegmentPieces(segment.Pieces, selectedNodes, fork.excludedCountryCodes, fork.doPlacementCheck,
fork.doDeclumping, fork.placementRules(segment.Placement), fork.nodeIDs)
numHealthy := len(piecesCheck.Healthy)
mon.IntVal("checker_segment_total_count").Observe(int64(len(pieces))) //mon:locked

View File

@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
@ -65,16 +67,22 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err
// the requested node IDs, and returns them in order. If a node is not in the reliability
// cache (that is, it is unknown or disqualified), an empty SelectedNode will be returned
// for the index corresponding to that node ID.
func (cache *ReliabilityCache) GetNodes(ctx context.Context, validUpTo time.Time, nodeIDs []storj.NodeID) ([]nodeselection.SelectedNode, error) {
// Slice selectedNodes will be filled with results nodes and returned. It's length must be
// equal to nodeIDs slice.
func (cache *ReliabilityCache) GetNodes(ctx context.Context, validUpTo time.Time, nodeIDs []storj.NodeID, selectedNodes []nodeselection.SelectedNode) ([]nodeselection.SelectedNode, error) {
state, err := cache.loadFast(ctx, validUpTo)
if err != nil {
return nil, err
}
nodes := make([]nodeselection.SelectedNode, len(nodeIDs))
for i, nodeID := range nodeIDs {
nodes[i] = state.nodeByID[nodeID]
if len(nodeIDs) != len(selectedNodes) {
return nil, errs.New("nodeIDs length must be equal to selectedNodes: want %d have %d", len(nodeIDs), len(selectedNodes))
}
return nodes, nil
for i, nodeID := range nodeIDs {
selectedNodes[i] = state.nodeByID[nodeID]
}
return selectedNodes, nil
}
func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) {

View File

@ -42,7 +42,7 @@ func TestReliabilityCache_Concurrent(t *testing.T) {
group.Go(func() error {
for i := 0; i < 10000; i++ {
nodeIDs := []storj.NodeID{testrand.NodeID()}
_, err := cache.GetNodes(ctx, time.Now(), nodeIDs)
_, err := cache.GetNodes(ctx, time.Now(), nodeIDs, make([]nodeselection.SelectedNode, len(nodeIDs)))
if err != nil {
return err
}

View File

@ -60,11 +60,9 @@ type PiecesCheckResult struct {
// ClassifySegmentPieces classifies the pieces of a segment into the categories
// represented by a PiecesCheckResult. Pieces may be put into multiple
// categories.
func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.SelectedNode, excludedCountryCodes map[location.CountryCode]struct{}, doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter) (result PiecesCheckResult) {
result.ExcludeNodeIDs = make([]storj.NodeID, len(pieces))
for i, p := range pieces {
result.ExcludeNodeIDs[i] = p.StorageNode
}
func ClassifySegmentPieces(pieces metabase.Pieces, nodes []nodeselection.SelectedNode, excludedCountryCodes map[location.CountryCode]struct{},
doPlacementCheck, doDeclumping bool, filter nodeselection.NodeFilter, excludeNodeIDs []storj.NodeID) (result PiecesCheckResult) {
result.ExcludeNodeIDs = excludeNodeIDs
// check excluded countries and remove online nodes from missing pieces
result.Missing = make(map[uint16]struct{})

View File

@ -43,7 +43,7 @@ func TestClassifySegmentPieces(t *testing.T) {
require.NoError(t, err)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, parsed.CreateFilters(0))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, parsed.CreateFilters(0), piecesToNodeIDs(pieces))
require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
@ -69,7 +69,7 @@ func TestClassifySegmentPieces(t *testing.T) {
require.NoError(t, err)
pieces := createPieces(selectedNodes, 1, 2, 3, 4, 7, 8)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))
require.Equal(t, 0, len(result.Missing))
require.Equal(t, 0, len(result.Clumped))
@ -92,7 +92,7 @@ func TestClassifySegmentPieces(t *testing.T) {
require.NoError(t, err)
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, false, c.CreateFilters(10), piecesToNodeIDs(pieces))
// offline nodes
require.Equal(t, 5, len(result.Missing))
@ -115,7 +115,7 @@ func TestClassifySegmentPieces(t *testing.T) {
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(0), piecesToNodeIDs(pieces))
// offline nodes
require.Equal(t, 2, len(result.Missing))
@ -142,7 +142,7 @@ func TestClassifySegmentPieces(t *testing.T) {
// first 5: online, 2 in each subnet --> healthy: one from (0,1) (2,3) (4), offline: (5,6) but 5 is in the same subnet as 6
pieces := createPieces(selectedNodes, 0, 1, 2, 3, 4, 5, 6)
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10))
result := ClassifySegmentPieces(pieces, getNodes(selectedNodes, pieces), map[location.CountryCode]struct{}{}, true, true, c.CreateFilters(10), piecesToNodeIDs(pieces))
// offline nodes
require.Equal(t, 2, len(result.Missing))
@ -178,3 +178,11 @@ func createPieces(selectedNodes []nodeselection.SelectedNode, indexes ...int) (r
}
return
}
func piecesToNodeIDs(pieces metabase.Pieces) []storj.NodeID {
ids := make([]storj.NodeID, len(pieces))
for i, piece := range pieces {
ids[i] = piece.StorageNode
}
return ids
}

View File

@ -222,7 +222,7 @@ func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment *queue
return false, overlayQueryError.New("GetNodes returned an invalid result")
}
pieces := segment.Pieces
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement))
piecesCheck := repair.ClassifySegmentPieces(pieces, selectedNodes, repairer.excludedCountryCodes, repairer.doPlacementCheck, repairer.doDeclumping, repairer.placementRules(segment.Placement), allNodeIDs)
// irreparable segment
if len(piecesCheck.Retrievable) < int(segment.Redundancy.RequiredShares) {