satellite/overlay: refactor ReliabilityCache to keep more data

ReliabilityCache will be now using refactored overlay Reliable method.
This method will provide more info about nodes (e.g. country code) and
with this we are able to add two dedicated methods to classify pieces:
* OutOfPlacementPieces
* PiecesNodesLastNetsInOrder

With those new method we will fix issue where offline but reliable node
won't be checked for clumped pieces and off placement pieces.

https://github.com/storj/storj/issues/5998

Change-Id: I9ffbed9f07f4881c9db3bd0e5f0412f1a418dd82
This commit is contained in:
Michal Niewrzal 2023-06-29 15:26:52 +02:00
parent f2cd7b0928
commit 21c1e66a85
7 changed files with 230 additions and 222 deletions

View File

@ -66,7 +66,7 @@ type DB interface {
KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error)
// KnownReliable filters a set of nodes to reliable (online and qualified) nodes.
KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error)
// Reliable returns all nodes that are reliable, online and offline.
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error)
// UpdateReputation updates the DB columns for all reputation fields in ReputationStatus.
UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) error
@ -401,43 +401,6 @@ func (service *Service) IsOnline(node *NodeDossier) bool {
return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow
}
// GetNodesNetworkInOrder returns the /24 subnet for each storage node, in order. If a
// requested node is not in the database, an empty string will be returned corresponding
// to that node's last_net.
func (service *Service) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (lastNets []string, err error) {
defer mon.Task()(&ctx)(nil)
nodes, err := service.DownloadSelectionCache.GetNodes(ctx, nodeIDs)
if err != nil {
return nil, err
}
lastNets = make([]string, len(nodeIDs))
for i, nodeID := range nodeIDs {
if selectedNode, ok := nodes[nodeID]; ok {
lastNets[i] = selectedNode.LastNet
}
}
return lastNets, nil
}
// GetNodesOutOfPlacement checks if nodes from nodeIDs list are in allowed country according to specified geo placement
// and returns list of node ids which are not.
func (service *Service) GetNodesOutOfPlacement(ctx context.Context, nodeIDs []storj.NodeID, placement storj.PlacementConstraint) (offNodes []storj.NodeID, err error) {
defer mon.Task()(&ctx)(nil)
nodes, err := service.DownloadSelectionCache.GetNodes(ctx, nodeIDs)
if err != nil {
return nil, err
}
offNodes = make([]storj.NodeID, 0, len(nodeIDs))
for _, nodeID := range nodeIDs {
if selectedNode, ok := nodes[nodeID]; ok && !placement.AllowedCountry(selectedNode.CountryCode) {
offNodes = append(offNodes, selectedNode.ID)
}
}
return offNodes, nil
}
// FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests.
func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*uploadselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
@ -577,11 +540,11 @@ func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDL
return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0)
}
// Reliable returns all nodes that are reliable, online and offline.
// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline).
func (service *Service) Reliable(ctx context.Context) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error) {
defer mon.Task()(&ctx)(&err)
// TODO add as of system time
// TODO add as of system tim.
return service.db.Reliable(ctx, service.config.Node.OnlineWindow, 0)
}

View File

