From 5b16d240ea5e48e65ed159c4b5a141b43f055b9c Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Tue, 3 May 2022 14:34:26 +0100 Subject: [PATCH] rpcs --- src/spawner/args.rs | 55 +++++++- src/spawner/mod.rs | 2 + src/spawner/rpc.rs | 300 +++++++++++++++++++++++++++++++++++++++++++ src/specification.rs | 35 +++++ 4 files changed, 390 insertions(+), 2 deletions(-) create mode 100644 src/spawner/rpc.rs diff --git a/src/spawner/args.rs b/src/spawner/args.rs index be8b918..f924d81 100644 --- a/src/spawner/args.rs +++ b/src/spawner/args.rs @@ -1,4 +1,6 @@ -use super::{Spawner, TriggerData}; +use log::info; + +use super::{RpcHandler, Spawner, TriggerData}; use crate::specification::{Arg, FileSocket, Pipe}; use crate::void::VoidBuilder; use crate::{Error, Result}; @@ -7,7 +9,10 @@ use std::ffi::CString; use std::fs::File; use std::net::TcpListener; use std::os::unix::ffi::OsStrExt; -use std::os::unix::io::IntoRawFd; +use std::os::unix::io::{FromRawFd, IntoRawFd}; + +use nix::sys::socket; +use nix::unistd::{fork, ForkResult}; pub struct PreparedArgs(Vec); @@ -89,6 +94,9 @@ enum PreparedArg { /// A TCP Listener TcpListener { socket: TcpListener }, + /// RPC + Rpc { socket: File }, + /// The rest of argv[1..], 0 or more arguments Trailing, } @@ -154,6 +162,45 @@ impl PreparedArg { PreparedArg::TcpListener { socket } } + Arg::Rpc(specs) => { + let (ambient, void) = socket::socketpair( + socket::AddressFamily::Unix, + socket::SockType::Datagram, + None, + socket::SockFlag::empty(), + ) + .map_err(|e| Error::Nix { + msg: "socketpair", + src: e, + })?; + + // SAFETY: valid new fd as socketpair(2) returned successfully + let ambient = unsafe { File::from_raw_fd(ambient) }; + // SAFETY: valid new fd as socketpair(2) returned successfully + let void = unsafe { File::from_raw_fd(void) }; + + // spawn this child with ambient authority + // necessary as no void ever has outgoing network capability + // SAFETY: this program is single-threaded so no safety issue + let child = unsafe { fork() }.map_err(|e| Error::Nix { + msg: "fork", + src: e, + })?; + + match child { + ForkResult::Child => { + let handler = RpcHandler::new(specs); + handler.handle(ambient).unwrap(); + } + ForkResult::Parent { child } => { + info!("spawned rpc handler with pid {}", child); + } + }; + + // SAFETY: safe as socketpair returned successfully + PreparedArg::Rpc { socket: void } + } + Arg::BinaryName => PreparedArg::BinaryName, Arg::Entrypoint => PreparedArg::Entrypoint, Arg::Trigger => PreparedArg::Trigger, @@ -191,6 +238,10 @@ impl PreparedArg { Ok(vec![CString::new(socket.into_raw_fd().to_string()).unwrap()]) } + PreparedArg::Rpc { socket } => { + Ok(vec![CString::new(socket.into_raw_fd().to_string()).unwrap()]) + } + PreparedArg::Trailing => Ok(spawner .binary_args .iter() diff --git a/src/spawner/mod.rs b/src/spawner/mod.rs index 9a7302c..ec732e9 100644 --- a/src/spawner/mod.rs +++ b/src/spawner/mod.rs @@ -1,8 +1,10 @@ use log::{debug, error, info}; mod args; +mod rpc; use args::PreparedArgs; +use rpc::RpcHandler; use crate::specification::{Arg, Entrypoint, Environment, Specification, Trigger}; use crate::void::VoidBuilder; diff --git a/src/spawner/rpc.rs b/src/spawner/rpc.rs new file mode 100644 index 0000000..7d613b5 --- /dev/null +++ b/src/spawner/rpc.rs @@ -0,0 +1,300 @@ +use log::{debug, error}; + +use crate::specification::{AddressFamily as SpecAddressFamily, RpcSpecification}; +use crate::Error; + +use std::ffi::CStr; +use std::fs::File; +use std::net::{TcpStream, UdpSocket}; +use std::os::raw::c_char; +use std::os::unix::io::AsRawFd; + +use nix::sys::socket::AddressFamily; +use nix::sys::socket::{recv, send, sendmsg, ControlMessage, MsgFlags}; + +const MAX_MSG_LENGTH: usize = 4096; + +pub struct RpcHandler<'a> { + permitted_rpcs: &'a [RpcSpecification], +} + +impl<'a> RpcHandler<'a> { + pub(super) fn new(permitted_rpcs: &'a [RpcSpecification]) -> Self { + Self { permitted_rpcs } + } + + pub(super) fn handle(&self, socket: File) -> Result<(), Error> { + let mut buf = vec![0; MAX_MSG_LENGTH]; + + loop { + let read_bytes = + recv(socket.as_raw_fd(), &mut buf, MsgFlags::empty()).map_err(|e| Error::Nix { + msg: "recvmsg", + src: e, + })?; + + debug!("handling rpc"); + + if read_bytes < 4 { + error!("received rpc too short"); + continue; + } + + // SAFETY: safe as the enum repr is non_exhaustive so any value is valid and the buffer is long enough + let kind = unsafe { *(buf.as_ptr() as *const RpcKind) }; + + let fds = Vec::new(); + if kind.num_fds() > 0 { + // get any fds to go alongside the message + // nothing which requires this currently exists + unimplemented!() + } + + let resp = handle_rpc(self.permitted_rpcs, kind, &buf[4..], &fds); + + let (msg, fds) = RpcResultSend::new(resp); + + // sendmsg first so its there when listening for the send + if !fds.is_empty() { + let fds: Box<[i32]> = fds.iter().map(|f| f.as_raw_fd()).collect(); + + sendmsg::<()>( + socket.as_raw_fd(), + &[], + &[ControlMessage::ScmRights(&fds)], + MsgFlags::empty(), + None, + ) + .map_err(|e| Error::Nix { + msg: "sendmsg", + src: e, + })?; + } + + // SAFETY: safe as msg is of fixed size + let msg = unsafe { + std::slice::from_raw_parts( + &msg as *const RpcResultSend as *const u8, + std::mem::size_of_val(&msg), + ) + }; + + send(socket.as_raw_fd(), msg, MsgFlags::empty()).map_err(|e| Error::Nix { + msg: "send", + src: e, + })?; + } + } +} + +#[repr(u32)] +#[non_exhaustive] +#[allow(dead_code)] +#[derive(Clone, Copy)] +pub enum RpcKind { + OpenTcpSocket, + OpenUdpSocket, +} + +impl RpcKind { + fn num_fds(&self) -> usize { + match self { + RpcKind::OpenTcpSocket => 0, + RpcKind::OpenUdpSocket => 0, + } + } +} + +pub struct OpenSocket { + pub family: AddressFamily, + pub port: u16, + pub host: [c_char], +} + +pub enum RpcResult { + OpenTcpSocket { socket: TcpStream }, + OpenUdpSocket { socket: UdpSocket }, + + Error { error: RpcError }, +} + +pub enum RpcResultSend { + OpenTcpSocket, + OpenUdpSocket, + + Error { error: RpcError }, +} + +impl RpcResultSend { + fn new(from: RpcResult) -> (Self, Vec>) { + match from { + RpcResult::OpenTcpSocket { socket } => (Self::OpenTcpSocket, vec![Box::new(socket)]), + RpcResult::OpenUdpSocket { socket } => (Self::OpenUdpSocket, vec![Box::new(socket)]), + RpcResult::Error { error } => (Self::Error { error }, vec![]), + } + } +} + +#[repr(C)] +pub enum RpcError { + BadlyFormedRequest, + OperationNotPermitted, + Io { errno: i32 }, +} + +fn handle_rpc( + permitted_rpcs: &[RpcSpecification], + kind: RpcKind, + data: &[u8], + _fds: &[File], +) -> RpcResult { + fn inner( + permitted_rpcs: &[RpcSpecification], + kind: RpcKind, + data: &[u8], + ) -> Result { + match kind { + RpcKind::OpenTcpSocket => { + let data = unsafe { &*(data as *const [u8] as *const OpenSocket) }; + if !validate_open_tcp_socket(permitted_rpcs, data)? { + Ok(RpcResult::Error { + error: RpcError::OperationNotPermitted, + }) + } else { + handle_open_tcp_socket(data) + } + } + RpcKind::OpenUdpSocket => { + let data = unsafe { &*(data as *const [u8] as *const OpenSocket) }; + if !validate_open_udp_socket(permitted_rpcs, data)? { + Ok(RpcResult::Error { + error: RpcError::OperationNotPermitted, + }) + } else { + handle_open_udp_socket(data) + } + } + } + } + + match inner(permitted_rpcs, kind, data) { + Ok(o) => o, + Err(e) => RpcResult::Error { error: e }, + } +} + +fn validate_open_tcp_socket( + permitted_rpcs: &[RpcSpecification], + req: &OpenSocket, +) -> Result { + for each in permitted_rpcs { + if let RpcSpecification::OpenTcpSocket { family, port, host } = each { + let mut allowed = true; + + allowed &= match family { + None => true, + Some(fam) => match req.family { + AddressFamily::Inet => *fam == SpecAddressFamily::Inet, + AddressFamily::Inet6 => *fam == SpecAddressFamily::Inet6, + _ => false, + }, + }; + + allowed &= match port { + None => true, + Some(p) => req.port == *p, + }; + + allowed &= match host { + None => true, + Some(h) => { + CStr::from_bytes_with_nul(as_u8_slice(&req.host)) + .map_err(|_| RpcError::BadlyFormedRequest)? + .to_string_lossy() + .as_ref() + == h + } + }; + + if allowed { + return Ok(true); + } + } + } + + Ok(false) +} + +fn handle_open_tcp_socket(req: &OpenSocket) -> Result { + let host = CStr::from_bytes_with_nul(as_u8_slice(&req.host)) + .map_err(|_| RpcError::BadlyFormedRequest)?; + let host = host.to_str().map_err(|_| RpcError::BadlyFormedRequest)?; + + let socket = TcpStream::connect(host).map_err(|e| RpcError::Io { + errno: e.raw_os_error().unwrap(), + })?; + + Ok(RpcResult::OpenTcpSocket { socket }) +} + +fn validate_open_udp_socket( + permitted_rpcs: &[RpcSpecification], + req: &OpenSocket, +) -> Result { + for each in permitted_rpcs { + if let RpcSpecification::OpenUdpSocket { family, port, host } = each { + let mut allowed = true; + + allowed &= match family { + None => true, + Some(fam) => match req.family { + AddressFamily::Inet => *fam == SpecAddressFamily::Inet, + AddressFamily::Inet6 => *fam == SpecAddressFamily::Inet6, + _ => false, + }, + }; + + allowed &= match port { + None => true, + Some(p) => req.port == *p, + }; + + allowed &= match host { + None => true, + Some(h) => { + CStr::from_bytes_with_nul(as_u8_slice(&req.host)) + .map_err(|_| RpcError::BadlyFormedRequest)? + .to_string_lossy() + .as_ref() + == h + } + }; + + if allowed { + return Ok(true); + } + } + } + + Ok(false) +} + +fn handle_open_udp_socket(req: &OpenSocket) -> Result { + let host = CStr::from_bytes_with_nul(as_u8_slice(&req.host)) + .map_err(|_| RpcError::BadlyFormedRequest)?; + let host = host.to_str().map_err(|_| RpcError::BadlyFormedRequest)?; + + let socket = UdpSocket::bind("0.0.0.0:0").map_err(|e| RpcError::Io { + errno: e.raw_os_error().unwrap(), + })?; + + socket.connect(host).map_err(|e| RpcError::Io { + errno: e.raw_os_error().unwrap(), + })?; + + Ok(RpcResult::OpenUdpSocket { socket }) +} + +fn as_u8_slice(s: &[c_char]) -> &[u8] { + unsafe { std::slice::from_raw_parts(s.as_ptr() as *const u8, s.len()) } +} diff --git a/src/specification.rs b/src/specification.rs index 4a0ab19..a50343b 100644 --- a/src/specification.rs +++ b/src/specification.rs @@ -68,6 +68,9 @@ pub enum Arg { /// A TCP Listener TcpListener { addr: SocketAddr }, + /// An RPC socket that accepts specified commands + Rpc(Vec), + /// The rest of argv[1..], 0 or more arguments Trailing, } @@ -78,6 +81,38 @@ impl Arg { } } +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub enum RpcSpecification { + /// Open a TCP socket + /// + /// None for each value means that any value is allowed in the call. + /// A specified value restricts to exactly that. + OpenTcpSocket { + family: Option, + port: Option, + host: Option, + }, + + /// Open a UDP socket + /// + /// None for each value means that any value is allowed in the call. + /// A specified value restricts to exactly that. + OpenUdpSocket { + family: Option, + port: Option, + host: Option, + }, +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub enum AddressFamily { + /// IPv4 address + Inet, + + /// IPv6 address + Inet6, +} + #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub enum Pipe { Rx(String), -- 2.46.0