2522ff09b6
Up to now, we have been implementing the DistinctIP preference with code in two places: 1. On check-in, the last_net is determined by taking the /24 or /64 (in ResolveIPAndNetwork()) and we store it with the node record. 2. On node selection, a preference parameter defines whether to return results that are distinct on last_net. It can be observed that we have never yet had the need to switch from DistinctIP to !DistinctIP, or from !DistinctIP to DistinctIP, on the same satellite, and we will probably never need to do so in an automated way. It can also be observed that this arrangement makes tests more complicated, because we often have to arrange for test nodes to have IP addresses in different /24 networks (a particular pain on macOS). Those two considerations, plus some pending work on the repair framework that will make repair take last_net into consideration, motivate this change. With this change, in the #2 place, we will _always_ return results that are distinct on last_net. We implement the DistinctIP preference, then, by making the #1 place (ResolveIPAndNetwork()) more flexible. When DistinctIP is enabled, last_net will be calculated as it was before. But when DistinctIP is _off_, last_net can be the same as address (IP and port). That will effectively implement !DistinctIP because every record will have a distinct last_net already. As a side effect, this flexibility will allow us to change the rules about last_net construction arbitrarily. We can do tests where last_net is set to the source IP, or to a /30 prefix, or a /16 prefix, etc., and be able to exercise the production logic without requiring a virtual network bridge. This change should be safe to make without any migration code, because all known production satellite deployments use DistinctIP, and the associated last_net values will not change for them. They will only change for satellites with !DistinctIP, which are mostly test deployments that can be recreated trivially. For those satellites which are both permanent and !DistinctIP, node selection will suddenly start acting as though DistinctIP is enabled, until the operator runs a single SQL update "UPDATE nodes SET last_net = last_ip_port". That can be done either before or after deploying software with this change. I also assert that this will not hurt performance for production deployments. It's true that adding the distinct requirement to node selection makes things a little slower, but the distinct requirement is already present for all production deployments, and they will see no change. Refs: https://github.com/storj/storj/issues/5391 Change-Id: I0e7e92498c3da768df5b4d5fb213dcd2d4862924
148 lines
5.0 KiB
Go
148 lines
5.0 KiB
Go
// Copyright (C) 2019 Storj Labs, Incache.
|
|
// See LICENSE for copying information.
|
|
|
|
package overlay
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"storj.io/common/pb"
|
|
"storj.io/common/sync2"
|
|
"storj.io/storj/satellite/nodeselection/uploadselection"
|
|
)
|
|
|
|
// UploadSelectionDB implements the database for upload selection cache.
|
|
//
|
|
// architecture: Database
|
|
type UploadSelectionDB interface {
|
|
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes
|
|
SelectAllStorageNodesUpload(ctx context.Context, selectionCfg NodeSelectionConfig) (reputable, new []*SelectedNode, err error)
|
|
}
|
|
|
|
// UploadSelectionCacheConfig is a configuration for upload selection cache.
|
|
type UploadSelectionCacheConfig struct {
|
|
Disabled bool `help:"disable node cache" default:"false"`
|
|
Staleness time.Duration `help:"how stale the node selection cache can be" releaseDefault:"3m" devDefault:"5m" testDefault:"3m"`
|
|
}
|
|
|
|
// UploadSelectionCache keeps a list of all the storage nodes that are qualified to store data
|
|
// We organize the nodes by if they are reputable or a new node on the network.
|
|
// The cache will sync with the nodes table in the database and get refreshed once the staleness time has past.
|
|
type UploadSelectionCache struct {
|
|
log *zap.Logger
|
|
db UploadSelectionDB
|
|
selectionConfig NodeSelectionConfig
|
|
|
|
cache sync2.ReadCache
|
|
}
|
|
|
|
// NewUploadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data.
|
|
func NewUploadSelectionCache(log *zap.Logger, db UploadSelectionDB, staleness time.Duration, config NodeSelectionConfig) (*UploadSelectionCache, error) {
|
|
cache := &UploadSelectionCache{
|
|
log: log,
|
|
db: db,
|
|
selectionConfig: config,
|
|
}
|
|
return cache, cache.cache.Init(staleness/2, staleness, cache.read)
|
|
}
|
|
|
|
// Run runs the background task for cache.
|
|
func (cache *UploadSelectionCache) Run(ctx context.Context) (err error) {
|
|
return cache.cache.Run(ctx)
|
|
}
|
|
|
|
// Refresh populates the cache with all of the reputableNodes and newNode nodes
|
|
// This method is useful for tests.
|
|
func (cache *UploadSelectionCache) Refresh(ctx context.Context) (err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
_, err = cache.cache.RefreshAndGet(ctx, time.Now())
|
|
return err
|
|
}
|
|
|
|
// refresh calls out to the database and refreshes the cache with the most up-to-date
|
|
// data from the nodes table, then sets time that the last refresh occurred so we know when
|
|
// to refresh again in the future.
|
|
func (cache *UploadSelectionCache) read(ctx context.Context) (_ interface{}, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
reputableNodes, newNodes, err := cache.db.SelectAllStorageNodesUpload(ctx, cache.selectionConfig)
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
|
|
state := uploadselection.NewState(convSelectedNodesToNodes(reputableNodes), convSelectedNodesToNodes(newNodes))
|
|
|
|
mon.IntVal("refresh_cache_size_reputable").Observe(int64(len(reputableNodes)))
|
|
mon.IntVal("refresh_cache_size_new").Observe(int64(len(newNodes)))
|
|
|
|
return state, nil
|
|
}
|
|
|
|
// GetNodes selects nodes from the cache that will be used to upload a file.
|
|
// Every node selected will be from a distinct network.
|
|
// If the cache hasn't been refreshed recently it will do so first.
|
|
func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
|
|
defer mon.Task()(&ctx)(&err)
|
|
|
|
stateAny, err := cache.cache.Get(ctx, time.Now())
|
|
if err != nil {
|
|
return nil, Error.Wrap(err)
|
|
}
|
|
state := stateAny.(*uploadselection.State)
|
|
|
|
selected, err := state.Select(ctx, uploadselection.Request{
|
|
Count: req.RequestedCount,
|
|
NewFraction: cache.selectionConfig.NewNodeFraction,
|
|
ExcludedIDs: req.ExcludedIDs,
|
|
Placement: req.Placement,
|
|
ExcludedCountryCodes: cache.selectionConfig.UploadExcludedCountryCodes,
|
|
})
|
|
if uploadselection.ErrNotEnoughNodes.Has(err) {
|
|
err = ErrNotEnoughNodes.Wrap(err)
|
|
}
|
|
|
|
return convNodesToSelectedNodes(selected), err
|
|
}
|
|
|
|
// Size returns how many reputable nodes and new nodes are in the cache.
|
|
func (cache *UploadSelectionCache) Size(ctx context.Context) (reputableNodeCount int, newNodeCount int, _ error) {
|
|
stateAny, err := cache.cache.Get(ctx, time.Now())
|
|
if err != nil {
|
|
return 0, 0, Error.Wrap(err)
|
|
}
|
|
state := stateAny.(*uploadselection.State)
|
|
stats := state.Stats()
|
|
return stats.Reputable, stats.New, nil
|
|
}
|
|
|
|
func convNodesToSelectedNodes(nodes []*uploadselection.Node) (xs []*SelectedNode) {
|
|
for _, n := range nodes {
|
|
xs = append(xs, &SelectedNode{
|
|
ID: n.ID,
|
|
Address: pb.NodeFromNodeURL(n.NodeURL).Address,
|
|
LastNet: n.LastNet,
|
|
LastIPPort: n.LastIPPort,
|
|
CountryCode: n.CountryCode,
|
|
})
|
|
}
|
|
return xs
|
|
}
|
|
|
|
func convSelectedNodesToNodes(nodes []*SelectedNode) (xs []*uploadselection.Node) {
|
|
for _, n := range nodes {
|
|
xs = append(xs, &uploadselection.Node{
|
|
NodeURL: (&pb.Node{
|
|
Id: n.ID,
|
|
Address: n.Address,
|
|
}).NodeURL(),
|
|
LastNet: n.LastNet,
|
|
LastIPPort: n.LastIPPort,
|
|
CountryCode: n.CountryCode,
|
|
})
|
|
}
|
|
return xs
|
|
}
|