@ -15,11 +15,9 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
@ -1029,116 +1027,3 @@ func TestUpdateCheckInBelowMinVersionEvent(t *testing.T) {
require.True(t, ne2.CreatedAt.After(ne1.CreatedAt))
})
}
func TestService_GetNodesOutOfPlacement(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.AsOfSystemTime.Enabled = false
config.Overlay.Node.AsOfSystemTime.DefaultInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
service := planet.Satellites[0].Overlay.Service
placement := storj.EU
nodeIDs := []storj.NodeID{}
for _, node := range planet.StorageNodes {
nodeIDs = append(nodeIDs, node.ID())
err := service.TestNodeCountryCode(ctx, node.ID(), location.Poland.String())
require.NoError(t, err)
}
require.NoError(t, service.DownloadSelectionCache.Refresh(ctx))
offNodes, err := service.GetNodesOutOfPlacement(ctx, nodeIDs, placement)
require.NoError(t, err)
require.Empty(t, offNodes)
expectedNodeIDs := []storj.NodeID{}
for _, node := range planet.StorageNodes {
expectedNodeIDs = append(expectedNodeIDs, node.ID())
err := service.TestNodeCountryCode(ctx, node.ID(), location.Brazil.String())
require.NoError(t, err)
// we need to refresh cache because node country code was changed
require.NoError(t, service.DownloadSelectionCache.Refresh(ctx))
offNodes, err := service.GetNodesOutOfPlacement(ctx, nodeIDs, placement)
require.NoError(t, err)
require.ElementsMatch(t, expectedNodeIDs, offNodes)
}
})
}
func TestService_UpdateTags(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
sampleIdentity := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion())
service := planet.Satellites[0].Overlay.Service
snIdentity := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
// first time
err := service.UpdateNodeTags(ctx, []uploadselection.NodeTag{
{
NodeID: snIdentity.ID,
SignedAt: time.Now(),
Signer: sampleIdentity.ID,
Name: "foo",
Value: []byte("bar"),
},
})
require.NoError(t, err)
tags, err := service.GetNodeTags(ctx, snIdentity.ID)
require.NoError(t, err)
require.Len(t, tags, 1)
// second time
err = service.UpdateNodeTags(ctx, []uploadselection.NodeTag{
{
NodeID: snIdentity.ID,
SignedAt: time.Now(),
Signer: sampleIdentity.ID,
Name: "foo",
Value: []byte("barx"),
},
{
NodeID: snIdentity.ID,
SignedAt: time.Now(),
Signer: sampleIdentity.ID,
Name: "kossuth",
Value: []byte("lajos"),
},
})
require.NoError(t, err)
tags, err = service.GetNodeTags(ctx, snIdentity.ID)
require.NoError(t, err)
require.Len(t, tags, 2)
tag, err := tags.FindBySignerAndName(sampleIdentity.ID, "kossuth")
require.NoError(t, err)
require.Equal(t, "kossuth", tag.Name)
require.Equal(t, []byte("lajos"), tag.Value)
tag, err = tags.FindBySignerAndName(sampleIdentity.ID, "foo")
require.NoError(t, err)
require.Equal(t, "foo", tag.Name)
require.Equal(t, []byte("barx"), tag.Value)
_, err = tags.FindBySignerAndName(sampleIdentity.ID, "foox")
require.Error(t, err)
_, err = tags.FindBySignerAndName(testidentity.MustPregeneratedSignedIdentity(1, storj.LatestIDVersion()).ID, "foo")
require.Error(t, err)
})
}

View File

