Skip to content

Commit

Permalink
support rollback for field indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
baichuan3 committed Feb 9, 2025
1 parent 474ea92 commit 246cfc4
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 161 deletions.
43 changes: 41 additions & 2 deletions crates/rooch-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ use raw_store::traits::DBStore;
use raw_store::{rocks::RocksDB, StoreInstance};
use rooch_config::store_config::StoreConfig;
use rooch_indexer::store::traits::IndexerStoreTrait;
use rooch_indexer::{indexer_reader::IndexerReader, IndexerStore};
use rooch_indexer::{indexer_reader::IndexerReader, list_field_indexer_keys, IndexerStore};
use rooch_store::meta_store::{MetaStore, SEQUENCER_INFO_KEY};
use rooch_store::state_store::StateStore;
use rooch_store::transaction_store::TransactionStore;
use rooch_store::{
RoochStore, META_SEQUENCER_INFO_COLUMN_FAMILY_NAME, STATE_CHANGE_SET_COLUMN_FAMILY_NAME,
TRANSACTION_COLUMN_FAMILY_NAME, TX_SEQUENCE_INFO_MAPPING_COLUMN_FAMILY_NAME,
};
use rooch_types::indexer::field::{
collect_revert_field_change_ids, handle_revert_field_change, IndexerFieldChanges,
};
use rooch_types::indexer::state::{
collect_revert_object_change_ids, handle_revert_object_change, IndexerObjectStateChangeSet,
IndexerObjectStatesIndexGenerator,
Expand Down Expand Up @@ -331,7 +334,8 @@ impl RoochDB {
let mut state_index_generator = IndexerObjectStatesIndexGenerator::default();
let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default();

for (_filed_key, object_change) in state_change_set_ext.state_change_set.changes {
for (_filed_key, object_change) in state_change_set_ext.state_change_set.changes.clone()
{
handle_revert_object_change(
&mut state_index_generator,
tx_order,
Expand All @@ -343,6 +347,41 @@ impl RoochDB {
self.indexer_store
.apply_object_states(indexer_object_state_change_set)
.map_err(|e| anyhow!(format!("Revert indexer states error: {:?}", e)))?;

//4. revert indexer field
let field_indexer_ids = list_field_indexer_keys(&resolver)?;

let mut field_object_ids = vec![];
for (_filed_key, object_change) in state_change_set_ext.state_change_set.changes.clone()
{
collect_revert_field_change_ids(
&field_indexer_ids,
object_change,
&mut field_object_ids,
)?;
}

let field_object_mapping = resolver
.get_states(AccessPath::objects(field_object_ids))?
.into_iter()
.flatten()
.map(|v| (v.metadata.id.clone(), v))
.collect::<HashMap<_, _>>();

let mut field_changes = IndexerFieldChanges::default();
for (field_key, object_change) in state_change_set_ext.state_change_set.changes.clone()
{
handle_revert_field_change(
field_key,
object_change,
&mut field_changes,
&field_indexer_ids,
&field_object_mapping,
)?;
}
self.indexer_store
.apply_fields(field_changes)
.map_err(|e| anyhow!(format!("Revert indexer field error: {:?}", e)))?;
};
Ok(())
}
Expand Down
46 changes: 8 additions & 38 deletions crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,23 @@ use crate::actor::messages::{
IndexerTransactionMessage, UpdateIndexerMessage,
};
use crate::store::traits::IndexerStoreTrait;
use crate::IndexerStore;
use crate::{list_field_indexer_keys, IndexerStore};
use anyhow::Result;
use async_trait::async_trait;
use coerce::actor::{context::ActorContext, message::Handler, Actor, LocalActorRef};
use moveos_store::MoveOSStore;
use moveos_types::move_types::type_tag_match;
use moveos_types::moveos_std::object::{is_dynamic_field_type, ObjectID, ObjectMeta, RawField};
use moveos_types::state::MoveType;
use moveos_types::state_resolver::{RootObjectResolver, StateResolver};
use moveos_types::moveos_std::object::{ObjectID, ObjectMeta};
use moveos_types::state_resolver::RootObjectResolver;
use moveos_types::transaction::MoveAction;
use rooch_event::actor::EventActor;
use rooch_types::framework::indexer::IndexerModule;
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::field::{
handle_field_change, parse_dynamic_field_type_tags, IndexerFieldChanges,
};
use rooch_types::indexer::field::{handle_field_change, IndexerFieldChanges};
use rooch_types::indexer::state::{
handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet,
IndexerObjectStatesIndexGenerator, ObjectStateType,
};
use rooch_types::indexer::transaction::IndexerTransaction;

pub const MAX_LIST_FIELD_SIZE: usize = 200;
pub struct IndexerActor {
root: ObjectMeta,
indexer_store: IndexerStore,
Expand All @@ -53,33 +47,9 @@ impl IndexerActor {
}

// TODO use EventBus to trigger field indexer update
pub fn list_field_indexer_keys(&self) -> Result<Vec<ObjectID>> {
pub fn get_all_field_indexer_keys(&self) -> Result<Vec<ObjectID>> {
let resolver = RootObjectResolver::new(self.root.clone(), &self.moveos_store);

let field_indexer_object_id = IndexerModule::field_indexer_object_id();
let states = resolver.list_fields(&field_indexer_object_id, None, MAX_LIST_FIELD_SIZE)?;

let data = states
.into_iter()
.filter_map(|state| {
let object_type = state.1.metadata.object_type;
if !is_dynamic_field_type(&object_type) {
return None;
}

parse_dynamic_field_type_tags(&object_type).and_then(|(name_type, value_type)| {
if !type_tag_match(&ObjectID::type_tag(), &name_type) {
return None;
}

RawField::parse_unchecked_field(state.1.value.as_slice(), name_type, value_type)
.ok()
.and_then(|raw_field| bcs::from_bytes::<ObjectID>(&raw_field.name).ok())
})
})
.collect();

Ok(data)
list_field_indexer_keys(&resolver)
}
}

Expand Down Expand Up @@ -139,14 +109,14 @@ impl Handler<UpdateIndexerMessage> for IndexerActor {
.apply_object_states(indexer_object_state_change_set)?;

//4. update indexer field
let field_indexer_ids = self.list_field_indexer_keys()?;
let field_indexer_ids = self.get_all_field_indexer_keys()?;
let mut field_changes = IndexerFieldChanges::default();
for (field_key, object_change) in state_change_set.changes {
let _ = handle_field_change(
field_key,
object_change,
&mut field_changes,
field_indexer_ids.clone(),
&field_indexer_ids,
)?;
}
self.indexer_store.apply_fields(field_changes)?;
Expand Down
40 changes: 39 additions & 1 deletion crates/rooch-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ use diesel::sqlite::SqliteConnection;
use diesel::ConnectionError::BadConnection;
use diesel::RunQueryDsl;
use errors::IndexerError;
use moveos_store::MoveOSStore;
use moveos_types::move_types::type_tag_match;
use moveos_types::moveos_std::object::{
is_dynamic_field_type, parse_dynamic_field_type_tags, ObjectID, RawField,
};
use moveos_types::state::MoveType;
use moveos_types::state_resolver::{RootObjectResolver, StateResolver};
use once_cell::sync::Lazy;
use prometheus::Registry;
use rooch_types::framework::indexer::IndexerModule;
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::field::{IndexerField, IndexerFieldChanges};
use rooch_types::indexer::state::{
Expand Down Expand Up @@ -41,7 +49,7 @@ pub mod utils;

/// Type alias to improve readability.
pub type IndexerResult<T> = Result<T, IndexerError>;

pub const MAX_LIST_FIELD_SIZE: usize = 200;
pub const DEFAULT_BUSY_TIMEOUT: u64 = 5000; // millsecond
pub type IndexerTableName = &'static str;
pub const INDEXER_EVENTS_TABLE_NAME: IndexerTableName = "events";
Expand Down Expand Up @@ -427,3 +435,33 @@ pub fn get_sqlite_pool_connection(
))
})
}

pub fn list_field_indexer_keys(
resolver: &RootObjectResolver<MoveOSStore>,
) -> Result<Vec<ObjectID>> {
// let resolver = RootObjectResolver::new(self.root.clone(), &self.moveos_store);
let field_indexer_object_id = IndexerModule::field_indexer_object_id();
let states = resolver.list_fields(&field_indexer_object_id, None, MAX_LIST_FIELD_SIZE)?;

let data = states
.into_iter()
.filter_map(|state| {
let object_type = state.1.metadata.object_type;
if !is_dynamic_field_type(&object_type) {
return None;
}

parse_dynamic_field_type_tags(&object_type).and_then(|(name_type, value_type)| {
if !type_tag_match(&ObjectID::type_tag(), &name_type) {
return None;
}

RawField::parse_unchecked_field(state.1.value.as_slice(), name_type, value_type)
.ok()
.and_then(|raw_field| bcs::from_bytes::<ObjectID>(&raw_field.name).ok())
})
})
.collect();

Ok(data)
}
Loading

0 comments on commit 246cfc4

Please sign in to comment.