Skip to content

Commit

Permalink
fix: portforward connections
Browse files Browse the repository at this point in the history
In some situations it was observed that connections lingered in
a CLOSE_WAIT state or even ESTABLISHED state.
In others, an existing connection would be reused and would fail at the time of use.

When the reader half is closed, signal it to the forwarder_loop task.
When a websocket close message is received, signal it to the forwarder_loop task
so it may shutdown all write half's.
Close the non-taken streams when joining so we may terminate.

Signed-off-by: Tiago Castro <[email protected]>
  • Loading branch information
tiagolobocastro committed Aug 4, 2022
1 parent 79f1d7a commit 70c8b8c
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion kube-client/src/api/portforward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub enum Error {

#[error("failed to complete the background task: {0}")]
Spawn(#[source] tokio::task::JoinError),

/// Failed to shutdown a pod writer channel.
#[error("failed to shutdown write to Pod channel: {0}")]
Shutdown(#[source] std::io::Error),
}

type ErrorReceiver = oneshot::Receiver<String>;
Expand All @@ -69,6 +73,8 @@ type ErrorSender = oneshot::Sender<String>;
enum Message {
FromPod(u8, Bytes),
ToPod(u8, Bytes),
FromPodClose,
ToPodClose(u8),
}

/// Manages port-forwarded streams.
Expand Down Expand Up @@ -138,7 +144,8 @@ impl Portforwarder {
}

/// Waits for port forwarding task to complete.
pub async fn join(self) -> Result<(), Error> {
pub async fn join(mut self) -> Result<(), Error> {
self.ports.clear();
self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e)))
}
}
Expand Down Expand Up @@ -192,6 +199,10 @@ async fn to_pod_loop(
.map_err(Error::ForwardToPod)?;
}
}
sender
.send(Message::ToPodClose(ch))
.await
.map_err(Error::ForwardToPod)?;
Ok(())
}

Expand All @@ -217,6 +228,10 @@ where
.await
.map_err(Error::ForwardFromPod)?;
}
message if message.is_close() => {
// the receiver may be closed already, so ignore the error
sender.send(Message::FromPodClose).await.ok();
}
// REVIEW should we error on unexpected websocket message?
_ => {}
}
Expand All @@ -240,6 +255,7 @@ where
{
// Keep track if the channel has received the initialization frame.
let mut initialized = vec![false; 2 * ports.len()];
let mut closed_ports = 0;
while let Some(msg) = receiver.next().await {
match msg {
Message::FromPod(ch, mut bytes) => {
Expand Down Expand Up @@ -293,6 +309,31 @@ where
.await
.map_err(Error::SendWebSocketMessage)?;
}
Message::ToPodClose(ch) => {
println!("close: {}", ch);
let ch = ch as usize;
if ch >= initialized.len() {
return Err(Error::InvalidChannel(ch));
}

let port_index = ch / 2;
writers[port_index].shutdown().await.map_err(Error::Shutdown)?;

closed_ports += 1;
}
Message::FromPodClose => {
for writer in &mut writers {
writer.shutdown().await.map_err(Error::Shutdown)?;
}
}
}

if closed_ports == ports.len() {
ws_sink
.send(ws::Message::Close(None))
.await
.map_err(Error::SendWebSocketMessage)?;
return Ok(());
}
}
Ok(())
Expand Down

0 comments on commit 70c8b8c

Please sign in to comment.