satellite/{orders,overlay}: use cache for downloads
Use DownloadSelectionCache to avoid querying database for every download. This change only addresses downloads from users. The download selection cache is not currently used for audit and repair. Change-Id: I96a49e121dac0b4204f97592a63131edabd73fb5
This commit is contained in:
parent
48b0a65fbd
commit
51d4e5c275
@ -142,7 +142,7 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
|
||||
nodeIDs[i] = piece.StorageNode
|
||||
}
|
||||
|
||||
nodes, err := service.overlay.GetOnlineNodesForGetDelete(ctx, nodeIDs)
|
||||
nodes, err := service.overlay.CachedGetOnlineNodesForGet(ctx, nodeIDs)
|
||||
if err != nil {
|
||||
service.log.Debug("error getting nodes from overlay", zap.Error(err))
|
||||
return nil, storj.PiecePrivateKey{}, Error.Wrap(err)
|
||||
|
@ -88,6 +88,19 @@ func (cache *DownloadSelectionCache) GetNodeIPs(ctx context.Context, nodes []sto
|
||||
return state.IPs(nodes), nil
|
||||
}
|
||||
|
||||
// GetNodes gets nodes by ID from the cache, and refreshes the cache if it is stale.
|
||||
func (cache *DownloadSelectionCache) GetNodes(ctx context.Context, nodes []storj.NodeID) (_ map[storj.NodeID]*SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
state := stateAny.(*DownloadSelectionCacheState)
|
||||
|
||||
return state.Nodes(nodes), nil
|
||||
}
|
||||
|
||||
// Size returns how many nodes are in the cache.
|
||||
func (cache *DownloadSelectionCache) Size(ctx context.Context) (int, error) {
|
||||
stateAny, err := cache.cache.Get(ctx, time.Now())
|
||||
@ -100,32 +113,43 @@ func (cache *DownloadSelectionCache) Size(ctx context.Context) (int, error) {
|
||||
|
||||
// DownloadSelectionCacheState contains state of download selection cache.
|
||||
type DownloadSelectionCacheState struct {
|
||||
// ipPortByID returns IP based on storj.NodeID
|
||||
ipPortByID map[storj.NodeID]string
|
||||
// byID returns IP based on storj.NodeID
|
||||
byID map[storj.NodeID]*SelectedNode // TODO: optimize, avoid pointery structures for performance
|
||||
}
|
||||
|
||||
// NewDownloadSelectionCacheState creates a new state from the nodes.
|
||||
func NewDownloadSelectionCacheState(nodes []*SelectedNode) *DownloadSelectionCacheState {
|
||||
ipPortByID := map[storj.NodeID]string{}
|
||||
byID := map[storj.NodeID]*SelectedNode{}
|
||||
for _, n := range nodes {
|
||||
ipPortByID[n.ID] = n.LastIPPort
|
||||
byID[n.ID] = n
|
||||
}
|
||||
return &DownloadSelectionCacheState{
|
||||
ipPortByID: ipPortByID,
|
||||
byID: byID,
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns how many nodes are in the state.
|
||||
func (state *DownloadSelectionCacheState) Size() int {
|
||||
return len(state.ipPortByID)
|
||||
return len(state.byID)
|
||||
}
|
||||
|
||||
// IPs returns node ip:port for nodes that are in state.
|
||||
func (state *DownloadSelectionCacheState) IPs(nodes []storj.NodeID) map[storj.NodeID]string {
|
||||
xs := make(map[storj.NodeID]string, len(nodes))
|
||||
for _, nodeID := range nodes {
|
||||
if ip, exists := state.ipPortByID[nodeID]; exists {
|
||||
xs[nodeID] = ip
|
||||
if n, exists := state.byID[nodeID]; exists {
|
||||
xs[nodeID] = n.LastIPPort
|
||||
}
|
||||
}
|
||||
return xs
|
||||
}
|
||||
|
||||
// Nodes returns node ip:port for nodes that are in state.
|
||||
func (state *DownloadSelectionCacheState) Nodes(nodes []storj.NodeID) map[storj.NodeID]*SelectedNode {
|
||||
xs := make(map[storj.NodeID]*SelectedNode, len(nodes))
|
||||
for _, nodeID := range nodes {
|
||||
if n, exists := state.byID[nodeID]; exists {
|
||||
xs[nodeID] = n.Clone() // TODO: optimize the clones
|
||||
}
|
||||
}
|
||||
return xs
|
||||
|
@ -103,3 +103,56 @@ func TestDownloadSelectionCacheState_IPs(t *testing.T) {
|
||||
require.Len(t, ips, 1)
|
||||
require.Equal(t, node.LastIPPort, ips[node.ID])
|
||||
}
|
||||
|
||||
func TestDownloadSelectionCache_GetNodes(t *testing.T) {
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
// add some reputable nodes to the database
|
||||
const nodeCount = 2
|
||||
ids := addNodesToNodesTable(ctx, t, db.OverlayCache(), nodeCount, nodeCount)
|
||||
|
||||
// create new cache and select nodes
|
||||
cache, err := overlay.NewDownloadSelectionCache(zap.NewNop(),
|
||||
db.OverlayCache(),
|
||||
overlay.DownloadSelectionCacheConfig{
|
||||
Staleness: time.Hour,
|
||||
OnlineWindow: time.Hour,
|
||||
AsOfSystemTime: overlay.AsOfSystemTimeConfig{Enabled: true, DefaultInterval: time.Minute},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
cacheCtx, cacheCancel := context.WithCancel(ctx)
|
||||
defer cacheCancel()
|
||||
ctx.Go(func() error { return cache.Run(cacheCtx) })
|
||||
|
||||
// get nodes, expect to see all nodes
|
||||
nodes, err := cache.GetNodes(ctx, ids)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, nodeCount)
|
||||
|
||||
// disqualify one node
|
||||
err = db.OverlayCache().DisqualifyNode(ctx, ids[0], time.Now(), overlay.DisqualificationReasonAuditFailure)
|
||||
require.NoError(t, err)
|
||||
// suspend the other node
|
||||
err = db.OverlayCache().TestSuspendNodeUnknownAudit(ctx, ids[1], time.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
// cache should still contain disqualified node since it has not refreshed
|
||||
nodes, err = cache.GetNodes(ctx, ids)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, nodeCount)
|
||||
|
||||
// update cache staleness so it refreshes on the next call to GetNodes
|
||||
err = cache.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// cache should not contain disqualified node after refresh
|
||||
// it should still contain the suspended node, since a suspended node can still be used for download
|
||||
nodes, err = cache.GetNodes(ctx, ids)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, nodeCount-1)
|
||||
for _, n := range nodes {
|
||||
require.NotEqual(t, ids[0], n.ID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -368,6 +368,12 @@ func (service *Service) GetOnlineNodesForGetDelete(ctx context.Context, nodeIDs
|
||||
return service.db.GetOnlineNodesForGetDelete(ctx, nodeIDs, service.config.Node.OnlineWindow, service.config.Node.AsOfSystemTime)
|
||||
}
|
||||
|
||||
// CachedGetOnlineNodesForGet returns a map of nodes from the download selection cache from the suppliedIDs.
|
||||
func (service *Service) CachedGetOnlineNodesForGet(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
return service.DownloadSelectionCache.GetNodes(ctx, nodeIDs)
|
||||
}
|
||||
|
||||
// GetOnlineNodesForAuditRepair returns a map of nodes for the supplied nodeIDs.
|
||||
func (service *Service) GetOnlineNodesForAuditRepair(ctx context.Context, nodeIDs []storj.NodeID) (_ map[storj.NodeID]*NodeReputation, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
Loading…
Reference in New Issue
Block a user