spawner-sigint-handlers #52

Merged
JakeHillion merged 2 commits from spawner-sigint-handlers into main 2022-05-25 04:57:41 +01:00

View File

@ -18,7 +18,9 @@ use std::path::{Path, PathBuf};
use nix::sys::signal::{kill, Signal}; use nix::sys::signal::{kill, Signal};
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags}; use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
use nix::unistd::{self, Pid}; use nix::sys::wait::{waitid, Id, WaitPidFlag, WaitStatus};
use nix::unistd::{self, fork, ForkResult, Pid};
use nix::Error as NixError;
const BUFFER_SIZE: usize = 1024; const BUFFER_SIZE: usize = 1024;
const MAX_FILE_DESCRIPTORS: usize = 16; const MAX_FILE_DESCRIPTORS: usize = 16;
@ -149,9 +151,21 @@ impl<'a> Spawner<'a> {
} }
fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<()> { fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<()> {
// put the work in a forked process that can handle signals
Self::fork_for_trigger()?;
let mut buf = [0_u8; BUFFER_SIZE]; let mut buf = [0_u8; BUFFER_SIZE];
loop { loop {
let read_bytes = pipe.read(&mut buf)?; let read_bytes = match pipe.read(&mut buf) {
Ok(n) => Ok(n),
Err(e) => {
if e.kind() == std::io::ErrorKind::Interrupted {
return Ok(());
}
Err(e)
}
}?;
if read_bytes == 0 { if read_bytes == 0 {
return Ok(()); return Ok(());
} }
@ -196,19 +210,30 @@ impl<'a> Spawner<'a> {
} }
fn file_socket_trigger(&self, socket: File, spec: &Entrypoint, name: &str) -> Result<()> { fn file_socket_trigger(&self, socket: File, spec: &Entrypoint, name: &str) -> Result<()> {
// put the work in a forked process that can handle signals
Self::fork_for_trigger()?;
let mut cmsg_buf = nix::cmsg_space!([RawFd; MAX_FILE_DESCRIPTORS]); let mut cmsg_buf = nix::cmsg_space!([RawFd; MAX_FILE_DESCRIPTORS]);
loop { loop {
let msg = recvmsg::<()>( let msg = match recvmsg::<()>(
socket.as_raw_fd(), socket.as_raw_fd(),
&mut [], &mut [],
Some(&mut cmsg_buf), Some(&mut cmsg_buf),
MsgFlags::empty(), MsgFlags::empty(),
) ) {
.map_err(|e| Error::Nix { Ok(m) => Ok(m),
Err(e) => {
if e == NixError::EINTR {
return Ok(());
}
Err(Error::Nix {
msg: "recvmsg", msg: "recvmsg",
src: e, src: e,
})?; })
}
}?;
debug!("triggering from socket recvmsg"); debug!("triggering from socket recvmsg");
@ -263,6 +288,35 @@ impl<'a> Spawner<'a> {
} }
} }
fn fork_for_trigger() -> Result<()> {
// SAFETY: only unsafe in a multi-threaded program
if let ForkResult::Parent { child: _pid } = unsafe { fork() }.map_err(|e| Error::Nix {
msg: "fork",
src: e,
})? {
let status = waitid(Id::All, WaitPidFlag::WEXITED).map_err(|e| Error::Nix {
msg: "waitpid",
src: e,
})?;
match status {
WaitStatus::Exited(_pid, code) => {
std::process::exit(code);
}
WaitStatus::Signaled(pid, sig, _coredump) => {
debug!(
"trigger: forked child {} was terminated with signal {}",
pid, sig
);
std::process::exit(-1);
}
_ => unreachable!(),
}
}
Ok(())
}
fn stop_self(name: &str) -> Result<()> { fn stop_self(name: &str) -> Result<()> {
info!("stopping process `{}`", name); info!("stopping process `{}`", name);