implementing file socket args and triggers #16

Merged
JakeHillion merged 1 commits from file-sockets into main 2022-04-13 23:20:32 +01:00
4 changed files with 234 additions and 15 deletions

View File

@ -18,6 +18,9 @@ pub enum Error {
#[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")]
BadPipe(String),
#[error("bad socket specification: a socket must have exactly one reader and one writer: {0}")]
BadFileSocket(String),
#[error("bad specification type: only .json files are supported")]
BadSpecType,

View File

@ -16,7 +16,8 @@ use std::os::unix::io::FromRawFd;
use clap::{App, AppSettings};
use nix::fcntl::OFlag;
use nix::unistd::{self};
use nix::sys::socket;
use nix::unistd;
pub fn run() -> Result<()> {
// process arguments
@ -69,12 +70,17 @@ pub fn run() -> Result<()> {
let (pipes, _) = spec.pipes();
let pipes = create_pipes(pipes)?;
let (sockets, _) = spec.sockets();
let sockets = create_sockets(sockets)?;
// spawn all processes
Spawner {
spec: &spec,
pipes,
binary,
trailing: &trailing,
pipes,
sockets,
}
.spawn()
}
@ -89,6 +95,16 @@ fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>> {
Ok(pipes)
}
fn create_sockets(names: Vec<&str>) -> Result<HashMap<String, SocketPair>> {
let mut sockets = HashMap::new();
for socket in names {
info!("creating socket pair `{}`", socket);
sockets.insert(socket.to_string(), SocketPair::new(socket)?);
}
Ok(sockets)
}
pub struct PipePair {
name: String,
@ -123,3 +139,44 @@ impl PipePair {
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
}
}
pub struct SocketPair {
name: String,
read: Option<File>,
write: Option<File>,
}
impl SocketPair {
fn new(name: &str) -> Result<SocketPair> {
let (read, write) = socket::socketpair(
socket::AddressFamily::Unix,
socket::SockType::Datagram,
None,
socket::SockFlag::empty(),
)
.map_err(|e| Error::Nix {
msg: "socketpair",
src: e,
})?;
// safe to create files given the successful return of socketpair(2)
Ok(SocketPair {
name: name.to_string(),
read: Some(unsafe { File::from_raw_fd(read) }),
write: Some(unsafe { File::from_raw_fd(write) }),
})
}
fn take_read(&mut self) -> Result<File> {
self.read
.take()
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
}
fn take_write(&mut self) -> Result<File> {
self.write
.take()
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
}
}

View File

@ -1,7 +1,7 @@
use log::{debug, error, info};
use super::specification::{Arg, Entrypoint, Pipe, Specification, Trigger};
use super::PipePair;
use super::specification::{Arg, Entrypoint, FileSocket, Pipe, Specification, Trigger};
use super::{PipePair, SocketPair};
use crate::void::VoidBuilder;
use crate::{Error, Result};
@ -10,18 +10,21 @@ use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::net::TcpListener;
use std::os::unix::io::IntoRawFd;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::path::PathBuf;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
use nix::unistd;
const BUFFER_SIZE: usize = 1024;
pub struct Spawner<'a> {
pub spec: &'a Specification,
pub pipes: HashMap<String, PipePair>,
pub binary: &'a str,
pub trailing: &'a Vec<&'a str>,
pub pipes: HashMap<String, PipePair>,
pub sockets: HashMap<String, SocketPair>,
}
enum TriggerData<'a> {
@ -30,6 +33,9 @@ enum TriggerData<'a> {
/// A string sent across a pipe
Pipe(&'a str),
/// File(s) sent over a file socket
FileSocket(Vec<File>),
}
impl<'a> TriggerData<'a> {
@ -37,6 +43,10 @@ impl<'a> TriggerData<'a> {
match self {
TriggerData::None => vec![],
TriggerData::Pipe(s) => vec![CString::new(s.to_string()).unwrap()],
TriggerData::FileSocket(fs) => fs
.drain(..)
.map(|f| CString::new(f.into_raw_fd().to_string()).unwrap())
.collect(),
}
}
}
@ -75,7 +85,7 @@ impl<'a> Spawner<'a> {
}
Trigger::Pipe(s) => {
let pipe = self.pipes.get_mut(s).unwrap().take_read().unwrap();
let pipe = self.pipes.get_mut(s).unwrap().take_read()?;
let binary = PathBuf::from(self.binary).canonicalize()?;
let mut builder = VoidBuilder::new();
@ -92,6 +102,25 @@ impl<'a> Spawner<'a> {
builder.spawn(closure)?;
}
Trigger::FileSocket(s) => {
let socket = self.sockets.get_mut(s).unwrap().take_read()?;
let binary = PathBuf::from(self.binary).canonicalize()?;
let mut builder = VoidBuilder::new();
builder.mount(binary, "/entrypoint");
builder.keep_fd(&socket);
let closure = || match self.file_socket_trigger(socket, entrypoint, name) {
Ok(()) => std::process::exit(exitcode::OK),
Err(e) => {
error!("error in file_socket_trigger: {}", e);
std::process::exit(1)
}
};
builder.spawn(closure)?;
}
}
}
@ -100,7 +129,6 @@ impl<'a> Spawner<'a> {
fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<()> {
let mut buf = [0_u8; BUFFER_SIZE];
loop {
let read_bytes = pipe.read(&mut buf)?;
if read_bytes == 0 {
@ -109,6 +137,9 @@ impl<'a> Spawner<'a> {
debug!("triggering from pipe read");
let mut builder = VoidBuilder::new();
builder.mount("/entrypoint", "/entrypoint");
let closure =
|| {
let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap();
@ -129,11 +160,68 @@ impl<'a> Spawner<'a> {
}
};
let mut builder = VoidBuilder::new();
builder.spawn(closure)?;
}
}
fn file_socket_trigger(&self, socket: File, spec: &Entrypoint, name: &str) -> Result<()> {
let mut buf = Vec::new();
loop {
let msg = recvmsg(socket.as_raw_fd(), &[], Some(&mut buf), MsgFlags::empty()).map_err(
|e| Error::Nix {
msg: "recvmsg",
src: e,
},
)?;
debug!("triggering from socket recvmsg");
for cmsg in msg.cmsgs() {
match cmsg {
ControlMessageOwned::ScmRights(fds) => {
let fds = fds
.into_iter()
.map(|fd| unsafe { File::from_raw_fd(fd) })
.collect();
let mut builder = VoidBuilder::new();
builder.mount("/entrypoint", "/entrypoint");
for fd in &fds {
builder.keep_fd(fd);
}
let closure = || {
let args = self
.prepare_args_ref(
name,
&spec.args,
&mut TriggerData::FileSocket(fds),
)
.unwrap();
if let Err(e) =
unistd::execv(&CString::new("/entrypoint").unwrap(), &args).map_err(
|e| Error::Nix {
msg: "execv",
src: e,
},
)
{
error!("error: {}", e);
1
} else {
0
}
};
builder.spawn(closure)?;
}
_ => unimplemented!(),
}
}
}
}
fn prepare_args(
&mut self,
entrypoint: &str,
@ -144,8 +232,10 @@ impl<'a> Spawner<'a> {
for arg in args {
out.extend(self.prepare_arg(entrypoint, arg, trigger)?);
}
Ok(out)
}
fn prepare_args_ref(
&self,
entrypoint: &str,
@ -156,6 +246,7 @@ impl<'a> Spawner<'a> {
for arg in args {
out.extend(self.prepare_arg_ref(entrypoint, arg, trigger)?);
}
Ok(out)
}
@ -176,6 +267,18 @@ impl<'a> Spawner<'a> {
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
}
},
Arg::FileSocket(s) => match s {
FileSocket::Rx(s) => {
let pipe = self.sockets.get_mut(s).unwrap().take_read()?;
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
}
FileSocket::Tx(s) => {
let pipe = self.sockets.get_mut(s).unwrap().take_write()?;
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
}
},
a => self.prepare_arg_ref(entrypoint, a, trigger),
}
}
@ -191,6 +294,12 @@ impl<'a> Spawner<'a> {
Arg::Entrypoint => Ok(vec![CString::new(entrypoint).unwrap()]),
Arg::Pipe(p) => Err(Error::BadPipe(p.get_name().to_string())),
Arg::FileSocket(s) => Err(Error::BadFileSocket(s.get_name().to_string())),
Arg::File(p) => {
let f = File::open(p)?.into_raw_fd();
Ok(vec![CString::new(f.to_string()).unwrap()])
}
Arg::Trigger => Ok(trigger.args()),

View File

@ -25,8 +25,14 @@ pub struct Entrypoint {
#[derive(Serialize, Deserialize, Debug)]
pub enum Trigger {
/// Start this entrypoint at application startup
Startup,
/// Trigger this entrypoint when a named pipe receives data
Pipe(String),
/// Trigger this entrypoint when a named file socket receives data
FileSocket(String),
}
impl Default for Trigger {
@ -36,7 +42,6 @@ impl Default for Trigger {
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
// #[serde(tag = "type")]
pub enum Arg {
/// The binary name, or argv[0], of the original program start
BinaryName,
@ -44,11 +49,17 @@ pub enum Arg {
/// The name of this entrypoint
Entrypoint,
/// A file descriptor for a file on the filesystem in the launching namespace
File(PathBuf),
/// A chosen end of a named pipe
Pipe(Pipe),
/// File socket
FileSocket(FileSocket),
/// A value specified by the trigger
/// NOTE: Only valid if the trigger is of type Pipe(...)
/// NOTE: Only valid if the trigger is of type Pipe(...) or FileSocket(...)
Trigger,
/// A TCP Listener
@ -79,6 +90,21 @@ impl Pipe {
}
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub enum FileSocket {
Rx(String),
Tx(String),
}
impl FileSocket {
pub fn get_name(&self) -> &str {
match self {
FileSocket::Rx(n) => n,
FileSocket::Tx(n) => n,
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
pub enum Permission {
Filesystem {
@ -105,9 +131,8 @@ impl Specification {
let mut write = Vec::new();
for entry in self.entrypoints.values() {
match &entry.trigger {
Trigger::Startup => {}
Trigger::Pipe(s) => read.push(s.as_str()),
if let Trigger::Pipe(s) = &entry.trigger {
read.push(s.as_str());
}
for arg in &entry.args {
@ -125,6 +150,30 @@ impl Specification {
(read, write)
}
pub fn sockets(&self) -> (Vec<&str>, Vec<&str>) {
let mut read = Vec::new();
let mut write = Vec::new();
for entry in self.entrypoints.values() {
if let Trigger::FileSocket(s) = &entry.trigger {
read.push(s.as_str());
}
for arg in &entry.args {
if let Arg::FileSocket(p) = arg {
match p {
FileSocket::Rx(s) => read.push(s.as_str()),
FileSocket::Tx(s) => write.push(s.as_str()),
}
}
}
}
debug!("read sockets: {:?}", &read);
debug!("write sockets: {:?}", &write);
(read, write)
}
pub fn validate(&self) -> Result<()> {
// validate pipes match
let (read, write) = self.pipes();
@ -153,11 +202,12 @@ impl Specification {
return Err(Error::BadPipe(pipe.to_string()));
}
// validate pipe trigger arguments make sense
// validate trigger arguments make sense
for entrypoint in self.entrypoints.values() {
if entrypoint.args.contains(&Arg::Trigger) {
match entrypoint.trigger {
Trigger::Pipe(_) => {}
Trigger::FileSocket(_) => {}
_ => return Err(Error::BadTriggerArgument),
}
}