diff --git a/satellite/overlay/service.go b/satellite/overlay/service.go index 6aacacb33..b27716ac2 100644 --- a/satellite/overlay/service.go +++ b/satellite/overlay/service.go @@ -66,7 +66,7 @@ type DB interface { KnownReliableInExcludedCountries(context.Context, *NodeCriteria, storj.NodeIDList) (storj.NodeIDList, error) // KnownReliable filters a set of nodes to reliable (online and qualified) nodes. KnownReliable(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error) - // Reliable returns all nodes that are reliable, online and offline. + // Reliable returns all nodes that are reliable (separated by whether they are currently online or offline). Reliable(ctx context.Context, onlineWindow, asOfSystemInterval time.Duration) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error) // UpdateReputation updates the DB columns for all reputation fields in ReputationStatus. UpdateReputation(ctx context.Context, id storj.NodeID, request ReputationUpdate) error @@ -401,43 +401,6 @@ func (service *Service) IsOnline(node *NodeDossier) bool { return time.Since(node.Reputation.LastContactSuccess) < service.config.Node.OnlineWindow } -// GetNodesNetworkInOrder returns the /24 subnet for each storage node, in order. If a -// requested node is not in the database, an empty string will be returned corresponding -// to that node's last_net. -func (service *Service) GetNodesNetworkInOrder(ctx context.Context, nodeIDs []storj.NodeID) (lastNets []string, err error) { - defer mon.Task()(&ctx)(nil) - - nodes, err := service.DownloadSelectionCache.GetNodes(ctx, nodeIDs) - if err != nil { - return nil, err - } - lastNets = make([]string, len(nodeIDs)) - for i, nodeID := range nodeIDs { - if selectedNode, ok := nodes[nodeID]; ok { - lastNets[i] = selectedNode.LastNet - } - } - return lastNets, nil -} - -// GetNodesOutOfPlacement checks if nodes from nodeIDs list are in allowed country according to specified geo placement -// and returns list of node ids which are not. -func (service *Service) GetNodesOutOfPlacement(ctx context.Context, nodeIDs []storj.NodeID, placement storj.PlacementConstraint) (offNodes []storj.NodeID, err error) { - defer mon.Task()(&ctx)(nil) - - nodes, err := service.DownloadSelectionCache.GetNodes(ctx, nodeIDs) - if err != nil { - return nil, err - } - offNodes = make([]storj.NodeID, 0, len(nodeIDs)) - for _, nodeID := range nodeIDs { - if selectedNode, ok := nodes[nodeID]; ok && !placement.AllowedCountry(selectedNode.CountryCode) { - offNodes = append(offNodes, selectedNode.ID) - } - } - return offNodes, nil -} - // FindStorageNodesForGracefulExit searches the overlay network for nodes that meet the provided requirements for graceful-exit requests. func (service *Service) FindStorageNodesForGracefulExit(ctx context.Context, req FindStorageNodesRequest) (_ []*uploadselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) @@ -577,11 +540,11 @@ func (service *Service) KnownReliable(ctx context.Context, nodeIDs storj.NodeIDL return service.db.KnownReliable(ctx, nodeIDs, service.config.Node.OnlineWindow, 0) } -// Reliable returns all nodes that are reliable, online and offline. +// Reliable returns all nodes that are reliable (separated by whether they are currently online or offline). func (service *Service) Reliable(ctx context.Context) (online []uploadselection.SelectedNode, offline []uploadselection.SelectedNode, err error) { defer mon.Task()(&ctx)(&err) - // TODO add as of system time + // TODO add as of system tim. return service.db.Reliable(ctx, service.config.Node.OnlineWindow, 0) } diff --git a/satellite/overlay/service_test.go b/satellite/overlay/service_test.go index 6f82e831c..9b36a1cd4 100644 --- a/satellite/overlay/service_test.go +++ b/satellite/overlay/service_test.go @@ -15,11 +15,9 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" - "storj.io/common/identity/testidentity" "storj.io/common/memory" "storj.io/common/pb" "storj.io/common/storj" - "storj.io/common/storj/location" "storj.io/common/testcontext" "storj.io/common/testrand" "storj.io/storj/private/testplanet" @@ -1029,116 +1027,3 @@ func TestUpdateCheckInBelowMinVersionEvent(t *testing.T) { require.True(t, ne2.CreatedAt.After(ne1.CreatedAt)) }) } - -func TestService_GetNodesOutOfPlacement(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Overlay.Node.AsOfSystemTime.Enabled = false - config.Overlay.Node.AsOfSystemTime.DefaultInterval = 0 - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - service := planet.Satellites[0].Overlay.Service - - placement := storj.EU - - nodeIDs := []storj.NodeID{} - for _, node := range planet.StorageNodes { - nodeIDs = append(nodeIDs, node.ID()) - - err := service.TestNodeCountryCode(ctx, node.ID(), location.Poland.String()) - require.NoError(t, err) - } - - require.NoError(t, service.DownloadSelectionCache.Refresh(ctx)) - - offNodes, err := service.GetNodesOutOfPlacement(ctx, nodeIDs, placement) - require.NoError(t, err) - require.Empty(t, offNodes) - - expectedNodeIDs := []storj.NodeID{} - for _, node := range planet.StorageNodes { - expectedNodeIDs = append(expectedNodeIDs, node.ID()) - err := service.TestNodeCountryCode(ctx, node.ID(), location.Brazil.String()) - require.NoError(t, err) - - // we need to refresh cache because node country code was changed - require.NoError(t, service.DownloadSelectionCache.Refresh(ctx)) - - offNodes, err := service.GetNodesOutOfPlacement(ctx, nodeIDs, placement) - require.NoError(t, err) - require.ElementsMatch(t, expectedNodeIDs, offNodes) - } - }) -} - -func TestService_UpdateTags(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 0, - Reconfigure: testplanet.Reconfigure{}, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - sampleIdentity := testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()) - - service := planet.Satellites[0].Overlay.Service - snIdentity := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion()) - - // first time - err := service.UpdateNodeTags(ctx, []uploadselection.NodeTag{ - { - NodeID: snIdentity.ID, - SignedAt: time.Now(), - Signer: sampleIdentity.ID, - Name: "foo", - Value: []byte("bar"), - }, - }) - require.NoError(t, err) - - tags, err := service.GetNodeTags(ctx, snIdentity.ID) - require.NoError(t, err) - - require.Len(t, tags, 1) - - // second time - err = service.UpdateNodeTags(ctx, []uploadselection.NodeTag{ - { - NodeID: snIdentity.ID, - SignedAt: time.Now(), - Signer: sampleIdentity.ID, - Name: "foo", - Value: []byte("barx"), - }, - { - NodeID: snIdentity.ID, - SignedAt: time.Now(), - Signer: sampleIdentity.ID, - Name: "kossuth", - Value: []byte("lajos"), - }, - }) - require.NoError(t, err) - - tags, err = service.GetNodeTags(ctx, snIdentity.ID) - require.NoError(t, err) - require.Len(t, tags, 2) - - tag, err := tags.FindBySignerAndName(sampleIdentity.ID, "kossuth") - require.NoError(t, err) - require.Equal(t, "kossuth", tag.Name) - require.Equal(t, []byte("lajos"), tag.Value) - - tag, err = tags.FindBySignerAndName(sampleIdentity.ID, "foo") - require.NoError(t, err) - require.Equal(t, "foo", tag.Name) - require.Equal(t, []byte("barx"), tag.Value) - - _, err = tags.FindBySignerAndName(sampleIdentity.ID, "foox") - require.Error(t, err) - - _, err = tags.FindBySignerAndName(testidentity.MustPregeneratedSignedIdentity(1, storj.LatestIDVersion()).ID, "foo") - require.Error(t, err) - - }) -} diff --git a/satellite/repair/checker/observer.go b/satellite/repair/checker/observer.go index 8e007147b..62c659e1c 100644 --- a/satellite/repair/checker/observer.go +++ b/satellite/repair/checker/observer.go @@ -34,7 +34,7 @@ var _ rangedloop.Partial = (*observerFork)(nil) type Observer struct { logger *zap.Logger repairQueue queue.RepairQueue - nodestate *ReliabilityCache + nodesCache *ReliabilityCache overlayService *overlay.Service repairOverrides RepairOverridesMap nodeFailureRate float64 @@ -57,7 +57,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove logger: logger, repairQueue: repairQueue, - nodestate: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, excludedCountries), + nodesCache: NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, excludedCountries), overlayService: overlay, repairOverrides: config.RepairOverrides.GetMap(), nodeFailureRate: config.NodeFailureRate, @@ -75,7 +75,7 @@ func NewObserver(logger *zap.Logger, repairQueue queue.RepairQueue, overlay *ove // nodes yet. We expect that there will be nodes before there are segments, though. func (observer *Observer) getNodesEstimate(ctx context.Context) (int, error) { // this should be safe to call frequently; it is an efficient caching lookup. - totalNumNodes, err := observer.nodestate.NumNodes(ctx) + totalNumNodes, err := observer.nodesCache.NumNodes(ctx) if err != nil { // We could proceed here by returning the last good value, or by returning a fallback // constant estimate, like "20000", and we'd probably be fine, but it would be better @@ -218,13 +218,13 @@ func (observer *Observer) getObserverStats(rsString string) *observerRSStats { // RefreshReliabilityCache forces refreshing node online status cache. func (observer *Observer) RefreshReliabilityCache(ctx context.Context) error { - return observer.nodestate.Refresh(ctx) + return observer.nodesCache.Refresh(ctx) } // observerFork implements the ranged loop Partial interface. type observerFork struct { repairQueue *queue.InsertBuffer - nodestate *ReliabilityCache + nodesCache *ReliabilityCache overlayService *overlay.Service rsStats map[string]*partialRSStats repairOverrides RepairOverridesMap @@ -244,7 +244,7 @@ func newObserverFork(observer *Observer) rangedloop.Partial { // we can only share thread-safe objects. return &observerFork{ repairQueue: observer.createInsertBuffer(), - nodestate: observer.nodestate, + nodesCache: observer.nodesCache, overlayService: observer.overlayService, rsStats: make(map[string]*partialRSStats), repairOverrides: observer.repairOverrides, @@ -334,24 +334,19 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme return Error.New("could not get estimate of total number of nodes: %w", err) } - missingPieces, err := fork.nodestate.MissingPieces(ctx, segment.CreatedAt, segment.Pieces) + missingPieces, err := fork.nodesCache.MissingPieces(ctx, segment.CreatedAt, segment.Pieces) if err != nil { fork.totalStats.remoteSegmentsFailedToCheck++ stats.iterationAggregates.remoteSegmentsFailedToCheck++ return Error.New("error getting missing pieces: %w", err) } - allNodeIDs := make([]storj.NodeID, len(pieces)) - for i, p := range pieces { - allNodeIDs[i] = p.StorageNode - } - var clumpedPieces metabase.Pieces var lastNets []string if fork.doDeclumping { // if multiple pieces are on the same last_net, keep only the first one. The rest are // to be considered retrievable but unhealthy. - lastNets, err = fork.overlayService.GetNodesNetworkInOrder(ctx, allNodeIDs) + lastNets, err = fork.nodesCache.PiecesNodesLastNetsInOrder(ctx, segment.CreatedAt, pieces) if err != nil { fork.totalStats.remoteSegmentsFailedToCheck++ stats.iterationAggregates.remoteSegmentsFailedToCheck++ @@ -360,16 +355,16 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme clumpedPieces = repair.FindClumpedPieces(segment.Pieces, lastNets) } - numPiecesOutOfPlacement := 0 + numOutOfPlacementPieces := 0 if fork.doPlacementCheck && segment.Placement != storj.EveryCountry { - outOfPlacementNodes, err := fork.overlayService.GetNodesOutOfPlacement(ctx, allNodeIDs, segment.Placement) + outOfPlacementPieces, err := fork.nodesCache.OutOfPlacementPieces(ctx, segment.CreatedAt, segment.Pieces, segment.Placement) if err != nil { fork.totalStats.remoteSegmentsFailedToCheck++ stats.iterationAggregates.remoteSegmentsFailedToCheck++ return errs.Combine(Error.New("error determining nodes placement"), err) } - numPiecesOutOfPlacement = len(outOfPlacementNodes) + numOutOfPlacementPieces = len(outOfPlacementPieces) } numHealthy := len(pieces) - len(missingPieces) - len(clumpedPieces) @@ -380,8 +375,8 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme stats.segmentStats.segmentHealthyCount.Observe(int64(numHealthy)) mon.IntVal("checker_segment_clumped_count").Observe(int64(len(clumpedPieces))) //mon:locked stats.segmentStats.segmentClumpedCount.Observe(int64(len(clumpedPieces))) - mon.IntVal("checker_segment_off_placement_count").Observe(int64(numPiecesOutOfPlacement)) //mon:locked - stats.segmentStats.segmentOffPlacementCount.Observe(int64(numPiecesOutOfPlacement)) + mon.IntVal("checker_segment_off_placement_count").Observe(int64(numOutOfPlacementPieces)) //mon:locked + stats.segmentStats.segmentOffPlacementCount.Observe(int64(numOutOfPlacementPieces)) segmentAge := time.Since(segment.CreatedAt) mon.IntVal("checker_segment_age").Observe(int64(segmentAge.Seconds())) //mon:locked @@ -397,7 +392,7 @@ func (fork *observerFork) process(ctx context.Context, segment *rangedloop.Segme // except for the case when the repair and success thresholds are the same (a case usually seen during testing). // separate case is when we find pieces which are outside segment placement. in such case we are putting segment // into queue right away. - if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numPiecesOutOfPlacement > 0 { + if (numHealthy <= repairThreshold && numHealthy < successThreshold) || numOutOfPlacementPieces > 0 { mon.FloatVal("checker_injured_segment_health").Observe(segmentHealth) //mon:locked stats.segmentStats.injuredSegmentHealth.Observe(segmentHealth) fork.totalStats.remoteSegmentsNeedingRepair++ diff --git a/satellite/repair/checker/observer_test.go b/satellite/repair/checker/observer_test.go index c164cfd83..6d80b01a1 100644 --- a/satellite/repair/checker/observer_test.go +++ b/satellite/repair/checker/observer_test.go @@ -16,6 +16,7 @@ import ( "go.uber.org/zap" "storj.io/common/memory" + "storj.io/common/pb" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/common/testrand" @@ -590,6 +591,8 @@ func TestObserver_PlacementCheck(t *testing.T) { Satellite: testplanet.ReconfigureRS(1, 2, 4, 4), }, }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.Pause() + repairQueue := planet.Satellites[0].DB.RepairQueue() require.NoError(t, planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "testbucket")) @@ -610,49 +613,63 @@ func TestObserver_PlacementCheck(t *testing.T) { type testCase struct { piecesOutOfPlacement int + // how many from out of placement pieces should be also offline + piecesOutOfPlacementOffline int } - for _, tc := range []testCase{ + for i, tc := range []testCase{ // all pieces/nodes are out of placement {piecesOutOfPlacement: 4}, - // few pieces/nodes are out of placement + // // few pieces/nodes are out of placement {piecesOutOfPlacement: 2}, + // all pieces/nodes are out of placement + 1 from it is offline + {piecesOutOfPlacement: 4, piecesOutOfPlacementOffline: 1}, + // // few pieces/nodes are out of placement + 1 from it is offline + {piecesOutOfPlacement: 2, piecesOutOfPlacementOffline: 1}, + // // single piece/node is out of placement and it is offline + {piecesOutOfPlacement: 1, piecesOutOfPlacementOffline: 1}, } { - for _, node := range planet.StorageNodes { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) - } + t.Run("#"+strconv.Itoa(i), func(t *testing.T) { + for _, node := range planet.StorageNodes { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, node.ID(), "PL")) + } - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - require.Len(t, segments[0].Pieces, 4) + segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) + require.NoError(t, err) + require.Len(t, segments, 1) + require.Len(t, segments[0].Pieces, 4) - for _, piece := range segments[0].Pieces[:tc.piecesOutOfPlacement] { - require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) - } + for index, piece := range segments[0].Pieces { + if index < tc.piecesOutOfPlacement { + require.NoError(t, planet.Satellites[0].Overlay.Service.TestNodeCountryCode(ctx, piece.StorageNode, "US")) + } - // confirm that some pieces are out of placement - ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) - require.NoError(t, err) - require.False(t, ok) + // make node offline if needed + require.NoError(t, updateNodeStatus(ctx, planet.Satellites[0], planet.FindNode(piece.StorageNode), index < tc.piecesOutOfPlacementOffline)) + } - require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) + // confirm that some pieces are out of placement + ok, err := allPiecesInPlacement(ctx, planet.Satellites[0].Overlay.Service, segments[0].Pieces, segments[0].Placement) + require.NoError(t, err) + require.False(t, ok) - _, err = planet.Satellites[0].RangedLoop.RangedLoop.Service.RunOnce(ctx) - require.NoError(t, err) + require.NoError(t, planet.Satellites[0].Repairer.Overlay.DownloadSelectionCache.Refresh(ctx)) - injuredSegment, err := repairQueue.Select(ctx) - require.NoError(t, err) - err = repairQueue.Delete(ctx, injuredSegment) - require.NoError(t, err) + planet.Satellites[0].RangedLoop.RangedLoop.Service.Loop.TriggerWait() - require.Equal(t, segments[0].StreamID, injuredSegment.StreamID) + injuredSegment, err := repairQueue.Select(ctx) + require.NoError(t, err) + err = repairQueue.Delete(ctx, injuredSegment) + require.NoError(t, err) - count, err := repairQueue.Count(ctx) - require.Zero(t, err) - require.Zero(t, count) + require.Equal(t, segments[0].StreamID, injuredSegment.StreamID) + + count, err := repairQueue.Count(ctx) + require.Zero(t, err) + require.Zero(t, count) + }) } }) } @@ -669,3 +686,22 @@ func allPiecesInPlacement(ctx context.Context, overlay *overlay.Service, pieces } return true, nil } + +func updateNodeStatus(ctx context.Context, satellite *testplanet.Satellite, node *testplanet.StorageNode, offline bool) error { + timestamp := time.Now() + if offline { + timestamp = time.Now().Add(-4 * time.Hour) + } + + return satellite.DB.OverlayCache().UpdateCheckIn(ctx, overlay.NodeCheckInInfo{ + NodeID: node.ID(), + Address: &pb.NodeAddress{Address: node.Addr()}, + IsUp: true, + Version: &pb.NodeVersion{ + Version: "v0.0.0", + CommitHash: "", + Timestamp: time.Time{}, + Release: false, + }, + }, timestamp, satellite.Config.Overlay.Node) +} diff --git a/satellite/repair/checker/online.go b/satellite/repair/checker/online.go index 589314b5f..14691bec3 100644 --- a/satellite/repair/checker/online.go +++ b/satellite/repair/checker/online.go @@ -20,17 +20,24 @@ import ( // // architecture: Service type ReliabilityCache struct { - overlay *overlay.Service - staleness time.Duration + overlay *overlay.Service + staleness time.Duration + // define from which countries nodes should be marked as offline excludedCountryCodes map[location.CountryCode]struct{} mu sync.Mutex state atomic.Value // contains immutable *reliabilityState } +type reliableNode struct { + LastNet string + CountryCode location.CountryCode +} + // reliabilityState. type reliabilityState struct { - reliable map[storj.NodeID]struct{} - created time.Time + reliableOnline map[storj.NodeID]reliableNode + reliableAll map[storj.NodeID]reliableNode + created time.Time } // NewReliabilityCache creates a new reliability checking cache. @@ -38,7 +45,7 @@ func NewReliabilityCache(overlay *overlay.Service, staleness time.Duration, excl excludedCountryCodes := make(map[location.CountryCode]struct{}) for _, countryCode := range excludedCountries { if cc := location.ToCountryCode(countryCode); cc != location.None { - excludedCountryCodes[location.ToCountryCode(countryCode)] = struct{}{} + excludedCountryCodes[cc] = struct{}{} } } @@ -67,24 +74,75 @@ func (cache *ReliabilityCache) NumNodes(ctx context.Context) (numNodes int, err if err != nil { return 0, err } - return len(state.reliable), nil + + return len(state.reliableOnline), nil } // MissingPieces returns piece indices that are unreliable with the given staleness period. -func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces metabase.Pieces) (_ []metabase.Piece, err error) { +func (cache *ReliabilityCache) MissingPieces(ctx context.Context, created time.Time, pieces metabase.Pieces) (_ metabase.Pieces, err error) { state, err := cache.loadFast(ctx, created) if err != nil { return nil, err } - var unreliable []metabase.Piece + var unreliable metabase.Pieces for _, p := range pieces { - if _, ok := state.reliable[p.StorageNode]; !ok { + node, ok := state.reliableOnline[p.StorageNode] + if !ok { + unreliable = append(unreliable, p) + } else if _, excluded := cache.excludedCountryCodes[node.CountryCode]; excluded { unreliable = append(unreliable, p) } } return unreliable, nil } +// OutOfPlacementPieces checks which pieces are out of segment placement. Piece placement is defined by node location which is storing it. +func (cache *ReliabilityCache) OutOfPlacementPieces(ctx context.Context, created time.Time, pieces metabase.Pieces, placement storj.PlacementConstraint) (_ metabase.Pieces, err error) { + defer mon.Task()(&ctx)(nil) + + if len(pieces) == 0 || placement == storj.EveryCountry { + return metabase.Pieces{}, nil + } + + state, err := cache.loadFast(ctx, created) + if err != nil { + return nil, err + } + var outOfPlacementPieces metabase.Pieces + + for _, p := range pieces { + if node, ok := state.reliableAll[p.StorageNode]; ok && !placement.AllowedCountry(node.CountryCode) { + outOfPlacementPieces = append(outOfPlacementPieces, p) + } + } + + return outOfPlacementPieces, nil +} + +// PiecesNodesLastNetsInOrder returns the /24 subnet for each piece storage node, in order. If a +// requested node is not in the database or it's unreliable, an empty string will be returned corresponding +// to that node's last_net. +func (cache *ReliabilityCache) PiecesNodesLastNetsInOrder(ctx context.Context, created time.Time, pieces metabase.Pieces) (lastNets []string, err error) { + defer mon.Task()(&ctx)(nil) + + if len(pieces) == 0 { + return []string{}, nil + } + + state, err := cache.loadFast(ctx, created) + if err != nil { + return nil, err + } + + lastNets = make([]string, len(pieces)) + for i, piece := range pieces { + if node, ok := state.reliableAll[piece.StorageNode]; ok { + lastNets[i] = node.LastNet + } + } + return lastNets, nil +} + func (cache *ReliabilityCache) loadFast(ctx context.Context, validUpTo time.Time) (_ *reliabilityState, err error) { // This code is designed to be very fast in the case where a refresh is not needed: just an // atomic load from rarely written to bit of shared memory. The general strategy is to first @@ -124,18 +182,30 @@ func (cache *ReliabilityCache) Refresh(ctx context.Context) (err error) { func (cache *ReliabilityCache) refreshLocked(ctx context.Context) (_ *reliabilityState, err error) { defer mon.Task()(&ctx)(&err) - online, _, err := cache.overlay.Reliable(ctx) + online, offline, err := cache.overlay.Reliable(ctx) if err != nil { return nil, Error.Wrap(err) } state := &reliabilityState{ - created: time.Now(), - reliable: make(map[storj.NodeID]struct{}, len(online)), + created: time.Now(), + reliableOnline: make(map[storj.NodeID]reliableNode, len(online)), + reliableAll: make(map[storj.NodeID]reliableNode, len(online)+len(offline)), } for _, node := range online { - if _, ok := cache.excludedCountryCodes[node.CountryCode]; !ok { - state.reliable[node.ID] = struct{}{} + state.reliableOnline[node.ID] = reliableNode{ + LastNet: node.LastNet, + CountryCode: node.CountryCode, + } + state.reliableAll[node.ID] = reliableNode{ + LastNet: node.LastNet, + CountryCode: node.CountryCode, + } + } + for _, node := range offline { + state.reliableAll[node.ID] = reliableNode{ + LastNet: node.LastNet, + CountryCode: node.CountryCode, } } diff --git a/satellite/repair/checker/online_test.go b/satellite/repair/checker/online_test.go index c3625dc27..978fe5f5b 100644 --- a/satellite/repair/checker/online_test.go +++ b/satellite/repair/checker/online_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2019 Storj Labs, Inc. // See LICENSE for copying information. -package checker +package checker_test import ( "context" @@ -12,12 +12,17 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" + "storj.io/common/storj" + "storj.io/common/storj/location" "storj.io/common/testcontext" "storj.io/common/testrand" + "storj.io/storj/private/testplanet" + "storj.io/storj/satellite" "storj.io/storj/satellite/metabase" "storj.io/storj/satellite/nodeevents" "storj.io/storj/satellite/nodeselection/uploadselection" "storj.io/storj/satellite/overlay" + "storj.io/storj/satellite/repair/checker" ) func TestReliabilityCache_Concurrent(t *testing.T) { @@ -35,7 +40,7 @@ func TestReliabilityCache_Concurrent(t *testing.T) { ctx.Go(func() error { return overlayCache.Run(cacheCtx) }) defer ctx.Check(overlayCache.Close) - cache := NewReliabilityCache(overlayCache, time.Millisecond, []string{}) + cache := checker.NewReliabilityCache(overlayCache, time.Millisecond, []string{}) var group errgroup.Group for i := 0; i < 10; i++ { group.Go(func() error { @@ -63,3 +68,57 @@ func (fakeOverlayDB) Reliable(context.Context, time.Duration, time.Duration) ([] {ID: testrand.NodeID()}, }, nil, nil } + +func TestReliabilityCache_OutOfPlacementPieces(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: func(log *zap.Logger, index int, config *satellite.Config) { + config.Overlay.Node.AsOfSystemTime.Enabled = false + config.Overlay.Node.AsOfSystemTime.DefaultInterval = 0 + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + overlay := planet.Satellites[0].Overlay.Service + config := planet.Satellites[0].Config.Checker + + cache := checker.NewReliabilityCache(overlay, config.ReliabilityCacheStaleness, []string{}) + + nodesPlacement := func(location location.CountryCode, nodes ...*testplanet.StorageNode) { + for _, node := range nodes { + err := overlay.TestNodeCountryCode(ctx, node.ID(), location.String()) + require.NoError(t, err) + } + require.NoError(t, cache.Refresh(ctx)) + } + + allPieces := metabase.Pieces{ + metabase.Piece{Number: 0, StorageNode: planet.StorageNodes[0].ID()}, + metabase.Piece{Number: 1, StorageNode: planet.StorageNodes[1].ID()}, + metabase.Piece{Number: 2, StorageNode: planet.StorageNodes[2].ID()}, + metabase.Piece{Number: 3, StorageNode: planet.StorageNodes[3].ID()}, + } + + pieces, err := cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), metabase.Pieces{}, storj.EU) + require.NoError(t, err) + require.Empty(t, pieces) + + nodesPlacement(location.Poland, planet.StorageNodes...) + pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU) + require.NoError(t, err) + require.Empty(t, pieces) + + pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US) + require.NoError(t, err) + require.ElementsMatch(t, allPieces, pieces) + + nodesPlacement(location.UnitedStates, planet.StorageNodes[:2]...) + pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.EU) + require.NoError(t, err) + require.ElementsMatch(t, allPieces[:2], pieces) + + pieces, err = cache.OutOfPlacementPieces(ctx, time.Now().Add(-time.Hour), allPieces, storj.US) + require.NoError(t, err) + require.ElementsMatch(t, allPieces[2:], pieces) + }) +} diff --git a/satellite/repair/repair_test.go b/satellite/repair/repair_test.go index 4dba54e67..26f236e97 100644 --- a/satellite/repair/repair_test.go +++ b/satellite/repair/repair_test.go @@ -3284,10 +3284,10 @@ func TestRepairClumpedPieces(t *testing.T) { Capacity: &local.Capacity, Version: &local.Version, } - err = satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{}) - require.NoError(t, err) - err = satellite.RangedLoop.Overlay.Service.DownloadSelectionCache.Refresh(ctx) - require.NoError(t, err) + + require.NoError(t, satellite.DB.OverlayCache().UpdateCheckIn(ctx, checkInInfo, time.Now().UTC(), overlay.NodeSelectionConfig{})) + + require.NoError(t, satellite.RangedLoop.Repair.Observer.RefreshReliabilityCache(ctx)) // running repair checker again should put the segment into the repair queue _, err = satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)