satellite/overlay: rename NodeSelectionCache to UploadSelectionCache
It wasn't obvious that NodeSelectionCache was only for uploads. Change-Id: Ifeeaa6fdb50a4b7916245b48d8634d70ac54459c
This commit is contained in:
parent
a97b5c8552
commit
19e3dc4ec0
@ -431,7 +431,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
|
||||
SuspensionGracePeriod: time.Hour,
|
||||
SuspensionDQEnabled: true,
|
||||
},
|
||||
NodeSelectionCache: overlay.CacheConfig{
|
||||
NodeSelectionCache: overlay.UploadSelectionCacheConfig{
|
||||
Staleness: 3 * time.Minute,
|
||||
},
|
||||
UpdateStatsBatchSize: 100,
|
||||
|
@ -442,7 +442,7 @@ func BenchmarkNodeSelection(b *testing.B) {
|
||||
|
||||
service, err := overlay.NewService(zap.NewNop(), overlaydb, overlay.Config{
|
||||
Node: nodeSelectionConfig,
|
||||
NodeSelectionCache: overlay.CacheConfig{
|
||||
NodeSelectionCache: overlay.UploadSelectionCacheConfig{
|
||||
Staleness: time.Hour,
|
||||
},
|
||||
})
|
||||
@ -496,9 +496,9 @@ func BenchmarkNodeSelection(b *testing.B) {
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("NodeSelectionCacheGetNodes", func(b *testing.B) {
|
||||
b.Run("UploadSelectionCacheGetNodes", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
selected, err := service.SelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
|
||||
selected, err := service.UploadSelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
|
||||
RequestedCount: SelectCount,
|
||||
ExcludedIDs: nil,
|
||||
MinimumVersion: "v1.0.0",
|
||||
@ -508,9 +508,9 @@ func BenchmarkNodeSelection(b *testing.B) {
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("NodeSelectionCacheGetNodesExclusion", func(b *testing.B) {
|
||||
b.Run("UploadSelectionCacheGetNodesExclusion", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
selected, err := service.SelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
|
||||
selected, err := service.UploadSelectionCache.GetNodes(ctx, overlay.FindStorageNodesRequest{
|
||||
RequestedCount: SelectCount,
|
||||
ExcludedIDs: excludedIDs,
|
||||
MinimumVersion: "v1.0.0",
|
||||
|
@ -21,7 +21,7 @@ var (
|
||||
// Config is a configuration for overlay service.
|
||||
type Config struct {
|
||||
Node NodeSelectionConfig
|
||||
NodeSelectionCache CacheConfig
|
||||
NodeSelectionCache UploadSelectionCacheConfig
|
||||
UpdateStatsBatchSize int `help:"number of update requests to process per transaction" default:"100"`
|
||||
AuditHistory AuditHistoryConfig
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func TestMinimumDiskSpace(t *testing.T) {
|
||||
n1, err := saOverlay.Service.FindStorageNodesForUpload(ctx, req)
|
||||
require.Error(t, err)
|
||||
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
|
||||
n2, err := saOverlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err := saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.Error(t, err)
|
||||
require.True(t, overlay.ErrNotEnoughNodes.Has(err))
|
||||
require.Equal(t, len(n2), len(n1))
|
||||
@ -104,7 +104,7 @@ func TestMinimumDiskSpace(t *testing.T) {
|
||||
n2, err = saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(n1), len(n2))
|
||||
n3, err = saOverlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n3, err = saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(n1), len(n3))
|
||||
})
|
||||
@ -218,7 +218,7 @@ func TestEnsureMinimumRequested(t *testing.T) {
|
||||
require.Len(t, nodes, requestedCount)
|
||||
require.Equal(t, 0, countReputable(nodes))
|
||||
|
||||
n2, err := service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err := service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, requestedCount, len(n2))
|
||||
})
|
||||
@ -521,7 +521,7 @@ func TestFindStorageNodesDistinctNetworks(t *testing.T) {
|
||||
require.NotEqual(t, nodes[0].LastIPPort, nodes[1].LastIPPort)
|
||||
require.NotEqual(t, nodes[0].LastIPPort, excludedNodeAddr)
|
||||
require.NotEqual(t, nodes[1].LastIPPort, excludedNodeAddr)
|
||||
n2, err := satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err := satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, n2, 2)
|
||||
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
|
||||
@ -543,7 +543,7 @@ func TestFindStorageNodesDistinctNetworks(t *testing.T) {
|
||||
n1, err := satellite.Overlay.Service.FindStorageNodesWithPreferences(ctx, req, &satellite.Config.Overlay.Node)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, len(n), len(n1))
|
||||
n2, err = satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err = satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, len(n1), len(n2))
|
||||
})
|
||||
@ -593,7 +593,7 @@ func TestSelectNewStorageNodesExcludedIPs(t *testing.T) {
|
||||
require.NotEqual(t, nodes[0].LastIPPort, nodes[1].LastIPPort)
|
||||
require.NotEqual(t, nodes[0].LastIPPort, excludedNodeAddr)
|
||||
require.NotEqual(t, nodes[1].LastIPPort, excludedNodeAddr)
|
||||
n2, err := satellite.Overlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err := satellite.Overlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, n2, 2)
|
||||
require.NotEqual(t, n2[0].LastIPPort, n2[1].LastIPPort)
|
||||
@ -749,7 +749,7 @@ func TestCacheSelectionVsDBSelection(t *testing.T) {
|
||||
req := overlay.FindStorageNodesRequest{RequestedCount: 5}
|
||||
n1, err := saOverlay.Service.FindStorageNodesForUpload(ctx, req)
|
||||
require.NoError(t, err)
|
||||
n2, err := saOverlay.Service.SelectionCache.GetNodes(ctx, req)
|
||||
n2, err := saOverlay.Service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(n2), len(n1))
|
||||
n3, err := saOverlay.Service.FindStorageNodesWithPreferences(ctx, req, &nodeConfig)
|
||||
|
@ -262,10 +262,10 @@ func (node *SelectedNode) Clone() *SelectedNode {
|
||||
//
|
||||
// architecture: Service
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
db DB
|
||||
config Config
|
||||
SelectionCache *NodeSelectionCache
|
||||
log *zap.Logger
|
||||
db DB
|
||||
config Config
|
||||
UploadSelectionCache *UploadSelectionCache
|
||||
}
|
||||
|
||||
// NewService returns a new Service.
|
||||
@ -278,7 +278,7 @@ func NewService(log *zap.Logger, db DB, config Config) (*Service, error) {
|
||||
log: log,
|
||||
db: db,
|
||||
config: config,
|
||||
SelectionCache: NewNodeSelectionCache(log, db,
|
||||
UploadSelectionCache: NewUploadSelectionCache(log, db,
|
||||
config.NodeSelectionCache.Staleness, config.Node,
|
||||
),
|
||||
}, nil
|
||||
@ -313,7 +313,7 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs
|
||||
// GetNodeIPs returns a map of node ip:port for the supplied nodeIDs.
|
||||
func (service *Service) GetNodeIPs(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return service.SelectionCache.GetNodeIPs(ctx, nodeIDs)
|
||||
return service.UploadSelectionCache.GetNodeIPs(ctx, nodeIDs)
|
||||
}
|
||||
|
||||
// IsOnline checks if a node is 'online' based on the collected statistics.
|
||||
@ -346,7 +346,7 @@ func (service *Service) FindStorageNodesForUpload(ctx context.Context, req FindS
|
||||
return service.FindStorageNodesWithPreferences(ctx, req, &service.config.Node)
|
||||
}
|
||||
|
||||
selectedNodes, err := service.SelectionCache.GetNodes(ctx, req)
|
||||
selectedNodes, err := service.UploadSelectionCache.GetNodes(ctx, req)
|
||||
if err != nil {
|
||||
service.log.Warn("error selecting from node selection cache", zap.String("err", err.Error()))
|
||||
}
|
||||
@ -544,7 +544,7 @@ func (service *Service) TestVetNode(ctx context.Context, nodeID storj.NodeID) (v
|
||||
service.log.Warn("error vetting node", zap.Stringer("node ID", nodeID))
|
||||
return nil, err
|
||||
}
|
||||
err = service.SelectionCache.Refresh(ctx)
|
||||
err = service.UploadSelectionCache.Refresh(ctx)
|
||||
service.log.Warn("nodecache refresh err", zap.Error(err))
|
||||
return vettedTime, err
|
||||
}
|
||||
@ -556,7 +556,7 @@ func (service *Service) TestUnvetNode(ctx context.Context, nodeID storj.NodeID)
|
||||
service.log.Warn("error unvetting node", zap.Stringer("node ID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = service.SelectionCache.Refresh(ctx)
|
||||
err = service.UploadSelectionCache.Refresh(ctx)
|
||||
service.log.Warn("nodecache refresh err", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
@ -274,7 +274,7 @@ func TestRandomizedSelectionCache(t *testing.T) {
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
satellite := planet.Satellites[0]
|
||||
overlaydb := satellite.Overlay.DB
|
||||
nodeSelectionCache := satellite.Overlay.Service.SelectionCache
|
||||
uploadSelectionCache := satellite.Overlay.Service.UploadSelectionCache
|
||||
allIDs := make(storj.NodeIDList, totalNodes)
|
||||
nodeCounts := make(map[storj.NodeID]int)
|
||||
expectedNewCount := int(float64(totalNodes) * satellite.Config.Overlay.Node.NewNodeFraction)
|
||||
@ -324,9 +324,9 @@ func TestRandomizedSelectionCache(t *testing.T) {
|
||||
nodeCounts[newID] = 0
|
||||
}
|
||||
|
||||
err := nodeSelectionCache.Refresh(ctx)
|
||||
err := uploadSelectionCache.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
reputable, new := nodeSelectionCache.Size()
|
||||
reputable, new := uploadSelectionCache.Size()
|
||||
require.Equal(t, totalNodes-expectedNewCount, reputable)
|
||||
require.Equal(t, expectedNewCount, new)
|
||||
|
||||
@ -338,7 +338,7 @@ func TestRandomizedSelectionCache(t *testing.T) {
|
||||
RequestedCount: numNodesToSelect,
|
||||
}
|
||||
|
||||
nodes, err = nodeSelectionCache.GetNodes(ctx, req)
|
||||
nodes, err = uploadSelectionCache.GetNodes(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, numNodesToSelect)
|
||||
|
||||
|
@ -15,26 +15,26 @@ import (
|
||||
"storj.io/storj/satellite/nodeselection"
|
||||
)
|
||||
|
||||
// CacheDB implements the database for overlay node selection cache.
|
||||
// UploadSelectionDB implements the database for upload selection cache.
|
||||
//
|
||||
// architecture: Database
|
||||
type CacheDB interface {
|
||||
type UploadSelectionDB interface {
|
||||
// SelectAllStorageNodesUpload returns all nodes that qualify to store data, organized as reputable nodes and new nodes
|
||||
SelectAllStorageNodesUpload(ctx context.Context, selectionCfg NodeSelectionConfig) (reputable, new []*SelectedNode, err error)
|
||||
}
|
||||
|
||||
// CacheConfig is a configuration for overlay node selection cache.
|
||||
type CacheConfig struct {
|
||||
// UploadSelectionCacheConfig is a configuration for upload selection cache.
|
||||
type UploadSelectionCacheConfig struct {
|
||||
Disabled bool `help:"disable node cache" default:"false"`
|
||||
Staleness time.Duration `help:"how stale the node selection cache can be" releaseDefault:"3m" devDefault:"5m"`
|
||||
}
|
||||
|
||||
// NodeSelectionCache keeps a list of all the storage nodes that are qualified to store data
|
||||
// UploadSelectionCache keeps a list of all the storage nodes that are qualified to store data
|
||||
// We organize the nodes by if they are reputable or a new node on the network.
|
||||
// The cache will sync with the nodes table in the database and get refreshed once the staleness time has past.
|
||||
type NodeSelectionCache struct {
|
||||
type UploadSelectionCache struct {
|
||||
log *zap.Logger
|
||||
db CacheDB
|
||||
db UploadSelectionDB
|
||||
selectionConfig NodeSelectionConfig
|
||||
staleness time.Duration
|
||||
|
||||
@ -43,9 +43,9 @@ type NodeSelectionCache struct {
|
||||
state *nodeselection.State
|
||||
}
|
||||
|
||||
// NewNodeSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data.
|
||||
func NewNodeSelectionCache(log *zap.Logger, db CacheDB, staleness time.Duration, config NodeSelectionConfig) *NodeSelectionCache {
|
||||
return &NodeSelectionCache{
|
||||
// NewUploadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to store data.
|
||||
func NewUploadSelectionCache(log *zap.Logger, db UploadSelectionDB, staleness time.Duration, config NodeSelectionConfig) *UploadSelectionCache {
|
||||
return &UploadSelectionCache{
|
||||
log: log,
|
||||
db: db,
|
||||
staleness: staleness,
|
||||
@ -55,7 +55,7 @@ func NewNodeSelectionCache(log *zap.Logger, db CacheDB, staleness time.Duration,
|
||||
|
||||
// Refresh populates the cache with all of the reputableNodes and newNode nodes
|
||||
// This method is useful for tests.
|
||||
func (cache *NodeSelectionCache) Refresh(ctx context.Context) (err error) {
|
||||
func (cache *UploadSelectionCache) Refresh(ctx context.Context) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
_, err = cache.refresh(ctx)
|
||||
return err
|
||||
@ -64,7 +64,7 @@ func (cache *NodeSelectionCache) Refresh(ctx context.Context) (err error) {
|
||||
// refresh calls out to the database and refreshes the cache with the most up-to-date
|
||||
// data from the nodes table, then sets time that the last refresh occurred so we know when
|
||||
// to refresh again in the future.
|
||||
func (cache *NodeSelectionCache) refresh(ctx context.Context) (state *nodeselection.State, err error) {
|
||||
func (cache *UploadSelectionCache) refresh(ctx context.Context) (state *nodeselection.State, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
@ -89,7 +89,7 @@ func (cache *NodeSelectionCache) refresh(ctx context.Context) (state *nodeselect
|
||||
// GetNodes selects nodes from the cache that will be used to upload a file.
|
||||
// Every node selected will be from a distinct network.
|
||||
// If the cache hasn't been refreshed recently it will do so first.
|
||||
func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
|
||||
func (cache *UploadSelectionCache) GetNodes(ctx context.Context, req FindStorageNodesRequest) (_ []*SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
cache.mu.RLock()
|
||||
@ -119,7 +119,7 @@ func (cache *NodeSelectionCache) GetNodes(ctx context.Context, req FindStorageNo
|
||||
}
|
||||
|
||||
// GetNodeIPs gets the last node ip:port from the cache, refreshing when needed.
|
||||
func (cache *NodeSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
||||
func (cache *UploadSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
cache.mu.RLock()
|
||||
@ -139,7 +139,7 @@ func (cache *NodeSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.N
|
||||
}
|
||||
|
||||
// Size returns how many reputable nodes and new nodes are in the cache.
|
||||
func (cache *NodeSelectionCache) Size() (reputableNodeCount int, newNodeCount int) {
|
||||
func (cache *UploadSelectionCache) Size() (reputableNodeCount int, newNodeCount int) {
|
||||
cache.mu.RLock()
|
||||
state := cache.state
|
||||
cache.mu.RUnlock()
|
@ -48,7 +48,7 @@ const (
|
||||
|
||||
func TestRefresh(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
db.OverlayCache(),
|
||||
lowStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -147,7 +147,7 @@ func TestRefreshConcurrent(t *testing.T) {
|
||||
// concurrent cache.Refresh with high staleness, where high staleness means the
|
||||
// cache should only be refreshed the first time we call cache.Refresh
|
||||
mockDB := mockdb{}
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
highStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -168,7 +168,7 @@ func TestRefreshConcurrent(t *testing.T) {
|
||||
// concurrent cache.Refresh with low staleness, where low staleness
|
||||
// means that the cache will refresh *every time* cache.Refresh is called
|
||||
mockDB = mockdb{}
|
||||
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache = overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
lowStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -194,7 +194,7 @@ func TestGetNodes(t *testing.T) {
|
||||
DistinctIP: true,
|
||||
MinimumDiskSpace: 100 * memory.MiB,
|
||||
}
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
db.OverlayCache(),
|
||||
lowStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -249,7 +249,7 @@ func TestGetNodesConcurrent(t *testing.T) {
|
||||
reputable: reputableNodes,
|
||||
new: newNodes,
|
||||
}
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
highStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -289,7 +289,7 @@ func TestGetNodesConcurrent(t *testing.T) {
|
||||
reputable: reputableNodes,
|
||||
new: newNodes,
|
||||
}
|
||||
cache = overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache = overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
lowStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -376,7 +376,7 @@ func TestGetNodesDistinct(t *testing.T) {
|
||||
config := nodeSelectionConfig
|
||||
config.NewNodeFraction = 0.5
|
||||
config.DistinctIP = true
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
highStaleness,
|
||||
config,
|
||||
@ -404,7 +404,7 @@ func TestGetNodesDistinct(t *testing.T) {
|
||||
config := nodeSelectionConfig
|
||||
config.NewNodeFraction = 0.5
|
||||
config.DistinctIP = false
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
highStaleness,
|
||||
config,
|
||||
@ -422,7 +422,7 @@ func TestGetNodesError(t *testing.T) {
|
||||
defer ctx.Cleanup()
|
||||
|
||||
mockDB := mockdb{}
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
&mockDB,
|
||||
highStaleness,
|
||||
nodeSelectionConfig,
|
||||
@ -450,7 +450,7 @@ func TestNewNodeFraction(t *testing.T) {
|
||||
DistinctIP: true,
|
||||
MinimumDiskSpace: 10 * memory.MiB,
|
||||
}
|
||||
cache := overlay.NewNodeSelectionCache(zap.NewNop(),
|
||||
cache := overlay.NewUploadSelectionCache(zap.NewNop(),
|
||||
db.OverlayCache(),
|
||||
lowStaleness,
|
||||
nodeSelectionConfig,
|
Loading…
Reference in New Issue
Block a user