Skip to content

Commit

Permalink
Fix infinite loop during synchronization of a workspace when importin…
Browse files Browse the repository at this point in the history
…g a file using the CLI command `workspace import` (#9116)

Closes #8941
  • Loading branch information
FirelightFlagboy authored Dec 9, 2024
1 parent fef5fc4 commit df7a961
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 35 deletions.
132 changes: 107 additions & 25 deletions cli/src/commands/workspace/import.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
use std::{path::PathBuf, vec};
use std::{collections::HashSet, path::PathBuf, sync::Arc, vec};

use libparsec::{anyhow::Context, FsPath, OpenOptions};
use libparsec::{anyhow::Context, FsPath, OpenOptions, VlobID};
use libparsec_client::EventBus;
use tokio::io::AsyncReadExt;

use crate::utils::load_client;
use crate::utils::load_client_with_config;

crate::clap_parser_with_shared_opts_builder!(
#[with = config_dir, device, password_stdin, workspace]
pub struct Args {
/// Local file to copy
src: PathBuf,
pub(crate) src: PathBuf,
/// Workspace destination path
dest: FsPath,
pub(crate) dest: FsPath,
}
);

pub async fn main(args: Args) -> anyhow::Result<()> {
let Args {
src,
dest,
workspace,
workspace: wid,
password_stdin,
device,
config_dir,
} = args;

log::trace!(
"workspace_import: {src} -> {workspace}:{dst}",
"workspace_import: {src} -> {wid}:{dst}",
src = src.display(),
dst = dest
);

let client = load_client(&config_dir, device, password_stdin).await?;
let workspace = client.start_workspace(workspace).await?;
let config = libparsec::ClientConfig {
with_monitors: true,
..Default::default()
}
.into();
let client = load_client_with_config(&config_dir, device, password_stdin, config).await?;
let workspace = client.start_workspace(wid).await?;
let (files_to_sync, _watch_files_to_sync) = watch_workspace_sync_events(&client.event_bus, wid);
let fd = workspace
.open_file(
dest,
Expand All @@ -46,13 +53,103 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
)
.await?;

let (notify, _event_conn) =
notify_sync_completion(&client.event_bus, wid, files_to_sync.clone());

copy_file_to_fd(src, &workspace, fd).await?;
log::debug!("Flushing and closing file");
workspace.fd_flush(fd).await?;
workspace.fd_close(fd).await?;

// After importing the file, we may need to wait for the workspace to sync its data with the server.
// The sync procedure may sync other files that the one related to the operation (think parent & children).
// So instead of being peaky about which file to sync or to wait, we just wait for all files to be synced for the workspace.
if !files_to_sync.lock().expect("Mutex poisoned").is_empty() {
log::debug!("Waiting for sync");
notify.notified().await;
log::trace!("Sync done");
}
Ok(())
}

/// Watch for file that need to be synced with the server
#[must_use]
fn watch_workspace_sync_events(
event_bus: &EventBus,
wid: VlobID,
) -> (
Arc<std::sync::Mutex<HashSet<VlobID>>>,
libparsec_client::EventBusConnectionLifetime<
libparsec_client::EventWorkspaceOpsOutboundSyncNeeded,
>,
) {
let files_needing_sync = Arc::new(std::sync::Mutex::new(HashSet::new()));
let dup = files_needing_sync.clone();
let event_conn = event_bus.connect(
move |event: &libparsec_client::EventWorkspaceOpsOutboundSyncNeeded| {
if event.realm_id == wid {
files_needing_sync
.lock()
.expect("Mutex poisoned")
.insert(event.entry_id);
log::debug!("Outbound sync needed for file ({})", event.entry_id);
} else {
log::trace!(
"Ignore outbound sync event for another realm ({})",
event.realm_id
);
}
},
);
(dup, event_conn)
}

#[must_use]
fn notify_sync_completion(
event_bus: &EventBus,
wid: VlobID,
files_to_sync: Arc<std::sync::Mutex<HashSet<VlobID>>>,
) -> (
Arc<tokio::sync::Notify>,
libparsec_client::EventBusConnectionLifetime<
libparsec_client::EventWorkspaceOpsOutboundSyncDone,
>,
) {
let notify = Arc::new(tokio::sync::Notify::new());
let notify2 = notify.clone();

let event_conn = event_bus.connect(
move |event: &libparsec_client::EventWorkspaceOpsOutboundSyncDone| {
let libparsec_client::EventWorkspaceOpsOutboundSyncDone {
ref realm_id,
entry_id,
} = event;
let mut files_to_sync = files_to_sync.lock().expect("Mutex poisoned");
if realm_id == &wid && files_to_sync.remove(entry_id) {
log::debug!("Outbound sync done for file ({realm_id}:{entry_id})");
if files_to_sync.is_empty() {
log::trace!("All outbound sync done for realm {}", wid);
notify2.notify_one();
}
} else {
log::trace!("Outbound sync done for another file ({realm_id}:{entry_id})");
}
},
);
(notify, event_conn)
}

async fn copy_file_to_fd(
src: PathBuf,
workspace: &Arc<libparsec_client::WorkspaceOps>,
fd: libparsec::FileDescriptor,
) -> Result<(), anyhow::Error> {
let file = tokio::fs::File::open(&src)
.await
.context("Cannot open local file")?;
let mut buf_file = tokio::io::BufReader::new(file);
let mut buffer = vec![0_u8; 4096];
let mut dst_offset = 0_usize;

log::debug!("Copying file to workspace");
loop {
let bytes_read = buf_file
Expand All @@ -73,20 +170,5 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
dst_offset += bytes_written;
}
}

log::debug!("Flushing and closing file");
workspace.fd_flush(fd).await?;
workspace.fd_close(fd).await?;

loop {
let entries_to_sync = workspace.get_need_outbound_sync(20).await?;
log::debug!("Entries to outbound sync: {:?}", entries_to_sync);
if entries_to_sync.is_empty() {
break;
}
for entry in entries_to_sync {
workspace.outbound_sync(entry).await?;
}
}
Ok(())
}
38 changes: 29 additions & 9 deletions cli/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,21 @@ pub async fn load_client(
Ok(client)
}

