satellite,storagenode: propagate node tags with NodeCheckin

Change-Id: Ib1a602a8cf81204efa001b5d338914ea4218c39b
This commit is contained in:
Márton Elek 2023-07-03 11:42:48 +02:00 committed by Storj Robot
parent a740f96f75
commit 6a3802de4f
8 changed files with 477 additions and 12 deletions

View File

@ -16,6 +16,7 @@ import (
"golang.org/x/sync/errgroup"
"storj.io/common/identity"
"storj.io/common/nodetag"
"storj.io/common/pb"
"storj.io/common/peertls/extensions"
"storj.io/common/peertls/tlsopts"
@ -325,7 +326,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
Type: pb.NodeType_SATELLITE,
Version: *pbVersion,
}
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact)
var authority nodetag.Authority
peerIdentity := full.PeerIdentity()
authority = append(authority, signing.SigneeFromPeerIdentity(peerIdentity))
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, authority, config.Contact)
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
return nil, errs.Combine(err, peer.Close())

View File

@ -7,17 +7,22 @@ import (
"crypto/tls"
"crypto/x509"
"net"
"sort"
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
"storj.io/common/nodetag"
"storj.io/common/pb"
"storj.io/common/rpc/rpcpeer"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/contact"
)
func TestSatelliteContactEndpoint(t *testing.T) {
@ -177,3 +182,143 @@ func TestSatellitePingMe_Failure(t *testing.T) {
require.Nil(t, resp)
})
}
func TestSatelliteContactEndpoint_WithNodeTags(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Server.DisableQUIC = true
config.Contact.Tags = contact.SignedTags(pb.SignedNodeTagSets{
Tags: []*pb.SignedNodeTagSet{},
})
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
ident := planet.StorageNodes[0].Identity
peer := rpcpeer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(nodeInfo.Address),
Port: 5,
},
State: tls.ConnectionState{
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
},
}
unsignedTags := &pb.NodeTagSet{
NodeId: ident.ID.Bytes(),
Tags: []*pb.Tag{
{
Name: "soc",
Value: []byte{1},
},
{
Name: "foo",
Value: []byte("bar"),
},
},
}
signedTags, err := nodetag.Sign(ctx, unsignedTags, signing.SignerFromFullIdentity(planet.Satellites[0].Identity))
require.NoError(t, err)
peerCtx := rpcpeer.NewContext(ctx, &peer)
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
Address: nodeInfo.Address,
Version: &nodeInfo.Version,
Capacity: &nodeInfo.Capacity,
Operator: &nodeInfo.Operator,
DebounceLimit: 3,
Features: 0xf,
SignedTags: &pb.SignedNodeTagSets{
Tags: []*pb.SignedNodeTagSet{
signedTags,
},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
tags, err := planet.Satellites[0].DB.OverlayCache().GetNodeTags(ctx, ident.ID)
require.NoError(t, err)
require.Len(t, tags, 2)
sort.Slice(tags, func(i, j int) bool {
return tags[i].Name < tags[j].Name
})
require.Equal(t, "foo", tags[0].Name)
require.Equal(t, "bar", string(tags[0].Value))
require.Equal(t, "soc", tags[1].Name)
require.Equal(t, []byte{1}, tags[1].Value)
})
}
func TestSatelliteContactEndpoint_WithWrongNodeTags(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
Reconfigure: testplanet.Reconfigure{
StorageNode: func(index int, config *storagenode.Config) {
config.Server.DisableQUIC = true
config.Contact.Tags = contact.SignedTags(pb.SignedNodeTagSets{
Tags: []*pb.SignedNodeTagSet{},
})
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
nodeInfo := planet.StorageNodes[0].Contact.Service.Local()
ident := planet.StorageNodes[0].Identity
peer := rpcpeer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(nodeInfo.Address),
Port: 5,
},
State: tls.ConnectionState{
PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA},
},
}
wrongNodeID := testidentity.MustPregeneratedIdentity(99, storj.LatestIDVersion()).ID
unsignedTags := &pb.NodeTagSet{
NodeId: wrongNodeID.Bytes(),
Tags: []*pb.Tag{
{
Name: "soc",
Value: []byte{1},
},
{
Name: "foo",
Value: []byte("bar"),
},
},
}
signedTags, err := nodetag.Sign(ctx, unsignedTags, signing.SignerFromFullIdentity(planet.Satellites[0].Identity))
require.NoError(t, err)
peerCtx := rpcpeer.NewContext(ctx, &peer)
resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{
Address: nodeInfo.Address,
Version: &nodeInfo.Version,
Capacity: &nodeInfo.Capacity,
Operator: &nodeInfo.Operator,
DebounceLimit: 3,
Features: 0xf,
SignedTags: &pb.SignedNodeTagSets{
Tags: []*pb.SignedNodeTagSet{
signedTags,
},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
tags, err := planet.Satellites[0].DB.OverlayCache().GetNodeTags(ctx, ident.ID)
require.NoError(t, err)
require.Len(t, tags, 0)
})
}

