From 00871d0e288e27affa1a442b1ee44cb695326a8c Mon Sep 17 00:00:00 2001 From: Meng Zhang Date: Sun, 1 Dec 2024 01:25:11 +0800 Subject: [PATCH 1/7] refactor(structured_doc): support pre sync chore(structured_doc): presync pull state before indexing Signed-off-by: Wei Zhang chore: skip indexing gitlab pulls (#3515) Signed-off-by: Wei Zhang # Conflicts: # ee/tabby-webserver/src/service/background_job/third_party_integration.rs chore: use a todo id fn to create pull id for state Signed-off-by: Wei Zhang chore: extract sync issue function Signed-off-by: Wei Zhang Update ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs Co-authored-by: Wei Zhang --- crates/tabby-index/src/indexer_tests.rs | 45 ++--- .../tabby-index/src/structured_doc/public.rs | 38 ++-- ee/tabby-schema/src/schema/integration.rs | 2 +- .../src/schema/repository/third_party.rs | 2 +- .../background_job/third_party_integration.rs | 124 +++++++++---- .../third_party_integration/issues.rs | 2 + .../third_party_integration/pulls.rs | 166 +++++++++++------- .../src/service/background_job/web_crawler.rs | 20 ++- ee/tabby-webserver/src/service/integration.rs | 10 +- .../src/service/repository/mod.rs | 2 +- .../src/service/repository/third_party.rs | 10 +- 11 files changed, 259 insertions(+), 162 deletions(-) diff --git a/crates/tabby-index/src/indexer_tests.rs b/crates/tabby-index/src/indexer_tests.rs index bb63ad066c77..fda3d6768bb3 100644 --- a/crates/tabby-index/src/indexer_tests.rs +++ b/crates/tabby-index/src/indexer_tests.rs @@ -72,14 +72,13 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .sync( - StructuredDocState { - updated_at, - deleted: false, - }, - doc, - ) - .await; + .presync(StructuredDocState { + id: doc.id().to_string(), + updated_at, + deleted: false, + }) + .await + && indexer.sync(doc).await; println!("{}", updated); updated }); @@ -119,14 +118,13 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .sync( - StructuredDocState { - updated_at, - deleted: false, - }, - doc, - ) - .await; + .presync(StructuredDocState { + id: doc.id().to_string(), + updated_at, + deleted: false, + }) + .await + && indexer.sync(doc).await; println!("{}", updated); updated }); @@ -163,18 +161,9 @@ mod structured_doc_tests { }), }; - let updated_at = chrono::Utc::now(); - let res = tokio::runtime::Runtime::new().unwrap().block_on(async { - indexer - .sync( - StructuredDocState { - updated_at, - deleted: false, - }, - doc, - ) - .await - }); + let res = tokio::runtime::Runtime::new() + .unwrap() + .block_on(async { indexer.sync(doc).await }); assert!(res); indexer.commit(); diff --git a/crates/tabby-index/src/structured_doc/public.rs b/crates/tabby-index/src/structured_doc/public.rs index f0db2a8e3130..54b36c08ba7c 100644 --- a/crates/tabby-index/src/structured_doc/public.rs +++ b/crates/tabby-index/src/structured_doc/public.rs @@ -17,10 +17,15 @@ use crate::{indexer::TantivyDocBuilder, Indexer}; /// StructuredDocState tracks the state of the document source. /// It helps determine whether the document should be updated or deleted. pub struct StructuredDocState { + // id is the unique identifier of the document. + // It is used to track the document in the indexer. + pub id: String, + // updated_at is the time when the document was last updated. // when the updated_at is earlier than the document's index time, // the update will be skipped. pub updated_at: DateTime, + // deleted indicates whether the document should be removed from the indexer. // For instance, a closed pull request will be marked as deleted, // prompting the indexer to remove it from the index. @@ -39,21 +44,34 @@ impl StructuredDocIndexer { Self { indexer, builder } } + // Runs pre-sync checks to determine if the document needs to be updated. + // Returns false if `sync` is not required to be called. + pub async fn presync(&self, state: StructuredDocState) -> bool { + if state.deleted { + self.indexer.delete(&state.id); + return false; + } + + if self.indexer.is_indexed_after(&state.id, state.updated_at) + && !self.indexer.has_failed_chunks(&state.id) + { + return false; + }; + + true + } + // The sync process updates the document in the indexer incrementally. // It first determines whether the document requires an update. // // If an update is needed, it checks the deletion state of the document. // If the document is marked as deleted, it will be removed. // Next, the document is rebuilt, the original is deleted, and the newly indexed document is added. - pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool { - if !self.require_updates(state.updated_at, &document) { + pub async fn sync(&self, document: StructuredDoc) -> bool { + if !self.require_updates(&document) { return false; } - if state.deleted { - return self.delete(document.id()).await; - } - stream! { let (id, s) = self.builder.build(document).await; self.indexer.delete(&id); @@ -80,7 +98,7 @@ impl StructuredDocIndexer { self.indexer.commit(); } - fn require_updates(&self, updated_at: DateTime, document: &StructuredDoc) -> bool { + fn require_updates(&self, document: &StructuredDoc) -> bool { if document.should_skip() { return false; } @@ -89,12 +107,6 @@ impl StructuredDocIndexer { return true; } - if self.indexer.is_indexed_after(document.id(), updated_at) - && !self.indexer.has_failed_chunks(document.id()) - { - return false; - }; - true } diff --git a/ee/tabby-schema/src/schema/integration.rs b/ee/tabby-schema/src/schema/integration.rs index 777aecd6774c..56ffbf825957 100644 --- a/ee/tabby-schema/src/schema/integration.rs +++ b/ee/tabby-schema/src/schema/integration.rs @@ -124,5 +124,5 @@ pub trait IntegrationService: Send + Sync { ) -> Result>; async fn get_integration(&self, id: ID) -> Result; - async fn update_integration_sync_status(&self, id: ID, error: Option) -> Result<()>; + async fn update_integration_sync_status(&self, id: &ID, error: Option) -> Result<()>; } diff --git a/ee/tabby-schema/src/schema/repository/third_party.rs b/ee/tabby-schema/src/schema/repository/third_party.rs index bac167496940..81ad9043796d 100644 --- a/ee/tabby-schema/src/schema/repository/third_party.rs +++ b/ee/tabby-schema/src/schema/repository/third_party.rs @@ -105,7 +105,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync + RepositoryProvider { last: Option, ) -> Result>; - async fn get_provided_repository(&self, id: ID) -> Result; + async fn get_provided_repository(&self, id: &ID) -> Result; async fn update_repository_active(&self, id: ID, active: bool) -> Result<()>; async fn upsert_repository( diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 96273027dde1..4b26b34524ca 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use issues::{list_github_issues, list_gitlab_issues}; use juniper::ID; -use pulls::list_github_pulls; +use pulls::{get_github_pull_doc, list_github_pull_states}; use serde::{Deserialize, Serialize}; use tabby_common::config::CodeRepository; use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState}; @@ -90,7 +90,7 @@ impl SchedulerGithubGitlabJob { integration_service: Arc, ) -> tabby_schema::Result<()> { let repository = repository_service - .get_provided_repository(self.repository_id) + .get_provided_repository(&self.repository_id) .await?; let integration = integration_service .get_integration(repository.integration_id.clone()) @@ -116,50 +116,111 @@ impl SchedulerGithubGitlabJob { "Indexing documents for repository {}", repository.display_name ); - let index = StructuredDocIndexer::new(embedding); - let issue_stream = match fetch_all_issues(&integration, &repository).await { + + self.sync_issues( + &integration, + integration_service.clone(), + &repository, + embedding.clone(), + ) + .await?; + + self.sync_pulls(&integration, integration_service, &repository, embedding) + .await?; + + Ok(()) + } + + async fn sync_pulls( + &self, + integration: &Integration, + integration_service: Arc, + repository: &ProvidedRepository, + embedding: Arc, + ) -> tabby_schema::Result<()> { + let mut pull_state_stream = match fetch_all_pull_states(&integration, &repository).await { Ok(s) => s, Err(e) => { integration_service - .update_integration_sync_status(integration.id, Some(e.to_string())) + .update_integration_sync_status(&integration.id, Some(e.to_string())) .await?; - logkit::error!("Failed to fetch issues: {}", e); - return Err(e); + logkit::error!("Failed to fetch pulls: {}", e); + return Ok(()); } }; - let pull_stream = match fetch_all_pulls(&integration, &repository).await { - Ok(s) => Some(s), + let mut count = 0; + let mut num_updated = 0; + + let index = StructuredDocIndexer::new(embedding); + while let Some((id, state)) = pull_state_stream.next().await { + count += 1; + if count % 100 == 0 { + logkit::info!( + "{} pull docs seen, {} pull docs updated", + count, + num_updated + ); + } + + if !index.presync(state).await { + continue; + } + let pull = get_github_pull_doc( + &repository.source_id(), + id, + integration.api_base(), + &repository.display_name, + &integration.access_token, + ) + .await?; + + index.sync(pull).await; + num_updated += 1; + } + logkit::info!( + "{} pull docs seen, {} pull docs updated", + count, + num_updated + ); + index.commit(); + + Ok(()) + } + + async fn sync_issues( + &self, + integration: &Integration, + integration_service: Arc, + repository: &ProvidedRepository, + embedding: Arc, + ) -> tabby_schema::Result<()> { + let issue_stream = match fetch_all_issues(&integration, &repository).await { + Ok(s) => s, Err(e) => { integration_service - .update_integration_sync_status(integration.id, Some(e.to_string())) + .update_integration_sync_status(&integration.id, Some(e.to_string())) .await?; - logkit::warn!("Failed to fetch pulls: {}", e); - None + logkit::error!("Failed to fetch issues: {}", e); + return Err(e); } }; + let index = StructuredDocIndexer::new(embedding); stream! { let mut count = 0; let mut num_updated = 0; - let combined_stream = if let Some(pull_stream) = pull_stream { - issue_stream.chain(pull_stream).boxed() - } else { - issue_stream.boxed() - }; - - for await (state, doc) in combined_stream { - if index.sync(state, doc).await { - num_updated += 1 + for await (state, doc) in issue_stream { + if index.presync(state).await && index.sync(doc).await { + num_updated += 1 + } + count += 1; + if count % 100 == 0 { + logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated); + }; } - count += 1; - if count % 100 == 0 { - logkit::info!("{} docs seen, {} docs updated", count, num_updated); - }; - } - - logkit::info!("{} docs seen, {} docs updated", count, num_updated); + logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated); index.commit(); } .count() @@ -212,13 +273,12 @@ async fn fetch_all_issues( Ok(s) } -async fn fetch_all_pulls( +async fn fetch_all_pull_states( integration: &Integration, repository: &ProvidedRepository, -) -> tabby_schema::Result> { +) -> tabby_schema::Result> { match &integration.kind { - IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pulls( - &repository.source_id(), + IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pull_states( integration.api_base(), &repository.display_name, &integration.access_token, diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs index 3d9bf56b6cbf..c2d549dd878c 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs @@ -67,6 +67,7 @@ pub async fn list_github_issues( }) }; yield (StructuredDocState { + id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, }, doc); @@ -133,6 +134,7 @@ pub async fn list_gitlab_issues( closed: issue.state == "closed", })}; yield (StructuredDocState { + id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, }, doc); diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 37a66a62fe90..984220835178 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -9,12 +9,21 @@ use tracing::debug; use super::error::octocrab_error_message; -pub async fn list_github_pulls( - source_id: &str, +// FIXME(kweizh): we can only get StructuredDoc id after constructing the StructuredDoc +// but we need to pass the id to the StructuredDocState +// so we need to refactor the id() method in StructuredDoc +fn pull_id(pull: &octocrab::models::pulls::PullRequest) -> String { + pull.html_url + .clone() + .map(|url| url.to_string()) + .unwrap_or_else(|| pull.url.clone()) +} + +pub async fn list_github_pull_states( api_base: &str, full_name: &str, access_token: &str, -) -> Result> { +) -> Result> { let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? @@ -26,7 +35,6 @@ pub async fn list_github_pulls( let owner = owner.to_owned(); let repo = repo.to_owned(); - let source_id = source_id.to_owned(); let s = stream! { let mut page = 1u32; loop { @@ -47,80 +55,25 @@ pub async fn list_github_pulls( let pages = response.number_of_pages().unwrap_or_default(); for pull in response.items { - let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url); - let title = pull.title.clone().unwrap_or_default(); - let body = pull.body.clone().unwrap_or_default(); - - let author = pull.user.as_ref().map(|user| user.login.clone()); - let email = if let Some(author) = author { - match octocrab.users(&author).profile().await { - Ok(profile) => { - profile.email - } - Err(e) => { - debug!("Failed to fetch user profile for {}: {}", author, octocrab_error_message(e)); - None - } - } - } else { - None - }; - - let doc = StructuredDoc { - source_id: source_id.to_string(), - fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields { - link: url.clone(), - title, - author_email: email.clone(), - body, - merged: pull.merged_at.is_some(), - diff: String::new(), - }), - }; + let id = pull_id(&pull); // skip closed but not merged pulls if let Some(state) = pull.state { if state == IssueState::Closed && pull.merged_at.is_none() { - yield (StructuredDocState{ + yield (pull.number, StructuredDocState{ + id: id, updated_at: pull.updated_at.unwrap(), deleted: true, - }, doc); + }); continue; } } - // Fetch the diff only if the number of changed lines is fewer than 100,000, - // assuming 80 characters per line, - // and the size of the diff is less than 8MB. - let diff = if pull.additions.unwrap_or_default() + pull.deletions.unwrap_or_default() < 100*1024 { - match octocrab.pulls(&owner, &repo).get_diff(pull.number).await { - Ok(x) => x, - Err(e) => { - logkit::error!("Failed to fetch pull request diff for {}: {}", - url, octocrab_error_message(e)); - continue - } - } - } else { - String::new() - }; - - let doc = StructuredDoc { - source_id: source_id.to_string(), - fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields { - link: url, - title: pull.title.unwrap_or_default(), - author_email: email, - body: pull.body.unwrap_or_default(), - diff, - merged: pull.merged_at.is_some(), - })}; - - - yield (StructuredDocState{ - updated_at: pull.updated_at.unwrap(), + yield (pull.number, StructuredDocState{ + id: id, + updated_at: pull.updated_at.unwrap_or_else(|| chrono::Utc::now()), deleted: false, - }, doc); + }); } page += 1; @@ -132,3 +85,82 @@ pub async fn list_github_pulls( Ok(s) } + +pub async fn get_github_pull_doc( + source_id: &str, + id: u64, + api_base: &str, + full_name: &str, + access_token: &str, +) -> Result { + let octocrab = Octocrab::builder() + .personal_token(access_token.to_string()) + .base_uri(api_base)? + .build()?; + + let (owner, repo) = full_name + .split_once('/') + .ok_or_else(|| anyhow!("Invalid repository name"))?; + + let owner = owner.to_owned(); + let repo = repo.to_owned(); + let source_id = source_id.to_owned(); + + let pull = octocrab.pulls(&owner, &repo).get(id).await.map_err(|e| { + anyhow!( + "Failed to fetch pull requests: {}", + octocrab_error_message(e) + ) + })?; + + let url = pull + .html_url + .map(|url| url.to_string()) + .unwrap_or_else(|| pull.url); + let title = pull.title.clone().unwrap_or_default(); + let body = pull.body.clone().unwrap_or_default(); + + let author = pull.user.as_ref().map(|user| user.login.clone()); + let email = if let Some(author) = author { + match octocrab.users(&author).profile().await { + Ok(profile) => profile.email, + Err(e) => { + debug!("Failed to fetch user profile for {}: {}", author, e); + None + } + } + } else { + None + }; + + // Fetch the diff only if the number of changed lines is fewer than 100,000, + // assuming 80 characters per line, + // and the size of the diff is less than 8MB. + let diff = + if pull.additions.unwrap_or_default() + pull.deletions.unwrap_or_default() < 100 * 1024 { + octocrab + .pulls(&owner, &repo) + .get_diff(pull.number) + .await + .map_err(|e| { + anyhow!( + "Failed to fetch pull request diff: {}", + octocrab_error_message(e) + ) + })? + } else { + String::new() + }; + + Ok(StructuredDoc { + source_id: source_id.to_string(), + fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields { + link: url.clone(), + title, + author_email: email.clone(), + body, + merged: pull.merged_at.is_some(), + diff, + }), + }) +} diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs index da307f61e028..a0fdab9b11f8 100644 --- a/ee/tabby-webserver/src/service/background_job/web_crawler.rs +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -54,15 +54,17 @@ impl WebCrawlerJob { }; num_docs += 1; - indexer - .sync( - StructuredDocState { - updated_at: Utc::now(), - deleted: false, - }, - source_doc, - ) - .await; + + if indexer + .presync(StructuredDocState { + id: source_doc.id().to_string(), + updated_at: Utc::now(), + deleted: false, + }) + .await + { + indexer.sync(source_doc).await; + } } logkit::info!("Crawled {} documents from '{}'", num_docs, self.url); indexer.commit(); diff --git a/ee/tabby-webserver/src/service/integration.rs b/ee/tabby-webserver/src/service/integration.rs index 4dae5dca1dfc..fc74853d3a5b 100644 --- a/ee/tabby-webserver/src/service/integration.rs +++ b/ee/tabby-webserver/src/service/integration.rs @@ -133,7 +133,7 @@ impl IntegrationService for IntegrationServiceImpl { Ok(self.db.get_integration(id.as_rowid()?).await?.try_into()?) } - async fn update_integration_sync_status(&self, id: ID, error: Option) -> Result<()> { + async fn update_integration_sync_status(&self, id: &ID, error: Option) -> Result<()> { self.db .update_integration_error(id.as_rowid()?, error) .await?; @@ -191,7 +191,7 @@ mod tests { // Test updating error status for gitlab provider tokio::time::sleep(Duration::from_secs(1)).await; integration - .update_integration_sync_status(id.clone(), Some("error".into())) + .update_integration_sync_status(&id, Some("error".into())) .await .unwrap(); @@ -200,7 +200,7 @@ mod tests { // Test successful status (no error) integration - .update_integration_sync_status(id.clone(), None) + .update_integration_sync_status(&id, None) .await .unwrap(); @@ -246,7 +246,7 @@ mod tests { // Test integration status is failed after updating sync status with an error integration - .update_integration_sync_status(id.clone(), Some("error".into())) + .update_integration_sync_status(&id, Some("error".into())) .await .unwrap(); @@ -288,7 +288,7 @@ mod tests { // Test integration status is ready after a successful sync and an update which changes no fields integration - .update_integration_sync_status(id.clone(), None) + .update_integration_sync_status(&id, None) .await .unwrap(); integration diff --git a/ee/tabby-webserver/src/service/repository/mod.rs b/ee/tabby-webserver/src/service/repository/mod.rs index f34480d0b1a1..725e87bedcc2 100644 --- a/ee/tabby-webserver/src/service/repository/mod.rs +++ b/ee/tabby-webserver/src/service/repository/mod.rs @@ -114,7 +114,7 @@ impl RepositoryService for RepositoryServiceImpl { | RepositoryKind::GithubSelfHosted | RepositoryKind::GitlabSelfHosted => self .third_party() - .get_provided_repository(id.clone()) + .get_provided_repository(id) .await .map(|repo| to_repository(*kind, repo)), }; diff --git a/ee/tabby-webserver/src/service/repository/third_party.rs b/ee/tabby-webserver/src/service/repository/third_party.rs index c07a01166f89..634c8f87cfe6 100644 --- a/ee/tabby-webserver/src/service/repository/third_party.rs +++ b/ee/tabby-webserver/src/service/repository/third_party.rs @@ -70,7 +70,7 @@ impl RepositoryProvider for ThirdPartyRepositoryServiceImpl { } async fn get_repository(&self, id: &ID) -> Result { - let repo = self.get_provided_repository(id.clone()).await?; + let repo = self.get_provided_repository(id).await?; let provider = self .integration .get_integration(repo.integration_id.clone()) @@ -119,10 +119,10 @@ impl ThirdPartyRepositoryService for ThirdPartyRepositoryServiceImpl { Ok(converted_repositories) } - async fn get_provided_repository(&self, id: ID) -> Result { + async fn get_provided_repository(&self, id: &ID) -> Result { let repo = self.db.get_provided_repository(id.as_rowid()?).await?; - let event = BackgroundJobEvent::SchedulerGithubGitlabRepository(id); + let event = BackgroundJobEvent::SchedulerGithubGitlabRepository(id.clone()); let last_job_run = self.job.get_job_info(event.to_command()).await?; Ok(to_provided_repository(repo, last_job_run)) @@ -169,7 +169,7 @@ impl ThirdPartyRepositoryService for ThirdPartyRepositoryServiceImpl { Ok(repos) => repos, Err(e) => { self.integration - .update_integration_sync_status(provider.id.clone(), Some(e.to_string())) + .update_integration_sync_status(&provider.id, Some(e.to_string())) .await?; error!( "Failed to fetch repositories from integration: {}", @@ -270,7 +270,7 @@ async fn refresh_repositories_for_provider( } integration - .update_integration_sync_status(provider.id.clone(), None) + .update_integration_sync_status(&provider.id, None) .await?; let num_removed = repository .delete_outdated_repositories(provider.id, start) From 066cad551e0802f64208b2ec60fb3b035d642b05 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 10 Dec 2024 08:28:34 +0000 Subject: [PATCH 2/7] [autofix.ci] apply automated fixes --- .../src/service/background_job/third_party_integration.rs | 4 ++-- .../service/background_job/third_party_integration/pulls.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 4b26b34524ca..a219261df448 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -138,7 +138,7 @@ impl SchedulerGithubGitlabJob { repository: &ProvidedRepository, embedding: Arc, ) -> tabby_schema::Result<()> { - let mut pull_state_stream = match fetch_all_pull_states(&integration, &repository).await { + let mut pull_state_stream = match fetch_all_pull_states(integration, repository).await { Ok(s) => s, Err(e) => { integration_service @@ -195,7 +195,7 @@ impl SchedulerGithubGitlabJob { repository: &ProvidedRepository, embedding: Arc, ) -> tabby_schema::Result<()> { - let issue_stream = match fetch_all_issues(&integration, &repository).await { + let issue_stream = match fetch_all_issues(integration, repository).await { Ok(s) => s, Err(e) => { integration_service diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 984220835178..1be07f5135da 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -61,7 +61,7 @@ pub async fn list_github_pull_states( if let Some(state) = pull.state { if state == IssueState::Closed && pull.merged_at.is_none() { yield (pull.number, StructuredDocState{ - id: id, + id, updated_at: pull.updated_at.unwrap(), deleted: true, }); @@ -70,8 +70,8 @@ pub async fn list_github_pull_states( } yield (pull.number, StructuredDocState{ - id: id, - updated_at: pull.updated_at.unwrap_or_else(|| chrono::Utc::now()), + id, + updated_at: pull.updated_at.unwrap_or_else(chrono::Utc::now), deleted: false, }); } From fbc7d66d47543a6ddd188301a0fd079c52157007 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Wed, 11 Dec 2024 19:47:22 +0800 Subject: [PATCH 3/7] chore: save raw doc in structured_doc_state to reduce unnecessary requests Signed-off-by: Wei Zhang --- .../tabby-index/src/structured_doc/public.rs | 4 +- .../background_job/third_party_integration.rs | 12 +++-- .../third_party_integration/issues.rs | 2 + .../third_party_integration/pulls.rs | 49 ++++++++----------- .../src/service/background_job/web_crawler.rs | 3 +- 5 files changed, 36 insertions(+), 34 deletions(-) diff --git a/crates/tabby-index/src/structured_doc/public.rs b/crates/tabby-index/src/structured_doc/public.rs index 54b36c08ba7c..a4d7dfbf3057 100644 --- a/crates/tabby-index/src/structured_doc/public.rs +++ b/crates/tabby-index/src/structured_doc/public.rs @@ -30,6 +30,8 @@ pub struct StructuredDocState { // For instance, a closed pull request will be marked as deleted, // prompting the indexer to remove it from the index. pub deleted: bool, + + pub raw: Option, } pub struct StructuredDocIndexer { @@ -46,7 +48,7 @@ impl StructuredDocIndexer { // Runs pre-sync checks to determine if the document needs to be updated. // Returns false if `sync` is not required to be called. - pub async fn presync(&self, state: StructuredDocState) -> bool { + pub async fn presync(&self, state: &StructuredDocState) -> bool { if state.deleted { self.indexer.delete(&state.id); return false; diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index a219261df448..36eda3285ec1 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -163,12 +163,18 @@ impl SchedulerGithubGitlabJob { ); } - if !index.presync(state).await { + if !index.presync(&state).await { continue; } + + if state.raw.as_ref().is_none() { + logkit::warn!("Pull {} has no raw data", id); + continue; + } + let pull = get_github_pull_doc( &repository.source_id(), - id, + state.raw.unwrap(), integration.api_base(), &repository.display_name, &integration.access_token, @@ -211,7 +217,7 @@ impl SchedulerGithubGitlabJob { let mut count = 0; let mut num_updated = 0; for await (state, doc) in issue_stream { - if index.presync(state).await && index.sync(doc).await { + if index.presync(&state).await && index.sync(doc).await { num_updated += 1 } count += 1; diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs index c2d549dd878c..3e6f7e030444 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs @@ -70,6 +70,7 @@ pub async fn list_github_issues( id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, + raw: None, }, doc); } @@ -137,6 +138,7 @@ pub async fn list_gitlab_issues( id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, + raw: None, }, doc); } }; diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 1be07f5135da..7ea18b0f3add 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use async_stream::stream; use futures::Stream; use octocrab::{models::IssueState, Octocrab}; +use serde_json::json; use tabby_index::public::{ StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState, }; @@ -58,12 +59,13 @@ pub async fn list_github_pull_states( let id = pull_id(&pull); // skip closed but not merged pulls - if let Some(state) = pull.state { - if state == IssueState::Closed && pull.merged_at.is_none() { + if let Some(state) = &pull.state { + if *state == IssueState::Closed && pull.merged_at.is_none() { yield (pull.number, StructuredDocState{ id, updated_at: pull.updated_at.unwrap(), deleted: true, + raw: None, }); continue; } @@ -73,6 +75,7 @@ pub async fn list_github_pull_states( id, updated_at: pull.updated_at.unwrap_or_else(chrono::Utc::now), deleted: false, + raw: Some(json!(pull)), }); } @@ -88,38 +91,19 @@ pub async fn list_github_pull_states( pub async fn get_github_pull_doc( source_id: &str, - id: u64, + raw: serde_json::Value, api_base: &str, full_name: &str, access_token: &str, ) -> Result { + let pull: octocrab::models::pulls::PullRequest = + serde_json::from_value(raw).map_err(|e| anyhow!("Failed to parse pull request: {}", e))?; + let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? .build()?; - let (owner, repo) = full_name - .split_once('/') - .ok_or_else(|| anyhow!("Invalid repository name"))?; - - let owner = owner.to_owned(); - let repo = repo.to_owned(); - let source_id = source_id.to_owned(); - - let pull = octocrab.pulls(&owner, &repo).get(id).await.map_err(|e| { - anyhow!( - "Failed to fetch pull requests: {}", - octocrab_error_message(e) - ) - })?; - - let url = pull - .html_url - .map(|url| url.to_string()) - .unwrap_or_else(|| pull.url); - let title = pull.title.clone().unwrap_or_default(); - let body = pull.body.clone().unwrap_or_default(); - let author = pull.user.as_ref().map(|user| user.login.clone()); let email = if let Some(author) = author { match octocrab.users(&author).profile().await { @@ -138,8 +122,12 @@ pub async fn get_github_pull_doc( // and the size of the diff is less than 8MB. let diff = if pull.additions.unwrap_or_default() + pull.deletions.unwrap_or_default() < 100 * 1024 { + let (owner, repo) = full_name + .split_once('/') + .ok_or_else(|| anyhow!("Invalid repository name"))?; + octocrab - .pulls(&owner, &repo) + .pulls(owner, repo) .get_diff(pull.number) .await .map_err(|e| { @@ -155,10 +143,13 @@ pub async fn get_github_pull_doc( Ok(StructuredDoc { source_id: source_id.to_string(), fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields { - link: url.clone(), - title, + link: pull + .html_url + .map(|url| url.to_string()) + .unwrap_or_else(|| pull.url), + title: pull.title.clone().unwrap_or_default(), author_email: email.clone(), - body, + body: pull.body.clone().unwrap_or_default(), merged: pull.merged_at.is_some(), diff, }), diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs index a0fdab9b11f8..2e95eb01249d 100644 --- a/ee/tabby-webserver/src/service/background_job/web_crawler.rs +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -56,10 +56,11 @@ impl WebCrawlerJob { num_docs += 1; if indexer - .presync(StructuredDocState { + .presync(&StructuredDocState { id: source_doc.id().to_string(), updated_at: Utc::now(), deleted: false, + raw: None, }) .await { From 7c6771b40285c241a6d0f34d3c05346c06ae3e08 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Thu, 12 Dec 2024 01:49:49 +0800 Subject: [PATCH 4/7] chore: return pull when pre sync Signed-off-by: Wei Zhang --- crates/tabby-index/src/indexer_tests.rs | 4 +- .../tabby-index/src/structured_doc/public.rs | 2 - .../background_job/third_party_integration.rs | 41 +++++++++++-------- .../third_party_integration/issues.rs | 2 - .../third_party_integration/pulls.rs | 24 +++++------ .../src/service/background_job/web_crawler.rs | 1 - 6 files changed, 39 insertions(+), 35 deletions(-) diff --git a/crates/tabby-index/src/indexer_tests.rs b/crates/tabby-index/src/indexer_tests.rs index fda3d6768bb3..365deb539aa1 100644 --- a/crates/tabby-index/src/indexer_tests.rs +++ b/crates/tabby-index/src/indexer_tests.rs @@ -72,7 +72,7 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .presync(StructuredDocState { + .presync(&StructuredDocState { id: doc.id().to_string(), updated_at, deleted: false, @@ -118,7 +118,7 @@ mod structured_doc_tests { let updated_at = chrono::Utc::now(); let res = tokio::runtime::Runtime::new().unwrap().block_on(async { let updated = indexer - .presync(StructuredDocState { + .presync(&StructuredDocState { id: doc.id().to_string(), updated_at, deleted: false, diff --git a/crates/tabby-index/src/structured_doc/public.rs b/crates/tabby-index/src/structured_doc/public.rs index a4d7dfbf3057..df083010d966 100644 --- a/crates/tabby-index/src/structured_doc/public.rs +++ b/crates/tabby-index/src/structured_doc/public.rs @@ -30,8 +30,6 @@ pub struct StructuredDocState { // For instance, a closed pull request will be marked as deleted, // prompting the indexer to remove it from the index. pub deleted: bool, - - pub raw: Option, } pub struct StructuredDocIndexer { diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs index 36eda3285ec1..9a4d76033b0d 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Result; use async_stream::stream; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; @@ -153,7 +154,7 @@ impl SchedulerGithubGitlabJob { let mut num_updated = 0; let index = StructuredDocIndexer::new(embedding); - while let Some((id, state)) = pull_state_stream.next().await { + while let Some((pull, state)) = pull_state_stream.next().await { count += 1; if count % 100 == 0 { logkit::info!( @@ -167,21 +168,9 @@ impl SchedulerGithubGitlabJob { continue; } - if state.raw.as_ref().is_none() { - logkit::warn!("Pull {} has no raw data", id); - continue; - } + let pull_doc = fetch_pull_structured_doc(integration, repository, pull).await?; - let pull = get_github_pull_doc( - &repository.source_id(), - state.raw.unwrap(), - integration.api_base(), - &repository.display_name, - &integration.access_token, - ) - .await?; - - index.sync(pull).await; + index.sync(pull_doc).await; num_updated += 1; } logkit::info!( @@ -279,10 +268,11 @@ async fn fetch_all_issues( Ok(s) } + async fn fetch_all_pull_states( integration: &Integration, repository: &ProvidedRepository, -) -> tabby_schema::Result> { +) -> tabby_schema::Result> { match &integration.kind { IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pull_states( integration.api_base(), @@ -296,3 +286,22 @@ async fn fetch_all_pull_states( )), } } + +async fn fetch_pull_structured_doc( + integration: &Integration, + repository: &ProvidedRepository, + pull: pulls::Pull, +) -> Result { + match pull { + pulls::Pull::GitHub(pull) => { + get_github_pull_doc( + &repository.source_id(), + pull, + integration.api_base(), + &repository.display_name, + &integration.access_token, + ) + .await + } + } +} diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs index 3e6f7e030444..c2d549dd878c 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/issues.rs @@ -70,7 +70,6 @@ pub async fn list_github_issues( id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, - raw: None, }, doc); } @@ -138,7 +137,6 @@ pub async fn list_gitlab_issues( id: doc.id().to_string(), updated_at: issue.updated_at, deleted: false, - raw: None, }, doc); } }; diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 7ea18b0f3add..93cdb05f7302 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -1,8 +1,8 @@ use anyhow::{anyhow, Result}; use async_stream::stream; use futures::Stream; +use octocrab::models::pulls::PullRequest; use octocrab::{models::IssueState, Octocrab}; -use serde_json::json; use tabby_index::public::{ StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState, }; @@ -20,11 +20,15 @@ fn pull_id(pull: &octocrab::models::pulls::PullRequest) -> String { .unwrap_or_else(|| pull.url.clone()) } +pub enum Pull { + GitHub(PullRequest), +} + pub async fn list_github_pull_states( api_base: &str, full_name: &str, access_token: &str, -) -> Result> { +) -> Result> { let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? @@ -57,25 +61,24 @@ pub async fn list_github_pull_states( for pull in response.items { let id = pull_id(&pull); + let updated_at = pull.updated_at.unwrap_or_else(chrono::Utc::now); // skip closed but not merged pulls if let Some(state) = &pull.state { if *state == IssueState::Closed && pull.merged_at.is_none() { - yield (pull.number, StructuredDocState{ + yield (Pull::GitHub(pull), StructuredDocState{ id, - updated_at: pull.updated_at.unwrap(), + updated_at, deleted: true, - raw: None, }); continue; } } - yield (pull.number, StructuredDocState{ + yield (Pull::GitHub(pull), StructuredDocState{ id, - updated_at: pull.updated_at.unwrap_or_else(chrono::Utc::now), + updated_at, deleted: false, - raw: Some(json!(pull)), }); } @@ -91,14 +94,11 @@ pub async fn list_github_pull_states( pub async fn get_github_pull_doc( source_id: &str, - raw: serde_json::Value, + pull: PullRequest, api_base: &str, full_name: &str, access_token: &str, ) -> Result { - let pull: octocrab::models::pulls::PullRequest = - serde_json::from_value(raw).map_err(|e| anyhow!("Failed to parse pull request: {}", e))?; - let octocrab = Octocrab::builder() .personal_token(access_token.to_string()) .base_uri(api_base)? diff --git a/ee/tabby-webserver/src/service/background_job/web_crawler.rs b/ee/tabby-webserver/src/service/background_job/web_crawler.rs index 2e95eb01249d..d2d1414c4e2c 100644 --- a/ee/tabby-webserver/src/service/background_job/web_crawler.rs +++ b/ee/tabby-webserver/src/service/background_job/web_crawler.rs @@ -60,7 +60,6 @@ impl WebCrawlerJob { id: source_doc.id().to_string(), updated_at: Utc::now(), deleted: false, - raw: None, }) .await { From 80d4869d73273f8cc6e1a231fe342a7f418edc9f Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Thu, 12 Dec 2024 02:16:07 +0800 Subject: [PATCH 5/7] chore: reduce unnecessary backtrace Signed-off-by: Wei Zhang --- .../service/background_job/third_party_integration/pulls.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 93cdb05f7302..9f41279d09b5 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -109,7 +109,11 @@ pub async fn get_github_pull_doc( match octocrab.users(&author).profile().await { Ok(profile) => profile.email, Err(e) => { - debug!("Failed to fetch user profile for {}: {}", author, e); + debug!( + "Failed to fetch user profile for {}: {}", + author, + octocrab_error_message(e) + ); None } } From f8baf32621dd8f43bba530629eaaf6bd02fbe82b Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 11 Dec 2024 18:23:55 +0000 Subject: [PATCH 6/7] [autofix.ci] apply automated fixes --- .../service/background_job/third_party_integration/pulls.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs index 9f41279d09b5..cf89dfd4f1ef 100644 --- a/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs +++ b/ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs @@ -1,8 +1,10 @@ use anyhow::{anyhow, Result}; use async_stream::stream; use futures::Stream; -use octocrab::models::pulls::PullRequest; -use octocrab::{models::IssueState, Octocrab}; +use octocrab::{ + models::{pulls::PullRequest, IssueState}, + Octocrab, +}; use tabby_index::public::{ StructuredDoc, StructuredDocFields, StructuredDocPullDocumentFields, StructuredDocState, }; From b129fd51cd70f86673265e29f8cf8c8ff3ef8ef9 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Thu, 12 Dec 2024 09:58:08 +0800 Subject: [PATCH 7/7] chore: add changie log Signed-off-by: Wei Zhang --- .../unreleased/Fixed and Improvements-20241212-095738.yaml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changes/unreleased/Fixed and Improvements-20241212-095738.yaml diff --git a/.changes/unreleased/Fixed and Improvements-20241212-095738.yaml b/.changes/unreleased/Fixed and Improvements-20241212-095738.yaml new file mode 100644 index 000000000000..6576a756eb95 --- /dev/null +++ b/.changes/unreleased/Fixed and Improvements-20241212-095738.yaml @@ -0,0 +1,3 @@ +kind: Fixed and Improvements +body: Refactors the pull request indexing process to enhance the speed of incremental indexing for pull docs. +time: 2024-12-12T09:57:38.860665+08:00