Skip to content

Commit

Permalink
Generate request routing table with a proc macro
Browse files Browse the repository at this point in the history
This commit adds a new subcrate named `tower-lsp-macros` which exposes
an `#[rpc]` procedural macro. This macro annotates each of the
`LanguageServer` trait methods and creates a new `crate::generated_impl`
module from it, containing two things:

1. An opaque `ServerRequest` struct which uses `serde` derive magic to
   deserialize valid LSP server requests from JSON, based on the trait
   methods' type signatures. This `ServerRequest` struct is ready to be
   re-exported in the `jsonrpc` module for consumption.
2. A private `handle_request()` function which takes an incoming
   `ServerRequest` and triggers the corresponding handler method on the
   user's `LanguageServer` implementation. It also contains basic checks
   for initialization and shutdown previously stored in `Delegate`.

`<LspService as Service>::call()` has been refactored to reference
`generated_impl::handle_request()` internally, and `LspService` has been
redefined as `fn(jsonrpc::Incoming) -> Option<jsonrpc::Outgoing>`. Also,
`Arc<dyn LanguageServer>` is now considered valid as a side effect of
this work.

The existing `delegate` module has been removed, and some tests had to
be tweaked to get everything to pass. One small hiccup was found with
`service::tests::exit_notification` test, where this notification would
no longer parse:

```json
{"jsonrpc":"2.0","method":"initialized"}
```

It seems that `jsonrpc-core` parses the JSON more permissively, despite
the LSP spec technically defining the `initialized` notification as:

```json
{"jsonrpc":"2.0","method":"initialized","params":{}}
```

AFAIK, this increased strictness shouldn't pose a problem, as `coc.nvim`
and several other popular language client seem to send the correct form
of the notification, but it's notable nonetheless as a potential caveat.
  • Loading branch information
ebkalderon committed Aug 9, 2020
1 parent 1be5882 commit 07c0e6c
Show file tree
Hide file tree
Showing 9 changed files with 649 additions and 508 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ documentation = "https://docs.rs/tower-lsp/"
readme = "README.md"
categories = ["asynchronous"]
keywords = ["language-server", "lsp", "tower"]
exclude = ["./tower-lsp-macros"]

[dependencies]
async-trait = "0.1"
auto_impl = "0.4"
bytes = "0.5"
dashmap = "3.5.1"
futures = { version = "0.3", features = ["compat"] }
futures = { version = "0.3", default-features = false, features = ["std", "async-await"] }
jsonrpc-core = "14.0"
jsonrpc-derive = "14.0"
log = "0.4"
Expand All @@ -27,9 +28,14 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", features = ["rt-core"] }
tokio-util = { version = "0.3", features = ["codec"] }
tower-lsp-macros = { version = "0.1", path = "./tower-lsp-macros" }
tower-service = "0.3"

[dev-dependencies]
env_logger = "0.7"
tokio = { version = "0.2", features = ["io-std", "io-util", "macros", "net", "test-util"] }
tower-test = "0.3"

[workspace]
members = [".", "./tower-lsp-macros"]
default-members = ["."]
140 changes: 54 additions & 86 deletions src/delegate/client.rs → src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,45 @@ use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;

use dashmap::DashMap;
use futures::channel::mpsc::{Receiver, Sender};
use futures::channel::oneshot;
use futures::channel::mpsc::Sender;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use jsonrpc_core::types::{ErrorCode, Id, Output, Version};
use jsonrpc_core::{Error, Result};
use log::{error, trace};
use lsp_types::notification::{Notification, *};
use lsp_types::request::{Request, *};
use lsp_types::*;
use serde::Serialize;
use serde_json::Value;

use super::not_initialized_error;

/// Maps all pending client request IDs to their future responses.
type RequestMap = DashMap<u64, oneshot::Sender<Output>>;
use super::jsonrpc::{self, ClientRequests, Error, ErrorCode, Id, Outgoing, Result, Version};

#[derive(Debug)]
struct ClientInner {
sender: Sender<String>,
sender: Sender<Outgoing>,
initialized: Arc<AtomicBool>,
request_id: AtomicU64,
pending_requests: Arc<RequestMap>,
pending_requests: Arc<ClientRequests>,
}