View File

@ -114,6 +114,10 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) (
req.Operator.WalletFeatures = nil
}
}
err = endpoint.service.processNodeTags(ctx, nodeID, req.SignedTags)
if err != nil {
endpoint.log.Info("failed to update node tags", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err))
}
nodeInfo := overlay.NodeCheckInInfo{
NodeID: peerID.ID,

View File

@ -12,11 +12,13 @@ import (
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/nodetag"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/quic"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/storj/satellite/nodeselection/uploadselection"
"storj.io/storj/satellite/overlay"
)
@ -49,19 +51,22 @@ type Service struct {
timeout time.Duration
idLimiter *RateLimiter
allowPrivateIP bool
nodeTagAuthority nodetag.Authority
}
// NewService creates a new contact service.
func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, config Config) *Service {
func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, authority nodetag.Authority, config Config) *Service {
return &Service{
log: log,
self: self,
overlay: overlay,
peerIDs: peerIDs,
dialer: dialer,
timeout: config.Timeout,
idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize),
allowPrivateIP: config.AllowPrivateIP,
log: log,
self: self,
overlay: overlay,
peerIDs: peerIDs,
dialer: dialer,
timeout: config.Timeout,
idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize),
allowPrivateIP: config.AllowPrivateIP,
nodeTagAuthority: authority,
}
}
@ -151,3 +156,56 @@ func (service *Service) pingNodeQUIC(ctx context.Context, nodeurl storj.NodeURL)
return nil
}
func (service *Service) processNodeTags(ctx context.Context, nodeID storj.NodeID, req *pb.SignedNodeTagSets) error {
if req != nil {
tags := uploadselection.NodeTags{}
for _, t := range req.Tags {
verifiedTags, signerID, err := verifyTags(ctx, service.nodeTagAuthority, nodeID, t)
if err != nil {
service.log.Info("Failed to verify tags.", zap.Error(err), zap.Stringer("NodeID", nodeID))
continue
}
ts := time.Unix(verifiedTags.Timestamp, 0)
for _, vt := range verifiedTags.Tags {
tags = append(tags, uploadselection.NodeTag{
NodeID: nodeID,
Name: vt.Name,
Value: vt.Value,
SignedAt: ts,
Signer: signerID,
})
}
}
if len(tags) > 0 {
err := service.overlay.UpdateNodeTags(ctx, tags)
if err != nil {
return Error.Wrap(err)
}
}
}
return nil
}
func verifyTags(ctx context.Context, authority nodetag.Authority, nodeID storj.NodeID, t *pb.SignedNodeTagSet) (*pb.NodeTagSet, storj.NodeID, error) {
signerID, err := storj.NodeIDFromBytes(t.SignerNodeId)
if err != nil {
return nil, signerID, errs.New("failed to parse signerNodeID from verifiedTags: '%x', %s", t.SignerNodeId, err.Error())
}
verifiedTags, err := authority.Verify(ctx, t)
if err != nil {
return nil, signerID, errs.New("received node tags with wrong/unknown signature: '%x', %s", t.Signature, err.Error())
}
signedNodeID, err := storj.NodeIDFromBytes(verifiedTags.NodeId)
if err != nil {
return nil, signerID, errs.New("failed to parse nodeID from verifiedTags: '%x', %s", verifiedTags.NodeId, err.Error())
}
if signedNodeID != nodeID {
return nil, signerID, errs.New("the tag is signed for a different node. Expected NodeID: '%s', Received NodeID: '%s'", nodeID, signedNodeID)
}
return verifiedTags, signerID, nil
}

View File

