Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(structured_doc): presync pull state before indexing #3502

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ use crate::{indexer::TantivyDocBuilder, Indexer};
/// StructuredDocState tracks the state of the document source.
/// It helps determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// id is the unique identifier of the document.
// It is used to track the document in the indexer.
pub id: String,

// updated_at is the time when the document was last updated.
// when the updated_at is earlier than the document's index time,
// the update will be skipped.
pub updated_at: DateTime<Utc>,

// deleted indicates whether the document should be removed from the indexer.
// For instance, a closed pull request will be marked as deleted,
// prompting the indexer to remove it from the index.
pub deleted: bool,
pub id: String,
}

pub struct StructuredDocIndexer {
Expand Down
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ pub trait IntegrationService: Send + Sync {
) -> Result<Vec<Integration>>;

async fn get_integration(&self, id: ID) -> Result<Integration>;
async fn update_integration_sync_status(&self, id: ID, error: Option<String>) -> Result<()>;
async fn update_integration_sync_status(&self, id: &ID, error: Option<String>) -> Result<()>;
}
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/repository/third_party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync + RepositoryProvider {
last: Option<usize>,
) -> Result<Vec<ProvidedRepository>>;

async fn get_provided_repository(&self, id: ID) -> Result<ProvidedRepository>;
async fn get_provided_repository(&self, id: &ID) -> Result<ProvidedRepository>;

async fn update_repository_active(&self, id: ID, active: bool) -> Result<()>;
async fn upsert_repository(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use issues::{list_github_issues, list_gitlab_issues};
use juniper::ID;
use pulls::list_github_pulls;
use pulls::{get_github_pull_doc, list_github_pull_states};
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
Expand All @@ -14,6 +14,7 @@ use tabby_schema::{
integration::{Integration, IntegrationKind, IntegrationService},
job::JobService,
repository::{ProvidedRepository, ThirdPartyRepositoryService},
CoreError,
};
use tracing::debug;

Expand Down Expand Up @@ -89,7 +90,7 @@ impl SchedulerGithubGitlabJob {
integration_service: Arc<dyn IntegrationService>,
) -> tabby_schema::Result<()> {
let repository = repository_service
.get_provided_repository(self.repository_id)
.get_provided_repository(&self.repository_id)
.await?;
let integration = integration_service
.get_integration(repository.integration_id.clone())
Expand All @@ -115,44 +116,111 @@ impl SchedulerGithubGitlabJob {
"Indexing documents for repository {}",
repository.display_name
);
let index = StructuredDocIndexer::new(embedding);
let issue_stream = match fetch_all_issues(&integration, &repository).await {

self.sync_issues(
&integration,
integration_service.clone(),
&repository,
embedding.clone(),
)
.await?;

self.sync_pulls(&integration, integration_service, &repository, embedding)
.await?;

Ok(())
}

async fn sync_pulls(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let mut pull_state_stream = match fetch_all_pull_states(&integration, &repository).await {
Ok(s) => s,
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))
.await?;
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);
logkit::error!("Failed to fetch pulls: {}", e);
return Ok(());
}
};

let pull_stream = match fetch_all_pulls(&integration, &repository).await {
let mut count = 0;
let mut num_updated = 0;

let index = StructuredDocIndexer::new(embedding);
while let Some((id, state)) = pull_state_stream.next().await {
count += 1;
if count % 100 == 0 {
logkit::info!(
"{} pull docs seen, {} pull docs updated",
count,
num_updated
);
}

if !index.presync(state).await {
continue;
}
let pull = get_github_pull_doc(
&repository.source_id(),
id,
integration.api_base(),
&repository.display_name,
&integration.access_token,
)
.await?;

index.sync(pull).await;
num_updated += 1;
}
logkit::info!(
"{} pull docs seen, {} pull docs updated",
count,
num_updated
);
index.commit();

Ok(())
}

async fn sync_issues(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let issue_stream = match fetch_all_issues(&integration, &repository).await {
Ok(s) => s,
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))
.await?;
logkit::error!("Failed to fetch pulls: {}", e);
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);
}
};