/// Handle for communicating with the language client.
#[derive(Clone, Debug)]
pub struct Client(Arc<ClientInner>);
pub struct Client {
inner: Arc<ClientInner>,
}

impl Client {
pub(super) fn new(
sender: Sender<String>,
mut receiver: Receiver<Output>,
sender: Sender<Outgoing>,
pending_requests: Arc<ClientRequests>,
initialized: Arc<AtomicBool>,
) -> Self {
let pending_requests = Arc::new(RequestMap::default());

let pending = pending_requests.clone();
tokio::spawn(async move {
while let Some(response) = receiver.next().await {
if let Id::Num(ref id) = response.id() {
match pending.remove(id) {
Some((_, tx)) => tx.send(response).expect("receiver already dropped"),
None => error!("received response from client with no matching request"),
}
} else {
error!("received response from client with non-numeric ID");
}
}
});

let inner = Arc::new(ClientInner {
sender,
initialized,
request_id: AtomicU64::new(0),
pending_requests,
});

Client(inner)
Client {
inner: Arc::new(ClientInner {
sender,
initialized,
request_id: AtomicU64::new(0),
pending_requests,
}),
}
}

/// Notifies the client to log a particular message.
Expand Down Expand Up @@ -132,7 +110,6 @@ impl Client {
if !value.is_null() && !value.is_array() && !value.is_object() {
value = Value::Array(vec![value]);
}

self.send_notification::<TelemetryEvent>(value);
}
}
Expand Down Expand Up @@ -275,34 +252,31 @@ impl Client {
where
R: Request,
{
let id = self.0.request_id.fetch_add(1, Ordering::Relaxed);
let message = make_request::<R>(id, params);
let id = self.inner.request_id.fetch_add(1, Ordering::Relaxed);
let message = Outgoing::Request(make_request::<R>(id, params));

if self.0.sender.clone().send(message).await.is_err() {
if self.inner.sender.clone().send(message).await.is_err() {
error!("failed to send request");
return Err(Error::internal_error());
}

let (tx, rx) = oneshot::channel();
self.0.pending_requests.insert(id, tx);
let response = rx.await.expect("sender already dropped");

match response {
Output::Success(s) => serde_json::from_value(s.result).map_err(|e| Error {
let response = self.inner.pending_requests.wait(Id::Number(id)).await;
let (_, result) = response.into_parts();
result.and_then(|v| {
serde_json::from_value(v).map_err(|e| Error {
code: ErrorCode::ParseError,
message: e.to_string(),
data: None,
}),
Output::Failure(f) => Err(f.error),
}
})
})
}

fn send_notification<N>(&self, params: N::Params)
where
N: Notification,
{
let mut sender = self.0.sender.clone();
let message = make_notification::<N>(params);
let mut sender = self.inner.sender.clone();
let message = Outgoing::Request(make_notification::<N>(params));
tokio::spawn(async move {
if sender.send(message).await.is_err() {
error!("failed to send notification")
Expand All @@ -314,21 +288,21 @@ impl Client {
where
R: Request,
{
if self.0.initialized.load(Ordering::SeqCst) {
if self.inner.initialized.load(Ordering::SeqCst) {
self.send_request::<R>(params).await
} else {
let id = self.0.request_id.load(Ordering::SeqCst) + 1;
let id = self.inner.request_id.fetch_add(1, Ordering::Relaxed);
let msg = make_request::<R>(id, params);
trace!("server not initialized, supressing message: {}", msg);
Err(not_initialized_error())
Err(jsonrpc::not_initialized_error())
}
}

fn send_notification_initialized<N>(&self, params: N::Params)
where
N: Notification,
{
if self.0.initialized.load(Ordering::SeqCst) {
if self.inner.initialized.load(Ordering::SeqCst) {
self.send_notification::<N>(params);
} else {
let msg = make_notification::<N>(params);
Expand All @@ -338,50 +312,44 @@ impl Client {
}

/// Constructs a JSON-RPC request from its corresponding LSP type.
fn make_request<R>(id: u64, params: R::Params) -> String
fn make_request<R>(id: u64, params: R::Params) -> Value
where
R: Request,
{
// Since these types come from the `lsp-types` crate and validity is enforced via the
// `Request` trait, the `unwrap()` call below should never fail.
serde_json::to_string(&serde_json::json!({
"jsonrpc": Version::V2,
"id": Id::Num(id),
serde_json::json!({
"jsonrpc": Version,
"id": Id::Number(id),
"method": R::METHOD,
"params": params,
}))
.unwrap()
})
}

/// Constructs a JSON-RPC notification from its corresponding LSP type.
fn make_notification<N>(params: N::Params) -> String
fn make_notification<N>(params: N::Params) -> Value
where
N: Notification,
{
// Since these types come from the `lsp-types` crate and validity is enforced via the
// `Notification` trait, the `unwrap()` call below should never fail.
serde_json::to_string(&serde_json::json!({
"jsonrpc": Version::V2,
serde_json::json!({
"jsonrpc": Version,
"method": N::METHOD,
"params": params,
}))
.unwrap()
})
}

#[cfg(test)]
mod tests {
use futures::channel::mpsc;
use futures::StreamExt;
use serde_json::json;

use super::*;

async fn assert_client_messages<F: FnOnce(Client)>(f: F, expected: String) {
async fn assert_client_messages<F: FnOnce(Client)>(f: F, expected: Outgoing) {
let (request_tx, request_rx) = mpsc::channel(1);
let (response_tx, response_rx) = mpsc::channel(1);
let pending = Arc::new(ClientRequests::new());

let client = Client::new(request_tx, response_rx, Arc::new(AtomicBool::new(true)));
let client = Client::new(request_tx, pending, Arc::new(AtomicBool::new(true)));
f(client);
drop(response_tx);

let messages: Vec<_> = request_rx.collect().await;
assert_eq!(messages, vec![expected]);
Expand All @@ -390,42 +358,42 @@ mod tests {
#[tokio::test]
async fn log_message() {
let (typ, message) = (MessageType::Log, "foo bar".to_owned());
let expected = make_notification::<LogMessage>(LogMessageParams {
let expected = Outgoing::Request(make_notification::<LogMessage>(LogMessageParams {
typ,
message: message.clone(),
});
}));

assert_client_messages(|p| p.log_message(typ, message), expected).await;
}

#[tokio::test]
async fn show_message() {
let (typ, message) = (MessageType::Log, "foo bar".to_owned());
let expected = make_notification::<ShowMessage>(ShowMessageParams {
let expected = Outgoing::Request(make_notification::<ShowMessage>(ShowMessageParams {
typ,
message: message.clone(),
});
}));

assert_client_messages(|p| p.show_message(typ, message), expected).await;
}

#[tokio::test]
async fn telemetry_event() {
let null = json!(null);
let expected = make_notification::<TelemetryEvent>(null.clone());
let expected = Outgoing::Request(make_notification::<TelemetryEvent>(null.clone()));
assert_client_messages(|p| p.telemetry_event(null), expected).await;

let array = json!([1, 2, 3]);
let expected = make_notification::<TelemetryEvent>(array.clone());
let expected = Outgoing::Request(make_notification::<TelemetryEvent>(array.clone()));
assert_client_messages(|p| p.telemetry_event(array), expected).await;

let object = json!({});
let expected = make_notification::<TelemetryEvent>(object.clone());
let expected = Outgoing::Request(make_notification::<TelemetryEvent>(object.clone()));
assert_client_messages(|p| p.telemetry_event(object), expected).await;

let anything_else = json!("hello");
let wrapped = Value::Array(vec![anything_else.clone()]);
let expected = make_notification::<TelemetryEvent>(wrapped);
let expected = Outgoing::Request(make_notification::<TelemetryEvent>(wrapped));
assert_client_messages(|p| p.telemetry_event(anything_else), expected).await;
}

Expand All @@ -435,7 +403,7 @@ mod tests {
let diagnostics = vec![Diagnostic::new_simple(Default::default(), "example".into())];

let params = PublishDiagnosticsParams::new(uri.clone(), diagnostics.clone(), None);
let expected = make_notification::<PublishDiagnostics>(params);
let expected = Outgoing::Request(make_notification::<PublishDiagnostics>(params));

assert_client_messages(|p| p.publish_diagnostics(uri, diagnostics, None), expected).await;
}
Expand Down
Loading

0 comments on commit 07c0e6c

Please sign in to comment.