@ -0,0 +1,149 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/identity/testidentity"
"storj.io/common/nodetag"
"storj.io/common/pb"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/testcontext"
)
func TestVerifyTags(t *testing.T) {
ctx := testcontext.New(t)
snIdentity := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion())
signerIdentity := testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion())
signer := signing.SignerFromFullIdentity(signerIdentity)
authority := nodetag.Authority{
signing.SignerFromFullIdentity(signerIdentity),
}
t.Run("ok tags", func(t *testing.T) {
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: snIdentity.ID.Bytes(),
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, signer)
require.NoError(t, err)
verifiedTags, signerID, err := verifyTags(ctx, authority, snIdentity.ID, tags)
require.NoError(t, err)
require.Equal(t, signerIdentity.ID, signerID)
require.Len(t, verifiedTags.Tags, 1)
require.Equal(t, "foo", verifiedTags.Tags[0].Name)
require.Equal(t, []byte("bar"), verifiedTags.Tags[0].Value)
})
t.Run("wrong signer ID", func(t *testing.T) {
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: snIdentity.ID.Bytes(),
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, signer)
require.NoError(t, err)
tags.SignerNodeId = []byte{1, 2, 3, 4}
_, _, err = verifyTags(ctx, authority, snIdentity.ID, tags)
require.Error(t, err)
require.ErrorContains(t, err, "01020304")
require.ErrorContains(t, err, "failed to parse signerNodeID")
})
t.Run("wrong signature", func(t *testing.T) {
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: snIdentity.ID.Bytes(),
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, signer)
require.NoError(t, err)
tags.Signature = []byte{4, 3, 2, 1}
_, _, err = verifyTags(ctx, authority, snIdentity.ID, tags)
require.Error(t, err)
require.ErrorContains(t, err, "04030201")
require.ErrorContains(t, err, "wrong/unknown signature")
})
t.Run("unknown signer", func(t *testing.T) {
otherSignerIdentity := testidentity.MustPregeneratedIdentity(2, storj.LatestIDVersion())
otherSigner := signing.SignerFromFullIdentity(otherSignerIdentity)
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: snIdentity.ID.Bytes(),
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, otherSigner)
require.NoError(t, err)
_, _, err = verifyTags(ctx, authority, snIdentity.ID, tags)
require.Error(t, err)
require.ErrorContains(t, err, "wrong/unknown signature")
})
t.Run("signed for different node", func(t *testing.T) {
otherNodeID := testidentity.MustPregeneratedIdentity(3, storj.LatestIDVersion()).ID
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: otherNodeID.Bytes(),
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, signer)
require.NoError(t, err)
_, _, err = verifyTags(ctx, authority, snIdentity.ID, tags)
require.Error(t, err)
require.ErrorContains(t, err, snIdentity.ID.String())
require.ErrorContains(t, err, "the tag is signed for a different node")
})
t.Run("wrong NodeID", func(t *testing.T) {
tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{
NodeId: []byte{4, 4, 4},
Tags: []*pb.Tag{
{
Name: "foo",
Value: []byte("bar"),
},
},
}, signer)
require.NoError(t, err)
_, _, err = verifyTags(ctx, authority, snIdentity.ID, tags)
require.Error(t, err)
require.ErrorContains(t, err, "040404")
require.ErrorContains(t, err, "failed to parse nodeID")
})
}

View File