@ -34,7 +34,7 @@ var _ rangedloop.Partial = (*observerFork)(nil)
type Observer struct {
logger *zap.Logger
repairQueue queue.RepairQueue
nodestate *ReliabilityCache
nodesCache *ReliabilityCache
overlayService *overlay.Service
repairOverrides RepairOverridesMap
nodeFailureRate float64
@ -57,7 +57,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
logger: logger,
repairQueue: repairQueue,
nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, excludedCountries),
nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, excludedCountries),
overlayService: overlay,
repairOverrides: config.RepairOverrides.GetMap(),
nodeFailureRate: config.NodeFailureRate,
@ -75,7 +75,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove
// nodes yet. We expect that there will be nodes before there are segments, though.
func (observer *Observer) getNodesEstimate(ctx context.Context) (int, error) {
// this should be safe to call frequently; it is an efficient caching lookup.
totalNumNodes, err := observer.nodestate.NumNodes(ctx)
totalNumNodes, err := observer.nodesCache.NumNodes(ctx)
if err != nil {
// We could proceed here by returning the last good value, or by returning a fallback
// constant estimate, like "20000", and we'd probably be fine, but it would be better
@ -218,13 +218,13 @@ func (observer *Observer) getObserverStats(rsString string) *observerRSStats {
// RefreshReliabilityCache forces refreshing node online status cache.
func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error {
return observer.nodestate.Refresh(ctx)
return observer.nodesCache.Refresh(ctx)
}
// observerFork implements the ranged loop Partial interface.
type observerFork struct {
repairQueue *queue.InsertBuffer
nodestate *ReliabilityCache
nodesCache *ReliabilityCache
overlayService *overlay.Service
rsStats map[string]*partialRSStats
repairOverrides RepairOverridesMap
@ -244,7 +244,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial {
// we can only share thread-safe objects.
return &observerFork{
repairQueue: observer.createInsertBuffer(),
nodestate: observer.nodestate,
nodesCache: observer.nodesCache,
overlayService: observer.overlayService,
rsStats: make(map[string]*partialRSStats),
repairOverrides: observer.repairOverrides,
@ -334,24 +334,19 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
return Error.New("could not get estimate of total number of nodes: %w", err)
}
missingPieces, err := fork.nodestate.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
missingPieces, err := fork.nodesCache.MissingPieces(ctx, segment.CreatedAt, segment.Pieces)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return Error.New("error getting missing pieces: %w", err)
}
allNodeIDs := make([]storj.NodeID, len(pieces))
for i, p := range pieces {
allNodeIDs[i] = p.StorageNode
}
var clumpedPieces metabase.Pieces
var lastNets []string
if fork.doDeclumping {
// if multiple pieces are on the same last_net, keep only the first one. The rest are
// to be considered retrievable but unhealthy.
lastNets, err = fork.overlayService.GetNodesNetworkInOrder(ctx, allNodeIDs)
lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
@ -360,16 +355,16 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets)
}
numPiecesOutOfPlacement := 0
numOutOfPlacementPieces := 0
if fork.doPlacementCheck && segment.Placement != storj.EveryCountry {
outOfPlacementNodes, err := fork.overlayService.GetNodesOutOfPlacement(ctx, allNodeIDs, segment.Placement)
outOfPlacementPieces, err := fork.nodesCache.OutOfPlacementPieces(ctx, segment.CreatedAt, segment.Pieces, segment.Placement)
if err != nil {
fork.totalStats.remoteSegmentsFailedToCheck++
stats.iterationAggregates.remoteSegmentsFailedToCheck++
return errs.Combine(Error.New("error determining nodes placement"), err)
}
numPiecesOutOfPlacement = len(outOfPlacementNodes)
numOutOfPlacementPieces = len(outOfPlacementPieces)
}
numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces)
@ -380,8 +375,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy))
mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked
stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces)))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(numPiecesOutOfPlacement)) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(numPiecesOutOfPlacement))
mon.IntVal("checker_segment_off_placement_count").Observe(int64(numOutOfPlacementPieces)) //mon:locked
stats.segmentStats.segmentOffPlacementCount.Observe(int64(numOutOfPlacementPieces))
segmentAge := time.Since(segment.CreatedAt)
mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked
@ -397,7 +392,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme
// except for the case when the repair and success thresholds are the same (a case usually seen during testing).
// separate case is when we find pieces which are outside segment placement. in such case we are putting segment
// into queue right away.
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numPiecesOutOfPlacement > 0 {
if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numOutOfPlacementPieces > 0 {
mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked
stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth)
fork.totalStats.remoteSegmentsNeedingRepair++

View File

@ -16,6 +16,7 @@ import (
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
@ -590,6 +591,8 @@ func TestObserver_PlacementCheck(t *testing.T) {
Satellite: testplanet.ReconfigureRS(1, 2, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Pause()
repairQueue := planet.Satellites[0].DB.RepairQueue()
require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket"))
@ -610,49 +613,63 @@ func TestObserver_PlacementCheck(t *testing.T) {
type testCase struct {
piecesOutOfPlacement int
// how many from out of placement pieces should be also offline
piecesOutOfPlacementOffline int
}
for _, tc := range []testCase{
for i, tc := range []testCase{
// all pieces/nodes are out of placement
{piecesOutOfPlacement: 4},
// few pieces/nodes are out of placement
// // few pieces/nodes are out of placement
{piecesOutOfPlacement: 2},
// all pieces/nodes are out of placement + 1 from it is offline
{piecesOutOfPlacement: 4, piecesOutOfPlacementOffline: 1},
// // few pieces/nodes are out of placement + 1 from it is offline
{piecesOutOfPlacement: 2, piecesOutOfPlacementOffline: 1},
// // single piece/node is out of placement and it is offline
{piecesOutOfPlacement: 1, piecesOutOfPlacementOffline: 1},
} {
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
t.Run("#"+strconv.Itoa(i), func(t *testing.T) {
for _, node := range planet.StorageNodes {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL"))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)
segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx)
require.NoError(t, err)
require.Len(t, segments, 1)
require.Len(t, segments[0].Pieces, 4)
for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
}
for index, piece := range segments[0].Pieces {
if index < tc.piecesOutOfPlacement {
require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US"))
}
// confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.False(t, ok)
// make node offline if needed
require.NoError(t, updateNodeStatus(ctx, planet.Satellites[0], planet.FindNode(piece.StorageNode), index < tc.piecesOutOfPlacementOffline))
}
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
// confirm that some pieces are out of placement
ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement)
require.NoError(t, err)
require.False(t, ok)
_, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx)
require.NoError(t, err)
require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx))
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.TriggerWait()
require.Equal(t, segments[0].StreamID, injuredSegment.StreamID)
injuredSegment, err := repairQueue.Select(ctx)
require.NoError(t, err)
err = repairQueue.Delete(ctx, injuredSegment)
require.NoError(t, err)
count, err := repairQueue.Count(ctx)
require.Zero(t, err)
require.Zero(t, count)
require.Equal(t, segments[0].StreamID, injuredSegment.StreamID)
count, err := repairQueue.Count(ctx)
require.Zero(t, err)
require.Zero(t, count)
})
}
})
}
@ -669,3 +686,22 @@ func allPiecesInPlacement(ctx context.Context, overlay *overlay.Service, pieces
}
return true, nil
}
func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node *testplanet.StorageNode, offline bool) error {
timestamp := time.Now()
if offline {
timestamp = time.Now().Add(-4 * time.Hour)
}
return satellite.DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{
NodeID: node.ID(),
Address: &pb.NodeAddress{Address: node.Addr()},
IsUp: true,
Version: &pb.NodeVersion{
Version: "v0.0.0",
CommitHash: "",
Timestamp: time.Time{},
Release: false,
},
}, timestamp, satellite.Config.Overlay.Node)
}

