Skip to content

feat: rewind on inconsistent seqence numbers #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ananas-block
Copy link

@ananas-block ananas-block commented Jul 24, 2025

Overview

  • Summary of changes

Testing

  • Testing performed to validate the changes

Summary by CodeRabbit

  • New Features

    • Introduced a rewind control mechanism, enabling dynamic rewinding to earlier slots in response to detected sequence gaps during block processing.
    • Added detection and explicit handling of sequence gaps, with integration of a rewind controller to automatically address gaps and prevent data inconsistencies.
    • Enhanced tracking of sequence numbers and slots for improved reliability and error detection in block ingestion.
  • Improvements

    • Integrated error propagation for state update merging, ensuring early error detection and robust handling of sequence issues.
    • Updated configuration and indexing flows to support optional rewind control, improving system resilience and maintainability.
    • Enhanced logging for unknown tree processing to better highlight issues during state update creation.
    • Refined block fetching streams to support rewind commands, allowing dynamic adjustment of data ingestion flow.
  • Bug Fixes

    • Improved error reporting and handling for sequence gaps, reducing the risk of missed or out-of-order data during ingestion.

@ananas-block ananas-block force-pushed the feat-rewind-controller branch from b73f9b5 to fe6fa31 Compare July 24, 2025 03:16
if earliest_slot == u64::MAX {
// No valid slots found, use conservative fallback
gaps.iter()
.map(|gap| gap.expected_seq.saturating_sub(10))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this slot not sequence number

Copy link

coderabbitai bot commented Jul 24, 2025

Walkthrough

This update introduces a rewind mechanism to the ingester pipeline, enabling detection and handling of sequence gaps during state updates. New types and methods are added for gap detection, atomic tracking of sequence numbers and slots, and issuing rewind commands. Functions throughout the ingester and main entrypoint are updated to propagate and respond to rewind control, integrating the mechanism into the block indexing flow.

Changes

File(s) Change Summary
src/ingester/error.rs Added SequenceGapDetected(Vec<SequenceGap>) variant to IngesterError. Added From<String> and From<SequenceGapError> impls for IngesterError.
src/ingester/fetchers/grpc.rs, src/snapshot/snapshotter/main.rs Added explicit None argument for rewind_receiver in calls to get_block_poller_stream for RPC fallback streams. No logic or control flow changes.
src/ingester/fetchers/mod.rs Added optional rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>> field to BlockStreamConfig. Changed load_block_stream to take ownership and pass rewind_receiver to get_block_poller_stream.
src/ingester/fetchers/poller.rs Updated get_slot_stream and get_block_poller_stream to accept optional rewind_receiver. Implemented non-blocking polling of rewind commands to reset slot fetching on rewind requests.
src/ingester/indexer/mod.rs Modified index_block_stream to accept and propagate optional rewind_controller reference to block batch indexing functions.
src/ingester/mod.rs Added sequence gap detection in derive_block_state_update and batch indexing functions using merge_updates_with_slot. On gap detection, request rewind via RewindController and return IngesterError::SequenceGapDetected. Modified retry logic in index_block_batch_with_infinite_retries to stop on sequence gap errors. Updated function signatures to accept optional rewind_controller.
src/ingester/parser/mod.rs, src/ingester/parser/tx_event_parser_v2.rs Added error propagation (?) for calls to StateUpdate::merge_updates to handle sequence gap errors.
src/ingester/parser/state_update.rs Changed merge_updates to return Result<StateUpdate, SequenceGapError>. Added merge_updates_with_slot supporting optional slot and sequence gap detection. Introduced SequenceGap and SequenceGapError types. Added logic to detect sequence gaps via TreeInfo::check_sequence_gap and update highest sequence numbers atomically.
src/ingester/parser/tree_info.rs Added atomic fields highest_seq and last_slot to TreeInfo. Added methods to get/update highest sequence, get last slot for sequence, and check for sequence gaps. Updated static QUEUE_TREE_MAPPING initialization to include new fields.
src/ingester/rewind_controller.rs New module defining RewindCommand enum and RewindController struct with async channel for rewind requests. Provides method to request rewind with slot and reason, logging warnings/errors. Includes function to determine rewind slot from sequence gaps conservatively.
src/main.rs Integrated RewindController into continuous block indexing flow. Created controller and receiver, passed receiver to BlockStreamConfig, and passed controller reference to indexing functions. Snapshot processing remains without rewind controller. Updated function signatures accordingly.
src/snapshot/mod.rs Extracted last_indexed_slot into a local variable before passing to update_snapshot_helper. No logic change.
tests/integration_tests/e2e_tests.rs Added explicit None argument for optional rewind controller parameter in calls to index_block in multiple test functions and helpers.
tests/integration_tests/mock_tests.rs Added explicit None argument for optional rewind controller parameter in calls to index_block in multiple async test functions.
tests/integration_tests/utils.rs Added .unwrap() on StateUpdate::merge_updates result in index_multiple_transactions. Added explicit None argument for rewind controller in calls to index_block.
src/ingester/parser/tx_event_parser.rs Changed log level from warning to error when skipping unknown trees in create_state_update_v1.

Sequence Diagram(s)

sequenceDiagram
    participant Main
    participant BlockStreamConfig
    participant BlockPoller
    participant Indexer
    participant StateUpdate
    participant RewindController

    Main->>BlockStreamConfig: Create with rewind_receiver
    Main->>Indexer: index_block_stream(..., rewind_controller)
    Indexer->>BlockPoller: get_block_poller_stream(..., rewind_receiver)
    BlockPoller->>BlockPoller: Poll for blocks, listen for rewind commands
    BlockPoller->>Indexer: Yield block batch
    Indexer->>StateUpdate: merge_updates_with_slot
    StateUpdate-->>Indexer: SequenceGapError? (if gap detected)
    Indexer->>RewindController: request_rewind(to_slot, reason)
    RewindController-->>BlockPoller: Send RewindCommand via channel
    BlockPoller->>BlockPoller: On RewindCommand, reset slot
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Poem

In fields of code where data flows,
A rabbit hops where sequence grows.
If gaps appear and streams rewind,
New logic helps the path to find.
With atomics, channels, and careful cheer,
The ingester leaps without fear!
🐇⏪✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04cf92a and 47050ad.

📒 Files selected for processing (1)
  • src/ingester/parser/state_update.rs (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/ingester/parser/state_update.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-rewind-controller

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🔭 Outside diff range comments (1)
src/ingester/parser/state_update.rs (1)

129-189: Add sequence gap detection for leaf nullifications and batch Merkle‐tree events

Both leaf_nullifications and batch_merkle_tree_events carry sequence numbers but currently only indexed_merkle_tree_updates are checked for gaps in merge_updates_with_slot. To ensure comprehensive gap detection:

  • Before merged.leaf_nullifications.extend(update.leaf_nullifications), iterate over each LeafNullification in update.leaf_nullifications, group or sort by tree and seq, then call:
    if let Some((expected, actual)) = TreeInfo::check_sequence_gap(&tree, leaf.seq) {
        // log and collect SequenceGap { tree, expected_seq: expected, actual_seq: actual }
    }
  • In the for (key, events) in update.batch_merkle_tree_events loop, for each (seq, _) in events, invoke:
    if let Some((expected, actual)) = TreeInfo::check_sequence_gap(&key, seq) {
        // log and collect SequenceGap
    }
  • Optionally factor this into a helper to avoid duplication.

Implementing these checks will catch missing or out‐of‐order entries across all sequence-tracked collections.

♻️ Duplicate comments (1)
src/ingester/rewind_controller.rs (1)

48-51: Fix: Don't use sequence number as slot in fallback calculation.

The fallback logic uses gap.expected_seq.saturating_sub(10) as a slot value, but sequence numbers and slots are different concepts. This could lead to incorrect rewind targets.

Consider these alternatives:

  1. Skip this gap if no slot mapping is found (don't include in earliest_slot calculation)
  2. Use a conservative fixed slot offset from the current slot
  3. Return an error indicating that rewind slot cannot be determined
         } else {
             // Fallback: conservative approach if we can't find the exact slot
             // This handles the case where this is the first sequence for this tree
-            earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10));
+            // Skip gaps where we can't determine the slot
+            // The tree might be new or have no prior sequences
         }

