Skip to content

Commit

Permalink
jsonrpc: rewriting the jsonrpc handling
Browse files Browse the repository at this point in the history
Signed-off-by: Vincenzo Palazzo <[email protected]>
  • Loading branch information
vincenzopalazzo committed May 16, 2024
1 parent 552032d commit 2aa2c58
Showing 1 changed file with 135 additions and 124 deletions.
259 changes: 135 additions & 124 deletions lampo-jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::io::{self, ErrorKind};
use std::io::{Read, Write};
use std::os::fd::{AsRawFd, FromRawFd};

Check warning on line 7 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused import: `FromRawFd`

Check warning on line 7 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused import: `FromRawFd`
use std::os::unix::net::UnixListener;
use std::os::unix::net::{SocketAddr, UnixStream};

Check warning on line 9 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused import: `SocketAddr`

Check warning on line 9 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused import: `SocketAddr`
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

// FIXME: use mio for a better platform support.
use popol::{Sources, Timeout};
use popol::{Event, Sources, Timeout};
use serde_json::Value;

pub mod command;
Expand All @@ -24,13 +25,14 @@ use crate::json_rpc2::{Request, Response};

#[derive(Debug, Clone, PartialEq)]
pub enum RPCEvent {
Listening,
Connect(String),
Accept,
Connect,
}

pub struct JSONRPCv2<T: Send + Sync + 'static> {
socket_path: String,
sources: Sources<RPCEvent>,
open_streams: HashMap<i32, UnixStream>,
socket: UnixListener,
handler: Arc<Handler<T>>,
// FIXME: should be not the name but the fd int as key?
Expand Down Expand Up @@ -102,6 +104,7 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
socket: listnet,
handler: Arc::new(Handler::new(ctx)),
socket_path: path.to_owned(),
open_streams: HashMap::new(),
conn: HashMap::new(),
conn_queue: Mutex::new(Cell::new(HashMap::new())),
})
Expand All @@ -118,17 +121,10 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
Ok(())
}

