From 479fae3858730d60c95a8169ba97bc62f3601386 Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Thu, 7 Apr 2022 15:24:32 +0100 Subject: [PATCH] implementing file socket args and triggers --- src/error.rs | 3 ++ src/lib.rs | 61 ++++++++++++++++++++- src/spawner.rs | 123 ++++++++++++++++++++++++++++++++++++++++--- src/specification.rs | 62 +++++++++++++++++++--- 4 files changed, 234 insertions(+), 15 deletions(-) diff --git a/src/error.rs b/src/error.rs index d0e05c9..700ecd5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,6 +18,9 @@ pub enum Error { #[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")] BadPipe(String), + #[error("bad socket specification: a socket must have exactly one reader and one writer: {0}")] + BadFileSocket(String), + #[error("bad specification type: only .json files are supported")] BadSpecType, diff --git a/src/lib.rs b/src/lib.rs index fc622f2..57bd2b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,8 @@ use std::os::unix::io::FromRawFd; use clap::{App, AppSettings}; use nix::fcntl::OFlag; -use nix::unistd::{self}; +use nix::sys::socket; +use nix::unistd; pub fn run() -> Result<()> { // process arguments @@ -69,12 +70,17 @@ pub fn run() -> Result<()> { let (pipes, _) = spec.pipes(); let pipes = create_pipes(pipes)?; + let (sockets, _) = spec.sockets(); + let sockets = create_sockets(sockets)?; + // spawn all processes Spawner { spec: &spec, - pipes, binary, trailing: &trailing, + + pipes, + sockets, } .spawn() } @@ -89,6 +95,16 @@ fn create_pipes(names: Vec<&str>) -> Result> { Ok(pipes) } +fn create_sockets(names: Vec<&str>) -> Result> { + let mut sockets = HashMap::new(); + for socket in names { + info!("creating socket pair `{}`", socket); + sockets.insert(socket.to_string(), SocketPair::new(socket)?); + } + + Ok(sockets) +} + pub struct PipePair { name: String, @@ -123,3 +139,44 @@ impl PipePair { .ok_or_else(|| Error::BadPipe(self.name.to_string())) } } + +pub struct SocketPair { + name: String, + + read: Option, + write: Option, +} + +impl SocketPair { + fn new(name: &str) -> Result { + let (read, write) = socket::socketpair( + socket::AddressFamily::Unix, + socket::SockType::Datagram, + None, + socket::SockFlag::empty(), + ) + .map_err(|e| Error::Nix { + msg: "socketpair", + src: e, + })?; + + // safe to create files given the successful return of socketpair(2) + Ok(SocketPair { + name: name.to_string(), + read: Some(unsafe { File::from_raw_fd(read) }), + write: Some(unsafe { File::from_raw_fd(write) }), + }) + } + + fn take_read(&mut self) -> Result { + self.read + .take() + .ok_or_else(|| Error::BadPipe(self.name.to_string())) + } + + fn take_write(&mut self) -> Result { + self.write + .take() + .ok_or_else(|| Error::BadPipe(self.name.to_string())) + } +} diff --git a/src/spawner.rs b/src/spawner.rs index c52ab57..ec618f0 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -1,7 +1,7 @@ use log::{debug, error, info}; -use super::specification::{Arg, Entrypoint, Pipe, Specification, Trigger}; -use super::PipePair; +use super::specification::{Arg, Entrypoint, FileSocket, Pipe, Specification, Trigger}; +use super::{PipePair, SocketPair}; use crate::void::VoidBuilder; use crate::{Error, Result}; @@ -10,18 +10,21 @@ use std::ffi::CString; use std::fs::File; use std::io::Read; use std::net::TcpListener; -use std::os::unix::io::IntoRawFd; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::path::PathBuf; +use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}; use nix::unistd; const BUFFER_SIZE: usize = 1024; pub struct Spawner<'a> { pub spec: &'a Specification, - pub pipes: HashMap, pub binary: &'a str, pub trailing: &'a Vec<&'a str>, + + pub pipes: HashMap, + pub sockets: HashMap, } enum TriggerData<'a> { @@ -30,6 +33,9 @@ enum TriggerData<'a> { /// A string sent across a pipe Pipe(&'a str), + + /// File(s) sent over a file socket + FileSocket(Vec), } impl<'a> TriggerData<'a> { @@ -37,6 +43,10 @@ impl<'a> TriggerData<'a> { match self { TriggerData::None => vec![], TriggerData::Pipe(s) => vec![CString::new(s.to_string()).unwrap()], + TriggerData::FileSocket(fs) => fs + .drain(..) + .map(|f| CString::new(f.into_raw_fd().to_string()).unwrap()) + .collect(), } } } @@ -75,7 +85,7 @@ impl<'a> Spawner<'a> { } Trigger::Pipe(s) => { - let pipe = self.pipes.get_mut(s).unwrap().take_read().unwrap(); + let pipe = self.pipes.get_mut(s).unwrap().take_read()?; let binary = PathBuf::from(self.binary).canonicalize()?; let mut builder = VoidBuilder::new(); @@ -92,6 +102,25 @@ impl<'a> Spawner<'a> { builder.spawn(closure)?; } + + Trigger::FileSocket(s) => { + let socket = self.sockets.get_mut(s).unwrap().take_read()?; + let binary = PathBuf::from(self.binary).canonicalize()?; + + let mut builder = VoidBuilder::new(); + builder.mount(binary, "/entrypoint"); + builder.keep_fd(&socket); + + let closure = || match self.file_socket_trigger(socket, entrypoint, name) { + Ok(()) => std::process::exit(exitcode::OK), + Err(e) => { + error!("error in file_socket_trigger: {}", e); + std::process::exit(1) + } + }; + + builder.spawn(closure)?; + } } } @@ -100,7 +129,6 @@ impl<'a> Spawner<'a> { fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<()> { let mut buf = [0_u8; BUFFER_SIZE]; - loop { let read_bytes = pipe.read(&mut buf)?; if read_bytes == 0 { @@ -109,6 +137,9 @@ impl<'a> Spawner<'a> { debug!("triggering from pipe read"); + let mut builder = VoidBuilder::new(); + builder.mount("/entrypoint", "/entrypoint"); + let closure = || { let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap(); @@ -129,11 +160,68 @@ impl<'a> Spawner<'a> { } }; - let mut builder = VoidBuilder::new(); builder.spawn(closure)?; } } + fn file_socket_trigger(&self, socket: File, spec: &Entrypoint, name: &str) -> Result<()> { + let mut buf = Vec::new(); + loop { + let msg = recvmsg(socket.as_raw_fd(), &[], Some(&mut buf), MsgFlags::empty()).map_err( + |e| Error::Nix { + msg: "recvmsg", + src: e, + }, + )?; + + debug!("triggering from socket recvmsg"); + + for cmsg in msg.cmsgs() { + match cmsg { + ControlMessageOwned::ScmRights(fds) => { + let fds = fds + .into_iter() + .map(|fd| unsafe { File::from_raw_fd(fd) }) + .collect(); + + let mut builder = VoidBuilder::new(); + builder.mount("/entrypoint", "/entrypoint"); + for fd in &fds { + builder.keep_fd(fd); + } + + let closure = || { + let args = self + .prepare_args_ref( + name, + &spec.args, + &mut TriggerData::FileSocket(fds), + ) + .unwrap(); + + if let Err(e) = + unistd::execv(&CString::new("/entrypoint").unwrap(), &args).map_err( + |e| Error::Nix { + msg: "execv", + src: e, + }, + ) + { + error!("error: {}", e); + 1 + } else { + 0 + } + }; + + builder.spawn(closure)?; + } + _ => unimplemented!(), + } + } + } + } + fn prepare_args( &mut self, entrypoint: &str, @@ -144,8 +232,10 @@ impl<'a> Spawner<'a> { for arg in args { out.extend(self.prepare_arg(entrypoint, arg, trigger)?); } + Ok(out) } + fn prepare_args_ref( &self, entrypoint: &str, @@ -156,6 +246,7 @@ impl<'a> Spawner<'a> { for arg in args { out.extend(self.prepare_arg_ref(entrypoint, arg, trigger)?); } + Ok(out) } @@ -176,6 +267,18 @@ impl<'a> Spawner<'a> { Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()]) } }, + + Arg::FileSocket(s) => match s { + FileSocket::Rx(s) => { + let pipe = self.sockets.get_mut(s).unwrap().take_read()?; + Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()]) + } + FileSocket::Tx(s) => { + let pipe = self.sockets.get_mut(s).unwrap().take_write()?; + Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()]) + } + }, + a => self.prepare_arg_ref(entrypoint, a, trigger), } } @@ -191,6 +294,12 @@ impl<'a> Spawner<'a> { Arg::Entrypoint => Ok(vec![CString::new(entrypoint).unwrap()]), Arg::Pipe(p) => Err(Error::BadPipe(p.get_name().to_string())), + Arg::FileSocket(s) => Err(Error::BadFileSocket(s.get_name().to_string())), + + Arg::File(p) => { + let f = File::open(p)?.into_raw_fd(); + Ok(vec![CString::new(f.to_string()).unwrap()]) + } Arg::Trigger => Ok(trigger.args()), diff --git a/src/specification.rs b/src/specification.rs index eb85cb8..583bfd1 100644 --- a/src/specification.rs +++ b/src/specification.rs @@ -25,8 +25,14 @@ pub struct Entrypoint { #[derive(Serialize, Deserialize, Debug)] pub enum Trigger { + /// Start this entrypoint at application startup Startup, + + /// Trigger this entrypoint when a named pipe receives data Pipe(String), + + /// Trigger this entrypoint when a named file socket receives data + FileSocket(String), } impl Default for Trigger { @@ -36,7 +42,6 @@ impl Default for Trigger { } #[derive(Serialize, Deserialize, PartialEq, Debug)] -// #[serde(tag = "type")] pub enum Arg { /// The binary name, or argv[0], of the original program start BinaryName, @@ -44,11 +49,17 @@ pub enum Arg { /// The name of this entrypoint Entrypoint, + /// A file descriptor for a file on the filesystem in the launching namespace + File(PathBuf), + /// A chosen end of a named pipe Pipe(Pipe), + /// File socket + FileSocket(FileSocket), + /// A value specified by the trigger - /// NOTE: Only valid if the trigger is of type Pipe(...) + /// NOTE: Only valid if the trigger is of type Pipe(...) or FileSocket(...) Trigger, /// A TCP Listener @@ -79,6 +90,21 @@ impl Pipe { } } +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub enum FileSocket { + Rx(String), + Tx(String), +} + +impl FileSocket { + pub fn get_name(&self) -> &str { + match self { + FileSocket::Rx(n) => n, + FileSocket::Tx(n) => n, + } + } +} + #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] pub enum Permission { Filesystem { @@ -105,9 +131,8 @@ impl Specification { let mut write = Vec::new(); for entry in self.entrypoints.values() { - match &entry.trigger { - Trigger::Startup => {} - Trigger::Pipe(s) => read.push(s.as_str()), + if let Trigger::Pipe(s) = &entry.trigger { + read.push(s.as_str()); } for arg in &entry.args { @@ -125,6 +150,30 @@ impl Specification { (read, write) } + pub fn sockets(&self) -> (Vec<&str>, Vec<&str>) { + let mut read = Vec::new(); + let mut write = Vec::new(); + + for entry in self.entrypoints.values() { + if let Trigger::FileSocket(s) = &entry.trigger { + read.push(s.as_str()); + } + + for arg in &entry.args { + if let Arg::FileSocket(p) = arg { + match p { + FileSocket::Rx(s) => read.push(s.as_str()), + FileSocket::Tx(s) => write.push(s.as_str()), + } + } + } + } + + debug!("read sockets: {:?}", &read); + debug!("write sockets: {:?}", &write); + (read, write) + } + pub fn validate(&self) -> Result<()> { // validate pipes match let (read, write) = self.pipes(); @@ -153,11 +202,12 @@ impl Specification { return Err(Error::BadPipe(pipe.to_string())); } - // validate pipe trigger arguments make sense + // validate trigger arguments make sense for entrypoint in self.entrypoints.values() { if entrypoint.args.contains(&Arg::Trigger) { match entrypoint.trigger { Trigger::Pipe(_) => {} + Trigger::FileSocket(_) => {} _ => return Err(Error::BadTriggerArgument), } }