Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global lock to synchronise LSP handlers #340

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use stdext::result::ResultOrLog;
use stdext::*;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::sync::RwLock;
use tower_lsp::jsonrpc::Result;
use tower_lsp::lsp_types::request::GotoImplementationParams;
use tower_lsp::lsp_types::request::GotoImplementationResponse;
Expand Down Expand Up @@ -51,12 +52,40 @@ use crate::r_task;

#[macro_export]
macro_rules! backend_trace {

($self: expr, $($rest: expr),*) => {{
let message = format!($($rest, )*);
$self.client.log_message(tower_lsp::lsp_types::MessageType::INFO, message).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason you can't use std::sync::RwLock has to do with this .await.

You cannot .await when holding the _guard for a std::sync RwLock - the guard can't be sent across threads i think. We also had this problem awhile back #85 (comment).

Technically if you wrap this in futures::executor::block_on() and remove the .await then you can switch back to std::sync::RwLock, but we also call log_message().await further down, and if you fix the {{ issue then you'll see that start to error too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tower-lsp executor is documented to run on one thread so technically it should be possible. But it also means that using a sync lock here would end up deadlocking us (a write handler would block the executor thread until the read handlers have finished executing, which they can't since the thread is blocked). So we can only use an async lock here.

Copy link
Contributor Author

@lionel- lionel- May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I now understand things, we do have multiple threads as we create a Tokio runtime with default configuration. However the tower-lsp handlers are run on a single task, so indeed on a single thread at any moment in time (which is important for our synchronisation efforts!). We still need Send/Sync and long enough lifetimes when running futures because the Tokio scheduler uses a work-stealing strategy. The handlers are run on a single thread but which thread exactly might change over time across yield points.

}};
}

// The following synchronisation macros should be used at entry in all LSP
// methods. The LSP handlers are run in message order by tower-lsp but they run
// concurrently. This means that a single `.await`, for instance to log a
// message, may cause out of order handling. This is problematic for
// state-changing methods as these should only run after all other methods have
// finished or were cancelled (the latter would be preferred, see
// `ContentModified` error and this thread:
// https://github.com/microsoft/language-server-protocol/issues/584).
//
// To fix this, we now request an `RwLock` at entry in each handler.
// World-changing handlers require an exclusive lock whereas world-observing
// handlers require a non-exclusive shared lock. This should prevent handlers
// from operating on outdated documents with stale positions or ranges.

#[macro_export]
macro_rules! backend_read_method {
($self:expr, $($arg:tt)*) => {{
let _guard = $self.lock.read().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure I understand this .await. I see that you aren't allowed to use a std::sync::RwLock here, but isn't this just propagating the issue?

i.e. when we hit the await we can switch away before we get the read() lock to another LSP method caller

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch happens after putting ourselves on the lock queue, if we're not able to take the lock right away (either because we're a read lock and a write lock is ongoing, or we're a write lock and another lock of any kind is ongoing).

backend_trace!($self, $($arg)*);
}};
}

#[macro_export]
macro_rules! backend_write_method {
($self:expr, $($arg:tt)*) => {{
let _guard = $self.lock.write().await;
backend_trace!($self, $($arg)*);
}};
Comment on lines +76 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am 99% sure that this only holds the _guard during the backend_trace!() call, then immediately drops it.

I think the {{ would need to be { for the _guard to be applied for the scope of the caller

(If you doubt this I have a convoluted way to prove this locally)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!!

// Recursive expansion of backend_write_method! macro
// ===================================================

{
    let _guard = self.lock.write().await;
    {
        let message = {
            let res =
                $crate::fmt::format(builtin #format_args("initialize({:#?})", params));
            res
        };
        self.client
            .log_message(tower_lsp::lsp_types::MessageType::INFO, message)
            .await
    };
}

}

#[derive(Debug)]
Expand All @@ -74,6 +103,7 @@ impl Default for Workspace {

#[derive(Clone, Debug)]
pub struct Backend {
pub lock: Arc<RwLock<()>>,
pub client: Client,
pub documents: Arc<DashMap<Url, Document>>,
pub workspace: Arc<Mutex<Workspace>>,
Expand Down Expand Up @@ -111,7 +141,7 @@ impl Backend {
#[tower_lsp::async_trait]
impl LanguageServer for Backend {
async fn initialize(&self, params: InitializeParams) -> Result<InitializeResult> {
backend_trace!(self, "initialize({:#?})", params);
backend_write_method!(self, "initialize({:#?})", params);

// initialize the set of known workspaces
let mut workspace = self.workspace.lock();
Expand Down Expand Up @@ -189,26 +219,26 @@ impl LanguageServer for Backend {
}

async fn initialized(&self, params: InitializedParams) {
backend_trace!(self, "initialized({:?})", params);
backend_read_method!(self, "initialized({:?})", params);
}

async fn shutdown(&self) -> Result<()> {
backend_trace!(self, "shutdown()");
backend_read_method!(self, "shutdown()");
Ok(())
}

async fn did_change_workspace_folders(&self, params: DidChangeWorkspaceFoldersParams) {
backend_trace!(self, "did_change_workspace_folders({:?})", params);
backend_write_method!(self, "did_change_workspace_folders({:?})", params);

// TODO: Re-start indexer with new folders.
}

async fn did_change_configuration(&self, params: DidChangeConfigurationParams) {
backend_trace!(self, "did_change_configuration({:?})", params);
backend_write_method!(self, "did_change_configuration({:?})", params);
}

async fn did_change_watched_files(&self, params: DidChangeWatchedFilesParams) {
backend_trace!(self, "did_change_watched_files({:?})", params);
backend_write_method!(self, "did_change_watched_files({:?})", params);

// TODO: Re-index the changed files.
}
Expand All @@ -217,7 +247,7 @@ impl LanguageServer for Backend {
&self,
params: WorkspaceSymbolParams,
) -> Result<Option<Vec<SymbolInformation>>> {
backend_trace!(self, "symbol({:?})", params);
backend_read_method!(self, "symbol({:?})", params);

let response = unwrap!(symbols::symbols(self, &params), Err(error) => {
log::error!("{:?}", error);
Expand All @@ -231,7 +261,7 @@ impl LanguageServer for Backend {
&self,
params: DocumentSymbolParams,
) -> Result<Option<DocumentSymbolResponse>> {
backend_trace!(self, "document_symbols({})", params.text_document.uri);
backend_read_method!(self, "document_symbols({})", params.text_document.uri);

let response = unwrap!(symbols::document_symbols(self, &params), Err(error) => {
log::error!("{:?}", error);
Expand All @@ -242,7 +272,7 @@ impl LanguageServer for Backend {
}

async fn execute_command(&self, params: ExecuteCommandParams) -> Result<Option<Value>> {
backend_trace!(self, "execute_command({:?})", params);
backend_read_method!(self, "execute_command({:?})", params);

match self.client.apply_edit(WorkspaceEdit::default()).await {
Ok(res) if res.applied => self.client.log_message(MessageType::INFO, "applied").await,
Expand All @@ -254,7 +284,7 @@ impl LanguageServer for Backend {
}

async fn did_open(&self, params: DidOpenTextDocumentParams) {
backend_trace!(self, "did_open({}", params.text_document.uri);
backend_read_method!(self, "did_open({}", params.text_document.uri);

let contents = params.text_document.text.as_str();
let uri = params.text_document.uri;
Expand All @@ -267,7 +297,7 @@ impl LanguageServer for Backend {
}

async fn did_change(&self, params: DidChangeTextDocumentParams) {
backend_trace!(self, "did_change({:?})", params);
backend_write_method!(self, "did_change({:?})", params);

// get reference to document
let uri = &params.text_document.uri;
Expand Down Expand Up @@ -303,11 +333,11 @@ impl LanguageServer for Backend {
}

async fn did_save(&self, params: DidSaveTextDocumentParams) {
backend_trace!(self, "did_save({:?}", params);
backend_read_method!(self, "did_save({:?}", params);
}

async fn did_close(&self, params: DidCloseTextDocumentParams) {
backend_trace!(self, "did_close({:?}", params);
backend_read_method!(self, "did_close({:?}", params);

let uri = params.text_document.uri;

Expand All @@ -327,7 +357,7 @@ impl LanguageServer for Backend {
}

async fn completion(&self, params: CompletionParams) -> Result<Option<CompletionResponse>> {
backend_trace!(self, "completion({:?})", params);
backend_read_method!(self, "completion({:?})", params);

// Get reference to document.
let uri = &params.text_document_position.text_document.uri;
Expand Down Expand Up @@ -360,7 +390,7 @@ impl LanguageServer for Backend {
}

async fn completion_resolve(&self, mut item: CompletionItem) -> Result<CompletionItem> {
backend_trace!(self, "completion_resolve({:?})", item);
backend_read_method!(self, "completion_resolve({:?})", item);

// Try resolving the completion item
let result = r_task(|| unsafe { resolve_completion(&mut item) });
Expand All @@ -374,7 +404,7 @@ impl LanguageServer for Backend {
}

async fn hover(&self, params: HoverParams) -> Result<Option<Hover>> {
backend_trace!(self, "hover({:?})", params);
backend_read_method!(self, "hover({:?})", params);

// get document reference
let uri = &params.text_document_position_params.text_document.uri;
Expand Down Expand Up @@ -411,6 +441,8 @@ impl LanguageServer for Backend {
}

async fn signature_help(&self, params: SignatureHelpParams) -> Result<Option<SignatureHelp>> {
backend_read_method!(self, "signature_help({params:?})");

// get document reference
let uri = &params.text_document_position_params.text_document.uri;
let document = unwrap!(self.documents.get(uri), None => {
Expand Down Expand Up @@ -444,7 +476,7 @@ impl LanguageServer for Backend {
&self,
params: GotoDefinitionParams,
) -> Result<Option<GotoDefinitionResponse>> {
backend_trace!(self, "goto_definition({:?})", params);
backend_read_method!(self, "goto_definition({params:?})");

// get reference to document
let uri = &params.text_document_position_params.text_document.uri;
Expand All @@ -466,7 +498,7 @@ impl LanguageServer for Backend {
&self,
params: GotoImplementationParams,
) -> Result<Option<GotoImplementationResponse>> {
backend_trace!(self, "goto_implementation({:?})", params);
backend_read_method!(self, "goto_implementation({params:?})");
let _ = params;
log::error!("Got a textDocument/implementation request, but it is not implemented");
return Ok(None);
Expand All @@ -476,7 +508,7 @@ impl LanguageServer for Backend {
&self,
params: SelectionRangeParams,
) -> Result<Option<Vec<SelectionRange>>> {
backend_trace!(self, "selection_range({:?})", params);
backend_read_method!(self, "selection_range({params:?})");

// Get reference to document
let uri = &params.text_document.uri;
Expand Down Expand Up @@ -508,7 +540,7 @@ impl LanguageServer for Backend {
}

async fn references(&self, params: ReferenceParams) -> Result<Option<Vec<Location>>> {
backend_trace!(self, "references({:?})", params);
backend_read_method!(self, "references({params:?})");

let locations = match self.find_references(params) {
Ok(locations) => locations,
Expand Down Expand Up @@ -542,6 +574,7 @@ impl LanguageServer for Backend {
// https://github.com/Microsoft/vscode-languageserver-node/blob/18fad46b0e8085bb72e1b76f9ea23a379569231a/client/src/common/client.ts#L701-L752
impl Backend {
async fn notification(&self, params: Option<Value>) {
backend_read_method!(self, "notification({params:?})");
log::info!("Received Positron notification: {:?}", params);
}
}
Expand Down Expand Up @@ -573,6 +606,7 @@ pub fn start_lsp(runtime: Arc<Runtime>, address: String, conn_init_tx: Sender<bo
// Note that DashMap uses synchronization primitives internally, so we
// don't guard access to the map via a mutex.
let backend = Backend {
lock: Arc::new(RwLock::new(())),
client,
documents: Arc::new(DashMap::new()),
workspace: Arc::new(Mutex::new(Workspace::default())),
Expand Down
3 changes: 2 additions & 1 deletion crates/ark/src/lsp/help_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::Serialize;
use tower_lsp::lsp_types::Position;
use tower_lsp::lsp_types::VersionedTextDocumentIdentifier;

use crate::backend_read_method;
use crate::backend_trace;
use crate::lsp::backend::Backend;
use crate::lsp::encoding::convert_position_to_point;
Expand Down Expand Up @@ -40,7 +41,7 @@ impl Backend {
&self,
params: HelpTopicParams,
) -> tower_lsp::jsonrpc::Result<Option<HelpTopicResponse>> {
backend_trace!(self, "help_topic({:?})", params);
backend_read_method!(self, "help_topic({:?})", params);

let uri = &params.text_document.uri;
let Some(document) = self.documents.get(uri) else {
Expand Down
3 changes: 2 additions & 1 deletion crates/ark/src/lsp/statement_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tower_lsp::lsp_types::VersionedTextDocumentIdentifier;
use tree_sitter::Node;
use tree_sitter::Point;

use crate::backend_read_method;
use crate::backend_trace;
use crate::lsp::backend::Backend;
use crate::lsp::encoding::convert_point_to_position;
Expand Down Expand Up @@ -58,7 +59,7 @@ impl Backend {
&self,
params: StatementRangeParams,
) -> tower_lsp::jsonrpc::Result<Option<StatementRangeResponse>> {
backend_trace!(self, "statement_range({:?})", params);
backend_read_method!(self, "statement_range({:?})", params);

let uri = &params.text_document.uri;
let Some(document) = self.documents.get(uri) else {
Expand Down
Loading