satellite/contact: Populate PeerIdentities table in satellitedb (#2998)
* Add peer identities db dependency to contact service * Update peer identities db on contact checkin
This commit is contained in:
parent
cc8a47324a
commit
82a651ac3a
@ -44,5 +44,9 @@ func TestSatelliteContactEndpoint(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
peerID, err := planet.Satellites[0].DB.PeerIdentities().Get(ctx, nodeDossier.Id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ident.PeerIdentity(), peerID)
|
||||
})
|
||||
}
|
||||
|
@ -43,13 +43,20 @@ func (endpoint *Endpoint) Checkin(ctx context.Context, req *pb.CheckinRequest) (
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
pingNodeSuccess, pingErrorMessage, err := endpoint.pingBack(ctx, req, peerID)
|
||||
nodeID := peerID.ID
|
||||
|
||||
err = endpoint.service.peerIDs.Set(ctx, nodeID, peerID)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.service.overlay.Put(ctx, peerID, pb.Node{
|
||||
Id: peerID,
|
||||
pingNodeSuccess, pingErrorMessage, err := endpoint.pingBack(ctx, req, nodeID)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
err = endpoint.service.overlay.Put(ctx, nodeID, pb.Node{
|
||||
Id: nodeID,
|
||||
Address: &pb.NodeAddress{
|
||||
Transport: pb.NodeTransport_TCP_TLS_GRPC,
|
||||
Address: req.GetAddress().GetAddress(),
|
||||
@ -62,13 +69,13 @@ func (endpoint *Endpoint) Checkin(ctx context.Context, req *pb.CheckinRequest) (
|
||||
// TODO(jg): We are making 2 requests to the database, one to update uptime and
|
||||
// the other to update the capacity and operator info. We should combine these into
|
||||
// one to reduce db connections. Consider adding batching and using a stored procedure.
|
||||
_, err = endpoint.service.overlay.UpdateUptime(ctx, peerID, pingNodeSuccess)
|
||||
_, err = endpoint.service.overlay.UpdateUptime(ctx, nodeID, pingNodeSuccess)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
|
||||
nodeInfo := pb.InfoResponse{Operator: req.GetOperator(), Capacity: req.GetCapacity()}
|
||||
_, err = endpoint.service.overlay.UpdateNodeInfo(ctx, peerID, &nodeInfo)
|
||||
_, err = endpoint.service.overlay.UpdateNodeInfo(ctx, nodeID, &nodeInfo)
|
||||
if err != nil {
|
||||
return nil, Error.Wrap(err)
|
||||
}
|
||||
@ -113,14 +120,14 @@ func (endpoint *Endpoint) pingBack(ctx context.Context, req *pb.CheckinRequest,
|
||||
return pingNodeSuccess, pingErrorMessage, nil
|
||||
}
|
||||
|
||||
func peerIDFromContext(ctx context.Context) (storj.NodeID, error) {
|
||||
func peerIDFromContext(ctx context.Context) (*identity.PeerIdentity, error) {
|
||||
p, ok := peer.FromContext(ctx)
|
||||
if !ok {
|
||||
return storj.NodeID{}, Error.New("unable to get grpc peer from contex")
|
||||
return nil, Error.New("unable to get grpc peer from context")
|
||||
}
|
||||
peerIdentity, err := identity.PeerIdentityFromPeer(p)
|
||||
if err != nil {
|
||||
return storj.NodeID{}, err
|
||||
return nil, err
|
||||
}
|
||||
return peerIdentity.ID, nil
|
||||
return peerIdentity, nil
|
||||
}
|
||||
|
@ -12,25 +12,29 @@ import (
|
||||
"storj.io/storj/satellite/overlay"
|
||||
)
|
||||
|
||||
// Error is the default error class for contact package
|
||||
// Error is the default error class for contact package.
|
||||
var Error = errs.Class("contact")
|
||||
|
||||
var mon = monkit.Package()
|
||||
|
||||
// Service is the contact service between storage nodes and satellites
|
||||
// Service is the contact service between storage nodes and satellites.
|
||||
// It is responsible for updating general node information like address, capacity, and uptime.
|
||||
// It is also responsible for updating peer identity information for verifying signatures from that node.
|
||||
//
|
||||
// architecture: Service
|
||||
type Service struct {
|
||||
log *zap.Logger
|
||||
overlay *overlay.Service
|
||||
peerIDs overlay.PeerIdentities
|
||||
transport transport.Client
|
||||
}
|
||||
|
||||
// NewService creates a new contact service
|
||||
func NewService(log *zap.Logger, overlay *overlay.Service, transport transport.Client) *Service {
|
||||
// NewService creates a new contact service.
|
||||
func NewService(log *zap.Logger, overlay *overlay.Service, peerIDs overlay.PeerIdentities, transport transport.Client) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
overlay: overlay,
|
||||
peerIDs: peerIDs,
|
||||
transport: transport,
|
||||
}
|
||||
}
|
||||
|
@ -367,7 +367,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
|
||||
|
||||
{ // setup contact service
|
||||
log.Debug("Setting up contact service")
|
||||
peer.Contact.Service = contact.NewService(peer.Log.Named("contact"), peer.Overlay.Service, peer.Transport)
|
||||
peer.Contact.Service = contact.NewService(peer.Log.Named("contact"), peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Transport)
|
||||
peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
|
||||
pb.RegisterNodeServer(peer.Server.GRPC(), peer.Contact.Endpoint)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user