Also applies to: 58-58

🧹 Nitpick comments (1)
src/ingester/parser/state_update.rs (1)

101-101: Address the TODO: Extend sequence gap detection to all tree types.

The TODO comment indicates that rewind functionality should be extended beyond address tree v1. This is important for consistency across all tree types.

Would you like me to help implement sequence gap detection for other tree types or create an issue to track this work?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0a45ee and 27ce82f.

📒 Files selected for processing (14)
  • src/ingester/error.rs (2 hunks)
  • src/ingester/fetchers/grpc.rs (4 hunks)
  • src/ingester/fetchers/mod.rs (3 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/indexer/mod.rs (2 hunks)
  • src/ingester/mod.rs (4 hunks)
  • src/ingester/parser/mod.rs (1 hunks)
  • src/ingester/parser/state_update.rs (5 hunks)
  • src/ingester/parser/tree_info.rs (9 hunks)
  • src/ingester/parser/tx_event_parser_v2.rs (1 hunks)
  • src/ingester/rewind_controller.rs (1 hunks)
  • src/main.rs (6 hunks)
  • src/snapshot/mod.rs (1 hunks)
  • src/snapshot/snapshotter/main.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
src/ingester/parser/mod.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
src/ingester/parser/tx_event_parser_v2.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
src/ingester/indexer/mod.rs (1)
src/ingester/mod.rs (1)
  • index_block_batch_with_infinite_retries (146-172)
src/ingester/fetchers/poller.rs (2)
src/ingester/typedefs/block_info.rs (1)
  • parse_ui_confirmed_blocked (55-93)
src/monitor/mod.rs (1)
  • start_latest_slot_updater (92-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (31)
src/snapshot/snapshotter/main.rs (1)

259-259: LGTM! Clear architectural distinction.

The explicit None assignment with the accompanying comment clearly indicates that the snapshotter intentionally opts out of the rewind mechanism, which is architecturally sound since snapshot processing doesn't require rewind control.

src/snapshot/mod.rs (1)

454-459: LGTM! Good refactoring for clarity.

Extracting last_indexed_slot into a local variable improves readability and avoids repeated field access. This is a clean refactoring that maintains the same functionality while making the code more readable.

src/ingester/parser/mod.rs (1)

130-130: LGTM! Essential error propagation for rewind mechanism.

The addition of the ? operator properly propagates sequence gap errors from StateUpdate::merge_updates, which is crucial for the rewind control system to detect and handle sequence inconsistencies.

src/ingester/fetchers/grpc.rs (4)

56-56: LGTM! Consistent rewind receiver handling.

The explicit None argument with clear comment properly indicates that the initial RPC poll stream doesn't participate in rewind control, which is architecturally correct for the gRPC fallback mechanism.


119-119: LGTM! Consistent with timeout handling.

The explicit None argument maintains consistency across all gRPC fallback scenarios, correctly indicating that timeout-triggered RPC streams don't use rewind control.


137-137: LGTM! Proper handling for out-of-order scenarios.

The explicit None argument correctly indicates that RPC streams created due to out-of-order gRPC blocks don't participate in rewind control, maintaining architectural consistency.


150-150: LGTM! Complete coverage of fallback scenarios.

The explicit None argument ensures that RPC streams created when gRPC becomes unhealthy don't use rewind control, completing the consistent handling across all gRPC fallback scenarios.

src/ingester/parser/tx_event_parser_v2.rs (1)

151-151: LGTM! Consistent error propagation in v2 parser.

The addition of the ? operator properly propagates sequence gap errors from StateUpdate::merge_updates, ensuring that the v2 transaction event parser participates correctly in the rewind control system.

src/ingester/indexer/mod.rs (2)

55-55: Parameter addition looks good.

The addition of the optional rewind_controller parameter properly integrates the rewind mechanism into the block stream indexing function.


75-75: Proper parameter propagation.

The rewind controller is correctly passed through to the batch processing function, maintaining consistency with the rewind mechanism integration.

src/ingester/fetchers/mod.rs (4)

6-8: Import additions are appropriate.

The new imports support the rewind mechanism integration with proper module boundaries.


21-21: Good integration of rewind receiver into configuration.

The optional rewind_receiver field properly extends the configuration structure to support the rewind mechanism.


42-42: Rewind integration with poller stream only.

The rewind receiver is passed only to the poller stream but not to the gRPC stream. This means rewind functionality is limited to RPC polling mode.

Is this intentional? Should gRPC streams also support rewind functionality for consistency?


25-25: No impact from changing &self to self on load_block_stream
All existing call sites pass a BlockStreamConfig by value and never reuse it after invoking load_block_stream, so consuming ownership here does not break any usage patterns.

• src/main.rs – continously_index_new_blocks(block_stream_config, …) moves the config and immediately calls load_block_stream once.
• src/snapshot/mod.rs – config is passed in and only used to call load_block_stream a single time.

No changes required.

src/ingester/error.rs (4)

2-2: Appropriate import addition.

The SequenceGap import is necessary for the new error variant and maintains proper module boundaries.


18-19: Well-designed error variant.

The SequenceGapDetected variant includes both the gap data and a helpful formatted message showing the count of gaps detected.


28-32: Convenient error conversion.

The From<String> implementation provides a clean way to convert string errors into ParserError variants.


34-42: Proper error mapping from parser to ingester.

The conversion from SequenceGapError to IngesterError correctly maps the parser's gap detection to the ingester's error handling, maintaining the error hierarchy.

src/main.rs (7)

20-20: Appropriate import for rewind functionality.

The RewindController import is necessary for the main function integration.


178-178: Function signature updated correctly.

The addition of the rewind_controller parameter properly integrates rewind capability into the continuous indexing function.


188-188: Proper rewind controller propagation.

The rewind controller is correctly passed to the indexing stream function, enabling gap detection and rewind functionality.


240-240: Appropriate exclusion of rewind for snapshots.

The explicit None for rewind controller during snapshot processing is correct since snapshots are historical data that shouldn't trigger rewinds.


284-285: Clean rewind controller setup.

The creation of the RewindController and separation of the receiver follows proper channel patterns for the rewind mechanism.


292-292: Configuration properly updated.

The rewind receiver is correctly added to the BlockStreamConfig to enable rewind functionality in the block fetching streams.


301-301: Complete integration of rewind controller.

The rewind controller is properly passed to the continuous indexing function, completing the integration chain from creation to usage.

src/ingester/fetchers/poller.rs (6)

12-12: Necessary import for rewind channels.

The tokio mpsc import is required for the rewind receiver functionality.


18-21: Import updates support rewind integration.

The updated imports properly include the RewindCommand type needed for the rewind functionality.


28-32: Function signature correctly updated.

The get_slot_stream function signature is properly updated to accept the optional rewind receiver parameter.


37-47: Well-implemented rewind command processing.

The rewind handling logic is well-designed:

  • Uses non-blocking try_recv() to avoid blocking the stream
  • Processes all pending commands in a loop
  • Logs rewind actions with reason for debugging
  • Correctly resets next_slot_to_fetch to the rewind target

63-63: Function signature updated consistently.

The get_block_poller_stream signature is properly updated to accept and thread the rewind receiver parameter.


70-70: Proper parameter propagation.

The rewind receiver is correctly passed from the block poller stream to the slot stream, completing the rewind integration.

Comment on lines +25 to +29
#[derive(Debug, Clone)]
pub enum SequenceGapError {
GapDetected(Vec<SequenceGap>),
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider implementing standard error traits for SequenceGapError.

The SequenceGapError enum would benefit from implementing std::fmt::Display and std::error::Error traits for better integration with Rust's error handling ecosystem.

-#[derive(Debug, Clone)]
-pub enum SequenceGapError {
-    GapDetected(Vec<SequenceGap>),
-}
+#[derive(Debug, Clone)]
+pub enum SequenceGapError {
+    GapDetected(Vec<SequenceGap>),
+}
+
+impl std::fmt::Display for SequenceGapError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            SequenceGapError::GapDetected(gaps) => {
+                write!(f, "Sequence gaps detected: {} gaps found", gaps.len())
+            }
+        }
+    }
+}
+
+impl std::error::Error for SequenceGapError {}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Debug, Clone)]
pub enum SequenceGapError {
GapDetected(Vec<SequenceGap>),
}
#[derive(Debug, Clone)]
pub enum SequenceGapError {
GapDetected(Vec<SequenceGap>),
}
impl std::fmt::Display for SequenceGapError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SequenceGapError::GapDetected(gaps) => {
write!(f, "Sequence gaps detected: {} gaps found", gaps.len())
}
}
}
}
impl std::error::Error for SequenceGapError {}
🤖 Prompt for AI Agents
In src/ingester/parser/state_update.rs around lines 25 to 29, the
SequenceGapError enum currently only derives Debug and Clone but lacks
implementations for std::fmt::Display and std::error::Error traits. Implement
the Display trait to provide a user-friendly error message describing the gap
detected, and implement the Error trait by using the standard library's error
trait implementation. This will improve error handling integration and
usability.

