Skip to content

Commit 11d16da

Browse files
refactor: clear cache and sequence state on gap detection rewinds
refactor: add a dedicated error variant for gap detection
1 parent 0bdcfa3 commit 11d16da

File tree

4 files changed

+10
-6
lines changed

4 files changed

+10
-6
lines changed

src/ingester/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ pub enum IngesterError {
1616
InvalidEvent,
1717
#[error("Custom error: {0}")]
1818
CustomError(String),
19+
#[error("Gap detected, triggering rewind")]
20+
GapDetectedRewind,
1921
}
2022

2123
impl From<sea_orm::error::DbErr> for IngesterError {

src/ingester/fetchers/poller.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ pub fn get_block_poller_stream(
7272
log::error!("Rewinding block stream to {}: {}", to_slot, reason);
7373
// Clear cached blocks
7474
block_cache.clear();
75+
// Clear sequence state to re-learn from rewound point
76+
crate::ingester::gap::clear_sequence_state();
7577
// Reset positions
7678
last_indexed_slot = to_slot - 1;
7779
current_start_slot = to_slot;
7880
rewind_occurred = true;
79-
log::info!("Cleared cache, restarting from slot {}", current_start_slot);
81+
log::info!("Cleared cache and sequence state, restarting from slot {}", current_start_slot);
8082
break;
8183
}
8284
}

src/ingester/indexer/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@ pub async fn index_block_stream(
112112
}
113113
}
114114
Err(e) => {
115-
if e.to_string().contains("Gap detection triggered rewind") {
115+
if matches!(e, crate::ingester::error::IngesterError::GapDetectedRewind) {
116116
// Gap detected, rewind triggered - the slot stream should handle repositioning
117117
log::info!("Gap detection triggered rewind");
118+
// Clear sequence state to re-learn from the rewound point
119+
crate::ingester::gap::clear_sequence_state();
118120
continue;
119121
} else {
120122
log::error!("Unexpected error in block processing: {}", e);

src/ingester/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ fn derive_block_state_update(
7272
));
7373
}
7474
// Return early after requesting rewind - don't continue processing
75-
return Err(IngesterError::CustomError(
76-
"Gap detection triggered rewind".to_string(),
77-
));
75+
return Err(IngesterError::GapDetectedRewind);
7876
}
7977
}
8078

@@ -203,7 +201,7 @@ pub async fn index_block_batch_with_infinite_retries(
203201
Ok(()) => return Ok(()),
204202
Err(e) => {
205203
// Check if this is a gap-triggered rewind error
206-
if e.to_string().contains("Gap detection triggered rewind") {
204+
if matches!(e, IngesterError::GapDetectedRewind) {
207205
// Don't retry, propagate the rewind error up
208206
return Err(e);
209207
}

0 commit comments

Comments
 (0)