From 17ec5b819ccb9592b8f0c870d21e91e851a7499f Mon Sep 17 00:00:00 2001 From: Jake Hillion Date: Tue, 1 Mar 2022 11:29:13 +0000 Subject: [PATCH] pipe triggers --- .clang-format | 1 + Cargo.lock | 11 +++ Cargo.toml | 2 + README.md | 9 +++ examples/pipes/main.rs | 42 ++++++++++ examples/pipes/spec.json | 25 ++++++ src/error.rs | 3 + src/main.rs | 101 ++++++++++------------- src/spawner.rs | 170 +++++++++++++++++++++++++++++++++++++++ src/specification.rs | 47 +++++++---- 10 files changed, 338 insertions(+), 73 deletions(-) create mode 100644 .clang-format create mode 100644 examples/pipes/main.rs create mode 100644 examples/pipes/spec.json create mode 100644 src/spawner.rs diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..3148815 --- /dev/null +++ b/.clang-format @@ -0,0 +1 @@ +BreakBeforeBraces: Attach \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b14f71e..886d2aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,7 @@ name = "clone-shim" version = "0.1.0" dependencies = [ "clap", + "close_fds", "env_logger", "exitcode", "ipnetwork", @@ -77,6 +78,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "close_fds" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "env_logger" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 4ad6f52..773b5e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,5 @@ ipnetwork = "0.18" libc = "0.2.117" nix = "0.23.1" + +close_fds = "0.3.2" diff --git a/README.md b/README.md index 4507ff0..f8c5cf2 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,12 @@ To run this example: cargo build --example basic cargo run -- -s examples/basic/spec.json target/debug/examples/basic + +### examples/pipes + +The pipes example shows some of the power of the shim by using pipes. The process "pipe_sender" sends two messages down a pipe that it's given by the shim. These two messages each spawn a completely isolated process, "pipe_receiver", that receives that message. + +To run this example: + + cargo build --example pipes + cargo run -- -s examples/pipes/spec.json target/debug/examples/pipes diff --git a/examples/pipes/main.rs b/examples/pipes/main.rs new file mode 100644 index 0000000..fa24c6a --- /dev/null +++ b/examples/pipes/main.rs @@ -0,0 +1,42 @@ +use std::fs::File; +use std::io::Write; + +fn main() { + use std::os::unix::io::FromRawFd; + + let mut args = std::env::args(); + + let _bin = args.next(); + + match args.next() { + Some(s) => match s.as_str() { + "pipe_sender" => { + let fd: i32 = args.next().unwrap().parse().unwrap(); + pipe_sender(unsafe { File::from_raw_fd(fd) }) + } + "pipe_receiver" => { + let pipe_data = args.next().unwrap(); + pipe_receiver(pipe_data.as_str()) + } + _ => unimplemented!(), + }, + None => unimplemented!(), + } +} + +fn pipe_sender(mut tx_pipe: File) { + println!("hello from pipe_sender!"); + + let data = b"some data"; + let bytes_written = tx_pipe.write(&data[..]).unwrap(); + assert!(bytes_written == data.len()); + + let data = b"some more data"; + let bytes_written = tx_pipe.write(&data[..]).unwrap(); + assert!(bytes_written == data.len()); +} + +fn pipe_receiver(rx_data: &str) { + println!("hello from pid: {}", std::process::id()); + println!("received data: {}", rx_data); +} diff --git a/examples/pipes/spec.json b/examples/pipes/spec.json new file mode 100644 index 0000000..3032669 --- /dev/null +++ b/examples/pipes/spec.json @@ -0,0 +1,25 @@ +{ + "entrypoints": { + "pipe_sender": { + "args": [ + "BinaryName", + "Entrypoint", + { + "Pipe": { + "Tx": "messages" + } + } + ] + }, + "pipe_receiver": { + "trigger": { + "Pipe": "messages" + }, + "args": [ + "BinaryName", + "Entrypoint", + "PipeTrigger" + ] + } + } +} \ No newline at end of file diff --git a/src/error.rs b/src/error.rs index 8d3c5dd..6d1ff0f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,6 +16,9 @@ pub enum Error { #[error("bad specification type: only .json files are supported")] BadSpecType, + #[error("bad pipe trigger: this entrypoint is not triggered by a pipe")] + BadPipeTrigger, + #[error("too many pipes: a pipe must have one reader and one writer: {0}")] TooManyPipes(String), diff --git a/src/main.rs b/src/main.rs index 4d4727c..d7c51bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,19 +2,20 @@ use log::{debug, error, info}; mod clone; mod error; +mod spawner; mod specification; -use clone::{clone3, CloneArgs, CloneFlags}; use error::Error; -use specification::{Arg, Pipe, Specification, Trigger}; +use spawner::Spawner; +use specification::Specification; use std::collections::HashMap; -use std::ffi::CString; use std::fs::File; -use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::io::FromRawFd; use clap::{App, AppSettings}; -use nix::unistd::{self, Pid}; +use nix::fcntl::OFlag; +use nix::unistd::{self}; fn main() { std::process::exit(match run() { @@ -78,65 +79,51 @@ fn run() -> Result<(), Error> { // create all the pipes let (pipes, _) = spec.pipes(); - let mut read_pipes = HashMap::new(); - let mut write_pipes = HashMap::new(); + let pipes = create_pipes(pipes)?; - for pipe in pipes { + // spawn all processes + let spawner = Spawner { + spec: &spec, + pipes: &pipes, + binary, + trailing: &trailing, + }; + + spawner.spawn()?; + + // TODO: Fix this dirty hack to prevent files dropping (and closing) + // switch to Option and use a mut ref to take, then call + // IntoRawFd if used + std::thread::sleep(std::time::Duration::from_secs(10)); + + Ok(()) +} + +pub struct PipePair { + read: File, + write: File, +} + +fn create_pipes(names: Vec<&str>) -> Result, Error> { + let mut pipes = HashMap::new(); + + for pipe in names { info!("creating pipe pair `{}`", pipe); - let (read, write) = unistd::pipe().map_err(|e| Error::Nix { - msg: "pipe", + let (read, write) = unistd::pipe2(OFlag::O_DIRECT).map_err(|e| Error::Nix { + msg: "pipe2", src: e, })?; // safe to create files given the successful return of pipe(2) - read_pipes.insert(pipe.to_string(), unsafe { File::from_raw_fd(read) }); - write_pipes.insert(pipe.to_string(), unsafe { File::from_raw_fd(write) }); + pipes.insert( + pipe.to_string(), + PipePair { + read: unsafe { File::from_raw_fd(read) }, + write: unsafe { File::from_raw_fd(write) }, + }, + ); } - // spawn all processes - for (name, entry) in &spec.entrypoints { - info!("spawning entrypoint `{}`", name.as_str()); - - match &entry.trigger { - Trigger::Startup => { - if clone3(CloneArgs::new(CloneFlags::empty())).map_err(|e| Error::Nix { - msg: "clone3", - src: e, - })? == Pid::from_raw(0) - { - let mut args = Vec::new(); - for arg in &entry.args { - match arg { - Arg::BinaryName => args.push(CString::new(binary).unwrap()), - Arg::Entrypoint => args.push(CString::new(name.as_str()).unwrap()), - Arg::Pipe(p) => args.push(match p { - Pipe::Rx(s) => { - CString::new(read_pipes[s].as_raw_fd().to_string()).unwrap() - } - Pipe::Tx(s) => { - CString::new(write_pipes[s].as_raw_fd().to_string()).unwrap() - } - }), - Arg::Trailing => { - args.extend(trailing.iter().map(|s| CString::new(*s).unwrap())) - } - } - } - - unistd::execv(&CString::new(binary).unwrap(), &args).map_err(|e| { - Error::Nix { - msg: "execv", - src: e, - } - })?; - } - } - Trigger::Pipe(_s) => { - todo!() - } - } - } - - Ok(()) + Ok(pipes) } diff --git a/src/spawner.rs b/src/spawner.rs new file mode 100644 index 0000000..456c8bf --- /dev/null +++ b/src/spawner.rs @@ -0,0 +1,170 @@ +use log::{debug, error, info}; + +use super::specification::{Arg, Entrypoint, Permission, Pipe, Specification, Trigger}; +use super::PipePair; +use crate::clone::{clone3, CloneArgs, CloneFlags}; +use crate::Error; + +use std::collections::HashMap; +use std::ffi::CString; +use std::fs::File; +use std::io::Read; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +use close_fds::CloseFdsBuilder; +use nix::unistd::{self, Pid}; + +const BUFFER_SIZE: usize = 1024; + +pub struct Spawner<'a> { + pub spec: &'a Specification, + pub pipes: &'a HashMap, + pub binary: &'a str, + pub trailing: &'a Vec<&'a str>, +} + +impl<'a> Spawner<'a> { + pub fn spawn(&self) -> Result<(), Error> { + for (name, entrypoint) in &self.spec.entrypoints { + info!("spawning entrypoint `{}`", name.as_str()); + + match &entrypoint.trigger { + Trigger::Startup => { + if clone3(CloneArgs::new(Self::clone_flags( + &mut entrypoint.permissions.iter(), + ))) + .map_err(|e| Error::Nix { + msg: "clone3", + src: e, + })? == Pid::from_raw(0) + { + let args = self.prepare_args(name, &entrypoint.args, None); + + unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| { + Error::Nix { + msg: "execv", + src: e, + } + })?; + } + } + + Trigger::Pipe(s) => { + // TODO: Consider typing the pipes so CLONE_FILES is only when necessary + if clone3(CloneArgs::new(CloneFlags::CLONE_FILES)).map_err(|e| Error::Nix { + msg: "clone3", + src: e, + })? == Pid::from_raw(0) + { + // Rust's ownership is out of the window now we've cloned + // Unsafely move to a new owned pipe + let pipe = unsafe { File::from_raw_fd(self.pipes[s].read.as_raw_fd()) }; + + let mut closer = CloseFdsBuilder::new(); + let keep = [pipe.as_raw_fd()]; + closer.keep_fds(&keep); + unsafe { + closer.closefrom(3); + } + + match self.pipe_trigger(pipe, entrypoint, name) { + Ok(()) => std::process::exit(exitcode::OK), + Err(e) => { + error!("error in pipe_trigger: {}", e); + std::process::exit(1) + } + } + } + } + } + } + + Ok(()) + } + + fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<(), Error> { + let mut buf = [0_u8; BUFFER_SIZE]; + + loop { + let read_bytes = pipe.read(&mut buf)?; + + if read_bytes == 0 { + return Ok(()); + } + + debug!("triggering from pipe read"); + + if clone3(CloneArgs::new(Self::clone_flags( + &mut spec.permissions.iter(), + ))) + .map_err(|e| Error::Nix { + msg: "clone3", + src: e, + })? == Pid::from_raw(0) + { + let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap(); + let args = self.prepare_args(name, &spec.args, Some(pipe_trigger)); + + unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| { + Error::Nix { + msg: "execv", + src: e, + } + })?; + } + } + } + + fn prepare_args( + &self, + entrypoint: &str, + args: &[Arg], + pipe_trigger: Option<&str>, + ) -> Vec { + let mut out = Vec::new(); + for arg in args { + match arg { + Arg::BinaryName => out.push(CString::new(self.binary).unwrap()), + Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()), + Arg::Pipe(p) => out.push(match p { + Pipe::Rx(s) => { + CString::new(self.pipes[s].read.as_raw_fd().to_string()).unwrap() + } + Pipe::Tx(s) => { + CString::new(self.pipes[s].write.as_raw_fd().to_string()).unwrap() + } + }), + Arg::PipeTrigger => { + out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap()) + } + Arg::TcpListener { port: _port } => unimplemented!(), + + Arg::Trailing => { + out.extend(self.trailing.iter().map(|s| CString::new(*s).unwrap())) + } + } + } + out + } + + fn clone_flags(perms: &mut dyn Iterator) -> CloneFlags { + let mut flags = CloneFlags::empty(); + + flags |= CloneFlags::CLONE_NEWCGROUP; // new cgroup namespace + flags |= CloneFlags::CLONE_NEWIPC; // new IPC namespace + flags |= CloneFlags::CLONE_NEWNET; // new empty network namespace + flags |= CloneFlags::CLONE_NEWNS; // new separate mount namespace + flags |= CloneFlags::CLONE_NEWPID; // new PID namespace + flags |= CloneFlags::CLONE_NEWUSER; // new user namespace + flags |= CloneFlags::CLONE_NEWUTS; // new UTS namespace + + for perm in perms { + match perm { + Permission::PropagateFiles => flags |= CloneFlags::CLONE_FILES, + _ => unimplemented!(), + } + } + + flags + } +} diff --git a/src/specification.rs b/src/specification.rs index 7bf5276..4e0a589 100644 --- a/src/specification.rs +++ b/src/specification.rs @@ -1,3 +1,5 @@ +use log::debug; + use crate::Error; use std::collections::{HashMap, HashSet}; @@ -20,11 +22,10 @@ pub struct Entrypoint { pub args: Vec, #[serde(default)] - pub permissions: HashSet, + pub permissions: HashSet, } #[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "type")] pub enum Trigger { Startup, Pipe(String), @@ -36,7 +37,7 @@ impl Default for Trigger { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Debug)] // #[serde(tag = "type")] pub enum Arg { /// The binary name, or argv[0], of the original program start @@ -48,6 +49,13 @@ pub enum Arg { /// A chosen end of a named pipe Pipe(Pipe), + /// The value of a pipe trigger + /// NOTE: Only valid if the trigger is of type Pipe(...) + PipeTrigger, + + /// A TCP Listener + TcpListener { port: u16 }, + /// The rest of argv[1..], 0 or more arguments Trailing, } @@ -58,16 +66,14 @@ impl Arg { } } -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "type")] +#[derive(Serialize, Deserialize, PartialEq, Debug)] pub enum Pipe { Rx(String), Tx(String), } #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] -#[serde(tag = "type")] -pub enum Permissions { +pub enum Permission { Filesystem { host_path: PathBuf, final_path: PathBuf, @@ -75,10 +81,10 @@ pub enum Permissions { Network { network: Network, }, + PropagateFiles, } #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] -#[serde(tag = "type")] pub enum Network { InternetV4, InternetV6, @@ -98,18 +104,17 @@ impl Specification { } for arg in &entry.args { - match arg { - Arg::BinaryName => {} - Arg::Entrypoint => {} - Arg::Pipe(p) => match p { + if let Arg::Pipe(p) = arg { + match p { Pipe::Rx(s) => read.push(s.as_str()), Pipe::Tx(s) => write.push(s.as_str()), - }, - Arg::Trailing => {} + } } } } + debug!("read pipes: {:?}", &read); + debug!("write pipes: {:?}", &write); (read, write) } @@ -119,14 +124,14 @@ impl Specification { let mut read_set = HashSet::with_capacity(read.len()); for pipe in read { - if read_set.insert(pipe) { + if !read_set.insert(pipe) { return Err(Error::TooManyPipes(pipe.to_string())); } } let mut write_set = HashSet::with_capacity(write.len()); for pipe in write { - if write_set.insert(pipe) { + if !write_set.insert(pipe) { return Err(Error::TooManyPipes(pipe.to_string())); } } @@ -141,6 +146,16 @@ impl Specification { return Err(Error::WriteOnlyPipe(pipe.to_string())); } + // validate pipe trigger arguments make sense + for entrypoint in self.entrypoints.values() { + if entrypoint.args.contains(&Arg::PipeTrigger) { + match entrypoint.trigger { + Trigger::Pipe(_) => {} + _ => return Err(Error::BadPipeTrigger), + } + } + } + Ok(()) } }