pipe triggers

This commit is contained in:
Jake Hillion 2022-03-01 11:29:13 +00:00
parent 2972f9603b
commit 17ec5b819c
10 changed files with 338 additions and 73 deletions

1
.clang-format Normal file
View File

@ -0,0 +1 @@
BreakBeforeBraces: Attach

11
Cargo.lock generated
View File

@ -66,6 +66,7 @@ name = "clone-shim"
version = "0.1.0"
dependencies = [
"clap",
"close_fds",
"env_logger",
"exitcode",
"ipnetwork",
@ -77,6 +78,16 @@ dependencies = [
"thiserror",
]
[[package]]
name = "close_fds"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed"
dependencies = [
"cfg-if",
"libc",
]
[[package]]
name = "env_logger"
version = "0.9.0"

View File

@ -18,3 +18,5 @@ ipnetwork = "0.18"
libc = "0.2.117"
nix = "0.23.1"
close_fds = "0.3.2"

View File

@ -10,3 +10,12 @@ To run this example:
cargo build --example basic
cargo run -- -s examples/basic/spec.json target/debug/examples/basic
### examples/pipes
The pipes example shows some of the power of the shim by using pipes. The process "pipe_sender" sends two messages down a pipe that it's given by the shim. These two messages each spawn a completely isolated process, "pipe_receiver", that receives that message.
To run this example:
cargo build --example pipes
cargo run -- -s examples/pipes/spec.json target/debug/examples/pipes

42
examples/pipes/main.rs Normal file
View File

@ -0,0 +1,42 @@
use std::fs::File;
use std::io::Write;
fn main() {
use std::os::unix::io::FromRawFd;
let mut args = std::env::args();
let _bin = args.next();
match args.next() {
Some(s) => match s.as_str() {
"pipe_sender" => {
let fd: i32 = args.next().unwrap().parse().unwrap();
pipe_sender(unsafe { File::from_raw_fd(fd) })
}
"pipe_receiver" => {
let pipe_data = args.next().unwrap();
pipe_receiver(pipe_data.as_str())
}
_ => unimplemented!(),
},
None => unimplemented!(),
}
}
fn pipe_sender(mut tx_pipe: File) {
println!("hello from pipe_sender!");
let data = b"some data";
let bytes_written = tx_pipe.write(&data[..]).unwrap();
assert!(bytes_written == data.len());
let data = b"some more data";
let bytes_written = tx_pipe.write(&data[..]).unwrap();
assert!(bytes_written == data.len());
}
fn pipe_receiver(rx_data: &str) {
println!("hello from pid: {}", std::process::id());
println!("received data: {}", rx_data);
}

25
examples/pipes/spec.json Normal file
View File

@ -0,0 +1,25 @@
{
"entrypoints": {
"pipe_sender": {
"args": [
"BinaryName",
"Entrypoint",
{
"Pipe": {
"Tx": "messages"
}
}
]
},
"pipe_receiver": {
"trigger": {
"Pipe": "messages"
},
"args": [
"BinaryName",
"Entrypoint",
"PipeTrigger"
]
}
}
}

View File

@ -16,6 +16,9 @@ pub enum Error {
#[error("bad specification type: only .json files are supported")]
BadSpecType,
#[error("bad pipe trigger: this entrypoint is not triggered by a pipe")]
BadPipeTrigger,
#[error("too many pipes: a pipe must have one reader and one writer: {0}")]
TooManyPipes(String),

View File

@ -2,19 +2,20 @@ use log::{debug, error, info};
mod clone;
mod error;
mod spawner;
mod specification;
use clone::{clone3, CloneArgs, CloneFlags};
use error::Error;
use specification::{Arg, Pipe, Specification, Trigger};
use spawner::Spawner;
use specification::Specification;
use std::collections::HashMap;
use std::ffi::CString;
use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::os::unix::io::FromRawFd;
use clap::{App, AppSettings};
use nix::unistd::{self, Pid};
use nix::fcntl::OFlag;
use nix::unistd::{self};
fn main() {
std::process::exit(match run() {
@ -78,65 +79,51 @@ fn run() -> Result<(), Error> {
// create all the pipes
let (pipes, _) = spec.pipes();
let mut read_pipes = HashMap::new();
let mut write_pipes = HashMap::new();
let pipes = create_pipes(pipes)?;
for pipe in pipes {
// spawn all processes
let spawner = Spawner {
spec: &spec,
pipes: &pipes,
binary,
trailing: &trailing,
};
spawner.spawn()?;
// TODO: Fix this dirty hack to prevent files dropping (and closing)
// switch to Option<File> and use a mut ref to take, then call
// IntoRawFd if used
std::thread::sleep(std::time::Duration::from_secs(10));
Ok(())
}
pub struct PipePair {
read: File,
write: File,
}
fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>, Error> {
let mut pipes = HashMap::new();
for pipe in names {
info!("creating pipe pair `{}`", pipe);
let (read, write) = unistd::pipe().map_err(|e| Error::Nix {
msg: "pipe",
let (read, write) = unistd::pipe2(OFlag::O_DIRECT).map_err(|e| Error::Nix {
msg: "pipe2",
src: e,
})?;
// safe to create files given the successful return of pipe(2)
read_pipes.insert(pipe.to_string(), unsafe { File::from_raw_fd(read) });
write_pipes.insert(pipe.to_string(), unsafe { File::from_raw_fd(write) });
pipes.insert(
pipe.to_string(),
PipePair {
read: unsafe { File::from_raw_fd(read) },
write: unsafe { File::from_raw_fd(write) },
},
);
}
// spawn all processes
for (name, entry) in &spec.entrypoints {
info!("spawning entrypoint `{}`", name.as_str());
match &entry.trigger {
Trigger::Startup => {
if clone3(CloneArgs::new(CloneFlags::empty())).map_err(|e| Error::Nix {
msg: "clone3",
src: e,
})? == Pid::from_raw(0)
{
let mut args = Vec::new();
for arg in &entry.args {
match arg {
Arg::BinaryName => args.push(CString::new(binary).unwrap()),
Arg::Entrypoint => args.push(CString::new(name.as_str()).unwrap()),
Arg::Pipe(p) => args.push(match p {
Pipe::Rx(s) => {
CString::new(read_pipes[s].as_raw_fd().to_string()).unwrap()
}
Pipe::Tx(s) => {
CString::new(write_pipes[s].as_raw_fd().to_string()).unwrap()
}
}),
Arg::Trailing => {
args.extend(trailing.iter().map(|s| CString::new(*s).unwrap()))
}
}
}
unistd::execv(&CString::new(binary).unwrap(), &args).map_err(|e| {
Error::Nix {
msg: "execv",
src: e,
}
})?;
}
}
Trigger::Pipe(_s) => {
todo!()
}
}
}
Ok(())
Ok(pipes)
}

170
src/spawner.rs Normal file
View File

@ -0,0 +1,170 @@
use log::{debug, error, info};
use super::specification::{Arg, Entrypoint, Permission, Pipe, Specification, Trigger};
use super::PipePair;
use crate::clone::{clone3, CloneArgs, CloneFlags};
use crate::Error;
use std::collections::HashMap;
use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::os::unix::io::{AsRawFd, FromRawFd};
use close_fds::CloseFdsBuilder;
use nix::unistd::{self, Pid};
const BUFFER_SIZE: usize = 1024;
pub struct Spawner<'a> {
pub spec: &'a Specification,
pub pipes: &'a HashMap<String, PipePair>,
pub binary: &'a str,
pub trailing: &'a Vec<&'a str>,
}
impl<'a> Spawner<'a> {
pub fn spawn(&self) -> Result<(), Error> {
for (name, entrypoint) in &self.spec.entrypoints {
info!("spawning entrypoint `{}`", name.as_str());
match &entrypoint.trigger {
Trigger::Startup => {
if clone3(CloneArgs::new(Self::clone_flags(
&mut entrypoint.permissions.iter(),
)))
.map_err(|e| Error::Nix {
msg: "clone3",
src: e,
})? == Pid::from_raw(0)
{
let args = self.prepare_args(name, &entrypoint.args, None);
unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| {
Error::Nix {
msg: "execv",
src: e,
}
})?;
}
}
Trigger::Pipe(s) => {
// TODO: Consider typing the pipes so CLONE_FILES is only when necessary
if clone3(CloneArgs::new(CloneFlags::CLONE_FILES)).map_err(|e| Error::Nix {
msg: "clone3",
src: e,
})? == Pid::from_raw(0)
{
// Rust's ownership is out of the window now we've cloned
// Unsafely move to a new owned pipe
let pipe = unsafe { File::from_raw_fd(self.pipes[s].read.as_raw_fd()) };
let mut closer = CloseFdsBuilder::new();
let keep = [pipe.as_raw_fd()];
closer.keep_fds(&keep);
unsafe {
closer.closefrom(3);
}
match self.pipe_trigger(pipe, entrypoint, name) {
Ok(()) => std::process::exit(exitcode::OK),
Err(e) => {
error!("error in pipe_trigger: {}", e);
std::process::exit(1)
}
}
}
}
}
}
Ok(())
}
fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<(), Error> {
let mut buf = [0_u8; BUFFER_SIZE];
loop {
let read_bytes = pipe.read(&mut buf)?;
if read_bytes == 0 {
return Ok(());
}
debug!("triggering from pipe read");
if clone3(CloneArgs::new(Self::clone_flags(
&mut spec.permissions.iter(),
)))
.map_err(|e| Error::Nix {
msg: "clone3",
src: e,
})? == Pid::from_raw(0)
{
let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap();
let args = self.prepare_args(name, &spec.args, Some(pipe_trigger));
unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| {
Error::Nix {
msg: "execv",
src: e,
}
})?;
}
}
}
fn prepare_args(
&self,
entrypoint: &str,
args: &[Arg],
pipe_trigger: Option<&str>,
) -> Vec<CString> {
let mut out = Vec::new();
for arg in args {
match arg {
Arg::BinaryName => out.push(CString::new(self.binary).unwrap()),
Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()),
Arg::Pipe(p) => out.push(match p {
Pipe::Rx(s) => {
CString::new(self.pipes[s].read.as_raw_fd().to_string()).unwrap()
}
Pipe::Tx(s) => {
CString::new(self.pipes[s].write.as_raw_fd().to_string()).unwrap()
}
}),
Arg::PipeTrigger => {
out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap())
}
Arg::TcpListener { port: _port } => unimplemented!(),
Arg::Trailing => {
out.extend(self.trailing.iter().map(|s| CString::new(*s).unwrap()))
}
}
}
out
}
fn clone_flags(perms: &mut dyn Iterator<Item = &Permission>) -> CloneFlags {
let mut flags = CloneFlags::empty();
flags |= CloneFlags::CLONE_NEWCGROUP; // new cgroup namespace
flags |= CloneFlags::CLONE_NEWIPC; // new IPC namespace
flags |= CloneFlags::CLONE_NEWNET; // new empty network namespace
flags |= CloneFlags::CLONE_NEWNS; // new separate mount namespace
flags |= CloneFlags::CLONE_NEWPID; // new PID namespace
flags |= CloneFlags::CLONE_NEWUSER; // new user namespace
flags |= CloneFlags::CLONE_NEWUTS; // new UTS namespace
for perm in perms {
match perm {
Permission::PropagateFiles => flags |= CloneFlags::CLONE_FILES,
_ => unimplemented!(),
}
}
flags
}
}

View File

@ -1,3 +1,5 @@
use log::debug;
use crate::Error;
use std::collections::{HashMap, HashSet};
@ -20,11 +22,10 @@ pub struct Entrypoint {
pub args: Vec<Arg>,
#[serde(default)]
pub permissions: HashSet<Permissions>,
pub permissions: HashSet<Permission>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum Trigger {
Startup,
Pipe(String),
@ -36,7 +37,7 @@ impl Default for Trigger {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
// #[serde(tag = "type")]
pub enum Arg {
/// The binary name, or argv[0], of the original program start
@ -48,6 +49,13 @@ pub enum Arg {
/// A chosen end of a named pipe
Pipe(Pipe),
/// The value of a pipe trigger
/// NOTE: Only valid if the trigger is of type Pipe(...)
PipeTrigger,
/// A TCP Listener
TcpListener { port: u16 },
/// The rest of argv[1..], 0 or more arguments
Trailing,
}
@ -58,16 +66,14 @@ impl Arg {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum Pipe {
Rx(String),
Tx(String),
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
#[serde(tag = "type")]
pub enum Permissions {
pub enum Permission {
Filesystem {
host_path: PathBuf,
final_path: PathBuf,
@ -75,10 +81,10 @@ pub enum Permissions {
Network {
network: Network,
},
PropagateFiles,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
#[serde(tag = "type")]
pub enum Network {
InternetV4,
InternetV6,
@ -98,18 +104,17 @@ impl Specification {
}
for arg in &entry.args {
match arg {
Arg::BinaryName => {}
Arg::Entrypoint => {}
Arg::Pipe(p) => match p {
if let Arg::Pipe(p) = arg {
match p {
Pipe::Rx(s) => read.push(s.as_str()),
Pipe::Tx(s) => write.push(s.as_str()),
},
Arg::Trailing => {}
}
}
}
}
debug!("read pipes: {:?}", &read);
debug!("write pipes: {:?}", &write);
(read, write)
}
@ -119,14 +124,14 @@ impl Specification {
let mut read_set = HashSet::with_capacity(read.len());
for pipe in read {
if read_set.insert(pipe) {
if !read_set.insert(pipe) {
return Err(Error::TooManyPipes(pipe.to_string()));
}
}
let mut write_set = HashSet::with_capacity(write.len());
for pipe in write {
if write_set.insert(pipe) {
if !write_set.insert(pipe) {
return Err(Error::TooManyPipes(pipe.to_string()));
}
}
@ -141,6 +146,16 @@ impl Specification {
return Err(Error::WriteOnlyPipe(pipe.to_string()));
}
// validate pipe trigger arguments make sense
for entrypoint in self.entrypoints.values() {
if entrypoint.args.contains(&Arg::PipeTrigger) {
match entrypoint.trigger {
Trigger::Pipe(_) => {}
_ => return Err(Error::BadPipeTrigger),
}
}
}
Ok(())
}
}