storage node cert cache (#1226)
* draft * still errors * double close fix * added tests * weird, goimports must not be working * renames * missed one * forgot to save:
This commit is contained in:
parent
07412698a9
commit
690e8b2061
@ -33,7 +33,6 @@ func TestQueryWithBw(t *testing.T) {
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sendGeneratedAgreements(ctx, t, planet)
|
||||
|
||||
tally := planet.Satellites[0].Accounting.Tally
|
||||
tallyEnd, bwTotals, err := tally.QueryBW(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -9,8 +9,10 @@ import (
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/peer"
|
||||
|
||||
"storj.io/storj/internal/sync2"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/transport"
|
||||
)
|
||||
@ -86,6 +88,27 @@ func (dialer *Dialer) Ping(ctx context.Context, target pb.Node) (bool, error) {
|
||||
return err == nil, errs.Combine(err, conn.disconnect())
|
||||
}
|
||||
|
||||
// FetchPeerIdentity connects to a node and returns its peer identity
|
||||
func (dialer *Dialer) FetchPeerIdentity(ctx context.Context, target pb.Node) (pID *identity.PeerIdentity, err error) {
|
||||
if !dialer.limit.Lock() {
|
||||
return nil, context.Canceled
|
||||
}
|
||||
defer dialer.limit.Unlock()
|
||||
|
||||
conn, err := dialer.dial(ctx, target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = errs.Combine(err, conn.disconnect())
|
||||
}()
|
||||
|
||||
p := &peer.Peer{}
|
||||
pCall := grpc.Peer(p)
|
||||
_, err = conn.client.Ping(ctx, &pb.PingRequest{}, pCall)
|
||||
return identity.PeerIdentityFromPeer(p)
|
||||
}
|
||||
|
||||
// dial dials the specified node.
|
||||
func (dialer *Dialer) dial(ctx context.Context, target pb.Node) (*Conn, error) {
|
||||
grpcconn, err := dialer.transport.DialNode(ctx, &target)
|
||||
|
@ -188,6 +188,19 @@ func (k *Kademlia) WaitForBootstrap() {
|
||||
k.bootstrapFinished.Wait()
|
||||
}
|
||||
|
||||
// FetchPeerIdentity connects to a node and returns its peer identity
|
||||
func (k *Kademlia) FetchPeerIdentity(ctx context.Context, nodeID storj.NodeID) (*identity.PeerIdentity, error) {
|
||||
if !k.lookups.Start() {
|
||||
return nil, context.Canceled
|
||||
}
|
||||
defer k.lookups.Done()
|
||||
node, err := k.FindNode(ctx, nodeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return k.dialer.FetchPeerIdentity(ctx, node)
|
||||
}
|
||||
|
||||
// Ping checks that the provided node is still accessible on the network
|
||||
func (k *Kademlia) Ping(ctx context.Context, node pb.Node) (pb.Node, error) {
|
||||
if !k.lookups.Start() {
|
||||
|
26
pkg/kademlia/kademlia_planet_test.go
Normal file
26
pkg/kademlia/kademlia_planet_test.go
Normal file
@ -0,0 +1,26 @@
|
||||
// Copyright (C) 2019 Storj Labs, Inc.
|
||||
// See LICENSE for copying information.
|
||||
|
||||
package kademlia_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"storj.io/storj/internal/testcontext"
|
||||
"storj.io/storj/internal/testplanet"
|
||||
)
|
||||
|
||||
func TestFetchPeerIdentity(t *testing.T) {
|
||||
testplanet.Run(t, testplanet.Config{
|
||||
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 0,
|
||||
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
|
||||
sat := planet.Satellites[0]
|
||||
peerID, err := planet.StorageNodes[0].Kademlia.Service.FetchPeerIdentity(ctx, sat.ID())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, sat.ID(), peerID.ID)
|
||||
require.True(t, sat.Identity.Leaf.Equal(peerID.Leaf))
|
||||
require.True(t, sat.Identity.CA.Equal(peerID.CA))
|
||||
})
|
||||
}
|
@ -69,7 +69,7 @@ func NewStreamReader(s *Server, stream pb.PieceStoreRoutes_StoreServer, bandwidt
|
||||
}
|
||||
// if whitelist does not contain PBA satellite ID, reject storage request
|
||||
if len(s.whitelist) != 0 {
|
||||
if !s.approved(pba.SatelliteId) {
|
||||
if !s.isWhitelisted(pba.SatelliteId) {
|
||||
return nil, StoreError.New("Satellite ID not approved")
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,9 @@
|
||||
package psserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto"
|
||||
"crypto/ecdsa"
|
||||
"crypto/hmac"
|
||||
"crypto/sha512"
|
||||
"errors"
|
||||
@ -19,12 +21,12 @@ import (
|
||||
"github.com/mr-tron/base58/base58"
|
||||
"github.com/zeebo/errs"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"storj.io/storj/pkg/auth"
|
||||
"storj.io/storj/pkg/identity"
|
||||
"storj.io/storj/pkg/kademlia"
|
||||
"storj.io/storj/pkg/pb"
|
||||
"storj.io/storj/pkg/peertls"
|
||||
pstore "storj.io/storj/pkg/piecestore"
|
||||
"storj.io/storj/pkg/piecestore/psserver/psdb"
|
||||
"storj.io/storj/pkg/storj"
|
||||
@ -62,7 +64,7 @@ type Server struct {
|
||||
pkey crypto.PrivateKey
|
||||
totalAllocated int64 // TODO: use memory.Size
|
||||
totalBwAllocated int64 // TODO: use memory.Size
|
||||
whitelist []storj.NodeID
|
||||
whitelist map[storj.NodeID]*ecdsa.PublicKey
|
||||
verifier auth.SignedMessageVerifier
|
||||
kad *kademlia.Kademlia
|
||||
}
|
||||
@ -121,14 +123,15 @@ func NewEndpoint(log *zap.Logger, config Config, storage *pstore.Storage, db *ps
|
||||
}
|
||||
|
||||
// parse the comma separated list of approved satellite IDs into an array of storj.NodeIDs
|
||||
var whitelist []storj.NodeID
|
||||
whitelist := make(map[storj.NodeID]*ecdsa.PublicKey)
|
||||
if config.SatelliteIDRestriction {
|
||||
idStrings := strings.Split(config.WhitelistedSatelliteIDs, ",")
|
||||
for i, s := range idStrings {
|
||||
whitelist[i], err = storj.NodeIDFromString(s)
|
||||
for _, s := range idStrings {
|
||||
satID, err := storj.NodeIDFromString(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
whitelist[satID] = nil // we will set these later
|
||||
}
|
||||
}
|
||||
|
||||
@ -294,7 +297,7 @@ func (s *Server) verifySignature(ctx context.Context, rba *pb.RenterBandwidthAll
|
||||
if err != nil || pba.UplinkId != pi.ID {
|
||||
return auth.ErrBadID.New("Uplink Node ID: %s vs %s", pba.UplinkId, pi.ID)
|
||||
}
|
||||
//todo: use whitelist for uplinks?
|
||||
|
||||
//todo: use whitelist for satellites?
|
||||
switch {
|
||||
case len(pba.SerialNumber) == 0:
|
||||
@ -312,6 +315,10 @@ func (s *Server) verifySignature(ctx context.Context, rba *pb.RenterBandwidthAll
|
||||
if err := auth.VerifyMsg(rba, pba.UplinkId); err != nil {
|
||||
return pb.ErrRenter.Wrap(err)
|
||||
}
|
||||
if !s.isWhitelisted(pba.SatelliteId) {
|
||||
return pb.ErrPayer.Wrap(peertls.ErrVerifyCAWhitelist.New(""))
|
||||
}
|
||||
//todo: once the certs are removed from the PBA, use s.whitelist to check satellite signatures
|
||||
if err := auth.VerifyMsg(&pba, pba.SatelliteId); err != nil {
|
||||
return pb.ErrPayer.Wrap(err)
|
||||
}
|
||||
@ -330,14 +337,25 @@ func (s *Server) verifyPayerAllocation(pba *pb.PayerBandwidthAllocation, actionP
|
||||
return nil
|
||||
}
|
||||
|
||||
// approved returns true if a node ID exists in a list of approved node IDs
|
||||
func (s *Server) approved(id storj.NodeID) bool {
|
||||
for _, n := range s.whitelist {
|
||||
if n == id {
|
||||
return true
|
||||
}
|
||||
//isWhitelisted returns true if a node ID exists in a list of approved node IDs
|
||||
func (s *Server) isWhitelisted(id storj.NodeID) bool {
|
||||
if len(s.whitelist) == 0 {
|
||||
return true // don't whitelist if the whitelist is empty
|
||||
}
|
||||
return false
|
||||
_, found := s.whitelist[id]
|
||||
return found
|
||||
}
|
||||
|
||||
func (s *Server) getPublicKey(ctx context.Context, id storj.NodeID) (*ecdsa.PublicKey, error) {
|
||||
pID, err := s.kad.FetchPeerIdentity(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ecdsa, ok := pID.Leaf.PublicKey.(*ecdsa.PublicKey)
|
||||
if !ok {
|
||||
return nil, auth.ErrECDSA
|
||||
}
|
||||
return ecdsa, nil
|
||||
}
|
||||
|
||||
func getBeginningOfMonth() time.Time {
|
||||
|
@ -4,6 +4,7 @@
|
||||
package psserver
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -327,7 +328,7 @@ func TestStore(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
snID, upID := newTestID(ctx, t), newTestID(ctx, t)
|
||||
s, c, cleanup := NewTest(ctx, t, snID, upID, []storj.NodeID{})
|
||||
s, c, cleanup := NewTest(ctx, t, snID, upID, tt.whitelist)
|
||||
defer cleanup()
|
||||
db := s.DB.DB
|
||||
|
||||
@ -354,7 +355,7 @@ func TestStore(t *testing.T) {
|
||||
|
||||
resp, err := stream.CloseAndRecv()
|
||||
if tt.err != "" {
|
||||
require.NotNil(t, err)
|
||||
require.Error(t, err)
|
||||
require.True(t, strings.HasPrefix(err.Error(), tt.err))
|
||||
return
|
||||
}
|
||||
@ -563,6 +564,10 @@ func NewTest(ctx context.Context, t *testing.T, snID, upID *identity.FullIdentit
|
||||
verifier := func(authorization *pb.SignedMessage) error {
|
||||
return nil
|
||||
}
|
||||
whitelist := make(map[storj.NodeID]*ecdsa.PublicKey)
|
||||
for _, id := range ids {
|
||||
whitelist[id] = nil
|
||||
}
|
||||
psServer := &Server{
|
||||
log: zaptest.NewLogger(t),
|
||||
storage: storage,
|
||||
@ -570,7 +575,7 @@ func NewTest(ctx context.Context, t *testing.T, snID, upID *identity.FullIdentit
|
||||
verifier: verifier,
|
||||
totalAllocated: math.MaxInt64,
|
||||
totalBwAllocated: math.MaxInt64,
|
||||
whitelist: ids,
|
||||
whitelist: whitelist,
|
||||
}
|
||||
//init ps server grpc
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
|
Loading…
Reference in New Issue
Block a user