@ -5,11 +5,15 @@ package contact
import (
"context"
"encoding/base64"
"math/rand"
"strings"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -38,8 +42,56 @@ type Config struct {
// Chore config values
Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s"`
Tags SignedTags `help:"protobuf serialized signed node tags in hex (base64) format"`
}
// SignedTags represents base64 encoded signed tags.
type SignedTags pb.SignedNodeTagSets
// Type implements pflag.Value interface.
func (u *SignedTags) Type() string {
return "signedtags"
}
// String implements pflag.Value interface.
func (u *SignedTags) String() string {
if u == nil {
return ""
}
p := pb.SignedNodeTagSets(*u)
raw, err := proto.Marshal(&p)
if err != nil {
return err.Error()
}
return base64.StdEncoding.EncodeToString(raw)
}
// Set implements flag.Value interface.
func (u *SignedTags) Set(s string) error {
p := pb.SignedNodeTagSets{}
for i, part := range strings.Split(s, ",") {
if s == "" {
return nil
}
if u == nil {
return nil
}
raw, err := base64.StdEncoding.DecodeString(part)
if err != nil {
return errs.New("signed tag configuration #%d is not base64 encoded: %s", i+1, s)
}
err = proto.Unmarshal(raw, &p)
if err != nil {
return errs.New("signed tag configuration #%d is not a pb.SignedNodeTagSets{}: %s", i+1, s)
}
u.Tags = append(u.Tags, p.Tags...)
}
return nil
}
var _ pflag.Value = &SignedTags{}
// NodeInfo contains information necessary for introducing storagenode to satellite.
type NodeInfo struct {
ID storj.NodeID
@ -65,10 +117,12 @@ type Service struct {
quicStats *QUICStats
initialized sync2.Fence
tags *pb.SignedNodeTagSets
}
// NewService creates a new contact service.
func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool, quicStats *QUICStats) *Service {
func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool, quicStats *QUICStats, tags *pb.SignedNodeTagSets) *Service {
return &Service{
log: log,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
@ -76,6 +130,7 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.
trust: trust,
self: self,
quicStats: quicStats,
tags: tags,
}
}
@ -141,6 +196,7 @@ func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID)
NoiseKeyAttestation: self.NoiseKeyAttestation,
DebounceLimit: int32(self.DebounceLimit),
Features: features,
SignedTags: service.tags,
})
service.quicStats.SetStatus(false)
if err != nil {

View File

@ -0,0 +1,45 @@
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"testing"
"github.com/stretchr/testify/require"
"storj.io/common/pb"
"storj.io/common/storj"
)
func TestSignedTags(t *testing.T) {
signer, err := storj.NodeIDFromString("12whfK1EDvHJtajBiAUeajQLYcWqxcQmdYQU5zX5cCf6bAxfgu4")
require.NoError(t, err)
t.Run("single injection", func(t *testing.T) {
s := SignedTags{}
// created with `tag-signer sign --node-id 1AujhDBDBhfYUhEH8cpFMymPu5yLYkQBCfFYbdSN5AVL5Bu6W9 --identity-dir satellite-api/0 soc=true`.
err := s.Set("CqIBCjUKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgsKA3NvYxIEdHJ1ZRjVqoqlBhog/+bJH0ducPXXHjw4eOBZLzO4LwHd+nZvQGIFy66jcwAiRzBFAiEAhrP90d2VxTHnWFDTzOv7Xd5MlvPon4lMgE9QotzLMmYCIAtXUbCrIVZEiphblFuaDDftJY0XTm/n64wZthuB8SJx")
require.NoError(t, err)
pbTags := pb.SignedNodeTagSets(s).Tags
require.Len(t, pbTags, 1)
require.Equal(t, signer.Bytes(), pbTags[0].SignerNodeId)
})
t.Run("coma separated", func(t *testing.T) {
s := SignedTags{}
a := "CqIBCjUKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgsKA3NvYxIEdHJ1ZRjVqoqlBhog/+bJH0ducPXXHjw4eOBZLzO4LwHd+nZvQGIFy66jcwAiRzBFAiEAhrP90d2VxTHnWFDTzOv7Xd5MlvPon4lMgE9QotzLMmYCIAtXUbCrIVZEiphblFuaDDftJY0XTm/n64wZthuB8SJx"
b := "CqEBCjQKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgoKA2ZvbxIDYmFyGMesiqUGGiD/5skfR25w9dcePDh44FkvM7gvAd36dm9AYgXLrqNzACJHMEUCIDFJMkpi3z3qPxvLch7Ie7afpP7Ab8+wsayzCGo0WMaBAiEAgCoWfSUhXNeFx2FPrlAv0ed5jW/DH+7TjDdeiqwA04g="
err := s.Set(a + "," + b)
require.NoError(t, err)
pbTags := pb.SignedNodeTagSets(s).Tags
require.Len(t, pbTags, 2)
require.Equal(t, signer.Bytes(), pbTags[0].SignerNodeId)
})
}

View File

@ -445,7 +445,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
}
peer.Contact.PingStats = new(contact.PingStats)
peer.Contact.QUICStats = contact.NewQUICStats(peer.Server.IsQUICEnabled())
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust, peer.Contact.QUICStats)
tags := pb.SignedNodeTagSets(config.Contact.Tags)
peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust, peer.Contact.QUICStats, &tags)
peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Contact.Service)
peer.Services.Add(lifecycle.Item{