-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use global lock to synchronise LSP handlers #340
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ use stdext::result::ResultOrLog; | |
use stdext::*; | ||
use tokio::net::TcpListener; | ||
use tokio::runtime::Runtime; | ||
use tokio::sync::RwLock; | ||
use tower_lsp::jsonrpc::Result; | ||
use tower_lsp::lsp_types::request::GotoImplementationParams; | ||
use tower_lsp::lsp_types::request::GotoImplementationResponse; | ||
|
@@ -51,12 +52,40 @@ use crate::r_task; | |
|
||
#[macro_export] | ||
macro_rules! backend_trace { | ||
|
||
($self: expr, $($rest: expr),*) => {{ | ||
let message = format!($($rest, )*); | ||
$self.client.log_message(tower_lsp::lsp_types::MessageType::INFO, message).await | ||
}}; | ||
} | ||
|
||
// The following synchronisation macros should be used at entry in all LSP | ||
// methods. The LSP handlers are run in message order by tower-lsp but they run | ||
// concurrently. This means that a single `.await`, for instance to log a | ||
// message, may cause out of order handling. This is problematic for | ||
// state-changing methods as these should only run after all other methods have | ||
// finished or were cancelled (the latter would be preferred, see | ||
// `ContentModified` error and this thread: | ||
// https://github.com/microsoft/language-server-protocol/issues/584). | ||
// | ||
// To fix this, we now request an `RwLock` at entry in each handler. | ||
// World-changing handlers require an exclusive lock whereas world-observing | ||
// handlers require a non-exclusive shared lock. This should prevent handlers | ||
// from operating on outdated documents with stale positions or ranges. | ||
|
||
#[macro_export] | ||
macro_rules! backend_read_method { | ||
($self:expr, $($arg:tt)*) => {{ | ||
let _guard = $self.lock.read().await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not really sure I understand this i.e. when we hit the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The switch happens after putting ourselves on the lock queue, if we're not able to take the lock right away (either because we're a read lock and a write lock is ongoing, or we're a write lock and another lock of any kind is ongoing). |
||
backend_trace!($self, $($arg)*); | ||
}}; | ||
} | ||
|
||
#[macro_export] | ||
macro_rules! backend_write_method { | ||
($self:expr, $($arg:tt)*) => {{ | ||
let _guard = $self.lock.write().await; | ||
backend_trace!($self, $($arg)*); | ||
}}; | ||
Comment on lines
+76
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am 99% sure that this only holds the I think the (If you doubt this I have a convoluted way to prove this locally) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch!! // Recursive expansion of backend_write_method! macro
// ===================================================
{
let _guard = self.lock.write().await;
{
let message = {
let res =
$crate::fmt::format(builtin #format_args("initialize({:#?})", params));
res
};
self.client
.log_message(tower_lsp::lsp_types::MessageType::INFO, message)
.await
};
} |
||
} | ||
|
||
#[derive(Debug)] | ||
|
@@ -74,6 +103,7 @@ impl Default for Workspace { | |
|
||
#[derive(Clone, Debug)] | ||
pub struct Backend { | ||
pub lock: Arc<RwLock<()>>, | ||
pub client: Client, | ||
pub documents: Arc<DashMap<Url, Document>>, | ||
pub workspace: Arc<Mutex<Workspace>>, | ||
|
@@ -111,7 +141,7 @@ impl Backend { | |
#[tower_lsp::async_trait] | ||
impl LanguageServer for Backend { | ||
async fn initialize(&self, params: InitializeParams) -> Result<InitializeResult> { | ||
backend_trace!(self, "initialize({:#?})", params); | ||
backend_write_method!(self, "initialize({:#?})", params); | ||
|
||
// initialize the set of known workspaces | ||
let mut workspace = self.workspace.lock(); | ||
|
@@ -189,26 +219,26 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn initialized(&self, params: InitializedParams) { | ||
backend_trace!(self, "initialized({:?})", params); | ||
backend_read_method!(self, "initialized({:?})", params); | ||
} | ||
|
||
async fn shutdown(&self) -> Result<()> { | ||
backend_trace!(self, "shutdown()"); | ||
backend_read_method!(self, "shutdown()"); | ||
Ok(()) | ||
} | ||
|
||
async fn did_change_workspace_folders(&self, params: DidChangeWorkspaceFoldersParams) { | ||
backend_trace!(self, "did_change_workspace_folders({:?})", params); | ||
backend_write_method!(self, "did_change_workspace_folders({:?})", params); | ||
|
||
// TODO: Re-start indexer with new folders. | ||
} | ||
|
||
async fn did_change_configuration(&self, params: DidChangeConfigurationParams) { | ||
backend_trace!(self, "did_change_configuration({:?})", params); | ||
backend_write_method!(self, "did_change_configuration({:?})", params); | ||
} | ||
|
||
async fn did_change_watched_files(&self, params: DidChangeWatchedFilesParams) { | ||
backend_trace!(self, "did_change_watched_files({:?})", params); | ||
backend_write_method!(self, "did_change_watched_files({:?})", params); | ||
|
||
// TODO: Re-index the changed files. | ||
} | ||
|
@@ -217,7 +247,7 @@ impl LanguageServer for Backend { | |
&self, | ||
params: WorkspaceSymbolParams, | ||
) -> Result<Option<Vec<SymbolInformation>>> { | ||
backend_trace!(self, "symbol({:?})", params); | ||
backend_read_method!(self, "symbol({:?})", params); | ||
|
||
let response = unwrap!(symbols::symbols(self, ¶ms), Err(error) => { | ||
log::error!("{:?}", error); | ||
|
@@ -231,7 +261,7 @@ impl LanguageServer for Backend { | |
&self, | ||
params: DocumentSymbolParams, | ||
) -> Result<Option<DocumentSymbolResponse>> { | ||
backend_trace!(self, "document_symbols({})", params.text_document.uri); | ||
backend_read_method!(self, "document_symbols({})", params.text_document.uri); | ||
|
||
let response = unwrap!(symbols::document_symbols(self, ¶ms), Err(error) => { | ||
log::error!("{:?}", error); | ||
|
@@ -242,7 +272,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn execute_command(&self, params: ExecuteCommandParams) -> Result<Option<Value>> { | ||
backend_trace!(self, "execute_command({:?})", params); | ||
backend_read_method!(self, "execute_command({:?})", params); | ||
|
||
match self.client.apply_edit(WorkspaceEdit::default()).await { | ||
Ok(res) if res.applied => self.client.log_message(MessageType::INFO, "applied").await, | ||
|
@@ -254,7 +284,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn did_open(&self, params: DidOpenTextDocumentParams) { | ||
backend_trace!(self, "did_open({}", params.text_document.uri); | ||
backend_read_method!(self, "did_open({}", params.text_document.uri); | ||
|
||
let contents = params.text_document.text.as_str(); | ||
let uri = params.text_document.uri; | ||
|
@@ -267,7 +297,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn did_change(&self, params: DidChangeTextDocumentParams) { | ||
backend_trace!(self, "did_change({:?})", params); | ||
backend_write_method!(self, "did_change({:?})", params); | ||
|
||
// get reference to document | ||
let uri = ¶ms.text_document.uri; | ||
|
@@ -303,11 +333,11 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn did_save(&self, params: DidSaveTextDocumentParams) { | ||
backend_trace!(self, "did_save({:?}", params); | ||
backend_read_method!(self, "did_save({:?}", params); | ||
} | ||
|
||
async fn did_close(&self, params: DidCloseTextDocumentParams) { | ||
backend_trace!(self, "did_close({:?}", params); | ||
backend_read_method!(self, "did_close({:?}", params); | ||
|
||
let uri = params.text_document.uri; | ||
|
||
|
@@ -327,7 +357,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn completion(&self, params: CompletionParams) -> Result<Option<CompletionResponse>> { | ||
backend_trace!(self, "completion({:?})", params); | ||
backend_read_method!(self, "completion({:?})", params); | ||
|
||
// Get reference to document. | ||
let uri = ¶ms.text_document_position.text_document.uri; | ||
|
@@ -360,7 +390,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn completion_resolve(&self, mut item: CompletionItem) -> Result<CompletionItem> { | ||
backend_trace!(self, "completion_resolve({:?})", item); | ||
backend_read_method!(self, "completion_resolve({:?})", item); | ||
|
||
// Try resolving the completion item | ||
let result = r_task(|| unsafe { resolve_completion(&mut item) }); | ||
|
@@ -374,7 +404,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn hover(&self, params: HoverParams) -> Result<Option<Hover>> { | ||
backend_trace!(self, "hover({:?})", params); | ||
backend_read_method!(self, "hover({:?})", params); | ||
|
||
// get document reference | ||
let uri = ¶ms.text_document_position_params.text_document.uri; | ||
|
@@ -411,6 +441,8 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn signature_help(&self, params: SignatureHelpParams) -> Result<Option<SignatureHelp>> { | ||
backend_read_method!(self, "signature_help({params:?})"); | ||
|
||
// get document reference | ||
let uri = ¶ms.text_document_position_params.text_document.uri; | ||
let document = unwrap!(self.documents.get(uri), None => { | ||
|
@@ -444,7 +476,7 @@ impl LanguageServer for Backend { | |
&self, | ||
params: GotoDefinitionParams, | ||
) -> Result<Option<GotoDefinitionResponse>> { | ||
backend_trace!(self, "goto_definition({:?})", params); | ||
backend_read_method!(self, "goto_definition({params:?})"); | ||
|
||
// get reference to document | ||
let uri = ¶ms.text_document_position_params.text_document.uri; | ||
|
@@ -466,7 +498,7 @@ impl LanguageServer for Backend { | |
&self, | ||
params: GotoImplementationParams, | ||
) -> Result<Option<GotoImplementationResponse>> { | ||
backend_trace!(self, "goto_implementation({:?})", params); | ||
backend_read_method!(self, "goto_implementation({params:?})"); | ||
let _ = params; | ||
log::error!("Got a textDocument/implementation request, but it is not implemented"); | ||
return Ok(None); | ||
|
@@ -476,7 +508,7 @@ impl LanguageServer for Backend { | |
&self, | ||
params: SelectionRangeParams, | ||
) -> Result<Option<Vec<SelectionRange>>> { | ||
backend_trace!(self, "selection_range({:?})", params); | ||
backend_read_method!(self, "selection_range({params:?})"); | ||
|
||
// Get reference to document | ||
let uri = ¶ms.text_document.uri; | ||
|
@@ -508,7 +540,7 @@ impl LanguageServer for Backend { | |
} | ||
|
||
async fn references(&self, params: ReferenceParams) -> Result<Option<Vec<Location>>> { | ||
backend_trace!(self, "references({:?})", params); | ||
backend_read_method!(self, "references({params:?})"); | ||
|
||
let locations = match self.find_references(params) { | ||
Ok(locations) => locations, | ||
|
@@ -542,6 +574,7 @@ impl LanguageServer for Backend { | |
// https://github.com/Microsoft/vscode-languageserver-node/blob/18fad46b0e8085bb72e1b76f9ea23a379569231a/client/src/common/client.ts#L701-L752 | ||
impl Backend { | ||
async fn notification(&self, params: Option<Value>) { | ||
backend_read_method!(self, "notification({params:?})"); | ||
log::info!("Received Positron notification: {:?}", params); | ||
} | ||
} | ||
|
@@ -573,6 +606,7 @@ pub fn start_lsp(runtime: Arc<Runtime>, address: String, conn_init_tx: Sender<bo | |
// Note that DashMap uses synchronization primitives internally, so we | ||
// don't guard access to the map via a mutex. | ||
let backend = Backend { | ||
lock: Arc::new(RwLock::new(())), | ||
client, | ||
documents: Arc::new(DashMap::new()), | ||
workspace: Arc::new(Mutex::new(Workspace::default())), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason you can't use
std::sync::RwLock
has to do with this.await
.You cannot
.await
when holding the_guard
for a std::syncRwLock
- the guard can't be sent across threads i think. We also had this problem awhile back #85 (comment).Technically if you wrap this in
futures::executor::block_on()
and remove the.await
then you can switch back tostd::sync::RwLock
, but we also calllog_message().await
further down, and if you fix the{{
issue then you'll see that start to error too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tower-lsp executor is documented to run on one thread so technically it should be possible. But it also means that using a sync lock here would end up deadlocking us (a write handler would block the executor thread until the read handlers have finished executing, which they can't since the thread is blocked). So we can only use an async lock here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I now understand things, we do have multiple threads as we create a Tokio runtime with default configuration. However the tower-lsp handlers are run on a single task, so indeed on a single thread at any moment in time (which is important for our synchronisation efforts!). We still need Send/Sync and long enough lifetimes when running futures because the Tokio scheduler uses a work-stealing strategy. The handlers are run on a single thread but which thread exactly might change over time across yield points.