Skip to content

Commit fe6fa31

Browse files
committed
stash return stateupdate
1 parent 27ce82f commit fe6fa31

File tree

5 files changed

+25
-16
lines changed

5 files changed

+25
-16
lines changed

src/ingester/error.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ pub enum IngesterError {
1515
EmptyBatchEvent,
1616
#[error("Invalid event.")]
1717
InvalidEvent,
18-
#[error("Sequence gap detected: {} gaps found", .0.len())]
19-
SequenceGapDetected(Vec<SequenceGap>),
18+
#[error("Sequence gap detected: {} gaps found", .gaps.len())]
19+
SequenceGapDetected {
20+
gaps: Vec<SequenceGap>,
21+
parsed_state: crate::ingester::parser::state_update::StateUpdate,
22+
},
2023
}
2124

2225
impl From<sea_orm::error::DbErr> for IngesterError {
@@ -34,8 +37,8 @@ impl From<String> for IngesterError {
3437
impl From<crate::ingester::parser::state_update::SequenceGapError> for IngesterError {
3538
fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self {
3639
match err {
37-
crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => {
38-
IngesterError::SequenceGapDetected(gaps)
40+
crate::ingester::parser::state_update::SequenceGapError::GapDetected { gaps, parsed_state } => {
41+
IngesterError::SequenceGapDetected { gaps, parsed_state }
3942
}
4043
}
4144
}

src/ingester/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ fn derive_block_state_update(
4242

4343
match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) {
4444
Ok(merged_update) => Ok(merged_update),
45-
Err(SequenceGapError::GapDetected(gaps)) => {
45+
Err(SequenceGapError::GapDetected { gaps, parsed_state }) => {
4646
if let Some(controller) = rewind_controller {
4747
let rewind_slot = determine_rewind_slot(&gaps);
4848
let reason = format!(
@@ -52,7 +52,7 @@ fn derive_block_state_update(
5252
);
5353
controller.request_rewind(rewind_slot, reason)?;
5454
}
55-
Err(IngesterError::SequenceGapDetected(gaps))
55+
Err(IngesterError::SequenceGapDetected { gaps, parsed_state })
5656
}
5757
}
5858
}
@@ -121,7 +121,7 @@ pub async fn index_block_batch(
121121
Some(block_batch.last().unwrap().metadata.slot)
122122
) {
123123
Ok(merged) => merged,
124-
Err(SequenceGapError::GapDetected(gaps)) => {
124+
Err(SequenceGapError::GapDetected { gaps, parsed_state }) => {
125125
if let Some(controller) = rewind_controller {
126126
let rewind_slot = determine_rewind_slot(&gaps);
127127
let reason = format!(
@@ -131,7 +131,7 @@ pub async fn index_block_batch(
131131
);
132132
controller.request_rewind(rewind_slot, reason)?;
133133
}
134-
return Err(IngesterError::SequenceGapDetected(gaps));
134+
return Err(IngesterError::SequenceGapDetected { gaps, parsed_state });
135135
}
136136
};
137137

@@ -151,9 +151,9 @@ pub async fn index_block_batch_with_infinite_retries(
151151
loop {
152152
match index_block_batch(db, &block_batch, rewind_controller).await {
153153
Ok(()) => return,
154-
Err(IngesterError::SequenceGapDetected(_)) => {
154+
Err(IngesterError::SequenceGapDetected { gaps, parsed_state: _ }) => {
155155
// For sequence gaps, we don't retry - we let the rewind mechanism handle it
156-
log::error!("Sequence gap detected in batch, stopping processing to allow rewind");
156+
log::error!("Sequence gap detected in batch with {} gaps, stopping processing to allow rewind", gaps.len());
157157
return;
158158
}
159159
Err(e) => {

src/ingester/parser/state_update.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ pub struct SequenceGap {
2424

2525
#[derive(Debug, Clone)]
2626
pub enum SequenceGapError {
27-
GapDetected(Vec<SequenceGap>),
27+
GapDetected {
28+
gaps: Vec<SequenceGap>,
29+
parsed_state: StateUpdate,
30+
},
2831
}
2932

3033
#[derive(BorshDeserialize, BorshSerialize, Debug, Clone, PartialEq, Eq)]
@@ -188,12 +191,15 @@ impl StateUpdate {
188191
.extend(update.batch_nullify_context);
189192
}
190193

191-
// If gaps were detected, return error
194+
// If gaps were detected, return error WITH the parsed state
192195
if !detected_gaps.is_empty() {
193-
return Err(SequenceGapError::GapDetected(detected_gaps));
196+
return Err(SequenceGapError::GapDetected {
197+
gaps: detected_gaps,
198+
parsed_state: merged,
199+
});
194200
}
195201

196-
// Update highest sequence numbers for each tree
202+
// Update highest sequence numbers for each tree (only if no gaps)
197203
if let Some(slot) = slot {
198204
for ((tree, _leaf_index), value) in &merged.indexed_merkle_tree_updates {
199205
if let Err(e) = TreeInfo::update_highest_seq(tree, value.seq, slot) {

src/ingester/parser/tx_event_parser_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub fn create_state_update_v2(
147147

148148
state_updates.push(state_update_event);
149149
}
150-
150+
// TODO: return the correct gap error.
151151
let merged = StateUpdate::merge_updates(state_updates)?;
152152
Ok(merged)
153153
}

tests/integration_tests/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ pub async fn index_multiple_transactions(
465465
let tx_state_update = parse_transaction(&transaction_info, 0).unwrap();
466466
state_updates.push(tx_state_update);
467467
}
468-
let state_update = StateUpdate::merge_updates(state_updates);
468+
let state_update = StateUpdate::merge_updates(state_updates).expect("Test data should not have sequence gaps");
469469
persist_state_update_using_connection(db_conn.as_ref(), state_update)
470470
.await
471471
.unwrap();

0 commit comments

Comments
 (0)