diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index eb64aefb0f1fc..2a8e4fd9ab3cb 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -100,6 +100,7 @@ build_exceptions! { TooManyUserConnections(1041), AbortedSession(1042), AbortedQuery(1043), + ClosedQuery(1044), CannotListenerPort(1045), BadBytes(1046), InitPrometheusFailure(1047), diff --git a/src/query/service/src/interpreters/common/query_log.rs b/src/query/service/src/interpreters/common/query_log.rs index e799d622a86bc..4011f7d24a874 100644 --- a/src/query/service/src/interpreters/common/query_log.rs +++ b/src/query/service/src/interpreters/common/query_log.rs @@ -44,6 +44,13 @@ fn error_fields(log_type: LogType, err: Option) -> (LogType, i32, Str e.to_string(), e.backtrace_str(), ) + } else if e.code() == ErrorCode::ABORTED_QUERY { + ( + LogType::Closed, + e.code().into(), + e.to_string(), + e.backtrace_str(), + ) } else { ( LogType::Error, diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index ae28c08110fa9..e0edbb9af99af 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -232,28 +232,19 @@ async fn query_final_handler( query_id, query_id ); let http_query_manager = HttpQueryManager::instance(); - match http_query_manager - .remove_query(&query_id, RemoveReason::Finished) - .await - { - Ok(query) => { + match http_query_manager.remove_query(&query_id, RemoveReason::Finished) { + Some(query) => { let mut response = query.get_response_state_only().await; + if query.check_removed().is_none() && !response.state.state.is_stopped() { + query.kill(ErrorCode::ClosedQuery("closed by client")).await; + response = query.get_response_state_only().await; + } // it is safe to set these 2 fields to None, because client now check for null/None first. response.session = None; response.state.affect = None; - if response.state.state == ExecuteStateKind::Running { - return Err(PoemError::from_string( - format!("query {} is still running, can not final it", query_id), - StatusCode::BAD_REQUEST, - )); - } Ok(QueryResponse::from_internal(query_id, response, true)) } - Err(reason) => Err(query_id_not_found_or_removed( - &query_id, - &ctx.node_id, - reason, - )), + None => Err(query_id_not_found(&query_id, &ctx.node_id)), } } .in_span(root) @@ -275,20 +266,16 @@ async fn query_cancel_handler( query_id, query_id ); let http_query_manager = HttpQueryManager::instance(); - match http_query_manager.try_get_query(&query_id).await { - Ok(query) => { - query.kill("http query cancel by handler").await; - http_query_manager - .remove_query(&query_id, RemoveReason::Canceled) - .await - .ok(); + match http_query_manager.remove_query(&query_id, RemoveReason::Canceled) { + Some(query) => { + if query.check_removed().is_none() { + query + .kill(ErrorCode::AbortedQuery("canceled by client")) + .await; + } Ok(StatusCode::OK) } - Err(reason) => Err(query_id_not_found_or_removed( - &query_id, - &ctx.node_id, - reason, - )), + None => Err(query_id_not_found(&query_id, &ctx.node_id)), } } .in_span(root) @@ -304,16 +291,16 @@ async fn query_state_handler( async { let http_query_manager = HttpQueryManager::instance(); - match http_query_manager.try_get_query(&query_id).await { - Ok(query) => { - let response = query.get_response_state_only().await; - Ok(QueryResponse::from_internal(query_id, response, false)) + match http_query_manager.get_query(&query_id) { + Some(query) => { + if let Some(reason) = query.check_removed() { + Err(query_id_removed(&query_id, reason)) + } else { + let response = query.get_response_state_only().await; + Ok(QueryResponse::from_internal(query_id, response, false)) + } } - Err(reason) => Err(query_id_not_found_or_removed( - &query_id, - &ctx.node_id, - reason, - )), + None => Err(query_id_not_found(&query_id, &ctx.node_id)), } } .in_span(root) @@ -330,20 +317,20 @@ async fn query_page_handler( async { let http_query_manager = HttpQueryManager::instance(); - match http_query_manager.try_get_query(&query_id).await { - Ok(query) => { - query.update_expire_time(true).await; - let resp = query.get_response_page(page_no).await.map_err(|err| { - poem::Error::from_string(err.message(), StatusCode::NOT_FOUND) - })?; - query.update_expire_time(false).await; - Ok(QueryResponse::from_internal(query_id, resp, false)) + match http_query_manager.get_query(&query_id) { + Some(query) => { + if let Some(reason) = query.check_removed() { + Err(query_id_removed(&query_id, reason)) + } else { + query.update_expire_time(true).await; + let resp = query.get_response_page(page_no).await.map_err(|err| { + poem::Error::from_string(err.message(), StatusCode::NOT_FOUND) + })?; + query.update_expire_time(false).await; + Ok(QueryResponse::from_internal(query_id, resp, false)) + } } - Err(reason) => Err(query_id_not_found_or_removed( - &query_id, - &ctx.node_id, - reason, - )), + None => Err(query_id_not_found(&query_id, &ctx.node_id)), } } .in_span(root) @@ -372,19 +359,11 @@ pub(crate) async fn query_handler( Ok(query) => { query.update_expire_time(true).await; // tmp workaround to tolerant old clients - let max_wait_time = std::cmp::max(1, req.pagination.wait_time_secs); - let start = std::time::Instant::now(); - let resp = loop { - let resp = query - .get_response_page(0) - .await - .map_err(|err| err.display_with_sql(&sql)) - .map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?; - if matches!(resp.state.state, ExecuteStateKind::Starting) && start.elapsed().as_secs() < max_wait_time as u64 { - continue; - } - break resp - }; + let resp = query + .get_response_page(0) + .await + .map_err(|err| err.display_with_sql(&sql)) + .map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?; if matches!(resp.state.state, ExecuteStateKind::Failed) { ctx.set_fail(); } @@ -432,17 +411,15 @@ pub fn query_route() -> Route { route } -fn query_id_not_found_or_removed( - query_id: &str, - node_id: &str, - reason: Option, -) -> PoemError { - let error = match reason { - Some(reason) => reason.to_string(), - None => "not found".to_string(), - }; +fn query_id_removed(query_id: &str, remove_reason: RemoveReason) -> PoemError { + PoemError::from_string( + format!("query id {query_id} {}", remove_reason), + StatusCode::BAD_REQUEST, + ) +} +fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError { PoemError::from_string( - format!("query id {query_id} {error} on {node_id}"), + format!("query id {query_id} not found on {node_id}"), StatusCode::NOT_FOUND, ) } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 4a5e52f5cc63a..2b8b0f2b86e30 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -41,6 +41,7 @@ use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterQueryLog; use crate::servers::http::v1::http_query_handlers::QueryResponseField; +use crate::servers::http::v1::query::http_query::ResponseState; use crate::servers::http::v1::query::sized_spsc::SizedChannelSender; use crate::sessions::AcquireQueueGuard; use crate::sessions::QueriesQueueManager; @@ -58,6 +59,12 @@ pub enum ExecuteStateKind { Succeeded, } +impl ExecuteStateKind { + pub fn is_stopped(self) -> bool { + matches!(self, Self::Succeeded | Self::Failed) + } +} + impl std::fmt::Display for ExecuteStateKind { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{:?}", self) @@ -156,6 +163,18 @@ impl ExecutorSessionState { } impl Executor { + pub fn get_response_state(&self) -> ResponseState { + let (exe_state, err) = self.state.extract(); + ResponseState { + running_time_ms: self.get_query_duration_ms(), + progresses: self.get_progress(), + state: exe_state, + error: err, + warnings: self.get_warnings(), + affect: self.get_affect(), + schema: self.get_schema(), + } + } pub fn get_schema(&self) -> Vec { match &self.state { Starting(_) => Default::default(), @@ -220,7 +239,7 @@ impl Executor { } } #[async_backtrace::framed] - pub async fn stop(this: &Arc>, reason: Result<()>, kill: bool) { + pub async fn stop(this: &Arc>, reason: Result<()>) { { let guard = this.read().await; if let Stopped(s) = &guard.state { @@ -249,8 +268,10 @@ impl Executor { ) .unwrap_or_else(|e| error!("fail to write query_log {:?}", e)); } - if reason.is_err() { - s.ctx.get_current_session().txn_mgr().lock().set_fail(); + if let Err(e) = &reason { + if e.code() != ErrorCode::CLOSED_QUERY { + s.ctx.get_current_session().txn_mgr().lock().set_fail(); + } } guard.state = Stopped(Box::new(ExecuteStopped { stats: Default::default(), @@ -263,18 +284,11 @@ impl Executor { })) } Running(r) => { - // release session - if kill { - if let Err(error) = &reason { - r.session.force_kill_query(error.clone()); - } else { - r.session.force_kill_query(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed", - )); + if let Err(e) = &reason { + if e.code() != ErrorCode::CLOSED_QUERY { + r.session.txn_mgr().lock().set_fail(); } - } - if reason.is_err() { - r.session.txn_mgr().lock().set_fail(); + r.session.force_kill_query(e.clone()); } guard.state = Stopped(Box::new(ExecuteStopped { @@ -353,11 +367,11 @@ impl ExecuteState { ); match CatchUnwindFuture::create(res).await { Ok(Err(err)) => { - Executor::stop(&executor_clone, Err(err.clone()), false).await; + Executor::stop(&executor_clone, Err(err.clone())).await; block_sender_closer.close(); } Err(e) => { - Executor::stop(&executor_clone, Err(e), false).await; + Executor::stop(&executor_clone, Err(e)).await; block_sender_closer.close(); } _ => {} @@ -389,7 +403,7 @@ async fn execute( None => { let block = DataBlock::empty_with_schema(schema); block_sender.send(block, 0).await; - Executor::stop(&executor, Ok(()), false).await; + Executor::stop(&executor, Ok(())).await; block_sender.close(); } Some(Err(err)) => { @@ -399,7 +413,7 @@ async fn execute( databend_common_expression::Value::Scalar(Scalar::String(err.to_string())), ); block_sender.send(DataBlock::new(vec![data], 1), 1).await; - Executor::stop(&executor, Err(err), false).await; + Executor::stop(&executor, Err(err)).await; block_sender.close(); } Some(Ok(block)) => { @@ -424,7 +438,7 @@ async fn execute( } }; } - Executor::stop(&executor, Ok(()), false).await; + Executor::stop(&executor, Ok(())).await; block_sender.close(); } } diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index b17b916f6bdaa..95df239728bdd 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -235,10 +235,11 @@ pub struct HttpQueryResponseInternal { pub node_id: String, } +#[derive(Debug, Clone, Copy)] pub enum ExpireState { Working, ExpireAt(Instant), - Removed, + Removed(RemoveReason), } pub enum ExpireResult { @@ -254,7 +255,7 @@ pub struct HttpQuery { request: HttpQueryRequest, state: Arc>, page_manager: Arc>, - expire_state: Arc>, + expire_state: Arc>, /// The timeout for the query result polling. In the normal case, the client driver /// should fetch the paginated result in a timely manner, and the interval should not /// exceed this result_timeout_secs. @@ -330,15 +331,13 @@ impl HttpQuery { })?; let mut n = 1; while let ExpiringState::InUse(query_id) = session.expire_state() { - if let Some(last_query) = &http_query_manager.get_query(&query_id).await { + if let Some(last_query) = &http_query_manager.get_query(&query_id) { if last_query.get_state().await.state == ExecuteStateKind::Running { return Err(ErrorCode::BadArguments( "last query on the session not finished", )); } - let _ = http_query_manager - .remove_query(&query_id, RemoveReason::Canceled) - .await; + let _ = http_query_manager.remove_query(&query_id, RemoveReason::Canceled); } // wait for Arc to drop and detach itself from session // should not take too long @@ -517,7 +516,7 @@ impl HttpQuery { state, page_manager: data, result_timeout_secs, - expire_state: Arc::new(TokioMutex::new(ExpireState::Working)), + expire_state: Arc::new(parking_lot::Mutex::new(ExpireState::Working)), is_txn_mgr_saved: AtomicBool::new(false), }; @@ -557,16 +556,7 @@ impl HttpQuery { #[async_backtrace::framed] async fn get_state(&self) -> ResponseState { let state = self.state.read().await; - let (exe_state, err) = state.state.extract(); - ResponseState { - running_time_ms: state.get_query_duration_ms(), - progresses: state.get_progress(), - state: exe_state, - error: err, - warnings: state.get_warnings(), - affect: state.get_affect(), - schema: state.get_schema(), - } + state.get_response_state() } #[async_backtrace::framed] @@ -638,18 +628,16 @@ impl HttpQuery { } #[async_backtrace::framed] - pub async fn kill(&self, reason: &str) { + pub async fn kill(&self, reason: ErrorCode) { // the query will be removed from the query manager before the session is dropped. self.detach().await; - Executor::stop(&self.state, Err(ErrorCode::AbortedQuery(reason)), true).await; + Executor::stop(&self.state, Err(reason)).await; } #[async_backtrace::framed] async fn detach(&self) { - info!("{}: http query detached", &self.id); - - let data = self.page_manager.lock().await; + let mut data = self.page_manager.lock().await; data.detach().await } @@ -662,20 +650,33 @@ impl HttpQuery { Duration::new(0, 0) }; let deadline = Instant::now() + duration; - let mut t = self.expire_state.lock().await; + let mut t = self.expire_state.lock(); *t = ExpireState::ExpireAt(deadline); } - #[async_backtrace::framed] - pub async fn mark_removed(&self) { - let mut t = self.expire_state.lock().await; - *t = ExpireState::Removed; + pub fn mark_removed(&self, remove_reason: RemoveReason) -> bool { + let mut t = self.expire_state.lock(); + if !matches!(*t, ExpireState::Removed(_)) { + *t = ExpireState::Removed(remove_reason); + true + } else { + false + } + } + + pub fn check_removed(&self) -> Option { + let t = self.expire_state.lock(); + if let ExpireState::Removed(r) = *t { + Some(r) + } else { + None + } } // return Duration to sleep #[async_backtrace::framed] pub async fn check_expire(&self) -> ExpireResult { - let expire_state = self.expire_state.lock().await; + let expire_state = self.expire_state.lock(); match *expire_state { ExpireState::ExpireAt(expire_at) => { let now = Instant::now(); @@ -685,7 +686,7 @@ impl HttpQuery { ExpireResult::Sleep(expire_at - now) } } - ExpireState::Removed => ExpireResult::Removed, + ExpireState::Removed(_) => ExpireResult::Removed, ExpireState::Working => { ExpireResult::Sleep(Duration::from_secs(self.result_timeout_secs)) } diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index bc0bf355f7dc5..ed9e696ef4f1a 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -12,22 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Borrow; use std::collections::HashMap; use std::collections::VecDeque; use std::fmt::Display; use std::fmt::Formatter; -use std::hash::Hash; use std::sync::Arc; use std::time::Duration; use chrono::SecondsFormat; -use databend_common_base::base::tokio::sync::RwLock; +use dashmap::DashMap; use databend_common_base::base::tokio::time::sleep; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_config::InnerConfig; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_storages_common_txn::TxnManagerRef; use log::warn; @@ -42,7 +41,7 @@ use crate::servers::http::v1::query::http_query::ServerInfo; use crate::servers::http::v1::query::HttpQueryRequest; use crate::sessions::Session; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Copy)] pub(crate) enum RemoveReason { Timeout, Canceled, @@ -55,47 +54,34 @@ impl Display for RemoveReason { } } -pub(crate) struct SizeLimitedIndexMap { - insert_order: VecDeque, - reasons: HashMap, - cap: usize, +pub struct LimitedQueue { + deque: VecDeque, + max_size: usize, } -impl SizeLimitedIndexMap { - fn new(cap: usize) -> Self { - Self { - insert_order: VecDeque::new(), - reasons: HashMap::new(), - cap, +impl LimitedQueue { + fn new(max_size: usize) -> Self { + LimitedQueue { + deque: VecDeque::new(), + max_size, } } - fn insert(&mut self, key: K, value: V) { - if self.get(&key).is_some() { - return; - } - if self.insert_order.len() + 1 >= self.cap { - let key = self.insert_order.pop_front().unwrap(); - self.reasons.remove(&key); + fn push(&mut self, item: T) -> Option { + self.deque.push_back(item); + if self.deque.len() > self.max_size { + self.deque.pop_front() + } else { + None } - self.insert_order.push_back(key.clone()); - self.reasons.insert(key, value); - } - - fn get(&self, key: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq, - { - self.reasons.get(key) } } pub struct HttpQueryManager { pub(crate) server_info: ServerInfo, #[allow(clippy::type_complexity)] - pub(crate) queries: Arc>>>, - pub(crate) removed_queries: Arc>>, + pub(crate) queries: Arc>>, + pub(crate) removed_queries: Arc>>, #[allow(clippy::type_complexity)] pub(crate) txn_managers: Arc)>>>, pub(crate) sessions: Mutex>>, @@ -109,9 +95,9 @@ impl HttpQueryManager { id: cfg.query.node_id.clone(), start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false), }, - queries: Arc::new(RwLock::new(HashMap::new())), + queries: Arc::new(DashMap::new()), sessions: Mutex::new(ExpiringMap::default()), - removed_queries: Arc::new(RwLock::new(SizeLimitedIndexMap::new(1000))), + removed_queries: Arc::new(parking_lot::Mutex::new(LimitedQueue::new(1000))), txn_managers: Arc::new(Mutex::new(HashMap::new())), })); @@ -133,37 +119,13 @@ impl HttpQueryManager { Ok(query) } - #[async_backtrace::framed] - pub(crate) async fn get_query(self: &Arc, query_id: &str) -> Option> { - let queries = self.queries.read().await; - queries.get(query_id).map(|q| q.to_owned()) - } - - #[async_backtrace::framed] - pub(crate) async fn try_get_query( - self: &Arc, - query_id: &str, - ) -> std::result::Result, Option> { - if let Some(q) = self.get_query(query_id).await { - Ok(q) - } else { - Err(self.try_get_query_tombstone(query_id).await) - } - } - - #[async_backtrace::framed] - pub(crate) async fn try_get_query_tombstone( - self: &Arc, - query_id: &str, - ) -> Option { - let queries = self.removed_queries.read().await; - queries.get(query_id).cloned() + pub(crate) fn get_query(self: &Arc, query_id: &str) -> Option> { + self.queries.get(query_id).map(|q| q.to_owned()) } #[async_backtrace::framed] async fn add_query(self: &Arc, query_id: &str, query: Arc) { - let mut queries = self.queries.write().await; - queries.insert(query_id.to_string(), query.clone()); + self.queries.insert(query_id.to_string(), query.clone()); let self_clone = self.clone(); let query_id_clone = query_id.to_string(); @@ -188,18 +150,17 @@ impl HttpQueryManager { "http query {} timeout after {} s", &query_id_clone, query_result_timeout_secs ); - match self_clone - .remove_query(&query_id_clone, RemoveReason::Timeout) - .await - { - Ok(_) => { + match self_clone.remove_query(&query_id_clone, RemoveReason::Timeout) { + Some(_) => { warn!("{msg}"); if let Some(query) = http_query_weak.upgrade() { - query.kill(&msg).await; + if query.check_removed().is_none() { + query.kill(ErrorCode::AbortedQuery(&msg)).await; + } } } - Err(_) => { - warn!("{msg}, but already removed"); + None => { + warn!("{msg}, but already evict, too many queries?"); } }; break; @@ -215,28 +176,27 @@ impl HttpQueryManager { }); } - // not remove it until timeout or cancelled by user, even if query execution is aborted #[async_backtrace::framed] - pub(crate) async fn remove_query( + pub(crate) fn remove_query( self: &Arc, query_id: &str, reason: RemoveReason, - ) -> std::result::Result, Option> { - { - let mut removed = self.removed_queries.write().await; - if let Some(r) = removed.get(query_id) { - return Err(Some(r.clone())); + ) -> Option> { + // deref at once to avoid holding DashMap shard guard for too long. + let query = self.queries.get(query_id).map(|q| q.clone()); + query.map(|q| { + let not_removed_yet = !q.mark_removed(reason); + let to_evict = not_removed_yet + .then(|| { + let mut queue = self.removed_queries.lock(); + queue.push(q.id.to_string()) + }) + .flatten(); + if let Some(qid) = to_evict { + self.queries.remove(&qid); } - removed.insert(query_id.to_string(), reason); - } - let mut queries = self.queries.write().await; - let q = queries.remove(query_id); - if let Some(q) = &q { - q.mark_removed().await; - Ok(q.clone()) - } else { - Err(None) - } + q.clone() + }) } #[async_backtrace::framed] diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 9f39d79f8ed87..5762016fe3549 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -204,7 +204,9 @@ impl PageManager { } #[async_backtrace::framed] - pub async fn detach(&self) { + pub async fn detach(&mut self) { self.block_receiver.close(); + self.last_page = None; + self.row_buffer.clear() } } diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index 2567a69b112c9..a28235f2debeb 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -270,7 +270,7 @@ async fn test_simple_sql() -> Result<()> { let sql = "select * from system.tables limit 10"; let ep = create_endpoint().await?; let (status, result) = - post_sql_to_endpoint_new_session(&ep, sql, 1, HeaderMap::default()).await?; + post_sql_to_endpoint_new_session(&ep, sql, 5, HeaderMap::default()).await?; assert_eq!(status, StatusCode::OK, "{:?}", result); assert!(result.error.is_none(), "{:?}", result.error); @@ -320,7 +320,7 @@ async fn test_simple_sql() -> Result<()> { assert!(result.next_uri.is_none(), "{:?}", result); let response = get_uri(&ep, &page_0_uri).await; - assert_eq!(response.status(), StatusCode::NOT_FOUND, "{:?}", result); + assert_eq!(response.status(), StatusCode::BAD_REQUEST, "{:?}", result); let sql = "show databases"; let (status, result) = post_sql(sql, 1).await?; @@ -639,26 +639,36 @@ async fn test_http_session() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] async fn test_result_timeout() -> Result<()> { let config = ConfigBuilder::create().build(); let _fixture = TestFixture::setup_with_config(&config).await?; - let json = serde_json::json!({ "sql": "SELECT 1", "pagination": {"wait_time_secs": 1}, "session": { "settings": {"http_handler_result_timeout_secs": "1"}}}); + let json = serde_json::json!({ "sql": "SELECT 1", "pagination": {"wait_time_secs": 5}, "session": { "settings": {"http_handler_result_timeout_secs": "1"}}}); let mut req = TestHttpQueryRequest::new(json); let (status, result, _) = req.fetch_begin().await?; assert_eq!(status, StatusCode::OK, "{:?}", result); let query_id = result.id.clone(); - assert!(!query_id.is_empty()); - - sleep(std::time::Duration::from_secs(2)).await; - let (status, result, body) = req.fetch_next().await?; - assert_eq!(status, StatusCode::NOT_FOUND, "{:?}", result); - let msg = format!("query id {} timeout on {}", query_id, config.query.node_id); - let msg = json!({ "error": { "code": "404", "message": msg }}).to_string(); + assert_eq!(result.data.len(), 1); + + sleep(std::time::Duration::from_secs(5)).await; + + // fail to get page 0 again (e.g. retry) due to timeout + // this is flaky + let (status, result, body) = req + .do_request(Method::GET, &format!("/v1/query/{query_id}/page/0",)) + .await?; + assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", body); + let msg = format!("query id {} timeout", query_id); + let msg = json!({ "error": { "code": "400", "message": msg }}).to_string(); assert_eq!(body, msg, "{:?}", result); + // but /final return ok + let (status, result, _) = req.fetch_next().await?; + assert_eq!(status, StatusCode::OK, "{:?}", result); + Ok(()) } diff --git a/src/query/storages/system/src/query_log_table.rs b/src/query/storages/system/src/query_log_table.rs index 8d7f6f886cd0f..ab722fa799cab 100644 --- a/src/query/storages/system/src/query_log_table.rs +++ b/src/query/storages/system/src/query_log_table.rs @@ -36,7 +36,15 @@ pub enum LogType { Start = 1, Finish = 2, Error = 3, + + /// canceled by another thread: + /// 1. `kill ` statement + /// 2. client_driver/ctx.cancel() -> /kill Aborted = 4, + + /// close early because client does not need more data: + /// 1. explicit or implicit result_set.close() -> /final + Closed = 5, } impl LogType { @@ -46,6 +54,7 @@ impl LogType { LogType::Finish => "Finish".to_string(), LogType::Error => "Error".to_string(), LogType::Aborted => "Aborted".to_string(), + LogType::Closed => "Closed".to_string(), } } }