Skip to content

Commit

Permalink
chore: chat stream timeout (#1137)
Browse files Browse the repository at this point in the history
* chore: increase timeout

* chore: keepalive event
  • Loading branch information
appflowy authored Jan 7, 2025
1 parent e0bc8f8 commit 2bd6da2
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 6 deletions.
5 changes: 3 additions & 2 deletions libs/appflowy-ai-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,21 @@ impl AppFlowyAIClient {
rag_ids,
},
};
self.stream_question_v3(model, json).await
self.stream_question_v3(model, json, Some(30)).await
}

pub async fn stream_question_v3(
&self,
model: &AIModel,
question: ChatQuestion,
timeout_secs: Option<u64>,
) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> {
let url = format!("{}/v2/chat/message/stream", self.url);
let resp = self
.async_http_client(Method::POST, &url)?
.header(AI_MODEL_HEADER_KEY, model.to_str())
.json(&question)
.timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(timeout_secs.unwrap_or(30)))
.send()
.await?;
AIResponse::<()>::stream_response(resp).await
Expand Down
1 change: 1 addition & 0 deletions libs/appflowy-ai-client/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::str::FromStr;
pub const STREAM_METADATA_KEY: &str = "0";
pub const STREAM_ANSWER_KEY: &str = "1";
pub const STREAM_IMAGE_KEY: &str = "2";
pub const STREAM_KEEP_ALIVE_KEY: &str = "3";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SummarizeRowResponse {
pub text: String,
Expand Down
1 change: 1 addition & 0 deletions libs/client-api-test/src/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ pub async fn collect_answer(mut stream: QuestionStream) -> String {
answer.push_str(&value);
},
QuestionStreamValue::Metadata { .. } => {},
QuestionStreamValue::KeepAlive => {},
}
}
answer
Expand Down
7 changes: 6 additions & 1 deletion libs/client-api/src/http_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reqwest::Method;
use serde_json::Value;
use shared_entity::dto::ai_dto::{
CalculateSimilarityParams, ChatQuestionQuery, RepeatedRelatedQuestion, SimilarityResponse,
STREAM_ANSWER_KEY, STREAM_IMAGE_KEY, STREAM_METADATA_KEY,
STREAM_ANSWER_KEY, STREAM_IMAGE_KEY, STREAM_KEEP_ALIVE_KEY, STREAM_METADATA_KEY,
};
use shared_entity::dto::chat_dto::{ChatSettings, UpdateChatParams};
use shared_entity::response::{AppResponse, AppResponseError};
Expand Down Expand Up @@ -366,6 +366,7 @@ pub enum QuestionStreamValue {
Metadata {
value: serde_json::Value,
},
KeepAlive,
}
impl Stream for QuestionStream {
type Item = Result<QuestionStreamValue, AppResponseError>;
Expand Down Expand Up @@ -394,6 +395,10 @@ impl Stream for QuestionStream {
return Poll::Ready(Some(Ok(QuestionStreamValue::Answer { value: image })));
}

if value.remove(STREAM_KEEP_ALIVE_KEY).is_some() {
return Poll::Ready(Some(Ok(QuestionStreamValue::KeepAlive)));
}

error!("Invalid streaming value: {:?}", value);
Poll::Ready(None)
},
Expand Down
2 changes: 0 additions & 2 deletions libs/infra/src/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ where
// Poll for the next chunk of data from the underlying stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
// Append the new bytes to the buffer
this.buffer.extend_from_slice(&bytes);

// Create a StreamDeserializer to deserialize the bytes into T
Expand All @@ -112,7 +111,6 @@ where
return Poll::Pending;
},
Some(Err(err)) => {
// Return other deserialization errors wrapped in SE
return Poll::Ready(Some(Err(err.into())));
},
None => {
Expand Down
2 changes: 1 addition & 1 deletion src/api/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async fn answer_stream_v3_handler(
trace!("[Chat] stream v3 {:?}", question);
match state
.ai_client
.stream_question_v3(&ai_model, question)
.stream_question_v3(&ai_model, question, Some(60))
.await
{
Ok(answer_stream) => {
Expand Down
1 change: 1 addition & 0 deletions tests/ai_test/chat_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ async fn collect_answer(mut stream: QuestionStream) -> String {
answer.push_str(&value);
},
QuestionStreamValue::Metadata { .. } => {},
QuestionStreamValue::KeepAlive => {},
}
}
answer
Expand Down

0 comments on commit 2bd6da2

Please sign in to comment.