View File

@ -20,17 +20,24 @@ import (
//
// architecture: Service
type ReliabilityCache struct {
overlay *overlay.Service
staleness time.Duration
overlay *overlay.Service
staleness time.Duration
// define from which countries nodes should be marked as offline
excludedCountryCodes map[location.CountryCode]struct{}
mu sync.Mutex
state atomic.Value // contains immutable *reliabilityState
}
type reliableNode struct {
LastNet string
CountryCode location.CountryCode
}
// reliabilityState.
type reliabilityState struct {
reliable map[storj.NodeID]struct{}
created time.Time
reliableOnline map[storj.NodeID]reliableNode
reliableAll map[storj.NodeID]reliableNode
created time.Time
}
// NewReliabilityCache creates a new reliability checking cache.
@ -38,7 +45,7 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration, excl
excludedCountryCodes := make(map[location.CountryCode]struct{})
for _, countryCode := range excludedCountries {
if cc := location.ToCountryCode(countryCode); cc != location.None {
excludedCountryCodes[location.ToCountryCode(countryCode)] = struct{}{}
excludedCountryCodes[cc] = struct{}{}
}
}
@ -67,24 +74,75 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err
if err != nil {
return 0, err
}
return len(state.reliable), nil
return len(state.reliableOnline), nil
}
// MissingPieces returns piece indices that are unreliable with the given staleness period.
func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces metabase.Pieces) (_ []metabase.Piece, err error) {
func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces metabase.Pieces) (_ metabase.Pieces, err error) {
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
var unreliable []metabase.Piece
var unreliable metabase.Pieces
for _, p := range pieces {
if _, ok := state.reliable[p.StorageNode]; !ok {
node, ok := state.reliableOnline[p.StorageNode]
if !ok {
unreliable = append(unreliable, p)
} else if _, excluded := cache.excludedCountryCodes[node.CountryCode]; excluded {
unreliable = append(unreliable, p)
}
}
return unreliable, nil
}
// OutOfPlacementPieces checks which pieces are out of segment placement. Piece placement is defined by node location which is storing it.
func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created time.Time, pieces metabase.Pieces, placement storj.PlacementConstraint) (_ metabase.Pieces, err error) {
defer mon.Task()(&ctx)(nil)
if len(pieces) == 0 || placement == storj.EveryCountry {
return metabase.Pieces{}, nil
}
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
var outOfPlacementPieces metabase.Pieces
for _, p := range pieces {
if node, ok := state.reliableAll[p.StorageNode]; ok && !placement.AllowedCountry(node.CountryCode) {
outOfPlacementPieces = append(outOfPlacementPieces, p)
}
}
return outOfPlacementPieces, nil
}
// PiecesNodesLastNetsInOrder returns the /24 subnet for each piece storage node, in order. If a
// requested node is not in the database or it's unreliable, an empty string will be returned corresponding
// to that node's last_net.
func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, created time.Time, pieces metabase.Pieces) (lastNets []string, err error) {
defer mon.Task()(&ctx)(nil)
if len(pieces) == 0 {
return []string{}, nil
}
state, err := cache.loadFast(ctx, created)
if err != nil {
return nil, err
}
lastNets = make([]string, len(pieces))
for i, piece := range pieces {
if node, ok := state.reliableAll[piece.StorageNode]; ok {
lastNets[i] = node.LastNet
}
}
return lastNets, nil
}
func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) {
// This code is designed to be very fast in the case where a refresh is not needed: just an
// atomic load from rarely written to bit of shared memory. The general strategy is to first
@ -124,18 +182,30 @@ func (cache *ReliabilityCache) Refresh(ctx context.Context) (err error) {
func (cache *ReliabilityCache) refreshLocked(ctx context.Context) (_ *reliabilityState, err error) {
defer mon.Task()(&ctx)(&err)
online, _, err := cache.overlay.Reliable(ctx)
online, offline, err := cache.overlay.Reliable(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
state := &reliabilityState{
created: time.Now(),
reliable: make(map[storj.NodeID]struct{}, len(online)),
created: time.Now(),
reliableOnline: make(map[storj.NodeID]reliableNode, len(online)),
reliableAll: make(map[storj.NodeID]reliableNode, len(online)+len(offline)),
}
for _, node := range online {
if _, ok := cache.excludedCountryCodes[node.CountryCode]; !ok {
state.reliable[node.ID] = struct{}{}
state.reliableOnline[node.ID] = reliableNode{
LastNet: node.LastNet,
CountryCode: node.CountryCode,
}
state.reliableAll[node.ID] = reliableNode{
LastNet: node.LastNet,
CountryCode: node.CountryCode,
}
}
for _, node := range offline {
state.reliableAll[node.ID] = reliableNode{
LastNet: node.LastNet,
CountryCode: node.CountryCode,
}
}

View File

@ -1,7 +1,7 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package checker
package checker_test
import (
"context"
@ -12,12 +12,17 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/storj"
"storj.io/common/storj/location"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/nodeselection/uploadselection"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
)
func TestReliabilityCache_Concurrent(t *testing.T) {
@ -35,7 +40,7 @@ func TestReliabilityCache_Concurrent(t *testing.T) {
ctx.Go(func() error { return overlayCache.Run(cacheCtx) })
defer ctx.Check(overlayCache.Close)
cache := NewReliabilityCache(overlayCache, time.Millisecond, []string{})
cache := checker.NewReliabilityCache(overlayCache, time.Millisecond, []string{})
var group errgroup.Group
for i := 0; i < 10; i++ {
group.Go(func() error {
@ -63,3 +68,57 @@ func (fakeOverlayDB) Reliable(context.Context, time.Duration, time.Duration) ([]
{ID: testrand.NodeID()},
}, nil, nil
}
func TestReliabilityCache_OutOfPlacementPieces(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Overlay.Node.AsOfSystemTime.Enabled = false
config.Overlay.Node.AsOfSystemTime.DefaultInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
overlay := planet.Satellites[0].Overlay.Service
config := planet.Satellites[0].Config.Checker
cache := checker.NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, []string{})
nodesPlacement := func(location location.CountryCode, nodes ...*testplanet.StorageNode) {
for _, node := range nodes {
err := overlay.TestNodeCountryCode(ctx, node.ID(), location.String())
require.NoError(t, err)
}
require.NoError(t, cache.Refresh(ctx))
}
allPieces := metabase.Pieces{
metabase.Piece{Number: 0, StorageNode: planet.StorageNodes[0].ID()},
metabase.Piece{Number: 1, StorageNode: planet.StorageNodes[1].ID()},
metabase.Piece{Number: 2, StorageNode: planet.StorageNodes[2].ID()},
metabase.Piece{Number: 3, StorageNode: planet.StorageNodes[3].ID()},
}
pieces, err := cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), metabase.Pieces{}, storj.EU)
require.NoError(t, err)
require.Empty(t, pieces)
nodesPlacement(location.Poland, planet.StorageNodes...)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU)
require.NoError(t, err)
require.Empty(t, pieces)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US)
require.NoError(t, err)
require.ElementsMatch(t, allPieces, pieces)
nodesPlacement(location.UnitedStates, planet.StorageNodes[:2]...)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU)
require.NoError(t, err)
require.ElementsMatch(t, allPieces[:2], pieces)
pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US)
require.NoError(t, err)
require.ElementsMatch(t, allPieces[2:], pieces)
})
}

View File

@ -3284,10 +3284,10 @@ func TestRepairClumpedPieces(t *testing.T) {
Capacity: &local.Capacity,
Version: &local.Version,
}
err = satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})
require.NoError(t, err)
err = satellite.RangedLoop.Overlay.Service.DownloadSelectionCache.Refresh(ctx)
require.NoError(t, err)
require.NoError(t, satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}))
require.NoError(t, satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx))
// running repair checker again should put the segment into the repair queue
_, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)