allowing multiple processes to share socket #40
@ -18,7 +18,7 @@ pub enum Error {
|
|||||||
#[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")]
|
#[error("bad pipe specification: a pipe must have exactly one reader and one writer: {0}")]
|
||||||
BadPipe(String),
|
BadPipe(String),
|
||||||
|
|
||||||
#[error("bad socket specification: a socket must have exactly one reader and one writer: {0}")]
|
#[error("bad socket specification: a socket must have exactly one reader and one or more writers: {0}")]
|
||||||
JakeHillion marked this conversation as resolved
Outdated
|
|||||||
BadFileSocket(String),
|
BadFileSocket(String),
|
||||||
|
|
||||||
#[error("bad specification type: only .json files are supported")]
|
#[error("bad specification type: only .json files are supported")]
|
||||||
|
24
src/lib.rs
24
src/lib.rs
@ -12,7 +12,7 @@ use specification::Specification;
|
|||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::os::unix::io::FromRawFd;
|
use std::os::unix::io::{AsRawFd, FromRawFd};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use nix::fcntl::OFlag;
|
use nix::fcntl::OFlag;
|
||||||
@ -135,10 +135,11 @@ impl PipePair {
|
|||||||
src: e,
|
src: e,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// safe to create files given the successful return of pipe(2)
|
|
||||||
Ok(PipePair {
|
Ok(PipePair {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
|
// SAFETY: valid new fd as pipe2(2) returned successfully
|
||||||
read: Some(unsafe { File::from_raw_fd(read) }),
|
read: Some(unsafe { File::from_raw_fd(read) }),
|
||||||
|
// SAFETY: valid new fd as pipe2(2) returned successfully
|
||||||
write: Some(unsafe { File::from_raw_fd(write) }),
|
write: Some(unsafe { File::from_raw_fd(write) }),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -160,7 +161,7 @@ pub struct SocketPair {
|
|||||||
name: String,
|
name: String,
|
||||||
|
|
||||||
read: Option<File>,
|
read: Option<File>,
|
||||||
write: Option<File>,
|
write: File,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SocketPair {
|
impl SocketPair {
|
||||||
@ -176,23 +177,26 @@ impl SocketPair {
|
|||||||
src: e,
|
src: e,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// safe to create files given the successful return of socketpair(2)
|
|
||||||
Ok(SocketPair {
|
Ok(SocketPair {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
|
// SAFETY: valid new fd as socketpair(2) returned successfully
|
||||||
JakeHillion marked this conversation as resolved
Outdated
JakeHillion
commented
add more local safety comments add more local safety comments
|
|||||||
read: Some(unsafe { File::from_raw_fd(read) }),
|
read: Some(unsafe { File::from_raw_fd(read) }),
|
||||||
write: Some(unsafe { File::from_raw_fd(write) }),
|
// SAFETY: valid new fd as socketpair(2) returned successfully
|
||||||
|
write: unsafe { File::from_raw_fd(write) },
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn take_read(&mut self) -> Result<File> {
|
fn take_read(&mut self) -> Result<File> {
|
||||||
self.read
|
self.read
|
||||||
.take()
|
.take()
|
||||||
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
|
.ok_or_else(|| Error::BadFileSocket(self.name.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn take_write(&mut self) -> Result<File> {
|
fn write(&self) -> Result<File> {
|
||||||
self.write
|
let dup_fd = nix::unistd::dup(self.write.as_raw_fd())
|
||||||
.take()
|
.map_err(|e| Error::Nix { msg: "dup", src: e })?;
|
||||||
JakeHillion marked this conversation as resolved
Outdated
JakeHillion
commented
fix this comment fix this comment
|
|||||||
.ok_or_else(|| Error::BadPipe(self.name.to_string()))
|
|
||||||
|
// SAFETY: valid new fd as dup(2) returned successfully
|
||||||
|
Ok(unsafe { File::from_raw_fd(dup_fd) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,11 +37,15 @@ impl PreparedArgs {
|
|||||||
* for things like network sockets. update the builder
|
* for things like network sockets. update the builder
|
||||||
* with newly passed fds.
|
* with newly passed fds.
|
||||||
*/
|
*/
|
||||||
pub fn prepare_ambient(builder: &mut VoidBuilder, args: &[Arg]) -> Result<Self> {
|
pub fn prepare_ambient(
|
||||||
|
spawner: &Spawner,
|
||||||
|
builder: &mut VoidBuilder,
|
||||||
|
args: &[Arg],
|
||||||
|
) -> Result<Self> {
|
||||||
let mut v = Vec::with_capacity(args.len());
|
let mut v = Vec::with_capacity(args.len());
|
||||||
|
|
||||||
for arg in args {
|
for arg in args {
|
||||||
v.push(PreparedArg::prepare_ambient(builder, arg)?);
|
v.push(PreparedArg::prepare_ambient(spawner, builder, arg)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(PreparedArgs(v))
|
Ok(PreparedArgs(v))
|
||||||
@ -113,24 +117,28 @@ impl PreparedArg {
|
|||||||
PreparedArg::Pipe(pipe)
|
PreparedArg::Pipe(pipe)
|
||||||
}
|
}
|
||||||
|
|
||||||
Arg::FileSocket(s) => {
|
Arg::FileSocket(FileSocket::Rx(s)) => {
|
||||||
let socket = match s {
|
let socket = spawner.sockets.get_mut(s).unwrap().take_read()?;
|
||||||
FileSocket::Rx(s) => spawner.sockets.get_mut(s).unwrap().take_read(),
|
|
||||||
FileSocket::Tx(s) => spawner.sockets.get_mut(s).unwrap().take_write(),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
builder.keep_fd(&socket);
|
builder.keep_fd(&socket);
|
||||||
PreparedArg::FileSocket(socket)
|
PreparedArg::FileSocket(socket)
|
||||||
}
|
}
|
||||||
|
|
||||||
arg => Self::prepare_ambient(builder, arg)?,
|
arg => Self::prepare_ambient(spawner, builder, arg)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn prepare_ambient(builder: &mut VoidBuilder, arg: &Arg) -> Result<Self> {
|
fn prepare_ambient(spawner: &Spawner, builder: &mut VoidBuilder, arg: &Arg) -> Result<Self> {
|
||||||
Ok(match arg {
|
Ok(match arg {
|
||||||
Arg::Pipe(p) => return Err(Error::BadPipe(p.get_name().to_string())),
|
Arg::Pipe(p) => return Err(Error::BadPipe(p.get_name().to_string())),
|
||||||
Arg::FileSocket(s) => return Err(Error::BadFileSocket(s.get_name().to_string())),
|
Arg::FileSocket(FileSocket::Rx(s)) => return Err(Error::BadFileSocket(s.to_string())),
|
||||||
|
|
||||||
|
Arg::FileSocket(FileSocket::Tx(s)) => {
|
||||||
|
let socket = spawner.sockets.get(s).unwrap().write()?;
|
||||||
|
|
||||||
|
builder.keep_fd(&socket);
|
||||||
|
PreparedArg::FileSocket(socket)
|
||||||
|
}
|
||||||
|
|
||||||
Arg::File(path) => {
|
Arg::File(path) => {
|
||||||
let fd = File::open(path)?;
|
let fd = File::open(path)?;
|
||||||
|
@ -169,7 +169,7 @@ impl<'a> Spawner<'a> {
|
|||||||
|
|
||||||
self.prepare_env(&mut builder, &spec.environment);
|
self.prepare_env(&mut builder, &spec.environment);
|
||||||
|
|
||||||
let args = PreparedArgs::prepare_ambient(&mut builder, &spec.args)?;
|
let args = PreparedArgs::prepare_ambient(self, &mut builder, &spec.args)?;
|
||||||
|
|
||||||
let closure =
|
let closure =
|
||||||
|| {
|
|| {
|
||||||
@ -234,7 +234,7 @@ impl<'a> Spawner<'a> {
|
|||||||
|
|
||||||
self.prepare_env(&mut builder, &spec.environment);
|
self.prepare_env(&mut builder, &spec.environment);
|
||||||
|
|
||||||
let args = PreparedArgs::prepare_ambient(&mut builder, &spec.args)?;
|
let args = PreparedArgs::prepare_ambient(self, &mut builder, &spec.args)?;
|
||||||
|
|
||||||
let closure = || {
|
let closure = || {
|
||||||
if self.debug {
|
if self.debug {
|
||||||
|
@ -99,15 +99,6 @@ pub enum FileSocket {
|
|||||||
Tx(String),
|
Tx(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileSocket {
|
|
||||||
pub fn get_name(&self) -> &str {
|
|
||||||
match self {
|
|
||||||
FileSocket::Rx(n) => n,
|
|
||||||
FileSocket::Tx(n) => n,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
|
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
|
||||||
pub enum Environment {
|
pub enum Environment {
|
||||||
Filesystem {
|
Filesystem {
|
||||||
@ -206,6 +197,31 @@ impl Specification {
|
|||||||
return Err(Error::BadPipe(pipe.to_string()));
|
return Err(Error::BadPipe(pipe.to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate sockets match
|
||||||
|
let (read, write) = self.sockets();
|
||||||
|
let mut read_set = HashSet::with_capacity(read.len());
|
||||||
|
|
||||||
|
for socket in read {
|
||||||
|
if !read_set.insert(socket) {
|
||||||
|
return Err(Error::BadFileSocket(socket.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut write_set = HashSet::with_capacity(write.len());
|
||||||
|
for socket in write {
|
||||||
|
write_set.insert(socket);
|
||||||
|
}
|
||||||
|
|
||||||
|
for socket in &read_set {
|
||||||
|
if !write_set.contains(socket) {
|
||||||
|
return Err(Error::BadFileSocket(socket.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(socket) = (&write_set - &read_set).into_iter().next() {
|
||||||
|
return Err(Error::BadFileSocket(socket.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
// validate trigger arguments make sense
|
// validate trigger arguments make sense
|
||||||
for entrypoint in self.entrypoints.values() {
|
for entrypoint in self.entrypoints.values() {
|
||||||
if entrypoint.args.contains(&Arg::Trigger) {
|
if entrypoint.args.contains(&Arg::Trigger) {
|
||||||
|
Loading…
Reference in New Issue
Block a user
this needs to be reflected in the specification checking