storj/satellite/overlay/downloadselection.go
Egon Elbre 54e01d37f9 satellite/overlay: add DownloadSelectionCache
Change-Id: Ic0779280172325f8d03f55a2e9673722f72bdd44
2021-01-29 16:47:06 +02:00

149 lines
4.4 KiB
Go

// Copyright (C) 2019 Storj Labs, Incache.
// See LICENSE for copying information.
package overlay
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/storj"
)
// DownloadSelectionDB implements the database for download selection cache.
//
// architecture: Database
type DownloadSelectionDB interface {
// SelectAllStorageNodesDownload returns nodes that are ready for downloading
SelectAllStorageNodesDownload(ctx context.Context, onlineWindow time.Duration, asOf AsOfSystemTimeConfig) ([]*SelectedNode, error)
}
// DownloadSelectionCacheConfig contains configuration for the selection cache.
type DownloadSelectionCacheConfig struct {
Staleness time.Duration
OnlineWindow time.Duration
AsOfSystemTime AsOfSystemTimeConfig
}
// DownloadSelectionCache keeps a list of all the storage nodes that are qualified to download data from.
// The cache will sync with the nodes table in the database and get refreshed once the staleness time has past.
type DownloadSelectionCache struct {
log *zap.Logger
db DownloadSelectionDB
config DownloadSelectionCacheConfig
mu sync.RWMutex
lastRefresh time.Time
state *DownloadSelectionCacheState
}
// NewDownloadSelectionCache creates a new cache that keeps a list of all the storage nodes that are qualified to download data from.
func NewDownloadSelectionCache(log *zap.Logger, db DownloadSelectionDB, config DownloadSelectionCacheConfig) *DownloadSelectionCache {
return &DownloadSelectionCache{
log: log,
db: db,
config: config,
}
}
// Refresh populates the cache with all of the reputableNodes and newNode nodes
// This method is useful for tests.
func (cache *DownloadSelectionCache) Refresh(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = cache.refresh(ctx)
return err
}
// refresh calls out to the database and refreshes the cache with the most up-to-date
// data from the nodes table, then sets time that the last refresh occurred so we know when
// to refresh again in the future.
func (cache *DownloadSelectionCache) refresh(ctx context.Context) (state *DownloadSelectionCacheState, err error) {
defer mon.Task()(&ctx)(&err)
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.state != nil && time.Since(cache.lastRefresh) <= cache.config.Staleness {
return cache.state, nil
}
onlineNodes, err := cache.db.SelectAllStorageNodesDownload(ctx, cache.config.OnlineWindow, cache.config.AsOfSystemTime)
if err != nil {
return cache.state, err
}
cache.lastRefresh = time.Now().UTC()
cache.state = NewDownloadSelectionCacheState(onlineNodes)
mon.IntVal("refresh_cache_size_online").Observe(int64(len(onlineNodes)))
return cache.state, nil
}
// GetNodeIPs gets the last node ip:port from the cache, refreshing when needed.
func (cache *DownloadSelectionCache) GetNodeIPs(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]string, err error) {
defer mon.Task()(&ctx)(&err)
cache.mu.RLock()
lastRefresh := cache.lastRefresh
state := cache.state
cache.mu.RUnlock()
// if the cache is stale, then refresh it before we get nodes
if state == nil || time.Since(lastRefresh) > cache.config.Staleness {
state, err = cache.refresh(ctx)
if err != nil {
return nil, err
}
}
return state.IPs(nodes), nil
}
// Size returns how many nodes are in the cache.
func (cache *DownloadSelectionCache) Size() int {
cache.mu.RLock()
state := cache.state
cache.mu.RUnlock()
if state == nil {
return 0
}
return state.Size()
}
// DownloadSelectionCacheState contains state of download selection cache.
type DownloadSelectionCacheState struct {
// ipPortByID returns IP based on storj.NodeID
ipPortByID map[storj.NodeID]string
}
// NewDownloadSelectionCacheState creates a new state from the nodes.
func NewDownloadSelectionCacheState(nodes []*SelectedNode) *DownloadSelectionCacheState {
ipPortByID := map[storj.NodeID]string{}
for _, n := range nodes {
ipPortByID[n.ID] = n.LastIPPort
}
return &DownloadSelectionCacheState{
ipPortByID: ipPortByID,
}
}
// Size returns how many nodes are in the state.
func (state *DownloadSelectionCacheState) Size() int {
return len(state.ipPortByID)
}
// IPs returns node ip:port for nodes that are in state.
func (state *DownloadSelectionCacheState) IPs(nodes []storj.NodeID) map[storj.NodeID]string {
xs := make(map[storj.NodeID]string, len(nodes))
for _, nodeID := range nodes {
if ip, exists := state.ipPortByID[nodeID]; exists {
xs[nodeID] = ip
}
}
return xs
}