file dropping fix
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing

This commit is contained in:
Jake Hillion 2022-03-27 11:31:40 +01:00
parent fe05cf1fef
commit 87e873b1ff
2 changed files with 69 additions and 29 deletions

View File

@ -69,26 +69,32 @@ pub fn run() -> Result<(), Error> {
let pipes = create_pipes(pipes)?; let pipes = create_pipes(pipes)?;
// spawn all processes // spawn all processes
let spawner = Spawner { Spawner {
spec: &spec, spec: &spec,
pipes: &pipes, pipes,
binary, binary,
trailing: &trailing, trailing: &trailing,
}; }
.spawn()
spawner.spawn()?;
// TODO: Fix this dirty hack to prevent files dropping (and closing)
// switch to Option<File> and use a mut ref to take, then call
// IntoRawFd if used
std::thread::sleep(std::time::Duration::from_secs(10));
Ok(())
} }
pub struct PipePair { pub struct PipePair {
read: File, read: Option<File>,
write: File, write: Option<File>,
}
impl PipePair {
fn take_read(&mut self) -> File {
self.read
.take()
.expect("read pipes should only be used once")
}
fn take_write(&mut self) -> File {
self.write
.take()
.expect("write pipes should only be used once")
}
} }
fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>, Error> { fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>, Error> {
@ -106,8 +112,8 @@ fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>, Error> {
pipes.insert( pipes.insert(
pipe.to_string(), pipe.to_string(),
PipePair { PipePair {
read: unsafe { File::from_raw_fd(read) }, read: Some(unsafe { File::from_raw_fd(read) }),
write: unsafe { File::from_raw_fd(write) }, write: Some(unsafe { File::from_raw_fd(write) }),
}, },
); );
} }

View File

@ -9,7 +9,7 @@ use std::collections::HashMap;
use std::ffi::CString; use std::ffi::CString;
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use std::os::unix::io::{AsRawFd, FromRawFd}; use std::os::unix::io::AsRawFd;
use close_fds::CloseFdsBuilder; use close_fds::CloseFdsBuilder;
use nix::unistd::{self, Pid}; use nix::unistd::{self, Pid};
@ -18,13 +18,13 @@ const BUFFER_SIZE: usize = 1024;
pub struct Spawner<'a> { pub struct Spawner<'a> {
pub spec: &'a Specification, pub spec: &'a Specification,
pub pipes: &'a HashMap<String, PipePair>, pub pipes: HashMap<String, PipePair>,
pub binary: &'a str, pub binary: &'a str,
pub trailing: &'a Vec<&'a str>, pub trailing: &'a Vec<&'a str>,
} }
impl<'a> Spawner<'a> { impl<'a> Spawner<'a> {
pub fn spawn(&self) -> Result<(), Error> { pub fn spawn(&mut self) -> Result<(), Error> {
for (name, entrypoint) in &self.spec.entrypoints { for (name, entrypoint) in &self.spec.entrypoints {
info!("spawning entrypoint `{}`", name.as_str()); info!("spawning entrypoint `{}`", name.as_str());
@ -50,16 +50,14 @@ impl<'a> Spawner<'a> {
} }
Trigger::Pipe(s) => { Trigger::Pipe(s) => {
// TODO: Consider typing the pipes so CLONE_FILES is only when necessary // take the pipe in the initiating thread so the File isn't dropped
if clone3(CloneArgs::new(CloneFlags::CLONE_FILES)).map_err(|e| Error::Nix { let pipe = self.pipes.get_mut(s).unwrap().take_read();
if clone3(CloneArgs::new(CloneFlags::empty())).map_err(|e| Error::Nix {
msg: "clone3", msg: "clone3",
src: e, src: e,
})? == Pid::from_raw(0) })? == Pid::from_raw(0)
{ {
// Rust's ownership is out of the window now we've cloned
// Unsafely move to a new owned pipe
let pipe = unsafe { File::from_raw_fd(self.pipes[s].read.as_raw_fd()) };
let mut closer = CloseFdsBuilder::new(); let mut closer = CloseFdsBuilder::new();
let keep = [pipe.as_raw_fd()]; let keep = [pipe.as_raw_fd()];
closer.keep_fds(&keep); closer.keep_fds(&keep);
@ -103,7 +101,7 @@ impl<'a> Spawner<'a> {
})? == Pid::from_raw(0) })? == Pid::from_raw(0)
{ {
let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap(); let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap();
let args = self.prepare_args(name, &spec.args, Some(pipe_trigger)); let args = self.prepare_args_ref(name, &spec.args, Some(pipe_trigger));
unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| { unistd::execv(&CString::new(self.binary).unwrap(), &args).map_err(|e| {
Error::Nix { Error::Nix {
@ -116,7 +114,7 @@ impl<'a> Spawner<'a> {
} }
fn prepare_args( fn prepare_args(
&self, &mut self,
entrypoint: &str, entrypoint: &str,
args: &[Arg], args: &[Arg],
pipe_trigger: Option<&str>, pipe_trigger: Option<&str>,
@ -126,17 +124,22 @@ impl<'a> Spawner<'a> {
match arg { match arg {
Arg::BinaryName => out.push(CString::new(self.binary).unwrap()), Arg::BinaryName => out.push(CString::new(self.binary).unwrap()),
Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()), Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()),
Arg::Pipe(p) => out.push(match p { Arg::Pipe(p) => out.push(match p {
Pipe::Rx(s) => { Pipe::Rx(s) => {
CString::new(self.pipes[s].read.as_raw_fd().to_string()).unwrap() let pipe = self.pipes.get_mut(s).unwrap().take_read();
CString::new(pipe.as_raw_fd().to_string()).unwrap()
} }
Pipe::Tx(s) => { Pipe::Tx(s) => {
CString::new(self.pipes[s].write.as_raw_fd().to_string()).unwrap() let pipe = self.pipes.get_mut(s).unwrap().take_write();
CString::new(pipe.as_raw_fd().to_string()).unwrap()
} }
}), }),
Arg::PipeTrigger => { Arg::PipeTrigger => {
out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap()) out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap())
} }
Arg::TcpListener { port: _port } => unimplemented!(), Arg::TcpListener { port: _port } => unimplemented!(),
Arg::Trailing => { Arg::Trailing => {
@ -144,6 +147,37 @@ impl<'a> Spawner<'a> {
} }
} }
} }
out
}
fn prepare_args_ref(
&self,
entrypoint: &str,
args: &[Arg],
pipe_trigger: Option<&str>,
) -> Vec<CString> {
let mut out = Vec::new();
for arg in args {
match arg {
Arg::BinaryName => out.push(CString::new(self.binary).unwrap()),
Arg::Entrypoint => out.push(CString::new(entrypoint).unwrap()),
Arg::Pipe(_) => panic!("can't use pipes with an immutable reference"),
Arg::PipeTrigger => {
out.push(CString::new(pipe_trigger.as_ref().unwrap().to_string()).unwrap())
}
Arg::TcpListener { port: _port } => unimplemented!(),
Arg::Trailing => {
out.extend(self.trailing.iter().map(|s| CString::new(*s).unwrap()))
}
}
}
out out
} }