Comment on lines 44 to 58
pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result<u64, String> {
let tree_pubkey_str = pubkey.to_string();
if let Some(tree_info) = Self::get(&tree_pubkey_str) {
let current = tree_info.highest_seq.load(std::sync::atomic::Ordering::Acquire);
if new_seq > current {
tree_info.highest_seq.store(new_seq, std::sync::atomic::Ordering::Release);
tree_info.last_slot.store(slot, std::sync::atomic::Ordering::Release);
Ok(new_seq)
} else {
Ok(current)
}
} else {
Err(format!("Tree {} not found in mapping", pubkey))
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix race condition in update_highest_seq using atomic compare-exchange.

The current implementation has a race condition where multiple threads could read the same current value and both proceed to update. Use compare_exchange for atomic read-modify-write.

 pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result<u64, String> {
     let tree_pubkey_str = pubkey.to_string();
     if let Some(tree_info) = Self::get(&tree_pubkey_str) {
-        let current = tree_info.highest_seq.load(std::sync::atomic::Ordering::Acquire);
-        if new_seq > current {
-            tree_info.highest_seq.store(new_seq, std::sync::atomic::Ordering::Release);
-            tree_info.last_slot.store(slot, std::sync::atomic::Ordering::Release);
-            Ok(new_seq)
-        } else {
-            Ok(current)
+        loop {
+            let current = tree_info.highest_seq.load(std::sync::atomic::Ordering::Acquire);
+            if new_seq <= current {
+                return Ok(current);
+            }
+            match tree_info.highest_seq.compare_exchange(
+                current,
+                new_seq,
+                std::sync::atomic::Ordering::Release,
+                std::sync::atomic::Ordering::Acquire,
+            ) {
+                Ok(_) => {
+                    tree_info.last_slot.store(slot, std::sync::atomic::Ordering::Release);
+                    return Ok(new_seq);
+                }
+                Err(_) => continue, // Retry with new current value
+            }
         }
     } else {
         Err(format!("Tree {} not found in mapping", pubkey))
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result<u64, String> {
let tree_pubkey_str = pubkey.to_string();
if let Some(tree_info) = Self::get(&tree_pubkey_str) {
let current = tree_info.highest_seq.load(std::sync::atomic::Ordering::Acquire);
if new_seq > current {
tree_info.highest_seq.store(new_seq, std::sync::atomic::Ordering::Release);
tree_info.last_slot.store(slot, std::sync::atomic::Ordering::Release);
Ok(new_seq)
} else {
Ok(current)
}
} else {
Err(format!("Tree {} not found in mapping", pubkey))
}
}
pub fn update_highest_seq(pubkey: &Pubkey, new_seq: u64, slot: u64) -> Result<u64, String> {
let tree_pubkey_str = pubkey.to_string();
if let Some(tree_info) = Self::get(&tree_pubkey_str) {
loop {
let current = tree_info.highest_seq.load(std::sync::atomic::Ordering::Acquire);
if new_seq <= current {
return Ok(current);
}
match tree_info.highest_seq.compare_exchange(
current,
new_seq,
std::sync::atomic::Ordering::Release,
std::sync::atomic::Ordering::Acquire,
) {
Ok(_) => {
tree_info.last_slot.store(slot, std::sync::atomic::Ordering::Release);
return Ok(new_seq);
}
Err(_) => continue, // Retry with updated current
}
}
} else {
Err(format!("Tree {} not found in mapping", pubkey))
}
}
🤖 Prompt for AI Agents
In src/ingester/parser/tree_info.rs lines 44 to 58, the update_highest_seq
function has a race condition because it reads and then writes the highest_seq
atomically but not atomically combined. To fix this, replace the current load
and store logic with an atomic compare_exchange loop that attempts to update
highest_seq only if the current value is less than new_seq, retrying if the
value changes concurrently. This ensures the update is done atomically and
prevents race conditions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
tests/integration_tests/utils.rs (1)

468-468: Consider adding a descriptive error message for better test debugging.

The change correctly handles the new Result return type from merge_updates. However, using .unwrap() in tests without context can make debugging sequence gap errors difficult.

Consider using .expect() with a descriptive message:

-    let state_update = StateUpdate::merge_updates(state_updates).unwrap();
+    let state_update = StateUpdate::merge_updates(state_updates)
+        .expect("Failed to merge state updates due to sequence gap in test");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 27ce82f and 0ae8920.

📒 Files selected for processing (3)
  • tests/integration_tests/e2e_tests.rs (5 hunks)
  • tests/integration_tests/mock_tests.rs (8 hunks)
  • tests/integration_tests/utils.rs (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • tests/integration_tests/mock_tests.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/integration_tests/utils.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (2)
tests/integration_tests/utils.rs (1)

558-558: LGTM! Correctly adapts to the updated index_block signature.

The addition of the None argument properly handles the new optional RewindController parameter. This is appropriate for test scenarios where rewind functionality is not being exercised.

tests/integration_tests/e2e_tests.rs (1)

81-81: LGTM! All index_block calls correctly updated for the new signature.

All calls to index_block have been properly updated to include the new optional RewindController parameter with None values. This is appropriate for these test scenarios where rewind functionality is not being exercised, maintaining the existing test behavior while accommodating the new API.

Also applies to: 501-501, 527-527, 532-532, 555-555, 590-590

ananas-block and others added 4 commits July 24, 2025 06:15
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
src/ingester/parser/state_update.rs (1)

25-28: Implement standard error traits for better error handling integration.

The SequenceGapError enum would benefit from implementing std::fmt::Display and std::error::Error traits as mentioned in the previous review.

src/ingester/parser/tree_info.rs (1)

44-71: Fix race condition in atomic sequence updates.

The current implementation has a race condition where multiple threads could read the same current value and both proceed to update. The previous review comment correctly identified this issue and provided a solution using compare_exchange.

🧹 Nitpick comments (1)
src/ingester/parser/tx_event_parser.rs (1)

71-71: Consider the appropriateness of the error log level for unknown trees.

Changing from log::warn! to log::error! increases the severity when skipping unknown trees. While this aligns with the new rewind mechanism's stricter error handling, consider whether "error" is the appropriate level for a condition that the system gracefully handles by skipping (when SKIP_UNKNOWN_TREES is enabled).

If this is truly an error condition that operators need to address urgently, the current change is appropriate. However, if it's an expected condition during normal operation, consider using log::warn! or log::info! instead.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 297aa06 and 270e930.

📒 Files selected for processing (5)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/parser/state_update.rs (5 hunks)
  • src/ingester/parser/tree_info.rs (9 hunks)
  • src/ingester/parser/tx_event_parser.rs (1 hunks)
  • src/ingester/rewind_controller.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/ingester/rewind_controller.rs
  • src/ingester/fetchers/poller.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (7)
src/ingester/parser/state_update.rs (3)

18-23: LGTM! Well-structured sequence gap representation.

The SequenceGap struct provides clear context about detected gaps with the tree, expected sequence, and actual sequence. The PartialEq and Eq implementations will be useful for testing and comparison.


125-127: LGTM! Good API design with backwards compatibility.

The original merge_updates method now delegates to merge_updates_with_slot with None, maintaining backwards compatibility while enabling the new slot-aware functionality.


196-203: LGTM! Proper atomic sequence number updates with error handling.

The code correctly updates the highest sequence numbers for each tree when a slot is provided, with appropriate error logging for failed updates. The atomic operations ensure thread safety.

src/ingester/parser/tree_info.rs (4)

13-14: LGTM! Proper atomic field design for concurrent access.

The atomic fields highest_seq and last_slot wrapped in Arc enable safe concurrent access to sequence tracking data across threads.


38-42: LGTM! Simple and correct atomic read operation.

The get_highest_seq method correctly uses Acquire ordering for reading the atomic value, ensuring proper synchronization.


73-92: Verify the logic for determining rewind slots.

The get_last_slot_for_seq method returns the slot for any target sequence that is less than or equal to the current highest sequence. This assumes that all sequences up to the highest were processed in the same slot, which may not be accurate for determining precise rewind points.

Consider whether this method should track per-sequence slot information or if the current approach is sufficient for the rewind mechanism's needs.


274-275: LGTM! Consistent atomic field initialization across all tree types.

All TreeInfo instances are properly initialized with atomic sequence tracking fields set to zero, ensuring consistent behavior across different tree types.

Also applies to: 286-287, 300-301, 312-313, 358-359, 370-371, 384-385

Comment on lines 150 to 163
// Check for sequence gaps before merging
if let Some((expected_seq, actual_seq)) =
TreeInfo::check_sequence_gap(&tree, value.seq)
{
error!(
"Sequence gap detected for tree {}: expected {}, got {}",
tree, expected_seq, actual_seq
);
detected_gaps.push(SequenceGap {
tree,
expected_seq,
actual_seq,
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify sequence gap detection logic for all tree types.

The sequence gap detection currently only works for address v1 trees (as noted in the comment on line 99-100 in tree_info.rs with + 2). However, this code is checking gaps for all trees in indexed_merkle_tree_updates without type filtering.


🏁 Script executed:

#!/bin/bash
# Description: Check if sequence gap detection should be limited to specific tree types
# Expected: Find usage patterns and tree type filtering logic

# Search for tree type filtering in gap detection
rg -A 10 -B 5 "check_sequence_gap"

# Search for tree type usage in indexed_merkle_tree_updates
ast-grep --pattern 'indexed_merkle_tree_updates'

Length of output: 2662


🏁 Script executed:

#!/bin/bash
# Display relevant portion of tree_info.rs around check_sequence_gap
sed -n '1,200p' src/ingester/parser/tree_info.rs

Length of output: 7675


Filter sequence-gap checks by tree type
The current check_sequence_gap logic (+ 2) only applies to Address V1 trees but is invoked for every tree in indexed_merkle_tree_updates. We should either restrict this check to Address V1 or extend the algorithm per TreeType.

Locations to update:

  • src/ingester/parser/state_update.rs (around lines 150–163)
  • Optionally, src/ingester/parser/tree_info.rs if you’d rather generalize check_sequence_gap

Suggested change in state_update.rs (wrap gap detection in a tree-type guard):

 use light_compressed_account::TreeType;
 
             let (tree, _leaf_index) = key;
 
-            // Check for sequence gaps before merging
-            if let Some((expected_seq, actual_seq)) =
-                TreeInfo::check_sequence_gap(&tree, value.seq)
-            {
-                error!(…);
-                detected_gaps.push(…);
-            }
+            // Only run gap detection for Address V1 trees
+            if TreeInfo::get_tree_type(&tree) == TreeType::AddressV1 {
+                if let Some((expected_seq, actual_seq)) =
+                    TreeInfo::check_sequence_gap(&tree, value.seq)
+                {
+                    error!(
+                        "Sequence gap detected for tree {}: expected {}, got {}",
+                        tree, expected_seq, actual_seq
+                    );
+                    detected_gaps.push(SequenceGap {
+                        tree,
+                        expected_seq,
+                        actual_seq,
+                    });
+                }
+            }

If you prefer a single-entry point, modify TreeInfo::check_sequence_gap to:

  • Obtain tree_type via Self::get_tree_type(pubkey)
  • Match on it (AddressV1 => current_highest + 2, other types as needed)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check for sequence gaps before merging
if let Some((expected_seq, actual_seq)) =
TreeInfo::check_sequence_gap(&tree, value.seq)
{
error!(
"Sequence gap detected for tree {}: expected {}, got {}",
tree, expected_seq, actual_seq
);
detected_gaps.push(SequenceGap {
tree,
expected_seq,
actual_seq,
});
}
use light_compressed_account::TreeType;
let (tree, _leaf_index) = key;
// Only run gap detection for Address V1 trees
if TreeInfo::get_tree_type(&tree) == TreeType::AddressV1 {
if let Some((expected_seq, actual_seq)) =
TreeInfo::check_sequence_gap(&tree, value.seq)
{
error!(
"Sequence gap detected for tree {}: expected {}, got {}",
tree, expected_seq, actual_seq
);
detected_gaps.push(SequenceGap {
tree,
expected_seq,
actual_seq,
});
}
}
🤖 Prompt for AI Agents
In src/ingester/parser/state_update.rs around lines 150 to 163, the sequence gap
check is currently applied to all trees but the logic only fits Address V1
trees. To fix this, wrap the gap detection code in a condition that first checks
if the tree type is Address V1 before calling check_sequence_gap. Alternatively,
update TreeInfo::check_sequence_gap in src/ingester/parser/tree_info.rs to get
the tree type using Self::get_tree_type(pubkey) and match on it, applying the +2
logic only for Address V1 and handling other tree types appropriately.

Comment on lines 93 to 106
pub fn check_sequence_gap(pubkey: &Pubkey, incoming_seq: u64) -> Option<(u64, u64)> {
if let Some(current_highest) = Self::get_highest_seq(pubkey) {
// We init with 0, we cannot crash on 0
if current_highest == 0 {
return None;
}
// Only works for address v1 trees because + 2.
let expected_seq = current_highest + 2;
if incoming_seq != expected_seq {
return Some((expected_seq, incoming_seq));
}
}
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Document the address v1 tree limitation and hardcoded increment.

The sequence gap detection has important limitations:

  1. Only works for address v1 trees (line 99-100 comment)
  2. Uses a hardcoded + 2 increment (line 100)
  3. Skips validation when current_highest == 0

Consider making this more generic:

-    pub fn check_sequence_gap(pubkey: &Pubkey, incoming_seq: u64) -> Option<(u64, u64)> {
+    pub fn check_sequence_gap(pubkey: &Pubkey, incoming_seq: u64) -> Option<(u64, u64)> {
+        let tree_type = Self::get_tree_type(pubkey);
+        
+        // Only check gaps for trees that have sequence numbers
+        if !matches!(tree_type, TreeType::AddressV1 | TreeType::StateV1) {
+            return None;
+        }
+        
         if let Some(current_highest) = Self::get_highest_seq(pubkey) {
-            // We init with 0, we cannot crash on 0
             if current_highest == 0 {
                 return None;
             }
-            // Only works for address v1 trees because + 2.
-            let expected_seq = current_highest + 2;
+            
+            let expected_seq = match tree_type {
+                TreeType::AddressV1 => current_highest + 2,
+                TreeType::StateV1 => current_highest + 1,
+                _ => return None,
+            };
+            
             if incoming_seq != expected_seq {
                 return Some((expected_seq, incoming_seq));
             }
         }
         None
     }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/ingester/parser/tree_info.rs lines 93 to 106, the check_sequence_gap
function has a hardcoded +2 increment and only supports address v1 trees, with
no validation when current_highest is 0. Update the comments to clearly document
these limitations and the reason for skipping validation at 0. Consider
refactoring the function to accept parameters or configuration to handle
different tree versions and sequence increments more generically instead of
hardcoding +2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants