diff --git a/src/lib.rs b/src/lib.rs index 9e19aa9..31501d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,26 +69,32 @@ pub fn run() -> Result<(), Error> { let pipes = create_pipes(pipes)?; // spawn all processes - let spawner = Spawner { + Spawner { spec: &spec, - pipes: &pipes, + pipes, binary, trailing: &trailing, - }; - - spawner.spawn()?; - - // TODO: Fix this dirty hack to prevent files dropping (and closing) - // switch to Option and use a mut ref to take, then call - // IntoRawFd if used - std::thread::sleep(std::time::Duration::from_secs(10)); - - Ok(()) + } + .spawn() } pub struct PipePair { - read: File, - write: File, + read: Option, + write: Option, +} + +impl PipePair { + fn take_read(&mut self) -> File { + self.read + .take() + .expect("read pipes should only be used once") + } + + fn take_write(&mut self) -> File { + self.write + .take() + .expect("write pipes should only be used once") + } } fn create_pipes(names: Vec<&str>) -> Result, Error> { @@ -106,8 +112,8 @@ fn create_pipes(names: Vec<&str>) -> Result, Error> { pipes.insert( pipe.to_string(), PipePair { - read: unsafe { File::from_raw_fd(read) }, - write: unsafe { File::from_raw_fd(write) }, + read: Some(unsafe { File::from_raw_fd(read) }), + write: Some(unsafe { File::from_raw_fd(write) }), }, ); } diff --git a/src/spawner.rs b/src/spawner.rs index 456c8bf..aa2a24a 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::ffi::CString; use std::fs::File; use std::io::Read; -use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::io::AsRawFd; use close_fds::CloseFdsBuilder; use nix::unistd::{self, Pid}; @@ -18,13 +18,13 @@ const BUFFER_SIZE: usize = 1024; pub struct Spawner<'a> { pub spec: &'a Specification, - pub pipes: &'a HashMap, + pub pipes: HashMap, pub binary: &'a str, pub trailing: &'a Vec<&'a str>, } impl<'a> Spawner<'a> { - pub fn spawn(&self) -> Result<(), Error> { + pub fn spawn(&mut self) -> Result<(), Error> { for (name, entrypoint) in &self.spec.entrypoints { info!("spawning entrypoint `{}`", name.as_str()); @@ -50,16 +50,14 @@ impl<'a> Spawner<'a> { } Trigger::Pipe(s) => { - // TODO: Consider typing the pipes so CLONE_FILES is only when necessary - if clone3(CloneArgs::new(CloneFlags::CLONE_FILES)).map_err(|e| Error::Nix { + // take the pipe in the initiating thread so the File isn't dropped + let pipe = self.pipes.get_mut(s).unwrap().take_read(); + + if clone3(CloneArgs::new(CloneFlags::empty())).map_err(|e| Error::Nix { msg: "clone3", src: e, })? == Pid::from_raw(0) { - // Rust's ownership is out of the window now we've cloned - // Unsafely move to a new owned pipe - let pipe = unsafe { File::from_raw_fd(self.pipes[s].read.as_raw_fd()) }; - let mut closer = CloseFdsBuilder::new(); let keep = [pipe.as_raw_fd()]; closer.keep_fds(&keep); @@ -103,7 +101,7 @@ impl<'a> Spawner<'a> { })? == Pid::from_raw(0) { let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap(); - let args = self.prepare_args(name, &spec.args, Some(pipe_trigger)); + let args = self.prepare_args_ref(name, &spec.args, Some(pipe_trigger)); unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| { Error::Nix { @@ -116,7 +114,7 @@ impl<'a> Spawner<'a> { } fn prepare_args( - &self, + &mut self, entrypoint: &str, args: &[Arg], pipe_trigger: Option<&str>, @@ -126,17 +124,22 @@ impl<'a> Spawner<'a> { match arg { Arg::BinaryName => out.push(CString::new(self.binary).unwrap()), Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()), + Arg::Pipe(p) => out.push(match p { Pipe::Rx(s) => { - CString::new(self.pipes[s].read.as_raw_fd().to_string()).unwrap() + let pipe = self.pipes.get_mut(s).unwrap().take_read(); + CString::new(pipe.as_raw_fd().to_string()).unwrap() } Pipe::Tx(s) => { - CString::new(self.pipes[s].write.as_raw_fd().to_string()).unwrap() + let pipe = self.pipes.get_mut(s).unwrap().take_write(); + CString::new(pipe.as_raw_fd().to_string()).unwrap() } }), + Arg::PipeTrigger => { out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap()) } + Arg::TcpListener { port: _port } => unimplemented!(), Arg::Trailing => { @@ -144,6 +147,37 @@ impl<'a> Spawner<'a> { } } } + + out + } + + fn prepare_args_ref( + &self, + entrypoint: &str, + args: &[Arg], + pipe_trigger: Option<&str>, + ) -> Vec { + let mut out = Vec::new(); + + for arg in args { + match arg { + Arg::BinaryName => out.push(CString::new(self.binary).unwrap()), + Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()), + + Arg::Pipe(_) => panic!("can't use pipes with an immutable reference"), + + Arg::PipeTrigger => { + out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap()) + } + + Arg::TcpListener { port: _port } => unimplemented!(), + + Arg::Trailing => { + out.extend(self.trailing.iter().map(|s| CString::new(*s).unwrap())) + } + } + } + out }