Skip to content

Commit b73f9b5

Browse files
committed
poc
1 parent f0a45ee commit b73f9b5

File tree

14 files changed

+320
-27
lines changed

14 files changed

+320
-27
lines changed

src/ingester/error.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use thiserror::Error;
2+
use crate::ingester::parser::state_update::SequenceGap;
23

34
#[derive(Error, Debug, PartialEq, Eq)]
45
pub enum IngesterError {
@@ -14,10 +15,28 @@ pub enum IngesterError {
1415
EmptyBatchEvent,
1516
#[error("Invalid event.")]
1617
InvalidEvent,
18+
#[error("Sequence gap detected: {} gaps found", .0.len())]
19+
SequenceGapDetected(Vec<SequenceGap>),
1720
}
1821

1922
impl From<sea_orm::error::DbErr> for IngesterError {
2023
fn from(err: sea_orm::error::DbErr) -> Self {
2124
IngesterError::DatabaseError(format!("DatabaseError: {}", err))
2225
}
2326
}
27+
28+
impl From<String> for IngesterError {
29+
fn from(err: String) -> Self {
30+
IngesterError::ParserError(err)
31+
}
32+
}
33+
34+
impl From<crate::ingester::parser::state_update::SequenceGapError> for IngesterError {
35+
fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self {
36+
match err {
37+
crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => {
38+
IngesterError::SequenceGapDetected(gaps)
39+
}
40+
}
41+
}
42+
}

src/ingester/fetchers/grpc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
5353
rpc_client.clone(),
5454
last_indexed_slot,
5555
max_concurrent_block_fetches,
56+
None, // No rewind receiver for grpc fallback
5657
))
5758
);
5859

@@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
115116
rpc_client.clone(),
116117
last_indexed_slot,
117118
max_concurrent_block_fetches,
119+
None, // No rewind receiver for grpc fallback
118120
)));
119121
continue;
120122
}
@@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
132134
rpc_client.clone(),
133135
last_indexed_slot,
134136
max_concurrent_block_fetches,
137+
None, // No rewind receiver for grpc fallback
135138
)));
136139
continue;
137140
}
@@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
144147
rpc_client.clone(),
145148
last_indexed_slot,
146149
max_concurrent_block_fetches,
150+
None, // No rewind receiver for grpc fallback
147151
)));
148152
}
149153
}

src/ingester/fetchers/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use std::sync::Arc;
33
use async_stream::stream;
44
use futures::{pin_mut, Stream, StreamExt};
55
use solana_client::nonblocking::rpc_client::RpcClient;
6+
use tokio::sync::mpsc;
67

7-
use super::typedefs::block_info::BlockInfo;
8+
use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo};
89

910
pub mod grpc;
1011
pub mod poller;
@@ -17,10 +18,11 @@ pub struct BlockStreamConfig {
1718
pub geyser_url: Option<String>,
1819
pub max_concurrent_block_fetches: usize,
1920
pub last_indexed_slot: u64,
21+
pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
2022
}
2123

