diff --git a/satellite/api.go b/satellite/api.go index f84022c5e..a6415cfd3 100644 --- a/satellite/api.go +++ b/satellite/api.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/errgroup" "storj.io/common/identity" + "storj.io/common/nodetag" "storj.io/common/pb" "storj.io/common/peertls/extensions" "storj.io/common/peertls/tlsopts" @@ -325,7 +326,12 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, Type: pb.NodeType_SATELLITE, Version: *pbVersion, } - peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact) + + var authority nodetag.Authority + peerIdentity := full.PeerIdentity() + authority = append(authority, signing.SigneeFromPeerIdentity(peerIdentity)) + + peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, authority, config.Contact) peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil { return nil, errs.Combine(err, peer.Close()) diff --git a/satellite/contact/contact_test.go b/satellite/contact/contact_test.go index 4113ec7a5..64e531306 100644 --- a/satellite/contact/contact_test.go +++ b/satellite/contact/contact_test.go @@ -7,17 +7,22 @@ import ( "crypto/tls" "crypto/x509" "net" + "sort" "testing" "github.com/stretchr/testify/require" + "storj.io/common/identity/testidentity" + "storj.io/common/nodetag" "storj.io/common/pb" "storj.io/common/rpc/rpcpeer" "storj.io/common/rpc/rpcstatus" + "storj.io/common/signing" "storj.io/common/storj" "storj.io/common/testcontext" "storj.io/storj/private/testplanet" "storj.io/storj/storagenode" + "storj.io/storj/storagenode/contact" ) func TestSatelliteContactEndpoint(t *testing.T) { @@ -177,3 +182,143 @@ func TestSatellitePingMe_Failure(t *testing.T) { require.Nil(t, resp) }) } + +func TestSatelliteContactEndpoint_WithNodeTags(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + StorageNode: func(index int, config *storagenode.Config) { + config.Server.DisableQUIC = true + config.Contact.Tags = contact.SignedTags(pb.SignedNodeTagSets{ + Tags: []*pb.SignedNodeTagSet{}, + }) + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + nodeInfo := planet.StorageNodes[0].Contact.Service.Local() + ident := planet.StorageNodes[0].Identity + + peer := rpcpeer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP(nodeInfo.Address), + Port: 5, + }, + State: tls.ConnectionState{ + PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA}, + }, + } + + unsignedTags := &pb.NodeTagSet{ + NodeId: ident.ID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "soc", + Value: []byte{1}, + }, + { + Name: "foo", + Value: []byte("bar"), + }, + }, + } + + signedTags, err := nodetag.Sign(ctx, unsignedTags, signing.SignerFromFullIdentity(planet.Satellites[0].Identity)) + require.NoError(t, err) + + peerCtx := rpcpeer.NewContext(ctx, &peer) + resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{ + Address: nodeInfo.Address, + Version: &nodeInfo.Version, + Capacity: &nodeInfo.Capacity, + Operator: &nodeInfo.Operator, + DebounceLimit: 3, + Features: 0xf, + SignedTags: &pb.SignedNodeTagSets{ + Tags: []*pb.SignedNodeTagSet{ + signedTags, + }, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + tags, err := planet.Satellites[0].DB.OverlayCache().GetNodeTags(ctx, ident.ID) + require.NoError(t, err) + require.Len(t, tags, 2) + sort.Slice(tags, func(i, j int) bool { + return tags[i].Name < tags[j].Name + }) + require.Equal(t, "foo", tags[0].Name) + require.Equal(t, "bar", string(tags[0].Value)) + + require.Equal(t, "soc", tags[1].Name) + require.Equal(t, []byte{1}, tags[1].Value) + + }) +} + +func TestSatelliteContactEndpoint_WithWrongNodeTags(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0, + Reconfigure: testplanet.Reconfigure{ + StorageNode: func(index int, config *storagenode.Config) { + config.Server.DisableQUIC = true + config.Contact.Tags = contact.SignedTags(pb.SignedNodeTagSets{ + Tags: []*pb.SignedNodeTagSet{}, + }) + }, + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + nodeInfo := planet.StorageNodes[0].Contact.Service.Local() + ident := planet.StorageNodes[0].Identity + + peer := rpcpeer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP(nodeInfo.Address), + Port: 5, + }, + State: tls.ConnectionState{ + PeerCertificates: []*x509.Certificate{ident.Leaf, ident.CA}, + }, + } + + wrongNodeID := testidentity.MustPregeneratedIdentity(99, storj.LatestIDVersion()).ID + unsignedTags := &pb.NodeTagSet{ + NodeId: wrongNodeID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "soc", + Value: []byte{1}, + }, + { + Name: "foo", + Value: []byte("bar"), + }, + }, + } + + signedTags, err := nodetag.Sign(ctx, unsignedTags, signing.SignerFromFullIdentity(planet.Satellites[0].Identity)) + require.NoError(t, err) + + peerCtx := rpcpeer.NewContext(ctx, &peer) + resp, err := planet.Satellites[0].Contact.Endpoint.CheckIn(peerCtx, &pb.CheckInRequest{ + Address: nodeInfo.Address, + Version: &nodeInfo.Version, + Capacity: &nodeInfo.Capacity, + Operator: &nodeInfo.Operator, + DebounceLimit: 3, + Features: 0xf, + SignedTags: &pb.SignedNodeTagSets{ + Tags: []*pb.SignedNodeTagSet{ + signedTags, + }, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + tags, err := planet.Satellites[0].DB.OverlayCache().GetNodeTags(ctx, ident.ID) + require.NoError(t, err) + require.Len(t, tags, 0) + }) +} diff --git a/satellite/contact/endpoint.go b/satellite/contact/endpoint.go index 33c83b140..2a87fe411 100644 --- a/satellite/contact/endpoint.go +++ b/satellite/contact/endpoint.go @@ -114,6 +114,10 @@ func (endpoint *Endpoint) CheckIn(ctx context.Context, req *pb.CheckInRequest) ( req.Operator.WalletFeatures = nil } } + err = endpoint.service.processNodeTags(ctx, nodeID, req.SignedTags) + if err != nil { + endpoint.log.Info("failed to update node tags", zap.String("node address", req.Address), zap.Stringer("Node ID", nodeID), zap.Error(err)) + } nodeInfo := overlay.NodeCheckInInfo{ NodeID: peerID.ID, diff --git a/satellite/contact/service.go b/satellite/contact/service.go index 50ba2833e..10ee33a04 100644 --- a/satellite/contact/service.go +++ b/satellite/contact/service.go @@ -12,11 +12,13 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" + "storj.io/common/nodetag" "storj.io/common/pb" "storj.io/common/rpc" "storj.io/common/rpc/quic" "storj.io/common/rpc/rpcstatus" "storj.io/common/storj" + "storj.io/storj/satellite/nodeselection/uploadselection" "storj.io/storj/satellite/overlay" ) @@ -49,19 +51,22 @@ type Service struct { timeout time.Duration idLimiter *RateLimiter allowPrivateIP bool + + nodeTagAuthority nodetag.Authority } // NewService creates a new contact service. -func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, config Config) *Service { +func NewService(log *zap.Logger, self *overlay.NodeDossier, overlay *overlay.Service, peerIDs overlay.PeerIdentities, dialer rpc.Dialer, authority nodetag.Authority, config Config) *Service { return &Service{ - log: log, - self: self, - overlay: overlay, - peerIDs: peerIDs, - dialer: dialer, - timeout: config.Timeout, - idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize), - allowPrivateIP: config.AllowPrivateIP, + log: log, + self: self, + overlay: overlay, + peerIDs: peerIDs, + dialer: dialer, + timeout: config.Timeout, + idLimiter: NewRateLimiter(config.RateLimitInterval, config.RateLimitBurst, config.RateLimitCacheSize), + allowPrivateIP: config.AllowPrivateIP, + nodeTagAuthority: authority, } } @@ -151,3 +156,56 @@ func (service *Service) pingNodeQUIC(ctx context.Context, nodeurl storj.NodeURL) return nil } + +func (service *Service) processNodeTags(ctx context.Context, nodeID storj.NodeID, req *pb.SignedNodeTagSets) error { + if req != nil { + tags := uploadselection.NodeTags{} + for _, t := range req.Tags { + verifiedTags, signerID, err := verifyTags(ctx, service.nodeTagAuthority, nodeID, t) + if err != nil { + service.log.Info("Failed to verify tags.", zap.Error(err), zap.Stringer("NodeID", nodeID)) + continue + } + + ts := time.Unix(verifiedTags.Timestamp, 0) + for _, vt := range verifiedTags.Tags { + tags = append(tags, uploadselection.NodeTag{ + NodeID: nodeID, + Name: vt.Name, + Value: vt.Value, + SignedAt: ts, + Signer: signerID, + }) + } + } + if len(tags) > 0 { + err := service.overlay.UpdateNodeTags(ctx, tags) + if err != nil { + return Error.Wrap(err) + } + } + } + return nil +} + +func verifyTags(ctx context.Context, authority nodetag.Authority, nodeID storj.NodeID, t *pb.SignedNodeTagSet) (*pb.NodeTagSet, storj.NodeID, error) { + signerID, err := storj.NodeIDFromBytes(t.SignerNodeId) + if err != nil { + return nil, signerID, errs.New("failed to parse signerNodeID from verifiedTags: '%x', %s", t.SignerNodeId, err.Error()) + } + + verifiedTags, err := authority.Verify(ctx, t) + if err != nil { + return nil, signerID, errs.New("received node tags with wrong/unknown signature: '%x', %s", t.Signature, err.Error()) + } + + signedNodeID, err := storj.NodeIDFromBytes(verifiedTags.NodeId) + if err != nil { + return nil, signerID, errs.New("failed to parse nodeID from verifiedTags: '%x', %s", verifiedTags.NodeId, err.Error()) + } + + if signedNodeID != nodeID { + return nil, signerID, errs.New("the tag is signed for a different node. Expected NodeID: '%s', Received NodeID: '%s'", nodeID, signedNodeID) + } + return verifiedTags, signerID, nil +} diff --git a/satellite/contact/service_test.go b/satellite/contact/service_test.go new file mode 100644 index 000000000..d9a176080 --- /dev/null +++ b/satellite/contact/service_test.go @@ -0,0 +1,149 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package contact + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/identity/testidentity" + "storj.io/common/nodetag" + "storj.io/common/pb" + "storj.io/common/signing" + "storj.io/common/storj" + "storj.io/common/testcontext" +) + +func TestVerifyTags(t *testing.T) { + ctx := testcontext.New(t) + snIdentity := testidentity.MustPregeneratedIdentity(0, storj.LatestIDVersion()) + signerIdentity := testidentity.MustPregeneratedIdentity(1, storj.LatestIDVersion()) + signer := signing.SignerFromFullIdentity(signerIdentity) + authority := nodetag.Authority{ + signing.SignerFromFullIdentity(signerIdentity), + } + t.Run("ok tags", func(t *testing.T) { + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: snIdentity.ID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, signer) + require.NoError(t, err) + + verifiedTags, signerID, err := verifyTags(ctx, authority, snIdentity.ID, tags) + require.NoError(t, err) + + require.Equal(t, signerIdentity.ID, signerID) + require.Len(t, verifiedTags.Tags, 1) + require.Equal(t, "foo", verifiedTags.Tags[0].Name) + require.Equal(t, []byte("bar"), verifiedTags.Tags[0].Value) + }) + + t.Run("wrong signer ID", func(t *testing.T) { + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: snIdentity.ID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, signer) + require.NoError(t, err) + tags.SignerNodeId = []byte{1, 2, 3, 4} + + _, _, err = verifyTags(ctx, authority, snIdentity.ID, tags) + + require.Error(t, err) + require.ErrorContains(t, err, "01020304") + require.ErrorContains(t, err, "failed to parse signerNodeID") + + }) + + t.Run("wrong signature", func(t *testing.T) { + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: snIdentity.ID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, signer) + require.NoError(t, err) + tags.Signature = []byte{4, 3, 2, 1} + + _, _, err = verifyTags(ctx, authority, snIdentity.ID, tags) + + require.Error(t, err) + require.ErrorContains(t, err, "04030201") + require.ErrorContains(t, err, "wrong/unknown signature") + }) + + t.Run("unknown signer", func(t *testing.T) { + otherSignerIdentity := testidentity.MustPregeneratedIdentity(2, storj.LatestIDVersion()) + otherSigner := signing.SignerFromFullIdentity(otherSignerIdentity) + + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: snIdentity.ID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, otherSigner) + require.NoError(t, err) + + _, _, err = verifyTags(ctx, authority, snIdentity.ID, tags) + + require.Error(t, err) + require.ErrorContains(t, err, "wrong/unknown signature") + }) + + t.Run("signed for different node", func(t *testing.T) { + otherNodeID := testidentity.MustPregeneratedIdentity(3, storj.LatestIDVersion()).ID + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: otherNodeID.Bytes(), + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, signer) + require.NoError(t, err) + + _, _, err = verifyTags(ctx, authority, snIdentity.ID, tags) + + require.Error(t, err) + require.ErrorContains(t, err, snIdentity.ID.String()) + require.ErrorContains(t, err, "the tag is signed for a different node") + }) + + t.Run("wrong NodeID", func(t *testing.T) { + tags, err := nodetag.Sign(ctx, &pb.NodeTagSet{ + NodeId: []byte{4, 4, 4}, + Tags: []*pb.Tag{ + { + Name: "foo", + Value: []byte("bar"), + }, + }, + }, signer) + require.NoError(t, err) + + _, _, err = verifyTags(ctx, authority, snIdentity.ID, tags) + + require.Error(t, err) + require.ErrorContains(t, err, "040404") + require.ErrorContains(t, err, "failed to parse nodeID") + }) + +} diff --git a/storagenode/contact/service.go b/storagenode/contact/service.go index 44b3498f8..000a144d0 100644 --- a/storagenode/contact/service.go +++ b/storagenode/contact/service.go @@ -5,11 +5,15 @@ package contact import ( "context" + "encoding/base64" "math/rand" + "strings" "sync" "time" + "github.com/gogo/protobuf/proto" "github.com/spacemonkeygo/monkit/v3" + "github.com/spf13/pflag" "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -38,8 +42,56 @@ type Config struct { // Chore config values Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s"` + + Tags SignedTags `help:"protobuf serialized signed node tags in hex (base64) format"` } +// SignedTags represents base64 encoded signed tags. +type SignedTags pb.SignedNodeTagSets + +// Type implements pflag.Value interface. +func (u *SignedTags) Type() string { + return "signedtags" +} + +// String implements pflag.Value interface. +func (u *SignedTags) String() string { + if u == nil { + return "" + } + p := pb.SignedNodeTagSets(*u) + raw, err := proto.Marshal(&p) + if err != nil { + return err.Error() + } + return base64.StdEncoding.EncodeToString(raw) +} + +// Set implements flag.Value interface. +func (u *SignedTags) Set(s string) error { + p := pb.SignedNodeTagSets{} + for i, part := range strings.Split(s, ",") { + if s == "" { + return nil + } + if u == nil { + return nil + } + raw, err := base64.StdEncoding.DecodeString(part) + if err != nil { + return errs.New("signed tag configuration #%d is not base64 encoded: %s", i+1, s) + } + err = proto.Unmarshal(raw, &p) + if err != nil { + return errs.New("signed tag configuration #%d is not a pb.SignedNodeTagSets{}: %s", i+1, s) + } + u.Tags = append(u.Tags, p.Tags...) + } + return nil +} + +var _ pflag.Value = &SignedTags{} + // NodeInfo contains information necessary for introducing storagenode to satellite. type NodeInfo struct { ID storj.NodeID @@ -65,10 +117,12 @@ type Service struct { quicStats *QUICStats initialized sync2.Fence + + tags *pb.SignedNodeTagSets } // NewService creates a new contact service. -func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool, quicStats *QUICStats) *Service { +func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool, quicStats *QUICStats, tags *pb.SignedNodeTagSets) *Service { return &Service{ log: log, rand: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -76,6 +130,7 @@ func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust. trust: trust, self: self, quicStats: quicStats, + tags: tags, } } @@ -141,6 +196,7 @@ func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) NoiseKeyAttestation: self.NoiseKeyAttestation, DebounceLimit: int32(self.DebounceLimit), Features: features, + SignedTags: service.tags, }) service.quicStats.SetStatus(false) if err != nil { diff --git a/storagenode/contact/service_test.go b/storagenode/contact/service_test.go new file mode 100644 index 000000000..104334024 --- /dev/null +++ b/storagenode/contact/service_test.go @@ -0,0 +1,45 @@ +// Copyright (C) 2023 Storj Labs, Inc. +// See LICENSE for copying information. + +package contact + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "storj.io/common/pb" + "storj.io/common/storj" +) + +func TestSignedTags(t *testing.T) { + signer, err := storj.NodeIDFromString("12whfK1EDvHJtajBiAUeajQLYcWqxcQmdYQU5zX5cCf6bAxfgu4") + require.NoError(t, err) + + t.Run("single injection", func(t *testing.T) { + s := SignedTags{} + + // created with `tag-signer sign --node-id 1AujhDBDBhfYUhEH8cpFMymPu5yLYkQBCfFYbdSN5AVL5Bu6W9 --identity-dir satellite-api/0 soc=true`. + err := s.Set("CqIBCjUKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgsKA3NvYxIEdHJ1ZRjVqoqlBhog/+bJH0ducPXXHjw4eOBZLzO4LwHd+nZvQGIFy66jcwAiRzBFAiEAhrP90d2VxTHnWFDTzOv7Xd5MlvPon4lMgE9QotzLMmYCIAtXUbCrIVZEiphblFuaDDftJY0XTm/n64wZthuB8SJx") + require.NoError(t, err) + + pbTags := pb.SignedNodeTagSets(s).Tags + require.Len(t, pbTags, 1) + require.Equal(t, signer.Bytes(), pbTags[0].SignerNodeId) + + }) + + t.Run("coma separated", func(t *testing.T) { + s := SignedTags{} + + a := "CqIBCjUKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgsKA3NvYxIEdHJ1ZRjVqoqlBhog/+bJH0ducPXXHjw4eOBZLzO4LwHd+nZvQGIFy66jcwAiRzBFAiEAhrP90d2VxTHnWFDTzOv7Xd5MlvPon4lMgE9QotzLMmYCIAtXUbCrIVZEiphblFuaDDftJY0XTm/n64wZthuB8SJx" + b := "CqEBCjQKIBaACbi52g7o8pQaX+hkw6TJNDHJ0UVqSLQtibPHoqkAEgoKA2ZvbxIDYmFyGMesiqUGGiD/5skfR25w9dcePDh44FkvM7gvAd36dm9AYgXLrqNzACJHMEUCIDFJMkpi3z3qPxvLch7Ie7afpP7Ab8+wsayzCGo0WMaBAiEAgCoWfSUhXNeFx2FPrlAv0ed5jW/DH+7TjDdeiqwA04g=" + err := s.Set(a + "," + b) + require.NoError(t, err) + + pbTags := pb.SignedNodeTagSets(s).Tags + require.Len(t, pbTags, 2) + require.Equal(t, signer.Bytes(), pbTags[0].SignerNodeId) + + }) +} diff --git a/storagenode/peer.go b/storagenode/peer.go index 6e1147d34..a84992f7e 100644 --- a/storagenode/peer.go +++ b/storagenode/peer.go @@ -445,7 +445,9 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten } peer.Contact.PingStats = new(contact.PingStats) peer.Contact.QUICStats = contact.NewQUICStats(peer.Server.IsQUICEnabled()) - peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust, peer.Contact.QUICStats) + + tags := pb.SignedNodeTagSets(config.Contact.Tags) + peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust, peer.Contact.QUICStats, &tags) peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Contact.Service) peer.Services.Add(lifecycle.Item{