Skip to content

Sergey/rewind #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
19 changes: 19 additions & 0 deletions src/ingester/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::ingester::parser::state_update::SequenceGap;
use thiserror::Error;

#[derive(Error, Debug, PartialEq, Eq)]
Expand All @@ -14,10 +15,28 @@ pub enum IngesterError {
EmptyBatchEvent,
#[error("Invalid event.")]
InvalidEvent,
#[error("Sequence gap detected: {} gaps found", .0.len())]
SequenceGapDetected(Vec<SequenceGap>),
}

impl From<sea_orm::error::DbErr> for IngesterError {
fn from(err: sea_orm::error::DbErr) -> Self {
IngesterError::DatabaseError(format!("DatabaseError: {}", err))
}
}

impl From<String> for IngesterError {
fn from(err: String) -> Self {
IngesterError::ParserError(err)
}
}

impl From<crate::ingester::parser::state_update::SequenceGapError> for IngesterError {
fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self {
match err {
crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => {
IngesterError::SequenceGapDetected(gaps)
}
}
}
}
4 changes: 4 additions & 0 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
))
);

Expand Down Expand Up @@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::Arc;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::mpsc;

use super::typedefs::block_info::BlockInfo;
use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo};

pub mod grpc;
pub mod poller;
Expand All @@ -17,10 +18,11 @@ pub struct BlockStreamConfig {
pub geyser_url: Option<String>,
pub max_concurrent_block_fetches: usize,
pub last_indexed_slot: u64,
pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
}

