satellite/satellitedb/overlaycache: fill node tags with join for limited number of nodes
The easiest way to get node information WITH node tags is executing two queries: 1. select all nodes 2. select all tags And we can pair them with a loop, using the in-memory data structures. But this approach does work only, if we select all nodes, which is true when we use cache (upload, download, repair checker). But repair process selects only the required nodes, where this approach is suboptimal. (full table scan for all tags, even if we need only tags for a few dozens nodes). Possible solutions: 1. We can introduce a cache for repair (similar to upload cache) 2. Or we can select both node and tag information with one query (join). This patch implements the second approach. Note: repair itself is quite slow (10-20 seconds per segements to repair). With 15 seconds execution time and 3 minutes cache staleness, we would use the cache only 12 times per worker. Probably we don't need cache for now. https://github.com/storj/storj/issues/6198 Change-Id: I0364d94306e9815a1c280b71e843b8f504e3d870
This commit is contained in:
parent
82b108de69
commit
ad87d1de74
@ -49,7 +49,7 @@ func (cache *overlaycache) SelectAllStorageNodesUpload(ctx context.Context, sele
|
||||
return reputable, new, err
|
||||
}
|
||||
|
||||
err = cache.addNodeTags(ctx, append(reputable, new...))
|
||||
err = cache.addNodeTagsFromFullScan(ctx, append(reputable, new...))
|
||||
if err != nil {
|
||||
return reputable, new, err
|
||||
}
|
||||
@ -145,7 +145,7 @@ func (cache *overlaycache) SelectAllStorageNodesDownload(ctx context.Context, on
|
||||
return nodes, err
|
||||
}
|
||||
|
||||
err = cache.addNodeTags(ctx, nodes)
|
||||
err = cache.addNodeTagsFromFullScan(ctx, nodes)
|
||||
if err != nil {
|
||||
return nodes, err
|
||||
}
|
||||
@ -402,8 +402,6 @@ func (cache *overlaycache) UpdateLastOfflineEmail(ctx context.Context, nodeIDs s
|
||||
func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDList, onlineWindow, asOfSystemInterval time.Duration) (records []nodeselection.SelectedNode, err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
|
||||
var nodes []*nodeselection.SelectedNode
|
||||
|
||||
if len(nodeIDs) == 0 {
|
||||
return nil, Error.New("no ids provided")
|
||||
}
|
||||
@ -414,20 +412,32 @@ func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDLis
|
||||
(n.offline_suspended IS NOT NULL OR n.unknown_audit_suspended IS NOT NULL) AS suspended,
|
||||
n.disqualified IS NOT NULL AS disqualified,
|
||||
n.exit_initiated_at IS NOT NULL AS exiting,
|
||||
n.exit_finished_at IS NOT NULL AS exited
|
||||
n.exit_finished_at IS NOT NULL AS exited,
|
||||
node_tags.name, node_tags.value, node_tags.signed_at, node_tags.signer
|
||||
FROM unnest($1::bytea[]) WITH ORDINALITY AS input(node_id, ordinal)
|
||||
LEFT OUTER JOIN nodes n ON input.node_id = n.id
|
||||
LEFT JOIN node_tags on node_tags.node_id = n.id
|
||||
`+cache.db.impl.AsOfSystemInterval(asOfSystemInterval)+`
|
||||
ORDER BY input.ordinal
|
||||
`, pgutil.NodeIDArray(nodeIDs), time.Now().Add(-onlineWindow),
|
||||
))(func(rows tagsql.Rows) error {
|
||||
for rows.Next() {
|
||||
node, err := scanSelectedNode(rows)
|
||||
node, tag, err := scanSelectedNodeWithTag(rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodes = append(nodes, &node)
|
||||
// just a joined new tag to the previous entry
|
||||
if len(records) > 0 && !tag.NodeID.IsZero() && records[len(records)-1].ID == tag.NodeID {
|
||||
records[len(records)-1].Tags = append(records[len(records)-1].Tags, tag)
|
||||
continue
|
||||
}
|
||||
|
||||
if tag.Name != "" {
|
||||
node.Tags = append(node.Tags, tag)
|
||||
}
|
||||
|
||||
records = append(records, node)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -435,15 +445,6 @@ func (cache *overlaycache) GetNodes(ctx context.Context, nodeIDs storj.NodeIDLis
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = cache.addNodeTags(ctx, nodes)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
records = make([]nodeselection.SelectedNode, len(nodes))
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
records[i] = *nodes[i]
|
||||
}
|
||||
|
||||
return records, Error.Wrap(err)
|
||||
}
|
||||
|
||||
@ -480,7 +481,7 @@ func (cache *overlaycache) GetParticipatingNodes(ctx context.Context, onlineWind
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = cache.addNodeTags(ctx, nodes)
|
||||
err = cache.addNodeTagsFromFullScan(ctx, nodes)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -551,6 +552,91 @@ func scanSelectedNode(rows tagsql.Rows) (nodeselection.SelectedNode, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func scanSelectedNodeWithTag(rows tagsql.Rows) (nodeselection.SelectedNode, nodeselection.NodeTag, error) {
|
||||
var node nodeselection.SelectedNode
|
||||
node.Address = &pb.NodeAddress{}
|
||||
var nodeID nullNodeID
|
||||
var address, lastNet, lastIPPort, countryCode sql.NullString
|
||||
var online, suspended, disqualified, exiting, exited sql.NullBool
|
||||
|
||||
var tag nodeselection.NodeTag
|
||||
var name []byte
|
||||
signedAt := &time.Time{}
|
||||
signer := nullNodeID{}
|
||||
|
||||
err := rows.Scan(&nodeID, &address, &lastNet, &lastIPPort, &countryCode,
|
||||
&online, &suspended, &disqualified, &exiting, &exited, &name, &tag.Value, &signedAt, &signer)
|
||||
if err != nil {
|
||||
return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, err
|
||||
}
|
||||
|
||||
// If node ID was null, no record was found for the specified ID. For our purposes
|
||||
// here, we will treat that as equivalent to a node being DQ'd or exited.
|
||||
if !nodeID.Valid {
|
||||
// return an empty record
|
||||
return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, nil
|
||||
}
|
||||
// nodeID was valid, so from here on we assume all the other non-null fields are valid, per database constraints
|
||||
if disqualified.Bool || exited.Bool {
|
||||
return nodeselection.SelectedNode{}, nodeselection.NodeTag{}, nil
|
||||
}
|
||||
node.ID = nodeID.NodeID
|
||||
node.Address.Address = address.String
|
||||
node.LastNet = lastNet.String
|
||||
if lastIPPort.Valid {
|
||||
node.LastIPPort = lastIPPort.String
|
||||
}
|
||||
if countryCode.Valid {
|
||||
node.CountryCode = location.ToCountryCode(countryCode.String)
|
||||
}
|
||||
node.Online = online.Bool
|
||||
node.Suspended = suspended.Bool
|
||||
node.Exiting = exiting.Bool
|
||||
|
||||
if len(name) > 0 {
|
||||
tag.Name = string(name)
|
||||
tag.SignedAt = *signedAt
|
||||
if signer.Valid {
|
||||
tag.Signer = signer.NodeID
|
||||
}
|
||||
tag.NodeID = node.ID
|
||||
}
|
||||
|
||||
return node, tag, nil
|
||||
}
|
||||
|
||||
func (cache *overlaycache) addNodeTagsFromFullScan(ctx context.Context, nodes []*nodeselection.SelectedNode) error {
|
||||
rows, err := cache.db.All_NodeTags(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
tagsByNode := map[storj.NodeID]nodeselection.NodeTags{}
|
||||
for _, row := range rows {
|
||||
nodeID, err := storj.NodeIDFromBytes(row.NodeId)
|
||||
if err != nil {
|
||||
return Error.New("Invalid nodeID in the database: %x", row.NodeId)
|
||||
}
|
||||
signerID, err := storj.NodeIDFromBytes(row.Signer)
|
||||
if err != nil {
|
||||
return Error.New("Invalid nodeID in the database: %x", row.NodeId)
|
||||
}
|
||||
tagsByNode[nodeID] = append(tagsByNode[nodeID], nodeselection.NodeTag{
|
||||
NodeID: nodeID,
|
||||
Name: row.Name,
|
||||
Value: row.Value,
|
||||
SignedAt: row.SignedAt,
|
||||
Signer: signerID,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
node.Tags = tagsByNode[node.ID]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateReputation updates the DB columns for any of the reputation fields in ReputationUpdate.
|
||||
func (cache *overlaycache) UpdateReputation(ctx context.Context, id storj.NodeID, request overlay.ReputationUpdate) (err error) {
|
||||
defer mon.Task()(&ctx)(&err)
|
||||
@ -1621,35 +1707,3 @@ func (cache *overlaycache) GetNodeTags(ctx context.Context, id storj.NodeID) (no
|
||||
}
|
||||
return tags, err
|
||||
}
|
||||
|
||||
func (cache *overlaycache) addNodeTags(ctx context.Context, nodes []*nodeselection.SelectedNode) error {
|
||||
rows, err := cache.db.All_NodeTags(ctx)
|
||||
if err != nil {
|
||||
return Error.Wrap(err)
|
||||
}
|
||||
|
||||
tagsByNode := map[storj.NodeID]nodeselection.NodeTags{}
|
||||
for _, row := range rows {
|
||||
nodeID, err := storj.NodeIDFromBytes(row.NodeId)
|
||||
if err != nil {
|
||||
return Error.New("Invalid nodeID in the database: %x", row.NodeId)
|
||||
}
|
||||
signerID, err := storj.NodeIDFromBytes(row.Signer)
|
||||
if err != nil {
|
||||
return Error.New("Invalid nodeID in the database: %x", row.NodeId)
|
||||
}
|
||||
tagsByNode[nodeID] = append(tagsByNode[nodeID], nodeselection.NodeTag{
|
||||
NodeID: nodeID,
|
||||
Name: row.Name,
|
||||
Value: row.Value,
|
||||
SignedAt: row.SignedAt,
|
||||
Signer: signerID,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
node.Tags = tagsByNode[node.ID]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ package satellitedb_test
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
@ -710,3 +711,98 @@ func addNode(ctx context.Context, t *testing.T, cache overlay.DB, address, lastI
|
||||
|
||||
return disp
|
||||
}
|
||||
|
||||
func TestOverlayCache_KnownReliableTagHandling(t *testing.T) {
|
||||
signer := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
|
||||
|
||||
satellitedbtest.Run(t, func(ctx *testcontext.Context, t *testing.T, db satellite.DB) {
|
||||
|
||||
cache := db.OverlayCache()
|
||||
|
||||
// GIVEN
|
||||
|
||||
var ids []storj.NodeID
|
||||
for i := 0; i < 10; i++ {
|
||||
address := fmt.Sprintf("127.0.0.%d", i)
|
||||
checkInInfo := overlay.NodeCheckInInfo{
|
||||
IsUp: true,
|
||||
NodeID: testidentity.MustPregeneratedIdentity(i+1, storj.LatestIDVersion()).ID,
|
||||
Address: &pb.NodeAddress{Address: address},
|
||||
LastIPPort: address + ":1234",
|
||||
LastNet: "127.0.0.0",
|
||||
CountryCode: location.Romania,
|
||||
Version: &pb.NodeVersion{Version: "v0.0.0"},
|
||||
}
|
||||
|
||||
ids = append(ids, checkInInfo.NodeID)
|
||||
|
||||
checkin := time.Now().UTC()
|
||||
if i%2 == 0 {
|
||||
checkin = checkin.Add(-50 * time.Hour)
|
||||
}
|
||||
err := cache.UpdateCheckIn(ctx, checkInInfo, checkin, overlay.NodeSelectionConfig{})
|
||||
require.NoError(t, err)
|
||||
|
||||
tags := nodeselection.NodeTags{}
|
||||
|
||||
if i%2 == 0 {
|
||||
tags = append(tags, nodeselection.NodeTag{
|
||||
SignedAt: time.Now(),
|
||||
Signer: signer.ID,
|
||||
NodeID: checkInInfo.NodeID,
|
||||
Name: "index",
|
||||
Value: []byte{byte(i)},
|
||||
})
|
||||
}
|
||||
if i%4 == 0 {
|
||||
tags = append(tags, nodeselection.NodeTag{
|
||||
SignedAt: time.Now(),
|
||||
Signer: signer.ID,
|
||||
NodeID: checkInInfo.NodeID,
|
||||
Name: "selected",
|
||||
Value: []byte("true"),
|
||||
})
|
||||
}
|
||||
|
||||
if len(tags) > 0 {
|
||||
require.NoError(t, err)
|
||||
err = cache.UpdateNodeTags(ctx, tags)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// WHEN
|
||||
nodes, err := cache.GetNodes(ctx, ids, 1*time.Hour, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
// THEN
|
||||
require.Len(t, nodes, 10)
|
||||
|
||||
checkTag := func(tags nodeselection.NodeTags, name string, value []byte) {
|
||||
tag1, err := tags.FindBySignerAndName(signer.ID, name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, name, tag1.Name)
|
||||
require.Equal(t, value, tag1.Value)
|
||||
require.Equal(t, signer.ID, tag1.Signer)
|
||||
require.True(t, time.Since(tag1.SignedAt) < 1*time.Hour)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
ipParts := strings.Split(node.Address.Address, ".")
|
||||
ix, err := strconv.Atoi(ipParts[3])
|
||||
require.NoError(t, err)
|
||||
|
||||
if ix%4 == 0 {
|
||||
require.Len(t, node.Tags, 2)
|
||||
checkTag(node.Tags, "selected", []byte("true"))
|
||||
checkTag(node.Tags, "index", []byte{byte(ix)})
|
||||
} else if ix%2 == 0 {
|
||||
checkTag(node.Tags, "index", []byte{byte(ix)})
|
||||
require.Len(t, node.Tags, 1)
|
||||
} else {
|
||||
require.Len(t, node.Tags, 0)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user