pub fn add_connection(&mut self, key: &SocketAddr, stream: UnixStream) {
let path = if let Some(path) = key.as_pathname() {
path.to_str().unwrap()
} else {
"unnamed"
};
pub fn add_connection(&mut self, stream: UnixStream) {
let res = stream.set_nonblocking(true);
debug_assert!(res.is_ok());
let event = RPCEvent::Connect(path.to_string());
self.sources.register(event, &stream, popol::interest::ALL);
self.conn.insert(path.to_owned(), stream);
log::trace!("register a new connection listener");
}

pub fn send_resp(&self, key: String, resp: Response<Value>) {
Expand Down Expand Up @@ -170,136 +166,151 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
self.handler.ctx()
}

fn write(&self, event: Event<RPCEvent>, towrite: &mut Vec<Response<Value>>) -> io::Result<()> {

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused variable: `event`

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused variable: `towrite`

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

method `write` is never used

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused variable: `event`

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

unused variable: `towrite`

Check warning on line 169 in lampo-jsonrpc/src/lib.rs

View workflow job for this annotation

GitHub Actions / Build (stable)

method `write` is never used
unimplemented!()
}

fn read(&mut self, event: &mut Event<RPCEvent>) -> io::Result<()> {
log::trace!("read from connection");
let stream = self.open_streams.get_mut(&event.as_raw_fd()).unwrap();
log::trace!("start reading");
let mut buff = vec![0; 1024]; // FIXME: make this variable
// Nb. Since `poll`, which this reactor is based on, is *level-triggered*,
// we will be notified again if there is still data to be read on the socket.
// Hence, there is no use in putting this socket read in a loop, as the second
// invocation would likely block.
let resp = match stream.read(&mut buff) {
Ok(count) => {
if count > 0 {
buff.truncate(count);
log::info!(target: "jsonrpc", "buffer read {}", String::from_utf8(buff.to_vec()).unwrap());
let requ: Request<Value> = serde_json::from_slice(&buff)
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{err}")))?;
log::trace!(target: "jsonrpc", "request {:?}", requ);
let Some(resp) = self.handler.run_callback(&requ) else {
log::error!(target: "jsonrpc", "`{}` not found!", requ.method);
return Ok(());
};
// FIXME; the id in the JSON RPC can be null!
let response = match resp {
Ok(result) => Response {
id: requ.id.clone().unwrap(),
jsonrpc: requ.jsonrpc.to_owned(),
result: Some(result),
error: None,
},
Err(err) => Response {
result: None,
error: Some(err.into()),
id: requ.id.unwrap().clone(),
jsonrpc: requ.jsonrpc.clone(),
},
};
response
} else {
log::info!("connection close");
self.open_streams.remove(&event.as_raw_fd());
event.source.unset(popol::interest::READ);
return Ok(());
}
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
log::trace!("reading is blocking");
// This shouldn't normally happen, since this function is only called
// when there's data on the socket. We leave it here in case external
// conditions change.
return Ok(());
}
Err(err) => {
log::error!("{:?}", err);
self.sources.unregister(&event.key);
return Err(err);
}
};

log::trace!(target: "jsonrpc", "send response: `{:?}`", resp);
let buff = serde_json::to_string(&resp).unwrap();
if let Err(err) = stream.write_all(buff.as_bytes()) {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
log::info!("writing is blocking");
return Ok(());
}
event.source.set(popol::interest::WRITE);
self.open_streams.remove(&event.as_raw_fd());
Ok(())
}

pub fn listen(mut self) -> io::Result<()> {
self.socket.set_nonblocking(true)?;
self.sources
.register(RPCEvent::Listening, &self.socket, popol::interest::READ);

.register(RPCEvent::Accept, &self.socket, popol::interest::READ);
log::info!(target: "jsonrpc", "starting server on {}", self.socket_path);
let mut events = vec![];
while !self.handler.stop.get() {
// Blocking while we are waiting new events!
self.sources.poll(&mut events, Timeout::Never)?;

for mut event in events.drain(..) {
match &event.key {
RPCEvent::Listening => {
let conn = self.socket.accept();
let Ok((stream, addr)) = conn else {
if let Err(err) = &conn {
if err.kind() == ErrorKind::WouldBlock {
break;
}
RPCEvent::Accept => loop {
let accept = self.socket.accept();
if let Err(err) = accept {
if err.kind() == ErrorKind::WouldBlock {
log::trace!("accepting the connection is blocking");
break;
}
log::error!(target: "jsonrpc", "fail to accept the connection: {:?}", conn);
continue;
};
log::trace!(target: "jsonrpc", "new connection to unix rpc socket");
self.add_connection(&addr, stream);
return Err(err);
}
log::info!("Accepting connection: `{:?}`", accept);
let stream = accept?.0;
self.sources.register(
RPCEvent::Connect,
&stream,
popol::interest::READ | popol::interest::WRITE,
);
self.open_streams.insert(stream.as_raw_fd(), stream);
break;
},
RPCEvent::Connect if event.is_readable() => {
self.read(&mut event)?;
}
RPCEvent::Connect(addr) => {
if event.is_hangup() {
break;
RPCEvent::Connect if event.is_writable() => {
let stream = self.open_streams.get_mut(&event.as_raw_fd()).unwrap();
match stream.flush() {
// In this case, we've written all the data, we
// are no longer interested in writing to this
// socket.
Ok(()) => {
log::trace!("reading ended");
event.source.unset(popol::interest::WRITE);
}
// In this case, the write couldn't complete. Set
// our interest to `WRITE` to be notified when the
// socket is ready to write again.
Err(err)
if [io::ErrorKind::WouldBlock, io::ErrorKind::WriteZero]
.contains(&err.kind()) =>
{
log::info!("reading return an error: {:?}", err);
event.source.set(popol::interest::READ);
break;
}
Err(err) => {
return Err(err);
}
}
if event.is_error() {
log::error!(target: "jsonrpc", "an error occurs");
}
RPCEvent::Connect => {
if event.is_hangup() || event.is_error() {
log::error!(target: "jsonrpc", "an error occurs: {:?}", event);
continue;
}

if event.is_invalid() {
log::info!(target: "jsonrpc", "event invalid, unregister event from the tracking one");
log::warn!(target: "jsonrpc", "event invalid: {:?}", event);
self.sources.unregister(&event.key);
break;
}

if event.is_readable() {
let Some(mut stream) = self.conn.get(addr) else {
log::error!(target: "jsonrpc", "connection not found `{addr}`");
continue;
};
let mut buff = String::new();
if let Err(err) = stream.read_to_string(&mut buff) {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
event.source.set(popol::interest::WRITE);
continue;
}
if buff.is_empty() {
log::warn!(target: "jsonrpc", "buffer is empty");
break;
}
let buff = buff.trim();
log::info!(target: "jsonrpc", "buffer read {buff}");
let requ: Request<Value> =
serde_json::from_str(&buff).map_err(|err| {
io::Error::new(io::ErrorKind::Other, format!("{err}"))
})?;
log::trace!(target: "jsonrpc", "request {:?}", requ);
let Some(resp) = self.handler.run_callback(&requ) else {
log::error!(target: "jsonrpc", "`{}` not found!", requ.method);
break;
};
// FIXME; the id in the JSON RPC can be null!
let response = match resp {
Ok(result) => Response {
id: requ.id.clone().unwrap(),
jsonrpc: requ.jsonrpc.to_owned(),
result: Some(result),
error: None,
},
Err(err) => Response {
result: None,
error: Some(err.into()),
id: requ.id.unwrap().clone(),
jsonrpc: requ.jsonrpc.clone(),
},
};
log::trace!(target: "jsonrpc", "send response: `{:?}`", response);
self.send_resp(addr.to_string(), response);
}

if event.is_writable() {
let stream = self.conn.get(addr);
if stream.is_none() {
log::error!(target: "jsonrpc", "connection not found `{addr}`");
continue;
};

let mut stream = stream.unwrap();
let Some(resp) = self.pop_resp(addr.to_string()) else {
break;
};
let buff = serde_json::to_string(&resp).unwrap();
if let Err(err) = stream.write_all(buff.as_bytes()) {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
event.source.set(popol::interest::WRITE);
continue;
}
match stream.flush() {
// In this case, we've written all the data, we
// are no longer interested in writing to this
// socket.
Ok(()) => {
event.source.unset(popol::interest::WRITE);
}
// In this case, the write couldn't complete. Set
// our interest to `WRITE` to be notified when the
// socket is ready to write again.
Err(err)
if [io::ErrorKind::WouldBlock, io::ErrorKind::WriteZero]
.contains(&err.kind()) =>
{
event.source.set(popol::interest::WRITE);
continue;
}
Err(err) => {
log::error!(target: "jsonrpc", "{}: Write error: {}", addr, err.to_string());
}
}
stream.shutdown(std::net::Shutdown::Both)?;
continue;
}
}
}
Expand Down

0 comments on commit 2aa2c58

Please sign in to comment.