diff --git a/src/ingester/error.rs b/src/ingester/error.rs index 12b87ef1..ed6482b7 100644 --- a/src/ingester/error.rs +++ b/src/ingester/error.rs @@ -1,3 +1,4 @@ +use crate::ingester::parser::state_update::SequenceGap; use thiserror::Error; #[derive(Error, Debug, PartialEq, Eq)] @@ -14,6 +15,8 @@ pub enum IngesterError { EmptyBatchEvent, #[error("Invalid event.")] InvalidEvent, + #[error("Sequence gap detected: {} gaps found", .0.len())] + SequenceGapDetected(Vec), } impl From for IngesterError { @@ -21,3 +24,19 @@ impl From for IngesterError { IngesterError::DatabaseError(format!("DatabaseError: {}", err)) } } + +impl From for IngesterError { + fn from(err: String) -> Self { + IngesterError::ParserError(err) + } +} + +impl From for IngesterError { + fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self { + match err { + crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => { + IngesterError::SequenceGapDetected(gaps) + } + } + } +} diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..04658130 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback )) ); @@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); continue; } @@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); continue; } @@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); } } diff --git a/src/ingester/fetchers/mod.rs b/src/ingester/fetchers/mod.rs index cc3235da..e2e768c6 100644 --- a/src/ingester/fetchers/mod.rs +++ b/src/ingester/fetchers/mod.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use async_stream::stream; use futures::{pin_mut, Stream, StreamExt}; use solana_client::nonblocking::rpc_client::RpcClient; +use tokio::sync::mpsc; -use super::typedefs::block_info::BlockInfo; +use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo}; pub mod grpc; pub mod poller; @@ -17,10 +18,11 @@ pub struct BlockStreamConfig { pub geyser_url: Option, pub max_concurrent_block_fetches: usize, pub last_indexed_slot: u64, + pub rewind_receiver: Option>, } impl BlockStreamConfig { - pub fn load_block_stream(&self) -> impl Stream> { + pub fn load_block_stream(self) -> impl Stream> { let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| { let auth_header = std::env::var("GRPC_X_TOKEN").unwrap(); get_grpc_stream_with_rpc_fallback( @@ -37,6 +39,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver, )) } else { None diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index 729d20e5..b85a2cd1 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError, }; +use tokio::sync::mpsc; use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use crate::{ - ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + ingester::{ + rewind_controller::RewindCommand, + typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + }, metric, monitor::{start_latest_slot_updater, LATEST_SLOT}, }; const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009]; -fn get_slot_stream(rpc_client: Arc, start_slot: u64) -> impl Stream { +fn get_slot_stream( + rpc_client: Arc, + start_slot: u64, + mut rewind_receiver: Option>, +) -> impl Stream { stream! { start_latest_slot_updater(rpc_client.clone()).await; let mut next_slot_to_fetch = start_slot; loop { + // Check for rewind commands + if let Some(ref mut receiver) = rewind_receiver { + while let Ok(command) = receiver.try_recv() { + match command { + RewindCommand::Rewind { to_slot, reason } => { + log::error!("Rewinding slot stream to {}: {}", to_slot, reason); + next_slot_to_fetch = to_slot; + } + } + } + } + if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) { tokio::time::sleep(std::time::Duration::from_millis(10)).await; continue; @@ -40,13 +60,14 @@ pub fn get_block_poller_stream( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + rewind_receiver: Option>, ) -> impl Stream> { stream! { let start_slot = match last_indexed_slot { 0 => 0, last_indexed_slot => last_indexed_slot + 1 }; - let slot_stream = get_slot_stream(rpc_client.clone(), start_slot); + let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver); pin_mut!(slot_stream); let block_stream = slot_stream .map(|slot| { @@ -82,17 +103,53 @@ fn pop_cached_blocks_to_index( Some(&slot) => slot, None => break, }; + + // Case 1: This block is older than what we've already indexed - discard it + if min_slot <= last_indexed_slot { + block_cache.remove(&min_slot); + continue; + } + let block: &BlockInfo = block_cache.get(&min_slot).unwrap(); + + // Case 2: Check if this block can be connected to our last indexed slot if block.metadata.parent_slot == last_indexed_slot { - last_indexed_slot = block.metadata.slot; - blocks.push(block.clone()); - block_cache.remove(&min_slot); - } else if min_slot < last_indexed_slot { - block_cache.remove(&min_slot); - } else { + // Direct succession - always allowed + // Log if there were skipped slots + if min_slot > last_indexed_slot + 1 { + // Direct parent match but there's a gap in slot numbers + log::info!( + "Block at slot {} directly follows {} (slots {}-{} were skipped)", + min_slot, + last_indexed_slot, + last_indexed_slot + 1, + min_slot - 1 + ); + } + // Process only ONE block at a time to ensure strict ordering + break; + } + + // Case 3: This block's parent is in the future - we're missing intermediate blocks + if block.metadata.parent_slot > last_indexed_slot { + // Wait for the intermediate blocks to arrive or be marked as skipped break; } + + // Case 4: This block's parent is before our last indexed slot + // This indicates a fork or invalid block - discard it + if block.metadata.parent_slot < last_indexed_slot { + log::warn!( + "Discarding block at slot {} with parent {} (last indexed: {})", + min_slot, + block.metadata.parent_slot, + last_indexed_slot + ); + block_cache.remove(&min_slot); + continue; + } } + (blocks, last_indexed_slot) } diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..25e2c8ba 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -52,6 +52,7 @@ pub async fn index_block_stream( rpc_client: Arc, last_indexed_slot_at_start: u64, end_slot: Option, + rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>, ) { pin_mut!(block_stream); let current_slot = @@ -65,16 +66,17 @@ pub async fn index_block_stream( "Backfilling historical blocks. Current number of blocks to backfill: {}", number_of_blocks_to_backfill ); - let mut last_indexed_slot = last_indexed_slot_at_start; + // let mut last_indexed_slot = last_indexed_slot_at_start; let mut finished_backfill_slot = None; while let Some(blocks) = block_stream.next().await { + let first_slot_in_block = blocks.first().unwrap().metadata.slot; let last_slot_in_block = blocks.last().unwrap().metadata.slot; - index_block_batch_with_infinite_retries(db.as_ref(), blocks).await; + index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await; - for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { - let blocks_indexed = slot - last_indexed_slot_at_start; + for slot in first_slot_in_block..(last_slot_in_block + 1) { + let blocks_indexed = slot.saturating_sub(last_indexed_slot_at_start); if blocks_indexed < number_of_blocks_to_backfill { if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { info!( @@ -92,7 +94,7 @@ pub async fn index_block_stream( info!("Indexed slot {}", slot); } } - last_indexed_slot = slot; + // last_indexed_slot = slot; } } } diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index f2934d47..42821f40 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -15,9 +15,10 @@ use sea_orm::QueryTrait; use sea_orm::Set; use sea_orm::TransactionTrait; -use self::parser::state_update::StateUpdate; +use self::parser::state_update::{SequenceGapError, StateUpdate}; use self::persist::persist_state_update; use self::persist::MAX_SQL_INSERTS; +use self::rewind_controller::{determine_rewind_slot, RewindController}; use self::typedefs::block_info::BlockInfo; use self::typedefs::block_info::BlockMetadata; use crate::dao::generated::blocks; @@ -27,20 +28,43 @@ pub mod fetchers; pub mod indexer; pub mod parser; pub mod persist; +pub mod rewind_controller; pub mod typedefs; -fn derive_block_state_update(block: &BlockInfo) -> Result { +fn derive_block_state_update( + block: &BlockInfo, + rewind_controller: Option<&RewindController>, +) -> Result { let mut state_updates: Vec = Vec::new(); for transaction in &block.transactions { state_updates.push(parse_transaction(transaction, block.metadata.slot)?); } - Ok(StateUpdate::merge_updates(state_updates)) + + match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) { + Ok(merged_update) => Ok(merged_update), + Err(SequenceGapError::GapDetected(gaps)) => { + if let Some(controller) = rewind_controller { + let rewind_slot = determine_rewind_slot(&gaps); + let reason = format!( + "Sequence gaps detected in block {}: {} gaps found", + block.metadata.slot, + gaps.len() + ); + controller.request_rewind(rewind_slot, reason)?; + } + Err(IngesterError::SequenceGapDetected(gaps)) + } + } } -pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> { +pub async fn index_block( + db: &DatabaseConnection, + block: &BlockInfo, + rewind_controller: Option<&RewindController>, +) -> Result<(), IngesterError> { let txn = db.begin().await?; index_block_metadatas(&txn, vec![&block.metadata]).await?; - persist_state_update(&txn, derive_block_state_update(block)?).await?; + persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?; txn.commit().await?; Ok(()) } @@ -81,6 +105,7 @@ async fn index_block_metadatas( pub async fn index_block_batch( db: &DatabaseConnection, block_batch: &Vec, + rewind_controller: Option<&RewindController>, ) -> Result<(), IngesterError> { let blocks_len = block_batch.len(); let tx = db.begin().await?; @@ -88,9 +113,33 @@ pub async fn index_block_batch( index_block_metadatas(&tx, block_metadatas).await?; let mut state_updates = Vec::new(); for block in block_batch { - state_updates.push(derive_block_state_update(block)?); + state_updates.push(derive_block_state_update(block, rewind_controller)?); } - persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; + + if block_batch.is_empty() { + return Ok(()); // Or return an appropriate error + } + + let merged_state_update = match StateUpdate::merge_updates_with_slot( + state_updates, + Some(block_batch.last().unwrap().metadata.slot), + ) { + Ok(merged) => merged, + Err(SequenceGapError::GapDetected(gaps)) => { + if let Some(controller) = rewind_controller { + let rewind_slot = determine_rewind_slot(&gaps); + let reason = format!( + "Sequence gaps detected in batch ending at slot {}: {} gaps found", + block_batch.last().unwrap().metadata.slot, + gaps.len() + ); + controller.request_rewind(rewind_slot, reason)?; + } + return Err(IngesterError::SequenceGapDetected(gaps)); + } + }; + + persist::persist_state_update(&tx, merged_state_update).await?; metric! { statsd_count!("blocks_indexed", blocks_len as i64); } @@ -101,10 +150,16 @@ pub async fn index_block_batch( pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, + rewind_controller: Option<&RewindController>, ) { loop { - match index_block_batch(db, &block_batch).await { + match index_block_batch(db, &block_batch, rewind_controller).await { Ok(()) => return, + Err(IngesterError::SequenceGapDetected(_)) => { + // For sequence gaps, we don't retry - we let the rewind mechanism handle it + log::error!("Sequence gap detected in batch, stopping processing to allow rewind"); + return; + } Err(e) => { let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; diff --git a/src/ingester/parser/mod.rs b/src/ingester/parser/mod.rs index aedff22f..c76c7267 100644 --- a/src/ingester/parser/mod.rs +++ b/src/ingester/parser/mod.rs @@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result), +} + #[derive(BorshDeserialize, BorshSerialize, Debug, Clone, PartialEq, Eq)] pub struct PathNode { pub node: [u8; 32], @@ -82,23 +98,23 @@ impl From for AddressQueueUpdate { } } } - +// TODO: add rewind for all sequence numbers not just address tree v1 #[derive(Default, Debug, Clone, PartialEq, Eq)] /// Representation of state update of the compression system that is optimal for simple persistence. pub struct StateUpdate { // v1 and v2 tree accounts - pub in_accounts: HashSet, + pub in_accounts: HashSet, // covered by leaf nullifications // v1 and v2 tree accounts - pub out_accounts: Vec, + pub out_accounts: Vec, // has leaf index, got v2 Merkle trees we need to pub account_transactions: HashSet, pub transactions: HashSet, - pub leaf_nullifications: HashSet, - pub indexed_merkle_tree_updates: HashMap<(Pubkey, u64), IndexedTreeLeafUpdate>, + pub leaf_nullifications: HashSet, // Has sequence number + pub indexed_merkle_tree_updates: HashMap<(Pubkey, u64), IndexedTreeLeafUpdate>, // Has sequence number // v2 state and address Merkle tree updates - pub batch_merkle_tree_events: BatchMerkleTreeEvents, + pub batch_merkle_tree_events: BatchMerkleTreeEvents, // Has sequence number // v2 input accounts that are inserted into the input queue - pub batch_nullify_context: Vec, - pub batch_new_addresses: Vec, + pub batch_nullify_context: Vec, // Has queue index we need to track this as well the same as sequence number + pub batch_new_addresses: Vec, // Has queue index we need to track this as well the same as sequence number } impl StateUpdate { @@ -106,21 +122,49 @@ impl StateUpdate { StateUpdate::default() } - pub fn merge_updates(updates: Vec) -> StateUpdate { + pub fn merge_updates(updates: Vec) -> Result { + Self::merge_updates_with_slot(updates, None) + } + + pub fn merge_updates_with_slot( + updates: Vec, + slot: Option, + ) -> Result { + #[cfg(test)] + if slot.is_some() { + use crate::ingester::parser::tree_info::TreeInfo; + TreeInfo::reset_all_sequences(); + } + let mut merged = StateUpdate::default(); + let mut detected_gaps = Vec::new(); + + let mut all_output_leaf_indices: HashMap> = HashMap::new(); + let mut all_address_queue_indices: HashMap> = HashMap::new(); for update in updates { merged.in_accounts.extend(update.in_accounts); + + for account in &update.out_accounts { + let tree_pubkey = account.account.tree.0; + all_output_leaf_indices + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push(account.account.leaf_index.0); + } merged.out_accounts.extend(update.out_accounts); merged .account_transactions .extend(update.account_transactions); merged.transactions.extend(update.transactions); + merged .leaf_nullifications .extend(update.leaf_nullifications); for (key, value) in update.indexed_merkle_tree_updates { + let (_, _leaf_index) = key; + // Insert only if the seq is higher. if let Some(existing) = merged.indexed_merkle_tree_updates.get_mut(&key) { if value.seq > existing.seq { @@ -139,14 +183,127 @@ impl StateUpdate { } } + for address_update in &update.batch_new_addresses { + let tree_pubkey = address_update.tree.0; + all_address_queue_indices + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push(address_update.queue_index); + } merged .batch_new_addresses .extend(update.batch_new_addresses); + + // Note: BatchNullifyContext gap detection requires tree association + // For now, skip since we don't have a reliable way to determine tree from account_hash merged .batch_nullify_context .extend(update.batch_nullify_context); } - merged + if let Some(slot) = slot { + #[derive(Debug)] + enum SequenceSource { + IndexedMerkleTree(u64), + LeafNullification(u64), + OutputAccount(u64), + BatchEvent(u64), + AddressQueue(u64), + } + + let mut all_sequences_by_tree: std::collections::HashMap> = + std::collections::HashMap::new(); + + for (&key, value) in &merged.indexed_merkle_tree_updates { + let (tree, _leaf_index) = key; + all_sequences_by_tree + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceSource::IndexedMerkleTree(value.seq)); + } + + for nullification in &merged.leaf_nullifications { + all_sequences_by_tree + .entry(nullification.tree) + .or_insert_with(Vec::new) + .push(SequenceSource::LeafNullification(nullification.seq)); + } + + for account in &merged.out_accounts { + let tree = account.account.tree.0; + all_sequences_by_tree + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceSource::OutputAccount(account.account.leaf_index.0)); + } + + for (tree_bytes, events) in &merged.batch_merkle_tree_events { + let tree = Pubkey::from(*tree_bytes); + for (seq, _) in events { + all_sequences_by_tree + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceSource::BatchEvent(*seq)); + } + } + + for address_update in &merged.batch_new_addresses { + let tree = address_update.tree.0; + all_sequences_by_tree + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceSource::AddressQueue(address_update.queue_index)); + } + + for (tree, mut sequences) in all_sequences_by_tree { + if sequences.is_empty() { + continue; + } + + sequences.sort_by_key(|s| match s { + SequenceSource::IndexedMerkleTree(seq) => *seq, + SequenceSource::LeafNullification(seq) => *seq, + SequenceSource::OutputAccount(seq) => *seq, + SequenceSource::BatchEvent(seq) => *seq, + SequenceSource::AddressQueue(seq) => *seq, + }); + + for sequence in sequences { + let (seq, source_name) = match sequence { + SequenceSource::IndexedMerkleTree(seq) => (seq, "indexed merkle tree"), + SequenceSource::LeafNullification(seq) => (seq, "leaf nullification"), + SequenceSource::OutputAccount(seq) => (seq, "output account"), + SequenceSource::BatchEvent(seq) => (seq, "batch event"), + SequenceSource::AddressQueue(seq) => (seq, "address queue"), + }; + + if let Some((expected_seq, actual_seq)) = + TreeInfo::check_sequence_gap(&tree, seq) + { + error!( + "{} sequence gap detected for tree {}: expected {}, got {}", + source_name, tree, expected_seq, actual_seq + ); + detected_gaps.push(SequenceGap { + tree, + expected_seq, + actual_seq, + }); + } + + // Update highest sequence for this tree + if let Err(e) = TreeInfo::update_highest_seq(&tree, seq, slot) { + error!("Failed to update highest sequence for tree {}: {}", tree, e); + } + } + } + + // TODO: Add batch nullify context queue index gap detection + } + if !detected_gaps.is_empty() { + return Err(SequenceGapError::GapDetected(detected_gaps)); + } + + Ok(merged) } } diff --git a/src/ingester/parser/tree_info.rs b/src/ingester/parser/tree_info.rs index 3c073a7d..a3d6f135 100644 --- a/src/ingester/parser/tree_info.rs +++ b/src/ingester/parser/tree_info.rs @@ -2,6 +2,7 @@ use lazy_static::lazy_static; use light_compressed_account::TreeType; use solana_pubkey::{pubkey, Pubkey}; use std::collections::HashMap; +use std::sync::{atomic::AtomicU64, Arc}; #[derive(Debug, Clone)] pub struct TreeInfo { @@ -9,6 +10,8 @@ pub struct TreeInfo { pub queue: Pubkey, pub height: u32, pub tree_type: TreeType, + pub highest_seq: std::sync::Arc, + pub last_slot: std::sync::Arc, } impl TreeInfo { @@ -31,6 +34,88 @@ impl TreeInfo { let pubkey = Pubkey::from(*tree_bytes); Self::get_tree_type(&pubkey) } + + pub fn get_highest_seq(pubkey: &Pubkey) -> Option { + let tree_pubkey_str = pubkey.to_string(); + Self::get(&tree_pubkey_str) + .map(|info| info.highest_seq.load(std::sync::atomic::Ordering::Acquire)) + } + + pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result { + let tree_pubkey_str = pubkey.to_string(); + + if let Some(tree_info) = Self::get(&tree_pubkey_str) { + let current = tree_info + .highest_seq + .load(std::sync::atomic::Ordering::Acquire); + log::info!( + "Updating highest sequence for tree {} from {} to {}", + tree_pubkey_str, + current, + new_seq + ); + if new_seq > current { + tree_info + .highest_seq + .store(new_seq, std::sync::atomic::Ordering::Release); + tree_info + .last_slot + .store(slot, std::sync::atomic::Ordering::Release); + Ok(new_seq) + } else { + Ok(current) + } + } else { + Err(format!("Tree {} not found in mapping", pubkey)) + } + } + + pub fn get_last_slot_for_seq(pubkey: &Pubkey, target_seq: u64) -> Option { + let tree_pubkey_str = pubkey.to_string(); + if let Some(tree_info) = Self::get(&tree_pubkey_str) { + let current_seq = tree_info + .highest_seq + .load(std::sync::atomic::Ordering::Acquire); + if target_seq <= current_seq { + // Return the slot where this sequence was last processed + Some( + tree_info + .last_slot + .load(std::sync::atomic::Ordering::Acquire), + ) + } else { + None + } + } else { + None + } + } + pub fn check_sequence_gap(pubkey: &Pubkey, incoming_seq: u64) -> Option<(u64, u64)> { + if let Some(current_highest) = Self::get_highest_seq(pubkey) { + // We init with 0, we cannot crash on 0 + if current_highest == 0 { + return None; + } + + let expected_seq = current_highest + 1; + if incoming_seq != expected_seq { + return Some((expected_seq, incoming_seq)); + } + } + None + } + + /// Reset all atomic counters for tests - only used in test environments + pub fn reset_all_sequences() { + for tree_info in QUEUE_TREE_MAPPING.values() { + tree_info + .highest_seq + .store(0, std::sync::atomic::Ordering::Release); + tree_info + .last_slot + .store(0, std::sync::atomic::Ordering::Release); + } + } } // TODO: add a table which stores tree metadata: tree_pubkey | queue_pubkey | type | ... @@ -198,6 +283,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::StateV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -208,6 +295,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::StateV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -220,6 +309,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::AddressV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -230,6 +321,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::AddressV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -274,6 +367,8 @@ lazy_static! { queue: *queue, height: 32, tree_type: TreeType::StateV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -284,6 +379,8 @@ lazy_static! { queue: *queue, height: 32, tree_type: TreeType::StateV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -296,6 +393,8 @@ lazy_static! { queue: *tree_queue, height: 40, tree_type: TreeType::AddressV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } diff --git a/src/ingester/parser/tx_event_parser.rs b/src/ingester/parser/tx_event_parser.rs index b146f859..7efb7c0a 100644 --- a/src/ingester/parser/tx_event_parser.rs +++ b/src/ingester/parser/tx_event_parser.rs @@ -68,7 +68,7 @@ pub fn create_state_update_v1( Some(info) => info.clone(), None => { if super::SKIP_UNKNOWN_TREES { - log::warn!("Skipping unknown tree: {}", tree.to_string()); + log::error!("Skipping unknown tree: {}", tree.to_string()); continue; } else { return Err(IngesterError::ParserError(format!( diff --git a/src/ingester/parser/tx_event_parser_v2.rs b/src/ingester/parser/tx_event_parser_v2.rs index 389dfd74..ce0b0734 100644 --- a/src/ingester/parser/tx_event_parser_v2.rs +++ b/src/ingester/parser/tx_event_parser_v2.rs @@ -148,6 +148,6 @@ pub fn create_state_update_v2( state_updates.push(state_update_event); } - let merged = StateUpdate::merge_updates(state_updates); + let merged = StateUpdate::merge_updates(state_updates)?; Ok(merged) } diff --git a/src/ingester/rewind_controller.rs b/src/ingester/rewind_controller.rs new file mode 100644 index 00000000..05043f0f --- /dev/null +++ b/src/ingester/rewind_controller.rs @@ -0,0 +1,64 @@ +use tokio::sync::mpsc; +use tracing::error; + +#[derive(Debug, Clone)] +pub enum RewindCommand { + Rewind { to_slot: u64, reason: String }, +} + +#[derive(Clone)] +pub struct RewindController { + sender: mpsc::UnboundedSender, +} + +impl RewindController { + pub fn new() -> (Self, mpsc::UnboundedReceiver) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Self { sender }, receiver) + } + + pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), String> { + let command = RewindCommand::Rewind { + to_slot, + reason: reason.clone(), + }; + + error!("Requesting rewind to slot {}: {}", to_slot, reason); + + self.sender.send(command).map_err(|e| { + error!("Failed to send rewind command: {}", e); + format!("Failed to send rewind command: {}", e) + }) + } +} + +pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 { + use crate::ingester::parser::tree_info::TreeInfo; + + // Find the earliest slot where we need to rewind to get the missing sequence + let mut earliest_slot = u64::MAX; + + for gap in gaps { + // Try to find the exact slot for the last known good sequence + let target_seq = gap.expected_seq.saturating_sub(1); + if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) { + // Rewind to just before this slot to ensure we reprocess + earliest_slot = earliest_slot.min(slot.saturating_sub(1)); + } else { + // Fallback: conservative approach if we can't find the exact slot + // This handles the case where this is the first sequence for this tree + earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10)); + } + } + + // Ensure we don't rewind to slot 0 unless explicitly needed + if earliest_slot == u64::MAX { + // No valid slots found, use conservative fallback + gaps.iter() + .map(|gap| gap.expected_seq.saturating_sub(10)) + .min() + .unwrap_or(0) + } else { + earliest_slot + } +} diff --git a/src/main.rs b/src/main.rs index 185013ac..fefba199 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use photon_indexer::ingester::fetchers::BlockStreamConfig; use photon_indexer::ingester::indexer::{ fetch_last_indexed_slot_with_infinite_retry, index_block_stream, }; +use photon_indexer::ingester::rewind_controller::RewindController; use photon_indexer::migration::{ sea_orm::{DatabaseBackend, DatabaseConnection, SqlxPostgresConnector, SqlxSqliteConnector}, Migrator, MigratorTrait, @@ -174,6 +175,7 @@ fn continously_index_new_blocks( db: Arc, rpc_client: Arc, last_indexed_slot: u64, + rewind_controller: RewindController, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let block_stream = block_stream_config.load_block_stream(); @@ -183,6 +185,7 @@ fn continously_index_new_blocks( rpc_client.clone(), last_indexed_slot, None, + Some(&rewind_controller), ) .await; }) @@ -234,6 +237,7 @@ async fn main() { rpc_client.clone(), last_indexed_slot, Some(last_slot), + None, // No rewind controller for snapshot processing ) .await; } @@ -277,11 +281,15 @@ async fn main() { .unwrap(), }; + // Create rewind controller for gap detection + let (rewind_controller, rewind_receiver) = RewindController::new(); + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), max_concurrent_block_fetches, last_indexed_slot, geyser_url: args.grpc_url, + rewind_receiver: Some(rewind_receiver), }; ( @@ -290,6 +298,7 @@ async fn main() { db_conn.clone(), rpc_client.clone(), last_indexed_slot, + rewind_controller, )), Some(continously_monitor_photon( db_conn.clone(), diff --git a/src/migration/migrations/custom/custom20250211_000002_solayer2.rs b/src/migration/migrations/custom/custom20250211_000002_solayer2.rs deleted file mode 100644 index f4840c9c..00000000 --- a/src/migration/migrations/custom/custom20250211_000002_solayer2.rs +++ /dev/null @@ -1,68 +0,0 @@ -use sea_orm_migration::prelude::*; -use sea_orm_migration::sea_orm::{ConnectionTrait, DatabaseBackend, Statement}; -use solana_program::pubkey::Pubkey; -use std::str::FromStr; - -use crate::migration::model::table::Accounts; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -async fn execute_sql<'a>(manager: &SchemaManager<'_>, sql: &str) -> Result<(), DbErr> { - manager - .get_connection() - .execute(Statement::from_string( - manager.get_database_backend(), - sql.to_string(), - )) - .await?; - Ok(()) -} - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let solayer_accounts = vec![ - "ARDPkhymCbfdan375FCgPnBJQvUfHeb7nHVdBfwWSxrp", - "2sYfW81EENCMe415CPhE2XzBA5iQf4TXRs31W1KP63YT", - ]; - // Encode the accounts as hex strings - let encoded_accounts = solayer_accounts - .iter() - .map(|account| { - let pubkey = Pubkey::from_str(account).unwrap(); - format!("\\x{}", hex::encode(pubkey.to_bytes())) - }) - .collect::>() - .join("', '"); - - if manager.get_database_backend() == DatabaseBackend::Postgres { - // Create index concurrently for Postgres - execute_sql( - manager, - &format!( - "CREATE INDEX CONCURRENTLY IF NOT EXISTS solayer_account_index2 ON accounts (spent, owner, substring(data, 1, 32)) \ - WHERE owner IN ('{}');", - encoded_accounts - ), - ) - .await?; - } - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - if manager.get_database_backend() == DatabaseBackend::Postgres { - manager - .drop_index( - Index::drop() - .name("solayer_account_index2") - .table(Accounts::Table) - .to_owned(), - ) - .await?; - } - - Ok(()) - } -} diff --git a/src/migration/migrations/custom/custom20252201_000001_init.rs b/src/migration/migrations/custom/custom20252201_000001_init.rs deleted file mode 100644 index 1ccb603f..00000000 --- a/src/migration/migrations/custom/custom20252201_000001_init.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::str::FromStr; - -use sea_orm_migration::prelude::*; -use sea_orm_migration::sea_orm::{ConnectionTrait, DatabaseBackend, Statement}; -use solana_sdk::pubkey::Pubkey; - -use crate::migration::model::table::Accounts; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -async fn execute_sql<'a>(manager: &SchemaManager<'_>, sql: &str) -> Result<(), DbErr> { - manager - .get_connection() - .execute(Statement::from_string( - manager.get_database_backend(), - sql.to_string(), - )) - .await?; - Ok(()) -} - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - let solayer_accounts = vec![ - "S1ay5sk6FVkvsNFZShMw2YK3nfgJZ8tpBBGuHWDZ266", - "2sYfW81EENCMe415CPhE2XzBA5iQf4TXRs31W1KP63YT", - ]; - // Encode the accounts as hex strings - let encoded_accounts = solayer_accounts - .iter() - .map(|account| { - let pubkey = Pubkey::from_str(account).unwrap(); - format!("\\x{}", hex::encode(pubkey.to_bytes())) - }) - .collect::>() - .join("', '"); - - if manager.get_database_backend() == DatabaseBackend::Postgres { - // Create index concurrently for Postgres - execute_sql( - manager, - &format!( - "CREATE INDEX CONCURRENTLY IF NOT EXISTS solayer_account_index ON accounts (spent, owner, substring(data, 1, 32)) \ - WHERE owner IN ('{}');", - encoded_accounts - ), - ) - .await?; - } - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - if manager.get_database_backend() == DatabaseBackend::Postgres { - manager - .drop_index( - Index::drop() - .name("solayer_account_index") - .table(Accounts::Table) - .to_owned(), - ) - .await?; - } - - Ok(()) - } -} diff --git a/src/migration/migrations/custom/mod.rs b/src/migration/migrations/custom/mod.rs deleted file mode 100644 index 47fddc25..00000000 --- a/src/migration/migrations/custom/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -use sea_orm_migration::MigrationTrait; - -pub mod custom20250211_000002_solayer2; -pub mod custom20252201_000001_init; - -pub fn get_custom_migrations() -> Vec> { - vec![ - Box::new(custom20252201_000001_init::Migration), - Box::new(custom20250211_000002_solayer2::Migration), - ] -} diff --git a/src/migration/migrations/mod.rs b/src/migration/migrations/mod.rs index 471f5364..be70b93c 100644 --- a/src/migration/migrations/mod.rs +++ b/src/migration/migrations/mod.rs @@ -1,2 +1,2 @@ -pub mod custom; +// pub mod custom; pub mod standard; diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 8db11731..9eb0814d 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -1,4 +1,4 @@ -use migrations::{custom::get_custom_migrations, standard::get_standard_migrations}; +use migrations::standard::get_standard_migrations; pub use sea_orm_migration::prelude::*; @@ -21,7 +21,7 @@ impl MigratorTrait for MigractorWithCustomMigrations { fn migrations() -> Vec> { get_standard_migrations() .into_iter() - .chain(get_custom_migrations()) + // .chain(get_custom_migrations()) .collect() } } diff --git a/src/snapshot/mod.rs b/src/snapshot/mod.rs index ee669507..38045615 100644 --- a/src/snapshot/mod.rs +++ b/src/snapshot/mod.rs @@ -451,11 +451,12 @@ pub async fn update_snapshot( incremental_snapshot_interval_slots: u64, ) { // Convert stream to iterator + let last_indexed_slot = block_stream_config.last_indexed_slot; let block_stream = block_stream_config.load_block_stream(); update_snapshot_helper( directory_adapter, block_stream, - block_stream_config.last_indexed_slot, + last_indexed_slot, incremental_snapshot_interval_slots, full_snapshot_interval_slots, ) diff --git a/src/snapshot/snapshotter/main.rs b/src/snapshot/snapshotter/main.rs index 9e6c9735..826b0dec 100644 --- a/src/snapshot/snapshotter/main.rs +++ b/src/snapshot/snapshotter/main.rs @@ -256,6 +256,7 @@ async fn main() { max_concurrent_block_fetches: args.max_concurrent_block_fetches.unwrap_or(20), last_indexed_slot, geyser_url: args.grpc_url.clone(), + rewind_receiver: None, // No rewind receiver for snapshotter }, args.incremental_snapshot_interval_slots, args.snapshot_interval_slots, diff --git a/tests/integration_tests/e2e_tests.rs b/tests/integration_tests/e2e_tests.rs index cfb1bc89..b7219c68 100644 --- a/tests/integration_tests/e2e_tests.rs +++ b/tests/integration_tests/e2e_tests.rs @@ -78,6 +78,7 @@ fn all_indexing_methodologies( }, ..Default::default() }, + None, ).await .unwrap(); @@ -497,7 +498,7 @@ async fn test_index_block_metadata( let slot = 254170887; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let filter = blocks::Column::Slot.eq(block.metadata.slot); let block_model = blocks::Entity::find() @@ -523,12 +524,12 @@ async fn test_index_block_metadata( assert_eq!(block_model.block_time, 1710441678); // Verify that we don't get an error if we try to index the same block again - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); assert_eq!(setup.api.get_indexer_slot().await.unwrap().0, slot); // Verify that get_indexer_slot() gets updated a new block is indexed. let block = cached_fetch_block(&setup.name, setup.client.clone(), slot + 1).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); assert_eq!(setup.api.get_indexer_slot().await.unwrap().0, slot + 1); } @@ -551,7 +552,7 @@ async fn test_get_latest_non_voting_signatures( let slot = 270893658; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let all_nonvoting_transactions = setup .api .get_latest_non_voting_signatures(GetLatestSignaturesRequest { @@ -586,7 +587,7 @@ async fn test_get_latest_non_voting_signatures_with_failures( let slot = 279620356; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let all_nonvoting_transactions = setup .api .get_latest_non_voting_signatures(GetLatestSignaturesRequest { diff --git a/tests/integration_tests/gap_detection_tests.rs b/tests/integration_tests/gap_detection_tests.rs new file mode 100644 index 00000000..419a9dfc --- /dev/null +++ b/tests/integration_tests/gap_detection_tests.rs @@ -0,0 +1,308 @@ +use light_compressed_account::TreeType; +use photon_indexer::common::typedefs::account::AccountWithContext; +use photon_indexer::common::typedefs::account::{Account, AccountContext}; +use photon_indexer::common::typedefs::{ + hash::Hash, serializable_pubkey::SerializablePubkey, unsigned_integer::UnsignedInteger, +}; +use photon_indexer::ingester::parser::indexer_events::RawIndexedElement; +use photon_indexer::ingester::parser::state_update::{ + IndexedTreeLeafUpdate, LeafNullification, SequenceGapError, StateUpdate, +}; +use solana_pubkey::pubkey; + +fn create_indexed_tree_update( + tree: solana_pubkey::Pubkey, + leaf_index: u64, + seq: u64, +) -> IndexedTreeLeafUpdate { + IndexedTreeLeafUpdate { + tree, + tree_type: TreeType::AddressV1, + hash: [0u8; 32], + leaf: RawIndexedElement { + value: [0u8; 32], + next_index: 0, + next_value: [0u8; 32], + index: leaf_index as usize, + }, + seq, + } +} + +fn create_leaf_nullification(tree: solana_pubkey::Pubkey, seq: u64) -> LeafNullification { + LeafNullification { + tree, + leaf_index: 0, + seq, + signature: solana_sdk::signature::Signature::default(), + } +} + +fn create_account_with_context(tree: solana_pubkey::Pubkey, leaf_index: u32) -> AccountWithContext { + AccountWithContext { + account: Account { + hash: Hash::new(&[0u8; 32]).unwrap(), + data: None, + owner: SerializablePubkey::try_from([0u8; 32]).unwrap(), + lamports: UnsignedInteger(0), + address: None, + tree: SerializablePubkey::from(tree), + leaf_index: UnsignedInteger(leaf_index as u64), + seq: None, + slot_created: UnsignedInteger(0), + }, + context: AccountContext { + queue: SerializablePubkey::try_from([0u8; 32]).unwrap(), + in_output_queue: false, + spent: false, + nullified_in_tree: false, + nullifier_queue_index: None, + nullifier: None, + tx_hash: None, + tree_type: 0, + }, + } +} + +#[test] +fn test_no_gap_sequential_sequences() { + let tree = pubkey!("BPF9L8vwCHcqW3xrLHgVrwCzxxH6VbSk1KDHhE1ZBFP9"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 1)); + state_update + .indexed_merkle_tree_updates + .insert((tree, 1), create_indexed_tree_update(tree, 1, 2)); + state_update + .indexed_merkle_tree_updates + .insert((tree, 2), create_indexed_tree_update(tree, 2, 3)); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_ok()); +} + +#[test] +fn test_gap_detected_indexed_merkle_tree() { + let tree = pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 1)); + state_update.indexed_merkle_tree_updates.insert( + (tree, 1), + create_indexed_tree_update(tree, 1, 3), // Gap here + ); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_err()); + + if let Err(SequenceGapError::GapDetected(gaps)) = result { + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].tree, tree); + assert_eq!(gaps[0].expected_seq, 2); + assert_eq!(gaps[0].actual_seq, 3); + } +} + +#[test] +fn test_gap_detected_leaf_nullifications() { + let tree = pubkey!("smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT"); + let mut state_update = StateUpdate::new(); + + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree, 1)); + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree, 2)); + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree, 4)); // Gap here (missing 3) + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_err()); + + if let Err(SequenceGapError::GapDetected(gaps)) = result { + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].tree, tree); + assert_eq!(gaps[0].expected_seq, 3); + assert_eq!(gaps[0].actual_seq, 4); + } +} + +#[test] +fn test_gap_detected_output_accounts() { + // Use an actual StateV1 tree for testing output accounts + let tree = pubkey!("smt2rJAFdyJJupwMKAqTNAJwvjhmiZ4JYGZmbVRw1Ho"); + let mut state_update = StateUpdate::new(); + + state_update + .out_accounts + .push(create_account_with_context(tree, 1)); + state_update + .out_accounts + .push(create_account_with_context(tree, 2)); + state_update + .out_accounts + .push(create_account_with_context(tree, 4)); // Gap here (missing 3) + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_err()); + + if let Err(SequenceGapError::GapDetected(gaps)) = result { + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].tree, tree); + assert_eq!(gaps[0].expected_seq, 3); + assert_eq!(gaps[0].actual_seq, 4); + } +} + +#[test] +fn test_multiple_gaps_detected() { + let tree1 = pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"); + let tree2 = pubkey!("smt3AFtReRGVcrP11D6bSLEaKdUmrGfaTNowMVccJeu"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree1, 0), create_indexed_tree_update(tree1, 0, 1)); + state_update + .indexed_merkle_tree_updates + .insert((tree1, 1), create_indexed_tree_update(tree1, 1, 2)); + state_update.indexed_merkle_tree_updates.insert( + (tree1, 2), + create_indexed_tree_update(tree1, 2, 4), // Gap in tree1 (missing 3) + ); + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree2, 1)); + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree2, 2)); + state_update + .leaf_nullifications + .insert(create_leaf_nullification(tree2, 4)); // Gap in tree2 (missing 3) + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_err()); + + if let Err(SequenceGapError::GapDetected(gaps)) = result { + assert_eq!(gaps.len(), 2); + } +} + +#[test] +fn test_deduplication_no_false_gaps() { + let tree = pubkey!("BPF9L8vwCHcqW3xrLHgVrwCzxxH6VbSk1KDHhE1YBFPA"); + let mut state_update1 = StateUpdate::new(); + let mut state_update2 = StateUpdate::new(); + + state_update1 + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 1)); + state_update1 + .indexed_merkle_tree_updates + .insert((tree, 1), create_indexed_tree_update(tree, 1, 2)); + + state_update2.indexed_merkle_tree_updates.insert( + (tree, 0), + create_indexed_tree_update(tree, 0, 3), // Overwrites seq 1 + ); + + let result = + StateUpdate::merge_updates_with_slot(vec![state_update1, state_update2], Some(100)); + assert!(result.is_ok()); +} + +#[test] +fn test_empty_state_update() { + let result = StateUpdate::merge_updates_with_slot(vec![StateUpdate::new()], Some(100)); + assert!(result.is_ok()); +} + +#[test] +fn test_single_sequence_no_gap() { + let tree = pubkey!("BPF9L8vwCHcqW3xrLHgVrwCzxxH6VbSk1KDHhE1ZBFP8"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 5)); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_ok()); +} + +#[test] +fn test_out_of_order_sequences() { + let tree = pubkey!("BPF9L8vwCHcqW3xrLHgVrwCzxxH6VbSk1KDHhE1YBFPB"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 2), create_indexed_tree_update(tree, 2, 3)); + state_update + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 1)); + state_update + .indexed_merkle_tree_updates + .insert((tree, 1), create_indexed_tree_update(tree, 1, 2)); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_ok()); +} + +#[test] +fn test_gap_detected_batch_new_addresses() { + let tree = pubkey!("amt2kaJA14v3urZbZvnc5v2np8jqvc4Z8zDep5wbtzx"); + let mut state_update = StateUpdate::new(); + + use photon_indexer::common::typedefs::serializable_pubkey::SerializablePubkey; + use photon_indexer::ingester::parser::state_update::AddressQueueUpdate; + + state_update.batch_new_addresses.push(AddressQueueUpdate { + tree: SerializablePubkey::from(tree), + address: [1u8; 32], + queue_index: 1, + }); + state_update.batch_new_addresses.push(AddressQueueUpdate { + tree: SerializablePubkey::from(tree), + address: [2u8; 32], + queue_index: 2, + }); + state_update.batch_new_addresses.push(AddressQueueUpdate { + tree: SerializablePubkey::from(tree), + address: [3u8; 32], + queue_index: 4, // Gap here (missing 3) + }); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_err()); + + if let Err(SequenceGapError::GapDetected(gaps)) = result { + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0].tree, tree); + assert_eq!(gaps[0].expected_seq, 3); + assert_eq!(gaps[0].actual_seq, 4); + } +} + +#[test] +fn test_starting_from_snapshot_no_gap() { + let tree = pubkey!("BPF9L8vwCHcqW3xrLHgVrwCzxxH6VbSk1KDHhE1ZBFP3"); + let mut state_update = StateUpdate::new(); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 0), create_indexed_tree_update(tree, 0, 4256)); + + state_update + .indexed_merkle_tree_updates + .insert((tree, 1), create_indexed_tree_update(tree, 1, 4257)); + + let result = StateUpdate::merge_updates_with_slot(vec![state_update], Some(100)); + assert!(result.is_ok()); +} diff --git a/tests/integration_tests/main.rs b/tests/integration_tests/main.rs index a0f68b11..c9c8ccd1 100644 --- a/tests/integration_tests/main.rs +++ b/tests/integration_tests/main.rs @@ -5,6 +5,7 @@ mod batched_address_tree_tests; mod batched_state_tree_tests; mod e2e_tests; +mod gap_detection_tests; mod mock_tests; mod open_api_tests; mod prod_tests; diff --git a/tests/integration_tests/mock_tests.rs b/tests/integration_tests/mock_tests.rs index 7f3347be..aefe5c35 100644 --- a/tests/integration_tests/mock_tests.rs +++ b/tests/integration_tests/mock_tests.rs @@ -80,6 +80,7 @@ async fn test_persist_state_update_basic( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -170,6 +171,7 @@ async fn test_multiple_accounts( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -453,6 +455,7 @@ async fn test_persist_token_data( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1072,6 +1075,7 @@ async fn test_get_multiple_new_address_proofs( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1114,6 +1118,7 @@ async fn test_get_multiple_new_address_proofs_interop( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1195,6 +1200,7 @@ async fn load_test(#[values(DatabaseBackend::Postgres)] db_backend: DatabaseBack }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1261,6 +1267,7 @@ async fn test_persisted_state_trees_bug_with_latter_smaller_seq_values( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1347,6 +1354,7 @@ async fn test_gpa_filters( }, ..Default::default() }, + None, ) .await .unwrap(); diff --git a/tests/integration_tests/utils.rs b/tests/integration_tests/utils.rs index e264519e..27ff00f0 100644 --- a/tests/integration_tests/utils.rs +++ b/tests/integration_tests/utils.rs @@ -465,7 +465,7 @@ pub async fn index_multiple_transactions( let tx_state_update = parse_transaction(&transaction_info, 0).unwrap(); state_updates.push(tx_state_update); } - let state_update = StateUpdate::merge_updates(state_updates); + let state_update = StateUpdate::merge_updates(state_updates).unwrap(); persist_state_update_using_connection(db_conn.as_ref(), state_update) .await .unwrap(); @@ -555,6 +555,7 @@ pub async fn index( }, ..Default::default() }, + None, ) .await .unwrap();