diff --git a/src/ingester/error.rs b/src/ingester/error.rs index 12b87ef1..23fc6f05 100644 --- a/src/ingester/error.rs +++ b/src/ingester/error.rs @@ -1,4 +1,5 @@ use thiserror::Error; +use crate::ingester::parser::state_update::SequenceGap; #[derive(Error, Debug, PartialEq, Eq)] pub enum IngesterError { @@ -14,6 +15,8 @@ pub enum IngesterError { EmptyBatchEvent, #[error("Invalid event.")] InvalidEvent, + #[error("Sequence gap detected: {} gaps found", .0.len())] + SequenceGapDetected(Vec), } impl From for IngesterError { @@ -21,3 +24,19 @@ impl From for IngesterError { IngesterError::DatabaseError(format!("DatabaseError: {}", err)) } } + +impl From for IngesterError { + fn from(err: String) -> Self { + IngesterError::ParserError(err) + } +} + +impl From for IngesterError { + fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self { + match err { + crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => { + IngesterError::SequenceGapDetected(gaps) + } + } + } +} diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..04658130 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback )) ); @@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); continue; } @@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); continue; } @@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for grpc fallback ))); } } diff --git a/src/ingester/fetchers/mod.rs b/src/ingester/fetchers/mod.rs index cc3235da..e2e768c6 100644 --- a/src/ingester/fetchers/mod.rs +++ b/src/ingester/fetchers/mod.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use async_stream::stream; use futures::{pin_mut, Stream, StreamExt}; use solana_client::nonblocking::rpc_client::RpcClient; +use tokio::sync::mpsc; -use super::typedefs::block_info::BlockInfo; +use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo}; pub mod grpc; pub mod poller; @@ -17,10 +18,11 @@ pub struct BlockStreamConfig { pub geyser_url: Option, pub max_concurrent_block_fetches: usize, pub last_indexed_slot: u64, + pub rewind_receiver: Option>, } impl BlockStreamConfig { - pub fn load_block_stream(&self) -> impl Stream> { + pub fn load_block_stream(self) -> impl Stream> { let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| { let auth_header = std::env::var("GRPC_X_TOKEN").unwrap(); get_grpc_stream_with_rpc_fallback( @@ -37,6 +39,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver, )) } else { None diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index 729d20e5..b826e492 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError, }; +use tokio::sync::mpsc; use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use crate::{ - ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + ingester::{ + rewind_controller::RewindCommand, + typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + }, metric, monitor::{start_latest_slot_updater, LATEST_SLOT}, }; const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009]; -fn get_slot_stream(rpc_client: Arc, start_slot: u64) -> impl Stream { +fn get_slot_stream( + rpc_client: Arc, + start_slot: u64, + mut rewind_receiver: Option>, +) -> impl Stream { stream! { start_latest_slot_updater(rpc_client.clone()).await; let mut next_slot_to_fetch = start_slot; loop { + // Check for rewind commands + if let Some(ref mut receiver) = rewind_receiver { + while let Ok(command) = receiver.try_recv() { + match command { + RewindCommand::Rewind { to_slot, reason } => { + log::error!("Rewinding slot stream to {}: {}", to_slot, reason); + next_slot_to_fetch = to_slot; + } + } + } + } + if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) { tokio::time::sleep(std::time::Duration::from_millis(10)).await; continue; @@ -40,13 +60,14 @@ pub fn get_block_poller_stream( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + rewind_receiver: Option>, ) -> impl Stream> { stream! { let start_slot = match last_indexed_slot { 0 => 0, last_indexed_slot => last_indexed_slot + 1 }; - let slot_stream = get_slot_stream(rpc_client.clone(), start_slot); + let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver); pin_mut!(slot_stream); let block_stream = slot_stream .map(|slot| { diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..25e2c8ba 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -52,6 +52,7 @@ pub async fn index_block_stream( rpc_client: Arc, last_indexed_slot_at_start: u64, end_slot: Option, + rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>, ) { pin_mut!(block_stream); let current_slot = @@ -65,16 +66,17 @@ pub async fn index_block_stream( "Backfilling historical blocks. Current number of blocks to backfill: {}", number_of_blocks_to_backfill ); - let mut last_indexed_slot = last_indexed_slot_at_start; + // let mut last_indexed_slot = last_indexed_slot_at_start; let mut finished_backfill_slot = None; while let Some(blocks) = block_stream.next().await { + let first_slot_in_block = blocks.first().unwrap().metadata.slot; let last_slot_in_block = blocks.last().unwrap().metadata.slot; - index_block_batch_with_infinite_retries(db.as_ref(), blocks).await; + index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await; - for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { - let blocks_indexed = slot - last_indexed_slot_at_start; + for slot in first_slot_in_block..(last_slot_in_block + 1) { + let blocks_indexed = slot.saturating_sub(last_indexed_slot_at_start); if blocks_indexed < number_of_blocks_to_backfill { if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { info!( @@ -92,7 +94,7 @@ pub async fn index_block_stream( info!("Indexed slot {}", slot); } } - last_indexed_slot = slot; + // last_indexed_slot = slot; } } } diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index f2934d47..025a8c56 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -15,7 +15,8 @@ use sea_orm::QueryTrait; use sea_orm::Set; use sea_orm::TransactionTrait; -use self::parser::state_update::StateUpdate; +use self::parser::state_update::{StateUpdate, SequenceGapError}; +use self::rewind_controller::{RewindController, determine_rewind_slot}; use self::persist::persist_state_update; use self::persist::MAX_SQL_INSERTS; use self::typedefs::block_info::BlockInfo; @@ -27,20 +28,43 @@ pub mod fetchers; pub mod indexer; pub mod parser; pub mod persist; +pub mod rewind_controller; pub mod typedefs; -fn derive_block_state_update(block: &BlockInfo) -> Result { +fn derive_block_state_update( + block: &BlockInfo, + rewind_controller: Option<&RewindController>, +) -> Result { let mut state_updates: Vec = Vec::new(); for transaction in &block.transactions { state_updates.push(parse_transaction(transaction, block.metadata.slot)?); } - Ok(StateUpdate::merge_updates(state_updates)) + + match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) { + Ok(merged_update) => Ok(merged_update), + Err(SequenceGapError::GapDetected(gaps)) => { + if let Some(controller) = rewind_controller { + let rewind_slot = determine_rewind_slot(&gaps); + let reason = format!( + "Sequence gaps detected in block {}: {} gaps found", + block.metadata.slot, + gaps.len() + ); + controller.request_rewind(rewind_slot, reason)?; + } + Err(IngesterError::SequenceGapDetected(gaps)) + } + } } -pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> { +pub async fn index_block( + db: &DatabaseConnection, + block: &BlockInfo, + rewind_controller: Option<&RewindController>, +) -> Result<(), IngesterError> { let txn = db.begin().await?; index_block_metadatas(&txn, vec![&block.metadata]).await?; - persist_state_update(&txn, derive_block_state_update(block)?).await?; + persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?; txn.commit().await?; Ok(()) } @@ -81,6 +105,7 @@ async fn index_block_metadatas( pub async fn index_block_batch( db: &DatabaseConnection, block_batch: &Vec, + rewind_controller: Option<&RewindController>, ) -> Result<(), IngesterError> { let blocks_len = block_batch.len(); let tx = db.begin().await?; @@ -88,9 +113,33 @@ pub async fn index_block_batch( index_block_metadatas(&tx, block_metadatas).await?; let mut state_updates = Vec::new(); for block in block_batch { - state_updates.push(derive_block_state_update(block)?); + state_updates.push(derive_block_state_update(block, rewind_controller)?); } - persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; + + if block_batch.is_empty() { + return Ok(()); // Or return an appropriate error + } + + let merged_state_update = match StateUpdate::merge_updates_with_slot( + state_updates, + Some(block_batch.last().unwrap().metadata.slot) + ) { + Ok(merged) => merged, + Err(SequenceGapError::GapDetected(gaps)) => { + if let Some(controller) = rewind_controller { + let rewind_slot = determine_rewind_slot(&gaps); + let reason = format!( + "Sequence gaps detected in batch ending at slot {}: {} gaps found", + block_batch.last().unwrap().metadata.slot, + gaps.len() + ); + controller.request_rewind(rewind_slot, reason)?; + } + return Err(IngesterError::SequenceGapDetected(gaps)); + } + }; + + persist::persist_state_update(&tx, merged_state_update).await?; metric! { statsd_count!("blocks_indexed", blocks_len as i64); } @@ -101,10 +150,16 @@ pub async fn index_block_batch( pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, + rewind_controller: Option<&RewindController>, ) { loop { - match index_block_batch(db, &block_batch).await { + match index_block_batch(db, &block_batch, rewind_controller).await { Ok(()) => return, + Err(IngesterError::SequenceGapDetected(_)) => { + // For sequence gaps, we don't retry - we let the rewind mechanism handle it + log::error!("Sequence gap detected in batch, stopping processing to allow rewind"); + return; + } Err(e) => { let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; diff --git a/src/ingester/parser/mod.rs b/src/ingester/parser/mod.rs index aedff22f..c76c7267 100644 --- a/src/ingester/parser/mod.rs +++ b/src/ingester/parser/mod.rs @@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result), +} + #[derive(BorshDeserialize, BorshSerialize, Debug, Clone, PartialEq, Eq)] pub struct PathNode { pub node: [u8; 32], @@ -82,23 +98,23 @@ impl From for AddressQueueUpdate { } } } - +// TODO: add rewind for all sequence numbers not just address tree v1 #[derive(Default, Debug, Clone, PartialEq, Eq)] /// Representation of state update of the compression system that is optimal for simple persistence. pub struct StateUpdate { // v1 and v2 tree accounts - pub in_accounts: HashSet, + pub in_accounts: HashSet, // covered by leaf nullifications // v1 and v2 tree accounts - pub out_accounts: Vec, + pub out_accounts: Vec, // has leaf index, got v2 Merkle trees we need to pub account_transactions: HashSet, pub transactions: HashSet, - pub leaf_nullifications: HashSet, - pub indexed_merkle_tree_updates: HashMap<(Pubkey, u64), IndexedTreeLeafUpdate>, + pub leaf_nullifications: HashSet, // Has sequence number + pub indexed_merkle_tree_updates: HashMap<(Pubkey, u64), IndexedTreeLeafUpdate>, // Has sequence number // v2 state and address Merkle tree updates - pub batch_merkle_tree_events: BatchMerkleTreeEvents, + pub batch_merkle_tree_events: BatchMerkleTreeEvents, // Has sequence number // v2 input accounts that are inserted into the input queue - pub batch_nullify_context: Vec, - pub batch_new_addresses: Vec, + pub batch_nullify_context: Vec, // Has queue index we need to track this as well the same as sequence number + pub batch_new_addresses: Vec, // Has queue index we need to track this as well the same as sequence number } impl StateUpdate { @@ -106,8 +122,16 @@ impl StateUpdate { StateUpdate::default() } - pub fn merge_updates(updates: Vec) -> StateUpdate { + pub fn merge_updates(updates: Vec) -> Result { + Self::merge_updates_with_slot(updates, None) + } + + pub fn merge_updates_with_slot( + updates: Vec, + slot: Option, + ) -> Result { let mut merged = StateUpdate::default(); + let mut detected_gaps = Vec::new(); for update in updates { merged.in_accounts.extend(update.in_accounts); @@ -121,6 +145,8 @@ impl StateUpdate { .extend(update.leaf_nullifications); for (key, value) in update.indexed_merkle_tree_updates { + let (_, _leaf_index) = key; + // Insert only if the seq is higher. if let Some(existing) = merged.indexed_merkle_tree_updates.get_mut(&key) { if value.seq > existing.seq { @@ -147,6 +173,39 @@ impl StateUpdate { .extend(update.batch_nullify_context); } - merged + let mut sorted_indexed_merkle_tree_updates: Vec<_> = + merged.indexed_merkle_tree_updates.iter().collect(); + sorted_indexed_merkle_tree_updates.sort_by_key(|(_, u)| u.seq); + if let Some(slot) = slot { + for (&key, value) in sorted_indexed_merkle_tree_updates { + let (tree, _leaf_index) = key; + + // Check for sequence gaps before merging + if let Some((expected_seq, actual_seq)) = + TreeInfo::check_sequence_gap(&tree, value.seq) + { + error!( + "Sequence gap detected for tree {}: expected {}, got {}", + tree, expected_seq, actual_seq + ); + detected_gaps.push(SequenceGap { + tree, + expected_seq, + actual_seq, + }); + } + + // Update highest sequence numbers for each tree + if let Err(e) = TreeInfo::update_highest_seq(&tree, value.seq, slot) { + error!("Failed to update highest sequence for tree {}: {}", tree, e); + } + } + } + // If gaps were detected, return error + if !detected_gaps.is_empty() { + return Err(SequenceGapError::GapDetected(detected_gaps)); + } + + Ok(merged) } } diff --git a/src/ingester/parser/tree_info.rs b/src/ingester/parser/tree_info.rs index 3c073a7d..11357fbc 100644 --- a/src/ingester/parser/tree_info.rs +++ b/src/ingester/parser/tree_info.rs @@ -2,6 +2,7 @@ use lazy_static::lazy_static; use light_compressed_account::TreeType; use solana_pubkey::{pubkey, Pubkey}; use std::collections::HashMap; +use std::sync::{atomic::AtomicU64, Arc}; #[derive(Debug, Clone)] pub struct TreeInfo { @@ -9,6 +10,8 @@ pub struct TreeInfo { pub queue: Pubkey, pub height: u32, pub tree_type: TreeType, + pub highest_seq: std::sync::Arc, + pub last_slot: std::sync::Arc, } impl TreeInfo { @@ -31,6 +34,76 @@ impl TreeInfo { let pubkey = Pubkey::from(*tree_bytes); Self::get_tree_type(&pubkey) } + + pub fn get_highest_seq(pubkey: &Pubkey) -> Option { + let tree_pubkey_str = pubkey.to_string(); + Self::get(&tree_pubkey_str) + .map(|info| info.highest_seq.load(std::sync::atomic::Ordering::Acquire)) + } + + pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result { + let tree_pubkey_str = pubkey.to_string(); + + if let Some(tree_info) = Self::get(&tree_pubkey_str) { + let current = tree_info + .highest_seq + .load(std::sync::atomic::Ordering::Acquire); + log::info!( + "Updating highest sequence for tree {} from {} to {}", + tree_pubkey_str, + current, + new_seq + ); + if new_seq > current { + tree_info + .highest_seq + .store(new_seq, std::sync::atomic::Ordering::Release); + tree_info + .last_slot + .store(slot, std::sync::atomic::Ordering::Release); + Ok(new_seq) + } else { + Ok(current) + } + } else { + Err(format!("Tree {} not found in mapping", pubkey)) + } + } + + pub fn get_last_slot_for_seq(pubkey: &Pubkey, target_seq: u64) -> Option { + let tree_pubkey_str = pubkey.to_string(); + if let Some(tree_info) = Self::get(&tree_pubkey_str) { + let current_seq = tree_info + .highest_seq + .load(std::sync::atomic::Ordering::Acquire); + if target_seq <= current_seq { + // Return the slot where this sequence was last processed + Some( + tree_info + .last_slot + .load(std::sync::atomic::Ordering::Acquire), + ) + } else { + None + } + } else { + None + } + } + pub fn check_sequence_gap(pubkey: &Pubkey, incoming_seq: u64) -> Option<(u64, u64)> { + if let Some(current_highest) = Self::get_highest_seq(pubkey) { + // We init with 0, we cannot crash on 0 + if current_highest == 0 { + return None; + } + + let expected_seq = current_highest + 1; + if incoming_seq != expected_seq { + return Some((expected_seq, incoming_seq)); + } + } + None + } } // TODO: add a table which stores tree metadata: tree_pubkey | queue_pubkey | type | ... @@ -198,6 +271,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::StateV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -208,6 +283,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::StateV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -220,6 +297,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::AddressV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -230,6 +309,8 @@ lazy_static! { queue: *legacy_queue, height: 26, tree_type: TreeType::AddressV1, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -274,6 +355,8 @@ lazy_static! { queue: *queue, height: 32, tree_type: TreeType::StateV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); @@ -284,6 +367,8 @@ lazy_static! { queue: *queue, height: 32, tree_type: TreeType::StateV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } @@ -296,6 +381,8 @@ lazy_static! { queue: *tree_queue, height: 40, tree_type: TreeType::AddressV2, + highest_seq: Arc::new(AtomicU64::new(0)), + last_slot: Arc::new(AtomicU64::new(0)), }, ); } diff --git a/src/ingester/parser/tx_event_parser.rs b/src/ingester/parser/tx_event_parser.rs index b146f859..7efb7c0a 100644 --- a/src/ingester/parser/tx_event_parser.rs +++ b/src/ingester/parser/tx_event_parser.rs @@ -68,7 +68,7 @@ pub fn create_state_update_v1( Some(info) => info.clone(), None => { if super::SKIP_UNKNOWN_TREES { - log::warn!("Skipping unknown tree: {}", tree.to_string()); + log::error!("Skipping unknown tree: {}", tree.to_string()); continue; } else { return Err(IngesterError::ParserError(format!( diff --git a/src/ingester/parser/tx_event_parser_v2.rs b/src/ingester/parser/tx_event_parser_v2.rs index 389dfd74..ce0b0734 100644 --- a/src/ingester/parser/tx_event_parser_v2.rs +++ b/src/ingester/parser/tx_event_parser_v2.rs @@ -148,6 +148,6 @@ pub fn create_state_update_v2( state_updates.push(state_update_event); } - let merged = StateUpdate::merge_updates(state_updates); + let merged = StateUpdate::merge_updates(state_updates)?; Ok(merged) } diff --git a/src/ingester/rewind_controller.rs b/src/ingester/rewind_controller.rs new file mode 100644 index 00000000..05043f0f --- /dev/null +++ b/src/ingester/rewind_controller.rs @@ -0,0 +1,64 @@ +use tokio::sync::mpsc; +use tracing::error; + +#[derive(Debug, Clone)] +pub enum RewindCommand { + Rewind { to_slot: u64, reason: String }, +} + +#[derive(Clone)] +pub struct RewindController { + sender: mpsc::UnboundedSender, +} + +impl RewindController { + pub fn new() -> (Self, mpsc::UnboundedReceiver) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Self { sender }, receiver) + } + + pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), String> { + let command = RewindCommand::Rewind { + to_slot, + reason: reason.clone(), + }; + + error!("Requesting rewind to slot {}: {}", to_slot, reason); + + self.sender.send(command).map_err(|e| { + error!("Failed to send rewind command: {}", e); + format!("Failed to send rewind command: {}", e) + }) + } +} + +pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 { + use crate::ingester::parser::tree_info::TreeInfo; + + // Find the earliest slot where we need to rewind to get the missing sequence + let mut earliest_slot = u64::MAX; + + for gap in gaps { + // Try to find the exact slot for the last known good sequence + let target_seq = gap.expected_seq.saturating_sub(1); + if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) { + // Rewind to just before this slot to ensure we reprocess + earliest_slot = earliest_slot.min(slot.saturating_sub(1)); + } else { + // Fallback: conservative approach if we can't find the exact slot + // This handles the case where this is the first sequence for this tree + earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10)); + } + } + + // Ensure we don't rewind to slot 0 unless explicitly needed + if earliest_slot == u64::MAX { + // No valid slots found, use conservative fallback + gaps.iter() + .map(|gap| gap.expected_seq.saturating_sub(10)) + .min() + .unwrap_or(0) + } else { + earliest_slot + } +} diff --git a/src/main.rs b/src/main.rs index 185013ac..fefba199 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use photon_indexer::ingester::fetchers::BlockStreamConfig; use photon_indexer::ingester::indexer::{ fetch_last_indexed_slot_with_infinite_retry, index_block_stream, }; +use photon_indexer::ingester::rewind_controller::RewindController; use photon_indexer::migration::{ sea_orm::{DatabaseBackend, DatabaseConnection, SqlxPostgresConnector, SqlxSqliteConnector}, Migrator, MigratorTrait, @@ -174,6 +175,7 @@ fn continously_index_new_blocks( db: Arc, rpc_client: Arc, last_indexed_slot: u64, + rewind_controller: RewindController, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let block_stream = block_stream_config.load_block_stream(); @@ -183,6 +185,7 @@ fn continously_index_new_blocks( rpc_client.clone(), last_indexed_slot, None, + Some(&rewind_controller), ) .await; }) @@ -234,6 +237,7 @@ async fn main() { rpc_client.clone(), last_indexed_slot, Some(last_slot), + None, // No rewind controller for snapshot processing ) .await; } @@ -277,11 +281,15 @@ async fn main() { .unwrap(), }; + // Create rewind controller for gap detection + let (rewind_controller, rewind_receiver) = RewindController::new(); + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), max_concurrent_block_fetches, last_indexed_slot, geyser_url: args.grpc_url, + rewind_receiver: Some(rewind_receiver), }; ( @@ -290,6 +298,7 @@ async fn main() { db_conn.clone(), rpc_client.clone(), last_indexed_slot, + rewind_controller, )), Some(continously_monitor_photon( db_conn.clone(), diff --git a/src/snapshot/mod.rs b/src/snapshot/mod.rs index ee669507..38045615 100644 --- a/src/snapshot/mod.rs +++ b/src/snapshot/mod.rs @@ -451,11 +451,12 @@ pub async fn update_snapshot( incremental_snapshot_interval_slots: u64, ) { // Convert stream to iterator + let last_indexed_slot = block_stream_config.last_indexed_slot; let block_stream = block_stream_config.load_block_stream(); update_snapshot_helper( directory_adapter, block_stream, - block_stream_config.last_indexed_slot, + last_indexed_slot, incremental_snapshot_interval_slots, full_snapshot_interval_slots, ) diff --git a/src/snapshot/snapshotter/main.rs b/src/snapshot/snapshotter/main.rs index 9e6c9735..826b0dec 100644 --- a/src/snapshot/snapshotter/main.rs +++ b/src/snapshot/snapshotter/main.rs @@ -256,6 +256,7 @@ async fn main() { max_concurrent_block_fetches: args.max_concurrent_block_fetches.unwrap_or(20), last_indexed_slot, geyser_url: args.grpc_url.clone(), + rewind_receiver: None, // No rewind receiver for snapshotter }, args.incremental_snapshot_interval_slots, args.snapshot_interval_slots, diff --git a/tests/integration_tests/e2e_tests.rs b/tests/integration_tests/e2e_tests.rs index cfb1bc89..b7219c68 100644 --- a/tests/integration_tests/e2e_tests.rs +++ b/tests/integration_tests/e2e_tests.rs @@ -78,6 +78,7 @@ fn all_indexing_methodologies( }, ..Default::default() }, + None, ).await .unwrap(); @@ -497,7 +498,7 @@ async fn test_index_block_metadata( let slot = 254170887; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let filter = blocks::Column::Slot.eq(block.metadata.slot); let block_model = blocks::Entity::find() @@ -523,12 +524,12 @@ async fn test_index_block_metadata( assert_eq!(block_model.block_time, 1710441678); // Verify that we don't get an error if we try to index the same block again - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); assert_eq!(setup.api.get_indexer_slot().await.unwrap().0, slot); // Verify that get_indexer_slot() gets updated a new block is indexed. let block = cached_fetch_block(&setup.name, setup.client.clone(), slot + 1).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); assert_eq!(setup.api.get_indexer_slot().await.unwrap().0, slot + 1); } @@ -551,7 +552,7 @@ async fn test_get_latest_non_voting_signatures( let slot = 270893658; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let all_nonvoting_transactions = setup .api .get_latest_non_voting_signatures(GetLatestSignaturesRequest { @@ -586,7 +587,7 @@ async fn test_get_latest_non_voting_signatures_with_failures( let slot = 279620356; let block = cached_fetch_block(&setup.name, setup.client.clone(), slot).await; - index_block(&setup.db_conn, &block).await.unwrap(); + index_block(&setup.db_conn, &block, None).await.unwrap(); let all_nonvoting_transactions = setup .api .get_latest_non_voting_signatures(GetLatestSignaturesRequest { diff --git a/tests/integration_tests/mock_tests.rs b/tests/integration_tests/mock_tests.rs index 7f3347be..aefe5c35 100644 --- a/tests/integration_tests/mock_tests.rs +++ b/tests/integration_tests/mock_tests.rs @@ -80,6 +80,7 @@ async fn test_persist_state_update_basic( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -170,6 +171,7 @@ async fn test_multiple_accounts( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -453,6 +455,7 @@ async fn test_persist_token_data( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1072,6 +1075,7 @@ async fn test_get_multiple_new_address_proofs( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1114,6 +1118,7 @@ async fn test_get_multiple_new_address_proofs_interop( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1195,6 +1200,7 @@ async fn load_test(#[values(DatabaseBackend::Postgres)] db_backend: DatabaseBack }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1261,6 +1267,7 @@ async fn test_persisted_state_trees_bug_with_latter_smaller_seq_values( }, ..Default::default() }, + None, ) .await .unwrap(); @@ -1347,6 +1354,7 @@ async fn test_gpa_filters( }, ..Default::default() }, + None, ) .await .unwrap(); diff --git a/tests/integration_tests/utils.rs b/tests/integration_tests/utils.rs index e264519e..27ff00f0 100644 --- a/tests/integration_tests/utils.rs +++ b/tests/integration_tests/utils.rs @@ -465,7 +465,7 @@ pub async fn index_multiple_transactions( let tx_state_update = parse_transaction(&transaction_info, 0).unwrap(); state_updates.push(tx_state_update); } - let state_update = StateUpdate::merge_updates(state_updates); + let state_update = StateUpdate::merge_updates(state_updates).unwrap(); persist_state_update_using_connection(db_conn.as_ref(), state_update) .await .unwrap(); @@ -555,6 +555,7 @@ pub async fn index( }, ..Default::default() }, + None, ) .await .unwrap();