2224
impl BlockStreamConfig {
23-
pub fn load_block_stream(&self) -> impl Stream<Item = Vec<BlockInfo>> {
25+
pub fn load_block_stream(self) -> impl Stream<Item = Vec<BlockInfo>> {
2426
let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| {
2527
let auth_header = std::env::var("GRPC_X_TOKEN").unwrap();
2628
get_grpc_stream_with_rpc_fallback(
@@ -37,6 +39,7 @@ impl BlockStreamConfig {
3739
self.rpc_client.clone(),
3840
self.last_indexed_slot,
3941
self.max_concurrent_block_fetches,
42+
self.rewind_receiver,
4043
))
4144
} else {
4245
None

src/ingester/fetchers/poller.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt};
99
use solana_client::{
1010
nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError,
1111
};
12+
use tokio::sync::mpsc;
1213

1314
use solana_sdk::commitment_config::CommitmentConfig;
1415
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
1516

1617
use crate::{
17-
ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
18+
ingester::{
19+
rewind_controller::RewindCommand,
20+
typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
21+
},
1822
metric,
1923
monitor::{start_latest_slot_updater, LATEST_SLOT},
2024
};
2125

2226
const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];
2327

24-
fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
28+
fn get_slot_stream(
29+
rpc_client: Arc<RpcClient>,
30+
start_slot: u64,
31+
mut rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
32+
) -> impl Stream<Item = u64> {
2533
stream! {
2634
start_latest_slot_updater(rpc_client.clone()).await;
2735
let mut next_slot_to_fetch = start_slot;
2836
loop {
37+
// Check for rewind commands
38+
if let Some(ref mut receiver) = rewind_receiver {
39+
while let Ok(command) = receiver.try_recv() {
40+
match command {
41+
RewindCommand::Rewind { to_slot, reason } => {
42+
log::warn!("Rewinding slot stream to {}: {}", to_slot, reason);
43+
next_slot_to_fetch = to_slot;
44+
}
45+
}
46+
}
47+
}
48+
2949
if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) {
3050
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3151
continue;
@@ -40,13 +60,14 @@ pub fn get_block_poller_stream(
4060
rpc_client: Arc<RpcClient>,
4161
mut last_indexed_slot: u64,
4262
max_concurrent_block_fetches: usize,
63+
rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
4364
) -> impl Stream<Item = Vec<BlockInfo>> {
4465
stream! {
4566
let start_slot = match last_indexed_slot {
4667
0 => 0,
4768
last_indexed_slot => last_indexed_slot + 1
4869
};
49-
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot);
70+
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver);
5071
pin_mut!(slot_stream);
5172
let block_stream = slot_stream
5273
.map(|slot| {

src/ingester/indexer/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub async fn index_block_stream(
5252
rpc_client: Arc<RpcClient>,
5353
last_indexed_slot_at_start: u64,
5454
end_slot: Option<u64>,
55+
rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>,
5556
) {
5657
pin_mut!(block_stream);
5758
let current_slot =
@@ -71,7 +72,7 @@ pub async fn index_block_stream(
7172

7273
while let Some(blocks) = block_stream.next().await {
7374
let last_slot_in_block = blocks.last().unwrap().metadata.slot;
74-
index_block_batch_with_infinite_retries(db.as_ref(), blocks).await;
75+
index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await;
7576

7677
for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) {
7778
let blocks_indexed = slot - last_indexed_slot_at_start;

src/ingester/mod.rs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use sea_orm::QueryTrait;
1515
use sea_orm::Set;
1616
use sea_orm::TransactionTrait;
1717

18-
use self::parser::state_update::StateUpdate;
18+
use self::parser::state_update::{StateUpdate, SequenceGapError};
19+
use self::rewind_controller::{RewindController, determine_rewind_slot};
1920
use self::persist::persist_state_update;
2021
use self::persist::MAX_SQL_INSERTS;
2122
use self::typedefs::block_info::BlockInfo;
@@ -27,20 +28,43 @@ pub mod fetchers;
2728
pub mod indexer;
2829
pub mod parser;
2930
pub mod persist;
31+
pub mod rewind_controller;
3032
pub mod typedefs;
3133

32-
fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
34+
fn derive_block_state_update(
35+
block: &BlockInfo,
36+
rewind_controller: Option<&RewindController>,
37+
) -> Result<StateUpdate, IngesterError> {
3338
let mut state_updates: Vec<StateUpdate> = Vec::new();
3439
for transaction in &block.transactions {
3540
state_updates.push(parse_transaction(transaction, block.metadata.slot)?);
3641
}
37-
Ok(StateUpdate::merge_updates(state_updates))
42+
43+
match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) {
44+
Ok(merged_update) => Ok(merged_update),
45+
Err(SequenceGapError::GapDetected(gaps)) => {
46+
if let Some(controller) = rewind_controller {
47+
let rewind_slot = determine_rewind_slot(&gaps);
48+
let reason = format!(
49+
"Sequence gaps detected in block {}: {} gaps found",
50+
block.metadata.slot,
51+
gaps.len()
52+
);
53+
controller.request_rewind(rewind_slot, reason)?;
54+
}
55+
Err(IngesterError::SequenceGapDetected(gaps))
56+
}
57+
}
3858
}
3959

40-
pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
60+
pub async fn index_block(
61+
db: &DatabaseConnection,
62+
block: &BlockInfo,
63+
rewind_controller: Option<&RewindController>,
64+
) -> Result<(), IngesterError> {
4165
let txn = db.begin().await?;
4266
index_block_metadatas(&txn, vec![&block.metadata]).await?;
43-
persist_state_update(&txn, derive_block_state_update(block)?).await?;
67+
persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?;
4468
txn.commit().await?;
4569
Ok(())
4670
}
@@ -81,16 +105,37 @@ async fn index_block_metadatas(
81105
pub async fn index_block_batch(
82106
db: &DatabaseConnection,
83107
block_batch: &Vec<BlockInfo>,
108+
rewind_controller: Option<&RewindController>,
84109
) -> Result<(), IngesterError> {
85110
let blocks_len = block_batch.len();
86111
let tx = db.begin().await?;
87112
let block_metadatas: Vec<&BlockMetadata> = block_batch.iter().map(|b| &b.metadata).collect();
88113
index_block_metadatas(&tx, block_metadatas).await?;
89114
let mut state_updates = Vec::new();
90115
for block in block_batch {
91-
state_updates.push(derive_block_state_update(block)?);
116+
state_updates.push(derive_block_state_update(block, rewind_controller)?);
92117
}
93-
persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;
118+
119+
let merged_state_update = match StateUpdate::merge_updates_with_slot(
120+
state_updates,
121+
Some(block_batch.last().unwrap().metadata.slot)
122+
) {
123+
Ok(merged) => merged,
124+
Err(SequenceGapError::GapDetected(gaps)) => {
125+
if let Some(controller) = rewind_controller {
126+
let rewind_slot = determine_rewind_slot(&gaps);
127+
let reason = format!(
128+
"Sequence gaps detected in batch ending at slot {}: {} gaps found",
129+
block_batch.last().unwrap().metadata.slot,
130+
gaps.len()
131+
);
132+
controller.request_rewind(rewind_slot, reason)?;
133+
}
134+
return Err(IngesterError::SequenceGapDetected(gaps));
135+
}
136+
};
137+
138+
persist::persist_state_update(&tx, merged_state_update).await?;
94139
metric! {
95140
statsd_count!("blocks_indexed", blocks_len as i64);
96141
}
@@ -101,10 +146,16 @@ pub async fn index_block_batch(
101146
pub async fn index_block_batch_with_infinite_retries(
102147
db: &DatabaseConnection,
103148
block_batch: Vec<BlockInfo>,
149+
rewind_controller: Option<&RewindController>,
104150
) {
105151
loop {
106-
match index_block_batch(db, &block_batch).await {
152+
match index_block_batch(db, &block_batch, rewind_controller).await {
107153
Ok(()) => return,
154+
Err(IngesterError::SequenceGapDetected(_)) => {
155+
// For sequence gaps, we don't retry - we let the rewind mechanism handle it
156+
log::error!("Sequence gap detected in batch, stopping processing to allow rewind");
157+
return;
158+
}
108159
Err(e) => {
109160
let start_block = block_batch.first().unwrap().metadata.slot;
110161
let end_block = block_batch.last().unwrap().metadata.slot;

src/ingester/parser/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result<StateUpdate,
127127
}
128128
}
129129

130-
let mut state_update = StateUpdate::merge_updates(state_updates.clone());
130+
let mut state_update = StateUpdate::merge_updates(state_updates.clone())?;
131131
if !is_voting_transaction(tx) || is_compression_transaction {
132132
state_update.transactions.insert(Transaction {
133133
signature: tx.signature,

0 commit comments

Comments
 (0)