Skip to content

Commit

Permalink
Fix possible race confition when adding, then removing remote config …
Browse files Browse the repository at this point in the history
…services (#882)

Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi authored Feb 14, 2025
1 parent 958830a commit 7a481f8
Showing 1 changed file with 55 additions and 30 deletions.
85 changes: 55 additions & 30 deletions remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,42 +168,67 @@ where
'service_handling: {
'drop_service: {
if let Some(known_service) = services.get_mut(target) {
known_service.refcount = if known_service.refcount == 1 {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, // not really
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
Instant::now() + Duration::from_secs(3666),
),
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
known_service.refcount = match known_service.refcount {
0 => {
// Handle the possible race condition where the service gets added AND
// removed while in Removing state.
let status = known_service.status.lock().unwrap();
match *status {
KnownTargetStatus::Removing(ref future) => {
let future = future.clone();
let runtime_id = runtime_id.to_string();
let this = self.clone();
let target = target.clone();
tokio::spawn(async move {
future.await;
this.remove_target(runtime_id.as_str(), &target);
});
return;
}
_ => {
// It's already in process of being removed
return;
}
}
};
// We've marked it Alive so that the Pending check in start_fetcher() will
// fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
0
} else {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
1 => {
known_service.runtimes.remove(runtime_id);
let mut status = known_service.status.lock().unwrap();
*status = match *status {
KnownTargetStatus::Pending => KnownTargetStatus::Alive, /* not really */
KnownTargetStatus::Alive => KnownTargetStatus::RemoveAt(
Instant::now() + Duration::from_secs(3666),
),
KnownTargetStatus::RemoveAt(_) | KnownTargetStatus::Removing(_) => {
unreachable!()
}
};
// We've marked it Alive so that the Pending check in start_fetcher()
// will fail
if matches!(*status, KnownTargetStatus::Alive) {
break 'drop_service;
}
0
}
_ => {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
*known_service.fetcher.runtime_id.lock().unwrap() =
id.to_string();
break 'changed_rt_id;
}
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
known_service.synthetic_id = true;
*known_service.fetcher.runtime_id.lock().unwrap() =
Self::generate_synthetic_id();
}
known_service.refcount - 1
}
known_service.refcount - 1
};
break 'service_handling;
}
Expand Down

0 comments on commit 7a481f8

Please sign in to comment.