Skip to content

feat: rewind on inconsistent seqence numbers #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/ingester/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use thiserror::Error;
use crate::ingester::parser::state_update::SequenceGap;

#[derive(Error, Debug, PartialEq, Eq)]
pub enum IngesterError {
Expand All @@ -14,10 +15,28 @@ pub enum IngesterError {
EmptyBatchEvent,
#[error("Invalid event.")]
InvalidEvent,
#[error("Sequence gap detected: {} gaps found", .0.len())]
SequenceGapDetected(Vec<SequenceGap>),
}

impl From<sea_orm::error::DbErr> for IngesterError {
fn from(err: sea_orm::error::DbErr) -> Self {
IngesterError::DatabaseError(format!("DatabaseError: {}", err))
}
}

impl From<String> for IngesterError {
fn from(err: String) -> Self {
IngesterError::ParserError(err)
}
}

impl From<crate::ingester::parser::state_update::SequenceGapError> for IngesterError {
fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self {
match err {
crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => {
IngesterError::SequenceGapDetected(gaps)
}
}
}
}
4 changes: 4 additions & 0 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
))
);

Expand Down Expand Up @@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::Arc;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::mpsc;

use super::typedefs::block_info::BlockInfo;
use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo};

pub mod grpc;
pub mod poller;
Expand All @@ -17,10 +18,11 @@ pub struct BlockStreamConfig {
pub geyser_url: Option<String>,
pub max_concurrent_block_fetches: usize,
pub last_indexed_slot: u64,
pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
}