impl BlockStreamConfig {
pub fn load_block_stream(&self) -> impl Stream<Item = Vec<BlockInfo>> {
pub fn load_block_stream(self) -> impl Stream<Item = Vec<BlockInfo>> {
let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| {
let auth_header = std::env::var("GRPC_X_TOKEN").unwrap();
get_grpc_stream_with_rpc_fallback(
Expand All @@ -37,6 +39,7 @@ impl BlockStreamConfig {
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
self.rewind_receiver,
))
} else {
None
Expand Down
75 changes: 66 additions & 9 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError,
};
use tokio::sync::mpsc;

use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};

use crate::{
ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
ingester::{
rewind_controller::RewindCommand,
typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
},
metric,
monitor::{start_latest_slot_updater, LATEST_SLOT},
};

const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];

fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
fn get_slot_stream(
rpc_client: Arc<RpcClient>,
start_slot: u64,
mut rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
let mut next_slot_to_fetch = start_slot;
loop {
// Check for rewind commands
if let Some(ref mut receiver) = rewind_receiver {
while let Ok(command) = receiver.try_recv() {
match command {
RewindCommand::Rewind { to_slot, reason } => {
log::error!("Rewinding slot stream to {}: {}", to_slot, reason);
next_slot_to_fetch = to_slot;
}
}
}
}

if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
continue;
Expand All @@ -40,13 +60,14 @@ pub fn get_block_poller_stream(
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = Vec<BlockInfo>> {
stream! {
let start_slot = match last_indexed_slot {
0 => 0,
last_indexed_slot => last_indexed_slot + 1
};
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot);
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver);
pin_mut!(slot_stream);
let block_stream = slot_stream
.map(|slot| {
Expand Down Expand Up @@ -82,17 +103,53 @@ fn pop_cached_blocks_to_index(
Some(&slot) => slot,
None => break,
};

// Case 1: This block is older than what we've already indexed - discard it
if min_slot <= last_indexed_slot {
block_cache.remove(&min_slot);
continue;
}

let block: &BlockInfo = block_cache.get(&min_slot).unwrap();

// Case 2: Check if this block can be connected to our last indexed slot
if block.metadata.parent_slot == last_indexed_slot {
last_indexed_slot = block.metadata.slot;
blocks.push(block.clone());
block_cache.remove(&min_slot);
} else if min_slot < last_indexed_slot {
block_cache.remove(&min_slot);
} else {
// Direct succession - always allowed
// Log if there were skipped slots
if min_slot > last_indexed_slot + 1 {
// Direct parent match but there's a gap in slot numbers
log::info!(
"Block at slot {} directly follows {} (slots {}-{} were skipped)",
min_slot,
last_indexed_slot,
last_indexed_slot + 1,
min_slot - 1
);
}
// Process only ONE block at a time to ensure strict ordering
break;
}

// Case 3: This block's parent is in the future - we're missing intermediate blocks
if block.metadata.parent_slot > last_indexed_slot {
// Wait for the intermediate blocks to arrive or be marked as skipped
break;
}

// Case 4: This block's parent is before our last indexed slot
// This indicates a fork or invalid block - discard it
if block.metadata.parent_slot < last_indexed_slot {
log::warn!(
"Discarding block at slot {} with parent {} (last indexed: {})",
min_slot,
block.metadata.parent_slot,
last_indexed_slot
);
block_cache.remove(&min_slot);
continue;
}
}

(blocks, last_indexed_slot)
}

Expand Down
12 changes: 7 additions & 5 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn index_block_stream(
rpc_client: Arc<RpcClient>,
last_indexed_slot_at_start: u64,
end_slot: Option<u64>,
rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>,
) {
pin_mut!(block_stream);
let current_slot =
Expand All @@ -65,16 +66,17 @@ pub async fn index_block_stream(
"Backfilling historical blocks. Current number of blocks to backfill: {}",
number_of_blocks_to_backfill
);
let mut last_indexed_slot = last_indexed_slot_at_start;
// let mut last_indexed_slot = last_indexed_slot_at_start;

let mut finished_backfill_slot = None;

while let Some(blocks) = block_stream.next().await {
let first_slot_in_block = blocks.first().unwrap().metadata.slot;
let last_slot_in_block = blocks.last().unwrap().metadata.slot;
index_block_batch_with_infinite_retries(db.as_ref(), blocks).await;
index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await;

for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) {
let blocks_indexed = slot - last_indexed_slot_at_start;
for slot in first_slot_in_block..(last_slot_in_block + 1) {
let blocks_indexed = slot.saturating_sub(last_indexed_slot_at_start);
if blocks_indexed < number_of_blocks_to_backfill {
if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 {
info!(
Expand All @@ -92,7 +94,7 @@ pub async fn index_block_stream(
info!("Indexed slot {}", slot);
}
}
last_indexed_slot = slot;
// last_indexed_slot = slot;
}
}
}
71 changes: 63 additions & 8 deletions src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use sea_orm::QueryTrait;
use sea_orm::Set;
use sea_orm::TransactionTrait;

use self::parser::state_update::StateUpdate;
use self::parser::state_update::{SequenceGapError, StateUpdate};
use self::persist::persist_state_update;
use self::persist::MAX_SQL_INSERTS;
use self::rewind_controller::{determine_rewind_slot, RewindController};
use self::typedefs::block_info::BlockInfo;
use self::typedefs::block_info::BlockMetadata;
use crate::dao::generated::blocks;
Expand All @@ -27,20 +28,43 @@ pub mod fetchers;
pub mod indexer;
pub mod parser;
pub mod persist;
pub mod rewind_controller;
pub mod typedefs;

fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
fn derive_block_state_update(
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<StateUpdate, IngesterError> {
let mut state_updates: Vec<StateUpdate> = Vec::new();
for transaction in &block.transactions {
state_updates.push(parse_transaction(transaction, block.metadata.slot)?);
}
Ok(StateUpdate::merge_updates(state_updates))

match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) {
Ok(merged_update) => Ok(merged_update),
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in block {}: {} gaps found",
block.metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
Err(IngesterError::SequenceGapDetected(gaps))
}
}
}

pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
pub async fn index_block(
db: &DatabaseConnection,
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let txn = db.begin().await?;
index_block_metadatas(&txn, vec![&block.metadata]).await?;
persist_state_update(&txn, derive_block_state_update(block)?).await?;
persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?;
txn.commit().await?;
Ok(())
}
Expand Down Expand Up @@ -81,16 +105,41 @@ async fn index_block_metadatas(
pub async fn index_block_batch(
db: &DatabaseConnection,
block_batch: &Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let blocks_len = block_batch.len();
let tx = db.begin().await?;
let block_metadatas: Vec<&BlockMetadata> = block_batch.iter().map(|b| &b.metadata).collect();
index_block_metadatas(&tx, block_metadatas).await?;
let mut state_updates = Vec::new();
for block in block_batch {
state_updates.push(derive_block_state_update(block)?);
state_updates.push(derive_block_state_update(block, rewind_controller)?);
}
persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;

if block_batch.is_empty() {
return Ok(()); // Or return an appropriate error
}

let merged_state_update = match StateUpdate::merge_updates_with_slot(
state_updates,
Some(block_batch.last().unwrap().metadata.slot),
) {
Ok(merged) => merged,
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in batch ending at slot {}: {} gaps found",
block_batch.last().unwrap().metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
return Err(IngesterError::SequenceGapDetected(gaps));
}
};

persist::persist_state_update(&tx, merged_state_update).await?;
metric! {
statsd_count!("blocks_indexed", blocks_len as i64);
}
Expand All @@ -101,10 +150,16 @@ pub async fn index_block_batch(
pub async fn index_block_batch_with_infinite_retries(
db: &DatabaseConnection,
block_batch: Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) {
loop {
match index_block_batch(db, &block_batch).await {
match index_block_batch(db, &block_batch, rewind_controller).await {
Ok(()) => return,
Err(IngesterError::SequenceGapDetected(_)) => {
// For sequence gaps, we don't retry - we let the rewind mechanism handle it
log::error!("Sequence gap detected in batch, stopping processing to allow rewind");
return;
}
Err(e) => {
let start_block = block_batch.first().unwrap().metadata.slot;
let end_block = block_batch.last().unwrap().metadata.slot;
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result<StateUpdate,
}
}

let mut state_update = StateUpdate::merge_updates(state_updates.clone());
let mut state_update = StateUpdate::merge_updates(state_updates.clone())?;
if !is_voting_transaction(tx) || is_compression_transaction {
state_update.transactions.insert(Transaction {
signature: tx.signature,
Expand Down
Loading
Loading