implementing file socket args and triggers #16
@ -18,6 +18,9 @@ pub enum Error {
|
||||
#[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")]
|
||||
BadPipe(String),
|
||||
|
||||
#[error("bad socket specification: a socket must have exactly one reader and one writer: {0}")]
|
||||
BadFileSocket(String),
|
||||
|
||||
#[error("bad specification type: only .json files are supported")]
|
||||
BadSpecType,
|
||||
|
||||
|
61
src/lib.rs
61
src/lib.rs
@ -16,7 +16,8 @@ use std::os::unix::io::FromRawFd;
|
||||
|
||||
use clap::{App, AppSettings};
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::unistd::{self};
|
||||
use nix::sys::socket;
|
||||
use nix::unistd;
|
||||
|
||||
pub fn run() -> Result<()> {
|
||||
// process arguments
|
||||
@ -69,12 +70,17 @@ pub fn run() -> Result<()> {
|
||||
let (pipes, _) = spec.pipes();
|
||||
let pipes = create_pipes(pipes)?;
|
||||
|
||||
let (sockets, _) = spec.sockets();
|
||||
let sockets = create_sockets(sockets)?;
|
||||
|
||||
// spawn all processes
|
||||
Spawner {
|
||||
spec: &spec,
|
||||
pipes,
|
||||
binary,
|
||||
trailing: &trailing,
|
||||
|
||||
pipes,
|
||||
sockets,
|
||||
}
|
||||
.spawn()
|
||||
}
|
||||
@ -89,6 +95,16 @@ fn create_pipes(names: Vec<&str>) -> Result<HashMap<String, PipePair>> {
|
||||
Ok(pipes)
|
||||
}
|
||||
|
||||
fn create_sockets(names: Vec<&str>) -> Result<HashMap<String, SocketPair>> {
|
||||
let mut sockets = HashMap::new();
|
||||
for socket in names {
|
||||
info!("creating socket pair `{}`", socket);
|
||||
sockets.insert(socket.to_string(), SocketPair::new(socket)?);
|
||||
}
|
||||
|
||||
Ok(sockets)
|
||||
}
|
||||
|
||||
pub struct PipePair {
|
||||
name: String,
|
||||
|
||||
@ -123,3 +139,44 @@ impl PipePair {
|
||||
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SocketPair {
|
||||
name: String,
|
||||
|
||||
read: Option<File>,
|
||||
write: Option<File>,
|
||||
}
|
||||
|
||||
impl SocketPair {
|
||||
fn new(name: &str) -> Result<SocketPair> {
|
||||
let (read, write) = socket::socketpair(
|
||||
socket::AddressFamily::Unix,
|
||||
socket::SockType::Datagram,
|
||||
None,
|
||||
socket::SockFlag::empty(),
|
||||
)
|
||||
.map_err(|e| Error::Nix {
|
||||
msg: "socketpair",
|
||||
src: e,
|
||||
})?;
|
||||
|
||||
// safe to create files given the successful return of socketpair(2)
|
||||
Ok(SocketPair {
|
||||
name: name.to_string(),
|
||||
read: Some(unsafe { File::from_raw_fd(read) }),
|
||||
write: Some(unsafe { File::from_raw_fd(write) }),
|
||||
})
|
||||
}
|
||||
|
||||
fn take_read(&mut self) -> Result<File> {
|
||||
self.read
|
||||
.take()
|
||||
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
|
||||
}
|
||||
|
||||
fn take_write(&mut self) -> Result<File> {
|
||||
self.write
|
||||
.take()
|
||||
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
|
||||
}
|
||||
}
|
||||
|
123
src/spawner.rs
123
src/spawner.rs
@ -1,7 +1,7 @@
|
||||
use log::{debug, error, info};
|
||||
|
||||
use super::specification::{Arg, Entrypoint, Pipe, Specification, Trigger};
|
||||
use super::PipePair;
|
||||
use super::specification::{Arg, Entrypoint, FileSocket, Pipe, Specification, Trigger};
|
||||
use super::{PipePair, SocketPair};
|
||||
use crate::void::VoidBuilder;
|
||||
use crate::{Error, Result};
|
||||
|
||||
@ -10,18 +10,21 @@ use std::ffi::CString;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::net::TcpListener;
|
||||
use std::os::unix::io::IntoRawFd;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags};
|
||||
use nix::unistd;
|
||||
|
||||
const BUFFER_SIZE: usize = 1024;
|
||||
|
||||
pub struct Spawner<'a> {
|
||||
pub spec: &'a Specification,
|
||||
pub pipes: HashMap<String, PipePair>,
|
||||
pub binary: &'a str,
|
||||
pub trailing: &'a Vec<&'a str>,
|
||||
|
||||
pub pipes: HashMap<String, PipePair>,
|
||||
pub sockets: HashMap<String, SocketPair>,
|
||||
}
|
||||
|
||||
enum TriggerData<'a> {
|
||||
@ -30,6 +33,9 @@ enum TriggerData<'a> {
|
||||
|
||||
/// A string sent across a pipe
|
||||
Pipe(&'a str),
|
||||
|
||||
/// File(s) sent over a file socket
|
||||
FileSocket(Vec<File>),
|
||||
}
|
||||
|
||||
impl<'a> TriggerData<'a> {
|
||||
@ -37,6 +43,10 @@ impl<'a> TriggerData<'a> {
|
||||
match self {
|
||||
TriggerData::None => vec![],
|
||||
TriggerData::Pipe(s) => vec![CString::new(s.to_string()).unwrap()],
|
||||
TriggerData::FileSocket(fs) => fs
|
||||
.drain(..)
|
||||
.map(|f| CString::new(f.into_raw_fd().to_string()).unwrap())
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -75,7 +85,7 @@ impl<'a> Spawner<'a> {
|
||||
}
|
||||
|
||||
Trigger::Pipe(s) => {
|
||||
let pipe = self.pipes.get_mut(s).unwrap().take_read().unwrap();
|
||||
let pipe = self.pipes.get_mut(s).unwrap().take_read()?;
|
||||
let binary = PathBuf::from(self.binary).canonicalize()?;
|
||||
|
||||
let mut builder = VoidBuilder::new();
|
||||
@ -92,6 +102,25 @@ impl<'a> Spawner<'a> {
|
||||
|
||||
builder.spawn(closure)?;
|
||||
}
|
||||
|
||||
Trigger::FileSocket(s) => {
|
||||
let socket = self.sockets.get_mut(s).unwrap().take_read()?;
|
||||
let binary = PathBuf::from(self.binary).canonicalize()?;
|
||||
|
||||
let mut builder = VoidBuilder::new();
|
||||
builder.mount(binary, "/entrypoint");
|
||||
builder.keep_fd(&socket);
|
||||
|
||||
let closure = || match self.file_socket_trigger(socket, entrypoint, name) {
|
||||
Ok(()) => std::process::exit(exitcode::OK),
|
||||
Err(e) => {
|
||||
error!("error in file_socket_trigger: {}", e);
|
||||
std::process::exit(1)
|
||||
}
|
||||
};
|
||||
|
||||
builder.spawn(closure)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,7 +129,6 @@ impl<'a> Spawner<'a> {
|
||||
|
||||
fn pipe_trigger(&self, mut pipe: File, spec: &Entrypoint, name: &str) -> Result<()> {
|
||||
let mut buf = [0_u8; BUFFER_SIZE];
|
||||
|
||||
loop {
|
||||
let read_bytes = pipe.read(&mut buf)?;
|
||||
if read_bytes == 0 {
|
||||
@ -109,6 +137,9 @@ impl<'a> Spawner<'a> {
|
||||
|
||||
debug!("triggering from pipe read");
|
||||
|
||||
let mut builder = VoidBuilder::new();
|
||||
builder.mount("/entrypoint", "/entrypoint");
|
||||
|
||||
let closure =
|
||||
|| {
|
||||
let pipe_trigger = std::str::from_utf8(&buf[0..read_bytes]).unwrap();
|
||||
@ -129,11 +160,68 @@ impl<'a> Spawner<'a> {
|
||||
}
|
||||
};
|
||||
|
||||
let mut builder = VoidBuilder::new();
|
||||
builder.spawn(closure)?;
|
||||
}
|
||||
}
|
||||
|
||||
fn file_socket_trigger(&self, socket: File, spec: &Entrypoint, name: &str) -> Result<()> {
|
||||
let mut buf = Vec::new();
|
||||
loop {
|
||||
let msg = recvmsg(socket.as_raw_fd(), &[], Some(&mut buf), MsgFlags::empty()).map_err(
|
||||
|e| Error::Nix {
|
||||
msg: "recvmsg",
|
||||
src: e,
|
||||
},
|
||||
)?;
|
||||
|
||||
debug!("triggering from socket recvmsg");
|
||||
|
||||
for cmsg in msg.cmsgs() {
|
||||
match cmsg {
|
||||
ControlMessageOwned::ScmRights(fds) => {
|
||||
let fds = fds
|
||||
.into_iter()
|
||||
.map(|fd| unsafe { File::from_raw_fd(fd) })
|
||||
.collect();
|
||||
|
||||
let mut builder = VoidBuilder::new();
|
||||
builder.mount("/entrypoint", "/entrypoint");
|
||||
for fd in &fds {
|
||||
builder.keep_fd(fd);
|
||||
}
|
||||
|
||||
let closure = || {
|
||||
let args = self
|
||||
.prepare_args_ref(
|
||||
name,
|
||||
&spec.args,
|
||||
&mut TriggerData::FileSocket(fds),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if let Err(e) =
|
||||
unistd::execv(&CString::new("/entrypoint").unwrap(), &args).map_err(
|
||||
|e| Error::Nix {
|
||||
msg: "execv",
|
||||
src: e,
|
||||
},
|
||||
)
|
||||
{
|
||||
error!("error: {}", e);
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
builder.spawn(closure)?;
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_args(
|
||||
&mut self,
|
||||
entrypoint: &str,
|
||||
@ -144,8 +232,10 @@ impl<'a> Spawner<'a> {
|
||||
for arg in args {
|
||||
out.extend(self.prepare_arg(entrypoint, arg, trigger)?);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
fn prepare_args_ref(
|
||||
&self,
|
||||
entrypoint: &str,
|
||||
@ -156,6 +246,7 @@ impl<'a> Spawner<'a> {
|
||||
for arg in args {
|
||||
out.extend(self.prepare_arg_ref(entrypoint, arg, trigger)?);
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
@ -176,6 +267,18 @@ impl<'a> Spawner<'a> {
|
||||
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
|
||||
}
|
||||
},
|
||||
|
||||
Arg::FileSocket(s) => match s {
|
||||
FileSocket::Rx(s) => {
|
||||
let pipe = self.sockets.get_mut(s).unwrap().take_read()?;
|
||||
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
|
||||
}
|
||||
FileSocket::Tx(s) => {
|
||||
let pipe = self.sockets.get_mut(s).unwrap().take_write()?;
|
||||
Ok(vec![CString::new(pipe.into_raw_fd().to_string()).unwrap()])
|
||||
}
|
||||
},
|
||||
|
||||
a => self.prepare_arg_ref(entrypoint, a, trigger),
|
||||
}
|
||||
}
|
||||
@ -191,6 +294,12 @@ impl<'a> Spawner<'a> {
|
||||
Arg::Entrypoint => Ok(vec![CString::new(entrypoint).unwrap()]),
|
||||
|
||||
Arg::Pipe(p) => Err(Error::BadPipe(p.get_name().to_string())),
|
||||
Arg::FileSocket(s) => Err(Error::BadFileSocket(s.get_name().to_string())),
|
||||
|
||||
Arg::File(p) => {
|
||||
let f = File::open(p)?.into_raw_fd();
|
||||
Ok(vec![CString::new(f.to_string()).unwrap()])
|
||||
}
|
||||
|
||||
Arg::Trigger => Ok(trigger.args()),
|
||||
|
||||
|
@ -25,8 +25,14 @@ pub struct Entrypoint {
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum Trigger {
|
||||
/// Start this entrypoint at application startup
|
||||
Startup,
|
||||
|
||||
/// Trigger this entrypoint when a named pipe receives data
|
||||
Pipe(String),
|
||||
|
||||
/// Trigger this entrypoint when a named file socket receives data
|
||||
FileSocket(String),
|
||||
}
|
||||
|
||||
impl Default for Trigger {
|
||||
@ -36,7 +42,6 @@ impl Default for Trigger {
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||
// #[serde(tag = "type")]
|
||||
pub enum Arg {
|
||||
/// The binary name, or argv[0], of the original program start
|
||||
BinaryName,
|
||||
@ -44,11 +49,17 @@ pub enum Arg {
|
||||
/// The name of this entrypoint
|
||||
Entrypoint,
|
||||
|
||||
/// A file descriptor for a file on the filesystem in the launching namespace
|
||||
File(PathBuf),
|
||||
|
||||
/// A chosen end of a named pipe
|
||||
Pipe(Pipe),
|
||||
|
||||
/// File socket
|
||||
FileSocket(FileSocket),
|
||||
|
||||
/// A value specified by the trigger
|
||||
/// NOTE: Only valid if the trigger is of type Pipe(...)
|
||||
/// NOTE: Only valid if the trigger is of type Pipe(...) or FileSocket(...)
|
||||
Trigger,
|
||||
|
||||
/// A TCP Listener
|
||||
@ -79,6 +90,21 @@ impl Pipe {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||
pub enum FileSocket {
|
||||
Rx(String),
|
||||
Tx(String),
|
||||
}
|
||||
|
||||
impl FileSocket {
|
||||
pub fn get_name(&self) -> &str {
|
||||
match self {
|
||||
FileSocket::Rx(n) => n,
|
||||
FileSocket::Tx(n) => n,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum Permission {
|
||||
Filesystem {
|
||||
@ -105,9 +131,8 @@ impl Specification {
|
||||
let mut write = Vec::new();
|
||||
|
||||
for entry in self.entrypoints.values() {
|
||||
match &entry.trigger {
|
||||
Trigger::Startup => {}
|
||||
Trigger::Pipe(s) => read.push(s.as_str()),
|
||||
if let Trigger::Pipe(s) = &entry.trigger {
|
||||
read.push(s.as_str());
|
||||
}
|
||||
|
||||
for arg in &entry.args {
|
||||
@ -125,6 +150,30 @@ impl Specification {
|
||||
(read, write)
|
||||
}
|
||||
|
||||
pub fn sockets(&self) -> (Vec<&str>, Vec<&str>) {
|
||||
let mut read = Vec::new();
|
||||
let mut write = Vec::new();
|
||||
|
||||
for entry in self.entrypoints.values() {
|
||||
if let Trigger::FileSocket(s) = &entry.trigger {
|
||||
read.push(s.as_str());
|
||||
}
|
||||
|
||||
for arg in &entry.args {
|
||||
if let Arg::FileSocket(p) = arg {
|
||||
match p {
|
||||
FileSocket::Rx(s) => read.push(s.as_str()),
|
||||
FileSocket::Tx(s) => write.push(s.as_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("read sockets: {:?}", &read);
|
||||
debug!("write sockets: {:?}", &write);
|
||||
(read, write)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<()> {
|
||||
// validate pipes match
|
||||
let (read, write) = self.pipes();
|
||||
@ -153,11 +202,12 @@ impl Specification {
|
||||
return Err(Error::BadPipe(pipe.to_string()));
|
||||
}
|
||||
|
||||
// validate pipe trigger arguments make sense
|
||||
// validate trigger arguments make sense
|
||||
for entrypoint in self.entrypoints.values() {
|
||||
if entrypoint.args.contains(&Arg::Trigger) {
|
||||
match entrypoint.trigger {
|
||||
Trigger::Pipe(_) => {}
|
||||
Trigger::FileSocket(_) => {}
|
||||
_ => return Err(Error::BadTriggerArgument),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user