impl BlockStreamConfig {
pub fn load_block_stream(&self) -> impl Stream<Item = Vec<BlockInfo>> {
pub fn load_block_stream(self) -> impl Stream<Item = Vec<BlockInfo>> {
let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| {
let auth_header = std::env::var("GRPC_X_TOKEN").unwrap();
get_grpc_stream_with_rpc_fallback(
Expand All @@ -37,6 +39,7 @@ impl BlockStreamConfig {
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
self.rewind_receiver,
))
} else {
None
Expand Down
27 changes: 24 additions & 3 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError,
};
use tokio::sync::mpsc;

use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};

use crate::{
ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
ingester::{
rewind_controller::RewindCommand,
typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
},
metric,
monitor::{start_latest_slot_updater, LATEST_SLOT},
};

const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];

fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
fn get_slot_stream(
rpc_client: Arc<RpcClient>,
start_slot: u64,
mut rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
let mut next_slot_to_fetch = start_slot;
loop {
// Check for rewind commands
if let Some(ref mut receiver) = rewind_receiver {
while let Ok(command) = receiver.try_recv() {
match command {
RewindCommand::Rewind { to_slot, reason } => {
log::error!("Rewinding slot stream to {}: {}", to_slot, reason);
next_slot_to_fetch = to_slot;
}
}
}
}

if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
continue;
Expand All @@ -40,13 +60,14 @@ pub fn get_block_poller_stream(
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = Vec<BlockInfo>> {
stream! {
let start_slot = match last_indexed_slot {
0 => 0,
last_indexed_slot => last_indexed_slot + 1
};
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot);
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver);
pin_mut!(slot_stream);
let block_stream = slot_stream
.map(|slot| {
Expand Down
12 changes: 7 additions & 5 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn index_block_stream(
rpc_client: Arc<RpcClient>,
last_indexed_slot_at_start: u64,
end_slot: Option<u64>,
rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>,
) {
pin_mut!(block_stream);
let current_slot =
Expand All @@ -65,16 +66,17 @@ pub async fn index_block_stream(
"Backfilling historical blocks. Current number of blocks to backfill: {}",
number_of_blocks_to_backfill
);
let mut last_indexed_slot = last_indexed_slot_at_start;
// let mut last_indexed_slot = last_indexed_slot_at_start;

let mut finished_backfill_slot = None;

while let Some(blocks) = block_stream.next().await {
let first_slot_in_block = blocks.first().unwrap().metadata.slot;
let last_slot_in_block = blocks.last().unwrap().metadata.slot;
index_block_batch_with_infinite_retries(db.as_ref(), blocks).await;
index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await;

for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) {
let blocks_indexed = slot - last_indexed_slot_at_start;
for slot in first_slot_in_block..(last_slot_in_block + 1) {
let blocks_indexed = slot.saturating_sub(last_indexed_slot_at_start);
if blocks_indexed < number_of_blocks_to_backfill {
if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 {
info!(
Expand All @@ -92,7 +94,7 @@ pub async fn index_block_stream(
info!("Indexed slot {}", slot);
}
}
last_indexed_slot = slot;
// last_indexed_slot = slot;
}
}
}
71 changes: 63 additions & 8 deletions src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use sea_orm::QueryTrait;
use sea_orm::Set;
use sea_orm::TransactionTrait;

use self::parser::state_update::StateUpdate;
use self::parser::state_update::{StateUpdate, SequenceGapError};
use self::rewind_controller::{RewindController, determine_rewind_slot};
use self::persist::persist_state_update;
use self::persist::MAX_SQL_INSERTS;
use self::typedefs::block_info::BlockInfo;
Expand All @@ -27,20 +28,43 @@ pub mod fetchers;
pub mod indexer;
pub mod parser;
pub mod persist;
pub mod rewind_controller;
pub mod typedefs;

fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
fn derive_block_state_update(
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<StateUpdate, IngesterError> {
let mut state_updates: Vec<StateUpdate> = Vec::new();
for transaction in &block.transactions {
state_updates.push(parse_transaction(transaction, block.metadata.slot)?);
}
Ok(StateUpdate::merge_updates(state_updates))

match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) {
Ok(merged_update) => Ok(merged_update),
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in block {}: {} gaps found",
block.metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
Err(IngesterError::SequenceGapDetected(gaps))
}
}
}

pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
pub async fn index_block(
db: &DatabaseConnection,
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let txn = db.begin().await?;
index_block_metadatas(&txn, vec![&block.metadata]).await?;
persist_state_update(&txn, derive_block_state_update(block)?).await?;
persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?;
txn.commit().await?;
Ok(())
}
Expand Down Expand Up @@ -81,16 +105,41 @@ async fn index_block_metadatas(
pub async fn index_block_batch(
db: &DatabaseConnection,
block_batch: &Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let blocks_len = block_batch.len();
let tx = db.begin().await?;
let block_metadatas: Vec<&BlockMetadata> = block_batch.iter().map(|b| &b.metadata).collect();
index_block_metadatas(&tx, block_metadatas).await?;
let mut state_updates = Vec::new();
for block in block_batch {
state_updates.push(derive_block_state_update(block)?);
state_updates.push(derive_block_state_update(block, rewind_controller)?);
}
persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;

if block_batch.is_empty() {
return Ok(()); // Or return an appropriate error
}

let merged_state_update = match StateUpdate::merge_updates_with_slot(
state_updates,
Some(block_batch.last().unwrap().metadata.slot)
) {
Ok(merged) => merged,
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in batch ending at slot {}: {} gaps found",
block_batch.last().unwrap().metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
return Err(IngesterError::SequenceGapDetected(gaps));
}
};

persist::persist_state_update(&tx, merged_state_update).await?;
metric! {
statsd_count!("blocks_indexed", blocks_len as i64);
}
Expand All @@ -101,10 +150,16 @@ pub async fn index_block_batch(
pub async fn index_block_batch_with_infinite_retries(
db: &DatabaseConnection,
block_batch: Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) {
loop {
match index_block_batch(db, &block_batch).await {
match index_block_batch(db, &block_batch, rewind_controller).await {
Ok(()) => return,
Err(IngesterError::SequenceGapDetected(_)) => {
// For sequence gaps, we don't retry - we let the rewind mechanism handle it
log::error!("Sequence gap detected in batch, stopping processing to allow rewind");
return;
}
Err(e) => {
let start_block = block_batch.first().unwrap().metadata.slot;
let end_block = block_batch.last().unwrap().metadata.slot;
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result<StateUpdate,
}
}

let mut state_update = StateUpdate::merge_updates(state_updates.clone());
let mut state_update = StateUpdate::merge_updates(state_updates.clone())?;
if !is_voting_transaction(tx) || is_compression_transaction {
state_update.transactions.insert(Transaction {
signature: tx.signature,
Expand Down
Loading
Loading