pub async fn load_client_with_config(
config_dir: &Path,
device: Option<String>,
password_stdin: bool,
config: libparsec_client::ClientConfig,
) -> anyhow::Result<Arc<StartedClient>> {
let device = load_and_unlock_device(config_dir, device, password_stdin).await?;
let client = start_client_with_config(device, config).await?;

Ok(client)
}

pub struct StartedClient {
client: Arc<Client>,
pub client: Arc<Client>,
pub event_bus: EventBus,
_device_in_use_guard: InUseDeviceLockGuard,
}

Expand All @@ -278,14 +291,19 @@ impl Deref for StartedClient {
}

pub async fn start_client(device: Arc<LocalDevice>) -> anyhow::Result<Arc<StartedClient>> {
let config: Arc<libparsec::internal::ClientConfig> = Arc::new(
ClientConfig {
with_monitors: false,
..Default::default()
}
.into(),
);
let config: libparsec_client::ClientConfig = ClientConfig {
with_monitors: false,
..Default::default()
}
.into();

start_client_with_config(device, config).await
}

pub async fn start_client_with_config(
device: Arc<LocalDevice>,
config: libparsec_client::ClientConfig,
) -> anyhow::Result<Arc<StartedClient>> {
let device_in_use_guard = match try_lock_device_for_use(&config.config_dir, device.device_id) {
Ok(device_in_use_guard) => device_in_use_guard,
Err(TryLockDeviceForUseError::AlreadyInUse) => {
Expand All @@ -299,10 +317,12 @@ pub async fn start_client(device: Arc<LocalDevice>) -> anyhow::Result<Arc<Starte
Err(TryLockDeviceForUseError::Internal(err)) => return Err(err),
};

let client = Client::start(config, EventBus::default(), device).await?;
let event_bus = EventBus::default();
let client = Client::start(Arc::new(config), event_bus.clone(), device).await?;

Ok(Arc::new(StartedClient {
client,
event_bus,
_device_in_use_guard: device_in_use_guard,
}))
}
Expand Down
72 changes: 71 additions & 1 deletion cli/tests/integration/workspace/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use libparsec::{tmp_path, TmpPath};
use std::sync::Arc;

use libparsec::{tmp_path, EntryName, LocalDevice, TmpPath, VlobID};

use crate::{
integration_tests::bootstrap_cli_test,
Expand Down Expand Up @@ -73,3 +75,71 @@ async fn workspace_import_file(tmp_path: TmpPath) {
assert_eq!(name.as_ref(), "test.txt");
assert!(matches!(stat, libparsec::EntryStat::File { size, .. } if size == &13));
}

#[rstest::rstest]
#[tokio::test]
async fn issue_8941_import_file_where_inbound_and_outbound_sync_are_required(tmp_path: TmpPath) {
let (
_,
TestOrganization {
alice, other_alice, ..
},
_,
) = bootstrap_cli_test(&tmp_path).await.unwrap();

// Create a workspace C with device A
let wid = create_workspace(alice.clone()).await;

// Refresh the workspace list for device B
refresh_workspace_list(other_alice).await;

// Create a file to import
let test_file = tmp_path.join("test.txt");
tokio::fs::write(&test_file, "Hello, World!").await.unwrap();

// Try to import a file with device B to workspace C
crate::assert_cmd_success!(
with_password = DEFAULT_DEVICE_PASSWORD,
"workspace",
"import",
"--device",
&alice.device_id.hex(),
"--workspace",
&wid.hex(),
&test_file.to_string_lossy(),
"/test.txt"
)
.stdout(predicates::str::is_empty());

// Try to import the file again
crate::assert_cmd_success!(
with_password = DEFAULT_DEVICE_PASSWORD,
"workspace",
"import",
"--device",
&alice.device_id.hex(),
"--workspace",
&wid.hex(),
&test_file.to_string_lossy(),
"/test.txt"
)
.stdout(predicates::str::is_empty());
}

async fn create_workspace(device: Arc<LocalDevice>) -> VlobID {
let client = start_client(device).await.unwrap();
let workspace_name = "new-workspace".parse::<EntryName>().unwrap();
let wid = client
.create_workspace(workspace_name.clone())
.await
.unwrap();
client.ensure_workspaces_bootstrapped().await.unwrap();
client.stop().await;
wid
}

async fn refresh_workspace_list(device: Arc<LocalDevice>) {
let client = start_client(device).await.unwrap();
client.list_workspaces().await;
client.stop().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl EntryStat {
EntryStat::Folder { id, .. } => *id,
}
}

pub fn parent(&self) -> VlobID {
match self {
EntryStat::File { parent, .. } => *parent,
EntryStat::Folder { parent, .. } => *parent,
}
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
1 change: 1 addition & 0 deletions newsfragments/8941.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix infinite loop during synchronization of a workspace when importing a file using the CLI command ``workspace import``

0 comments on commit df7a961

Please sign in to comment.