diff --git a/src/opt.rs b/src/opt.rs index b0fdffa4..6b1059a0 100644 --- a/src/opt.rs +++ b/src/opt.rs @@ -1,4 +1,4 @@ -use crate::{subcommand, data::RequestPayload}; +use crate::{data::RequestPayload, subcommand}; use derive_more::{Display, Error, From, IsVariant}; use lazy_static::lazy_static; use std::{ @@ -8,7 +8,7 @@ use std::{ str::FromStr, }; use structopt::StructOpt; -use strum::{EnumString, EnumVariantNames, VariantNames, IntoStaticStr}; +use strum::{EnumString, EnumVariantNames, IntoStaticStr, VariantNames}; lazy_static! { static ref USERNAME: String = whoami::username(); @@ -87,17 +87,18 @@ pub enum SessionSubcommand { Exists, /// Prints out information about the available sessions - Info { + Info { /// Represents the format that results should be returned /// /// Currently, there are two possible formats: + /// /// 1. "json": printing out JSON for external program usage - /// 3. "shell": printing out human-readable results for interactive shell usage + /// 2. "shell": printing out human-readable results for interactive shell usage #[structopt( - short, - long, + short, + long, case_insensitive = true, - default_value = "shell", + default_value = Mode::Shell.into(), possible_values = Mode::VARIANTS )] mode: Mode, @@ -105,7 +106,18 @@ pub enum SessionSubcommand { } /// Represents the communication medium used for the send command -#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IsVariant, EnumString, EnumVariantNames)] +#[derive( + Copy, + Clone, + Debug, + Display, + PartialEq, + Eq, + IsVariant, + IntoStaticStr, + EnumString, + EnumVariantNames, +)] #[strum(serialize_all = "snake_case")] pub enum Mode { /// Sends and receives data in JSON format @@ -123,20 +135,25 @@ pub struct ActionSubcommand { /// Represents the format that results should be returned /// /// Currently, there are two possible formats: + /// /// 1. "json": printing out JSON for external program usage - /// 3. "shell": printing out human-readable results for interactive shell usage + /// 2. "shell": printing out human-readable results for interactive shell usage #[structopt( - short, - long, + short, + long, case_insensitive = true, - default_value = "shell", + default_value = Mode::Shell.into(), possible_values = Mode::VARIANTS )] pub mode: Mode, /// Represents the medium for retrieving a session for use in performing the action - #[structopt(long, default_value = "file", possible_values = SessionSharing::VARIANTS)] - pub session: SessionSharing, + #[structopt( + long, + default_value = SessionInput::File.into(), + possible_values = SessionInput::VARIANTS + )] + pub session: SessionInput, /// If specified, commands to send are sent over stdin and responses are received /// over stdout (and stderr if mode is shell) @@ -149,7 +166,7 @@ pub struct ActionSubcommand { } /// Represents options for binding a server to an IP address -#[derive(Copy, Clone, Debug, Display, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IsVariant)] pub enum BindAddress { #[display(fmt = "ssh")] Ssh, @@ -202,9 +219,48 @@ impl FromStr for BindAddress { } /// Represents the means by which to share the session from launching on a remote machine -#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, IntoStaticStr, IsVariant, EnumString, EnumVariantNames)] +#[derive( + Copy, + Clone, + Debug, + Display, + PartialEq, + Eq, + IntoStaticStr, + IsVariant, + EnumString, + EnumVariantNames, +)] #[strum(serialize_all = "snake_case")] -pub enum SessionSharing { +pub enum SessionOutput { + /// Session is in a file in the form of `DISTANT DATA ` + File, + + /// Special scenario where the session is not shared but is instead kept within the + /// launch program, causing the program itself to listen on stdin for input rather + /// than terminating + Keep, + + /// Session is stored and retrieved over anonymous pipes (stdout/stdin) + /// in form of `DISTANT DATA ` + Pipe, +} + +/// Represents the means by which to consume a session when performing an action +#[derive( + Copy, + Clone, + Debug, + Display, + PartialEq, + Eq, + IntoStaticStr, + IsVariant, + EnumString, + EnumVariantNames, +)] +#[strum(serialize_all = "snake_case")] +pub enum SessionInput { /// Session is in a environment variables /// /// * `DISTANT_HOST=` @@ -215,28 +271,33 @@ pub enum SessionSharing { /// Session is in a file in the form of `DISTANT DATA ` File, - /// Session is stored and retrieved over anonymous pipes (stdout/stdin) + /// Session is stored and retrieved over anonymous pipes (stdout/stdin) /// in form of `DISTANT DATA ` Pipe, } -impl SessionSharing { - /// Represents session configurations that can be used for output - pub fn output_variants() -> Vec<&'static str> { - vec![Self::File.into(), Self::Pipe.into()] - } -} - /// Represents subcommand to launch a remote server #[derive(Debug, StructOpt)] pub struct LaunchSubcommand { /// Represents the medium for sharing the session upon launching on a remote machine #[structopt( - long, - default_value = SessionSharing::File.into(), - possible_values = &SessionSharing::output_variants() + long, + default_value = SessionOutput::File.into(), + possible_values = SessionOutput::VARIANTS )] - pub session: SessionSharing, + pub session: SessionOutput, + + /// Represents the format that results should be returned when session is "keep", + /// causing the launcher to enter an interactive loop to handle input and output + /// itself rather than enabling other clients to connect + #[structopt( + short, + long, + case_insensitive = true, + default_value = Mode::Shell.into(), + possible_values = Mode::VARIANTS + )] + pub mode: Mode, /// Path to remote program to execute via ssh #[structopt(short, long, default_value = "distant")] @@ -284,8 +345,8 @@ pub struct LaunchSubcommand { /// Represents some range of ports #[derive(Clone, Debug, Display, PartialEq, Eq)] #[display( - fmt = "{}{}", - start, + fmt = "{}{}", + start, "end.as_ref().map(|end| format!(\"[:{}]\", end)).unwrap_or_default()" )] pub struct PortRange { @@ -383,11 +444,6 @@ pub struct ListenSubcommand { /// With -p 0, the server will let the operating system pick an available TCP port. /// /// Please note that this option does not affect the server-side port used by SSH - #[structopt( - short, - long, - value_name = "PORT[:PORT2]", - default_value = "8080:8099" - )] + #[structopt(short, long, value_name = "PORT[:PORT2]", default_value = "8080:8099")] pub port: PortRange, } diff --git a/src/subcommand/action.rs b/src/subcommand/action/inner.rs similarity index 76% rename from src/subcommand/action.rs rename to src/subcommand/action/inner.rs index 458e2005..6d95ef8b 100644 --- a/src/subcommand/action.rs +++ b/src/subcommand/action/inner.rs @@ -1,10 +1,9 @@ use crate::{ data::{Request, RequestPayload, Response, ResponsePayload}, - net::{Client, TransportError}, - opt::{ActionSubcommand, CommonOpt, Mode, SessionSharing}, - session::{Session, SessionFile}, + net::Client, + opt::Mode, }; -use derive_more::{Display, Error, From}; +use derive_more::IsVariant; use log::*; use structopt::StructOpt; use tokio::{ @@ -16,98 +15,26 @@ use tokio::{ }; use tokio_stream::StreamExt; -#[derive(Debug, Display, Error, From)] -pub enum Error { - IoError(io::Error), - TransportError(TransportError), - - #[display(fmt = "Non-interactive but no operation supplied")] - MissingOperation, -} - -pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> { - let rt = tokio::runtime::Runtime::new()?; - - rt.block_on(async { run_async(cmd, opt).await }) -} - -async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> { - let session = match cmd.session { - SessionSharing::Environment => Session::from_environment()?, - SessionSharing::File => SessionFile::load().await?.into(), - SessionSharing::Pipe => Session::from_stdin()?, - }; - - let mut client = Client::connect(session).await?; - - if !cmd.interactive && cmd.operation.is_none() { - return Err(Error::MissingOperation); - } - - // Special conditions for continuing to process responses - let mut is_proc_req = false; - let mut proc_id = 0; - - if let Some(req) = cmd.operation.map(Request::from) { - is_proc_req = req.payload.is_proc_run(); - - trace!("Client sending request: {:?}", req); - let res = client.send(req).await?; - - // Store the spawned process id for using in sending stdin (if we spawned a proc) - proc_id = match &res.payload { - ResponsePayload::ProcStart { id } => *id, - _ => 0, - }; - - format_response(cmd.mode, res)?.print(); - } - - // If we are executing a process, we want to continue interacting via stdin and receiving - // results via stdout/stderr - // - // If we are interactive, we want to continue looping regardless - if is_proc_req || cmd.interactive { - interactive_loop(client, proc_id, cmd.mode, cmd.interactive).await?; - } - - Ok(()) +#[derive(Copy, Clone, PartialEq, Eq, IsVariant)] +pub enum LoopConfig { + Json, + Proc { id: usize }, + Shell, } -fn spawn_stdin_reader() -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(1); - - // NOTE: Using blocking I/O per tokio's advice to read from stdin line-by-line and then - // pass the results to a separate async handler to forward to the remote process - std::thread::spawn(move || { - let stdin = std::io::stdin(); - - loop { - let mut line = String::new(); - match stdin.read_line(&mut line) { - Ok(0) | Err(_) => break, - Ok(_) => { - if let Err(x) = tx.blocking_send(line) { - error!( - "Failed to pass along stdin to be sent to remote process: {}", - x - ); - } - std::thread::yield_now(); - } - } +impl From for Mode { + fn from(config: LoopConfig) -> Self { + match config { + LoopConfig::Json => Self::Json, + LoopConfig::Proc { .. } | LoopConfig::Shell => Self::Shell, } - }); - - rx + } } -async fn interactive_loop( - mut client: Client, - id: usize, - mode: Mode, - interactive: bool, -) -> Result<(), Error> { +/// Starts a new action loop that processes requests and receives responses +/// +/// id represents the id of a remote process +pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Result<()> { let mut stream = client.to_response_stream(); // Create a channel that can report when we should stop the loop based on a received request @@ -117,7 +44,7 @@ async fn interactive_loop( let mut rx = spawn_stdin_reader(); tokio::spawn(async move { while let Some(line) = rx.recv().await { - match mode { + match config { // Special exit condition for interactive mode _ if line.trim() == "exit" => { if let Err(_) = tx_stop.send(()) { @@ -127,13 +54,13 @@ async fn interactive_loop( } // For json mode, all stdin is treated as individual requests - Mode::Json => { + LoopConfig::Json => { trace!("Client sending request: {:?}", line); let result = serde_json::from_str(&line) .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)); match result { Ok(req) => match client.send(req).await { - Ok(res) => match format_response(mode, res) { + Ok(res) => match format_response(Mode::Json, res) { Ok(out) => out.print(), Err(x) => error!("Failed to format response: {}", x), }, @@ -148,7 +75,7 @@ async fn interactive_loop( } // For interactive shell mode, parse stdin as individual commands - Mode::Shell if interactive => { + LoopConfig::Shell => { if line.trim().is_empty() { continue; } @@ -163,7 +90,7 @@ async fn interactive_loop( ); match payload_result { Ok(payload) => match client.send(Request::from(payload)).await { - Ok(res) => match format_response(mode, res) { + Ok(res) => match format_response(Mode::Shell, res) { Ok(out) => out.print(), Err(x) => error!("Failed to format response: {}", x), }, @@ -178,7 +105,7 @@ async fn interactive_loop( } // For non-interactive shell mode, all stdin is treated as a proc's stdin - Mode::Shell => { + LoopConfig::Proc { id } => { trace!("Client sending stdin: {:?}", line); let req = Request::from(RequestPayload::ProcStdin { id, @@ -202,9 +129,9 @@ async fn interactive_loop( "Response stream no longer available", ) })?; - let done = res.payload.is_proc_done() && !interactive; + let done = res.payload.is_proc_done() && config.is_proc(); - format_response(mode, res)?.print(); + format_response(config.into(), res)?.print(); // If we aren't interactive but are just running a proc and // we've received the end of the proc, we should exit @@ -221,8 +148,36 @@ async fn interactive_loop( Ok(()) } +fn spawn_stdin_reader() -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(1); + + // NOTE: Using blocking I/O per tokio's advice to read from stdin line-by-line and then + // pass the results to a separate async handler to forward to the remote process + std::thread::spawn(move || { + let stdin = std::io::stdin(); + + loop { + let mut line = String::new(); + match stdin.read_line(&mut line) { + Ok(0) | Err(_) => break, + Ok(_) => { + if let Err(x) = tx.blocking_send(line) { + error!( + "Failed to pass along stdin to be sent to remote process: {}", + x + ); + } + std::thread::yield_now(); + } + } + } + }); + + rx +} + /// Represents the output content and destination -enum ResponseOut { +pub enum ResponseOut { Stdout(String), Stderr(String), None, @@ -238,7 +193,7 @@ impl ResponseOut { } } -fn format_response(mode: Mode, res: Response) -> io::Result { +pub fn format_response(mode: Mode, res: Response) -> io::Result { Ok(match mode { Mode::Json => ResponseOut::Stdout(format!( "{}\n", diff --git a/src/subcommand/action/mod.rs b/src/subcommand/action/mod.rs new file mode 100644 index 00000000..ff78ce36 --- /dev/null +++ b/src/subcommand/action/mod.rs @@ -0,0 +1,74 @@ +use crate::{ + data::{Request, ResponsePayload}, + net::{Client, TransportError}, + opt::{ActionSubcommand, CommonOpt, Mode, SessionInput}, + session::{Session, SessionFile}, +}; +use derive_more::{Display, Error, From}; +use log::*; +use tokio::io; + +pub(crate) mod inner; + +#[derive(Debug, Display, Error, From)] +pub enum Error { + IoError(io::Error), + TransportError(TransportError), + + #[display(fmt = "Non-interactive but no operation supplied")] + MissingOperation, +} + +pub fn run(cmd: ActionSubcommand, opt: CommonOpt) -> Result<(), Error> { + let rt = tokio::runtime::Runtime::new()?; + + rt.block_on(async { run_async(cmd, opt).await }) +} + +async fn run_async(cmd: ActionSubcommand, _opt: CommonOpt) -> Result<(), Error> { + let session = match cmd.session { + SessionInput::Environment => Session::from_environment()?, + SessionInput::File => SessionFile::load().await?.into(), + SessionInput::Pipe => Session::from_stdin()?, + }; + + let mut client = Client::connect(session).await?; + + if !cmd.interactive && cmd.operation.is_none() { + return Err(Error::MissingOperation); + } + + // Special conditions for continuing to process responses + let mut is_proc_req = false; + let mut proc_id = 0; + + if let Some(req) = cmd.operation.map(Request::from) { + is_proc_req = req.payload.is_proc_run(); + + trace!("Client sending request: {:?}", req); + let res = client.send(req).await?; + + // Store the spawned process id for using in sending stdin (if we spawned a proc) + proc_id = match &res.payload { + ResponsePayload::ProcStart { id } => *id, + _ => 0, + }; + + inner::format_response(cmd.mode, res)?.print(); + } + + // If we are executing a process, we want to continue interacting via stdin and receiving + // results via stdout/stderr + // + // If we are interactive, we want to continue looping regardless + if is_proc_req || cmd.interactive { + let config = match cmd.mode { + Mode::Json => inner::LoopConfig::Json, + Mode::Shell if cmd.interactive => inner::LoopConfig::Shell, + Mode::Shell => inner::LoopConfig::Proc { id: proc_id }, + }; + inner::interactive_loop(client, config).await?; + } + + Ok(()) +} diff --git a/src/subcommand/launch.rs b/src/subcommand/launch.rs index 9c85ac67..7c4c12c1 100644 --- a/src/subcommand/launch.rs +++ b/src/subcommand/launch.rs @@ -1,5 +1,6 @@ use crate::{ - opt::{CommonOpt, LaunchSubcommand, SessionSharing}, + net::Client, + opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput}, session::{Session, SessionFile}, }; use derive_more::{Display, Error, From}; @@ -66,12 +67,18 @@ async fn run_async(cmd: LaunchSubcommand, _opt: CommonOpt) -> Result<(), Error> session.host = cmd.host; // Handle sharing resulting session in different ways - // NOTE: Environment is unreachable here as we disallow it from the defined options since - // there is no way to set the shell's environment variables, only this running process match cmd.session { - SessionSharing::Environment => unreachable!(), - SessionSharing::File => SessionFile::from(session).save().await?, - SessionSharing::Pipe => println!("{}", session.to_unprotected_string()), + SessionOutput::File => SessionFile::from(session).save().await?, + SessionOutput::Keep => { + use crate::subcommand::action::inner; + let client = Client::connect(session).await?; + let config = match cmd.mode { + Mode::Json => inner::LoopConfig::Json, + Mode::Shell => inner::LoopConfig::Shell, + }; + inner::interactive_loop(client, config).await?; + } + SessionOutput::Pipe => println!("{}", session.to_unprotected_string()), } Ok(())