allowing multiple processes to share socket #40

Merged
JakeHillion merged 1 commits from dup-sockets into main 2022-05-19 21:34:22 +01:00
5 changed files with 60 additions and 32 deletions

View File

@ -18,7 +18,7 @@ pub enum Error {
#[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")] #[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")]
BadPipe(String), BadPipe(String),
#[error("bad socket specification: a socket must have exactly one reader and one writer: {0}")] #[error("bad socket specification: a socket must have exactly one reader and one or more writers: {0}")]
JakeHillion marked this conversation as resolved Outdated

this needs to be reflected in the specification checking

this needs to be reflected in the specification checking
BadFileSocket(String), BadFileSocket(String),
#[error("bad specification type: only .json files are supported")] #[error("bad specification type: only .json files are supported")]

View File

@ -12,7 +12,7 @@ use specification::Specification;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::os::unix::io::FromRawFd; use std::os::unix::io::{AsRawFd, FromRawFd};
use std::path::Path; use std::path::Path;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
@ -135,10 +135,11 @@ impl PipePair {
src: e, src: e,
})?; })?;
// safe to create files given the successful return of pipe(2)
Ok(PipePair { Ok(PipePair {
name: name.to_string(), name: name.to_string(),
// SAFETY: valid new fd as pipe2(2) returned successfully
read: Some(unsafe { File::from_raw_fd(read) }), read: Some(unsafe { File::from_raw_fd(read) }),
// SAFETY: valid new fd as pipe2(2) returned successfully
write: Some(unsafe { File::from_raw_fd(write) }), write: Some(unsafe { File::from_raw_fd(write) }),
}) })
} }
@ -160,7 +161,7 @@ pub struct SocketPair {
name: String, name: String,
read: Option<File>, read: Option<File>,
write: Option<File>, write: File,
} }
impl SocketPair { impl SocketPair {
@ -176,23 +177,26 @@ impl SocketPair {
src: e, src: e,
})?; })?;
// safe to create files given the successful return of socketpair(2)
Ok(SocketPair { Ok(SocketPair {
name: name.to_string(), name: name.to_string(),
// SAFETY: valid new fd as socketpair(2) returned successfully
JakeHillion marked this conversation as resolved Outdated

add more local safety comments

add more local safety comments
read: Some(unsafe { File::from_raw_fd(read) }), read: Some(unsafe { File::from_raw_fd(read) }),
write: Some(unsafe { File::from_raw_fd(write) }), // SAFETY: valid new fd as socketpair(2) returned successfully
write: unsafe { File::from_raw_fd(write) },
}) })
} }
fn take_read(&mut self) -> Result<File> { fn take_read(&mut self) -> Result<File> {
self.read self.read
.take() .take()
.ok_or_else(|| Error::BadPipe(self.name.to_string())) .ok_or_else(|| Error::BadFileSocket(self.name.to_string()))
} }
fn take_write(&mut self) -> Result<File> { fn write(&self) -> Result<File> {
self.write let dup_fd = nix::unistd::dup(self.write.as_raw_fd())
.take() .map_err(|e| Error::Nix { msg: "dup", src: e })?;
JakeHillion marked this conversation as resolved Outdated

fix this comment

fix this comment
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
// SAFETY: valid new fd as dup(2) returned successfully
Ok(unsafe { File::from_raw_fd(dup_fd) })
} }
} }

View File