let index = StructuredDocIndexer::new(embedding);
stream! {
let mut count = 0;
let mut num_updated = 0;
for await (state, doc) in issue_stream.chain(pull_stream) {
for await (state, doc) in issue_stream {
if index.presync(state).await && index.sync(doc).await {
num_updated += 1
}

count += 1;
if count % 100 == 0 {
logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);
};
}

logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);
index.commit();
}
.count()
Expand Down Expand Up @@ -205,18 +273,20 @@ async fn fetch_all_issues(

Ok(s)
}
async fn fetch_all_pulls(
async fn fetch_all_pull_states(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
let s: BoxStream<(StructuredDocState, StructuredDoc)> = list_github_pulls(
&repository.source_id(),
integration.api_base(),
&repository.display_name,
&integration.access_token,
)
.await?
.boxed();

Ok(s)
) -> tabby_schema::Result<BoxStream<'static, (u64, StructuredDocState)>> {
match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pull_states(
integration.api_base(),
&repository.display_name,
&integration.access_token,
)
.await?
.boxed()),
IntegrationKind::Gitlab | IntegrationKind::GitlabSelfHosted => Err(CoreError::Other(
anyhow::anyhow!("Gitlab does not support pull requests yet"),
)),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ use tabby_index::public::{

use super::error::octocrab_error_message;

pub async fn list_github_pulls(
source_id: &str,
// TODO(kweizh): we can only get StructuredDoc id after constructing the StructuredDoc
wsxiaoys marked this conversation as resolved.
Show resolved Hide resolved
// but we need to pass the id to the StructuredDocState
// so we need to refactor the id() method in StructuredDoc
fn pull_id(pull: &octocrab::models::pulls::PullRequest) -> String {
pull.html_url
.clone()
.map(|url| url.to_string())
.unwrap_or_else(|| pull.url.clone())
}

pub async fn list_github_pull_states(
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<impl Stream<Item = (StructuredDocState, StructuredDoc)>> {
) -> Result<impl Stream<Item = (u64, StructuredDocState)>> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
Expand All @@ -25,7 +34,6 @@ pub async fn list_github_pulls(

let owner = owner.to_owned();
let repo = repo.to_owned();
let source_id = source_id.to_owned();
let s = stream! {
let mut page = 1u32;
loop {
Expand All @@ -46,64 +54,25 @@ pub async fn list_github_pulls(
let pages = response.number_of_pages().unwrap_or_default();

for pull in response.items {
let url = pull.html_url.map(|url| url.to_string()).unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();
let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff: String::new(),
}),
};
let id = pull_id(&pull);

// skip closed but not merged pulls
if let Some(state) = pull.state {
if state == IssueState::Closed && pull.merged_at.is_none() {
yield (StructuredDocState{
id: doc.id().to_string(),
yield (pull.number, StructuredDocState{
id: id,
updated_at: pull.updated_at.unwrap(),
deleted: true,
}, doc);
});
continue;
}
}

// Fetch the diff only if the number of changed lines is fewer than 100,000,
// assuming 80 characters per line,
// and the size of the diff is less than 8MB.
let diff = if pull.additions.unwrap_or_default() + pull.deletions.unwrap_or_default() < 100*1024 {
match octocrab.pulls(&owner, &repo).get_diff(pull.number).await {
Ok(x) => x,
Err(e) => {
logkit::error!("Failed to fetch pull request diff for {}: {}",
url, octocrab_error_message(e));
continue
}
}
} else {
String::new()
};

let doc = StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url,
title: pull.title.unwrap_or_default(),
body: pull.body.unwrap_or_default(),
diff,
merged: pull.merged_at.is_some(),
})};


yield (StructuredDocState{
id: doc.id().to_string(),
updated_at: pull.updated_at.unwrap(),
yield (pull.number, StructuredDocState{
id: id,
updated_at: pull.updated_at.unwrap_or_else(|| chrono::Utc::now()),
deleted: false,
}, doc);
});
}

page += 1;
Expand All @@ -115,3 +84,68 @@ pub async fn list_github_pulls(

Ok(s)
}

pub async fn get_github_pull_doc(
source_id: &str,
id: u64,
api_base: &str,
full_name: &str,
access_token: &str,
) -> Result<StructuredDoc> {
let octocrab = Octocrab::builder()
.personal_token(access_token.to_string())
.base_uri(api_base)?
.build()?;

let (owner, repo) = full_name
.split_once('/')
.ok_or_else(|| anyhow!("Invalid repository name"))?;

let owner = owner.to_owned();
let repo = repo.to_owned();
let source_id = source_id.to_owned();

let pull = octocrab.pulls(&owner, &repo).get(id).await.map_err(|e| {
anyhow!(
"Failed to fetch pull requests: {}",
octocrab_error_message(e)
)
})?;

let url = pull
.html_url
.map(|url| url.to_string())
.unwrap_or_else(|| pull.url);
let title = pull.title.clone().unwrap_or_default();
let body = pull.body.clone().unwrap_or_default();

// Fetch the diff only if the number of changed lines is fewer than 100,000,
// assuming 80 characters per line,
// and the size of the diff is less than 8MB.
let diff =
if pull.additions.unwrap_or_default() + pull.deletions.unwrap_or_default() < 100 * 1024 {
octocrab
.pulls(&owner, &repo)
.get_diff(pull.number)
.await
.map_err(|e| {
anyhow!(
"Failed to fetch pull request diff: {}",
octocrab_error_message(e)
)
})?
} else {
String::new()
};

Ok(StructuredDoc {
source_id: source_id.to_string(),
fields: StructuredDocFields::Pull(StructuredDocPullDocumentFields {
link: url.clone(),
title,
body,
merged: pull.merged_at.is_some(),
diff,
}),
})
}
Loading
Loading