diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index 61cf0f4696601..b4401e11f41f4 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -144,7 +144,7 @@ impl QueryResponse { (JsonBlock::empty(), None) } else { match state.state { - ExecuteStateKind::Running => match r.data { + ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data { None => (JsonBlock::empty(), Some(make_state_uri(&id))), Some(d) => { let uri = match d.next_page_no { @@ -179,9 +179,14 @@ impl QueryResponse { }; let rows = data.data.len(); + let state_kind = match state.state { + ExecuteStateKind::Starting => ExecuteStateKind::Running, + _ => state.state, + }; + Json(QueryResponse { data: data.into(), - state: state.state, + state: state_kind, schema: state.schema.clone(), session_id: Some(session_id), node_id: r.node_id, @@ -368,22 +373,30 @@ pub(crate) async fn query_handler( match query { Ok(query) => { query.update_expire_time(true).await; - let resp = query - .get_response_page(0) - .await - .map_err(|err| err.display_with_sql(&sql)) - .map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?; - if matches!(resp.state.state,ExecuteStateKind::Failed) { + // tmp workaround to tolerant old clients + let max_wait_time = std::cmp::max(10, req.pagination.wait_time_secs); + let start = std::time::Instant::now(); + let resp = loop { + let resp = query + .get_response_page(0) + .await + .map_err(|err| err.display_with_sql(&sql)) + .map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?; + if matches!(resp.state.state, ExecuteStateKind::Starting) && start.elapsed().as_secs() < max_wait_time as u64 { + continue; + } + break resp + }; + if matches!(resp.state.state, ExecuteStateKind::Failed) { ctx.set_fail(); } let (rows, next_page) = match &resp.data { None => (0, None), Some(p) => (p.page.data.num_rows(), p.next_page_no), }; - info!( - "http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'", - &query.id, &resp.state, rows, next_page, mask_connection_info(&sql) - ); + info!( "http query initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'", + &query.id, &resp.state, rows, next_page, mask_connection_info(&sql) + ); query.update_expire_time(false).await; Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response()) } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 5bf8435b18456..e3c0292abeca1 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -54,6 +54,7 @@ use crate::sessions::TableContext; #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)] pub enum ExecuteStateKind { + Starting, Running, Failed, Succeeded, @@ -93,7 +94,8 @@ pub enum ExecuteState { impl ExecuteState { pub(crate) fn extract(&self) -> (ExecuteStateKind, Option) { match self { - Starting(_) | Running(_) => (ExecuteStateKind::Running, None), + Starting(_) => (ExecuteStateKind::Starting, None), + Running(_) => (ExecuteStateKind::Running, None), Stopped(v) => match &v.reason { Ok(_) => (ExecuteStateKind::Succeeded, None), Err(e) => (ExecuteStateKind::Failed, Some(e.clone())),