@ -37,11 +37,15 @@ impl PreparedArgs {
* for things like network sockets. update the builder * for things like network sockets. update the builder
* with newly passed fds. * with newly passed fds.
*/ */
pub fn prepare_ambient(builder: &mut VoidBuilder, args: &[Arg]) -> Result<Self> { pub fn prepare_ambient(
spawner: &Spawner,
builder: &mut VoidBuilder,
args: &[Arg],
) -> Result<Self> {
let mut v = Vec::with_capacity(args.len()); let mut v = Vec::with_capacity(args.len());
for arg in args { for arg in args {
v.push(PreparedArg::prepare_ambient(builder, arg)?); v.push(PreparedArg::prepare_ambient(spawner, builder, arg)?);
} }
Ok(PreparedArgs(v)) Ok(PreparedArgs(v))
@ -113,24 +117,28 @@ impl PreparedArg {
PreparedArg::Pipe(pipe) PreparedArg::Pipe(pipe)
} }
Arg::FileSocket(s) => { Arg::FileSocket(FileSocket::Rx(s)) => {
let socket = match s { let socket = spawner.sockets.get_mut(s).unwrap().take_read()?;
FileSocket::Rx(s) => spawner.sockets.get_mut(s).unwrap().take_read(),
FileSocket::Tx(s) => spawner.sockets.get_mut(s).unwrap().take_write(),
}?;
builder.keep_fd(&socket); builder.keep_fd(&socket);
PreparedArg::FileSocket(socket) PreparedArg::FileSocket(socket)
} }
arg => Self::prepare_ambient(builder, arg)?, arg => Self::prepare_ambient(spawner, builder, arg)?,
}) })
} }
fn prepare_ambient(builder: &mut VoidBuilder, arg: &Arg) -> Result<Self> { fn prepare_ambient(spawner: &Spawner, builder: &mut VoidBuilder, arg: &Arg) -> Result<Self> {
Ok(match arg { Ok(match arg {
Arg::Pipe(p) => return Err(Error::BadPipe(p.get_name().to_string())), Arg::Pipe(p) => return Err(Error::BadPipe(p.get_name().to_string())),
Arg::FileSocket(s) => return Err(Error::BadFileSocket(s.get_name().to_string())), Arg::FileSocket(FileSocket::Rx(s)) => return Err(Error::BadFileSocket(s.to_string())),
Arg::FileSocket(FileSocket::Tx(s)) => {
let socket = spawner.sockets.get(s).unwrap().write()?;
builder.keep_fd(&socket);
PreparedArg::FileSocket(socket)
}
Arg::File(path) => { Arg::File(path) => {
let fd = File::open(path)?; let fd = File::open(path)?;

View File

@ -169,7 +169,7 @@ impl<'a> Spawner<'a> {
self.prepare_env(&mut builder, &spec.environment); self.prepare_env(&mut builder, &spec.environment);
let args = PreparedArgs::prepare_ambient(&mut builder, &spec.args)?; let args = PreparedArgs::prepare_ambient(self, &mut builder, &spec.args)?;
let closure = let closure =
|| { || {
@ -234,7 +234,7 @@ impl<'a> Spawner<'a> {
self.prepare_env(&mut builder, &spec.environment); self.prepare_env(&mut builder, &spec.environment);
let args = PreparedArgs::prepare_ambient(&mut builder, &spec.args)?; let args = PreparedArgs::prepare_ambient(self, &mut builder, &spec.args)?;
let closure = || { let closure = || {
if self.debug { if self.debug {

View File

@ -99,15 +99,6 @@ pub enum FileSocket {
Tx(String), Tx(String),
} }
impl FileSocket {
pub fn get_name(&self) -> &str {
match self {
FileSocket::Rx(n) => n,
FileSocket::Tx(n) => n,
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
pub enum Environment { pub enum Environment {
Filesystem { Filesystem {
@ -206,6 +197,31 @@ impl Specification {
return Err(Error::BadPipe(pipe.to_string())); return Err(Error::BadPipe(pipe.to_string()));
} }
// validate sockets match
let (read, write) = self.sockets();
let mut read_set = HashSet::with_capacity(read.len());
for socket in read {
if !read_set.insert(socket) {
return Err(Error::BadFileSocket(socket.to_string()));
}
}
let mut write_set = HashSet::with_capacity(write.len());
for socket in write {
write_set.insert(socket);
}
for socket in &read_set {
if !write_set.contains(socket) {
return Err(Error::BadFileSocket(socket.to_string()));
}
}
if let Some(socket) = (&write_set - &read_set).into_iter().next() {
return Err(Error::BadFileSocket(socket.to_string()));
}
// validate trigger arguments make sense // validate trigger arguments make sense
for entrypoint in self.entrypoints.values() { for entrypoint in self.entrypoints.values() {
if entrypoint.args.contains(&Arg::Trigger) { if entrypoint.args.contains(&Arg::Trigger) {