Skip to content

Sergey/rewind #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

Sergey/rewind #36

wants to merge 13 commits into from

Conversation

ananas-block
Copy link

@ananas-block ananas-block commented Jul 25, 2025

Overview

  • Summary of changes

Testing

  • Testing performed to validate the changes

Summary by CodeRabbit

  • New Features

    • Introduced automatic detection and handling of sequence gaps during block ingestion, with the ability to rewind to the correct slot and resume indexing.
    • Added a rewind controller to coordinate and trigger rewinds when gaps are detected.
    • Enhanced state update merging with robust sequence gap detection and reporting.
    • Enabled dynamic rewinding of block streams based on rewind commands.
  • Bug Fixes

    • Improved error handling for unknown trees and sequence gaps, ensuring more reliable indexing.
    • Refined block cache processing to handle block ordering and forks more precisely.
  • Tests

    • Added comprehensive integration tests for gap detection and rewind functionality.
    • Updated existing tests to support new function signatures and error handling.
  • Chores

    • Removed deprecated custom database migrations and related modules.
    • Refactored configuration and initialization to support rewind features.

Copy link

coderabbitai bot commented Jul 25, 2025

Walkthrough

This update introduces a rewind control system to the ingester, enabling detection and handling of sequence gaps in state updates. It adds new error types, modifies state update merging to detect gaps, and integrates a channel-based rewind mechanism. The block indexing and streaming workflows are updated to propagate and respond to rewind requests. Related integration tests and migration modules are adjusted or removed accordingly.

Changes

File(s) Change Summary
src/ingester/error.rs Added SequenceGapDetected to IngesterError, new From impls for String and SequenceGapError.
src/ingester/fetchers/grpc.rs Explicitly passes None for rewind receiver in gRPC stream fallback logic.
src/ingester/fetchers/mod.rs Adds optional rewind_receiver to BlockStreamConfig, updates load_block_stream to own self and pass receiver.
src/ingester/fetchers/poller.rs Updates slot/block poller streams to accept and process optional rewind receiver and commands; refines block cache ordering and fork handling.
src/ingester/indexer/mod.rs Adds optional rewind_controller to index_block_stream, updates slot iteration and progress logic.
src/ingester/mod.rs Integrates sequence gap detection and rewind logic throughout block indexing functions; updates signatures to accept optional RewindController.
src/ingester/parser/mod.rs, src/ingester/parser/tx_event_parser_v2.rs Adds error propagation for state update merging.
src/ingester/parser/state_update.rs Refactors merge_updates to return Result with gap detection; adds merge_updates_with_slot, SequenceGap, and SequenceGapError.
src/ingester/parser/tree_info.rs Adds atomic sequence/slot tracking to TreeInfo; new methods for sequence gap detection and updates.
src/ingester/parser/tx_event_parser.rs Changes unknown tree log from warning to error.
src/ingester/rewind_controller.rs New module: defines RewindController, RewindCommand, and logic to determine rewind slot from sequence gaps.
src/main.rs Integrates RewindController into block streaming/indexing; updates function signatures and initialization.
src/snapshot/mod.rs, src/snapshot/snapshotter/main.rs Adds/sets rewind_receiver to None in snapshot block stream config and calls.
src/migration/migrations/custom/custom20250211_000002_solayer2.rs,
src/migration/migrations/custom/custom20252201_000001_init.rs
Removes two custom migration files implementing partial indexes.
src/migration/migrations/custom/mod.rs Removes module exposing custom migrations and their aggregation function.
src/migration/migrations/mod.rs Comments out public exposure of custom migrations module.
src/migration/mod.rs Removes import and usage of custom migrations in migrator trait impl.
tests/integration_tests/e2e_tests.rs,
tests/integration_tests/mock_tests.rs,
tests/integration_tests/utils.rs
Updates all index_block calls to include explicit None for new parameter.
tests/integration_tests/gap_detection_tests.rs New integration tests for sequence gap detection in state updates.
tests/integration_tests/main.rs Registers new gap detection test module.

Sequence Diagram(s)

sequenceDiagram
    participant BlockPoller
    participant RewindController
    participant Indexer
    participant StateUpdate
    participant TreeInfo

    Indexer->>StateUpdate: merge_updates_with_slot(updates, slot)
    StateUpdate->>TreeInfo: check_sequence_gap(pubkey, seq)
    TreeInfo-->>StateUpdate: gap info (if any)
    StateUpdate-->>Indexer: Result (Ok or SequenceGapError)
    alt SequenceGapError
        Indexer->>RewindController: request_rewind(to_slot, reason)
        RewindController-->>BlockPoller: RewindCommand::Rewind
        BlockPoller->>BlockPoller: Resets slot to rewind point
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

A bunny with code in its paws,
Now leaps through the sequence with cause.
When a gap in the garden appears,
It rewinds with twitching ears.
No carrot is left behind—
With atomic hops, all gaps we find!
Onward, the ingester hops, gap-free and refined. 🥕

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sergey/rewind

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (11)
src/migration/mod.rs (2)

17-27: Consider refactoring the redundant migrator struct.

Both Migrator and MigractorWithCustomMigrations now serve identical purposes since custom migrations have been removed. The name MigractorWithCustomMigrations is misleading as it no longer includes custom migrations.

Consider either:

  1. Removing MigractorWithCustomMigrations entirely if it's no longer needed
  2. Renaming it to reflect its current purpose
  3. Adding a comment explaining why both structs exist
-pub struct MigractorWithCustomMigrations;
+// TODO: Remove this struct if no longer needed, or rename to reflect actual purpose
+pub struct MigractorWithCustomMigrations;

24-24: Consider removing commented code in production.

The commented code should be removed if the custom migrations are permanently disabled, or documented with a TODO/FIXME comment explaining the temporary nature if this is intended to be re-enabled later.

-            // .chain(get_custom_migrations())
+            // TODO: Custom migrations temporarily disabled - remove this comment if permanent
src/ingester/error.rs (1)

28-32: Generic String to ParserError conversion may hide error context.

The From<String> implementation converts any string error into a ParserError. This is quite broad and could lead to non-parser errors being miscategorized. Consider using a more specific conversion or documenting when this conversion is appropriate.

src/ingester/indexer/mod.rs (1)

69-69: Remove commented out code.

The last_indexed_slot variable tracking has been commented out but not removed. If this tracking is no longer needed due to the rewind mechanism, please remove these lines entirely rather than leaving them as comments.

-    // let mut last_indexed_slot = last_indexed_slot_at_start;
-            // last_indexed_slot = slot;

Also applies to: 97-97

src/ingester/fetchers/poller.rs (1)

38-47: Consider handling channel closure gracefully

The current implementation uses try_recv() which returns Err when the channel is closed. While this won't cause issues in the current loop structure, it's more idiomatic to handle the Disconnected case explicitly to distinguish between an empty channel and a closed one.

 if let Some(ref mut receiver) = rewind_receiver {
-    while let Ok(command) = receiver.try_recv() {
+    loop {
+        match receiver.try_recv() {
+            Ok(command) => {
                 match command {
                     RewindCommand::Rewind { to_slot, reason } => {
                         log::error!("Rewinding slot stream to {}: {}", to_slot, reason);
                         next_slot_to_fetch = to_slot;
                     }
                 }
+            }
+            Err(mpsc::error::TryRecvError::Empty) => break,
+            Err(mpsc::error::TryRecvError::Disconnected) => {
+                rewind_receiver = None;
+                break;
+            }
+        }
     }
 }
src/ingester/parser/state_update.rs (2)

205-212: Extract SequenceSource enum to module level

The SequenceSource enum is useful for debugging and could be reused elsewhere. Consider moving it to the module level.

Move the enum definition outside the function and make it public if needed for debugging or testing purposes.


301-301: Address TODO: Implement batch nullify context gap detection

The TODO comment indicates missing gap detection for batch nullify context. This could lead to undetected gaps in that component.

Would you like me to help implement gap detection for batch nullify context? This would require associating queue indices with their respective trees.

src/ingester/parser/tree_info.rs (2)

286-301: Optimize Arc usage for shared atomic counters

Currently, separate Arc<AtomicU64> instances are created for both the tree and queue entries of the same logical tree. Since they represent the same tree, they should share the same atomic counters.

 for (legacy_tree, legacy_queue) in legacy_state_trees.iter() {
+    let highest_seq = Arc::new(AtomicU64::new(0));
+    let last_slot = Arc::new(AtomicU64::new(0));
+    
     m.insert(
         legacy_queue.to_string(),
         TreeInfo {
             tree: *legacy_tree,
             queue: *legacy_queue,
             height: 26,
             tree_type: TreeType::StateV1,
-            highest_seq: Arc::new(AtomicU64::new(0)),
-            last_slot: Arc::new(AtomicU64::new(0)),
+            highest_seq: Arc::clone(&highest_seq),
+            last_slot: Arc::clone(&last_slot),
         },
     );

     m.insert(
         legacy_tree.to_string(),
         TreeInfo {
             tree: *legacy_tree,
             queue: *legacy_queue,
             height: 26,
             tree_type: TreeType::StateV1,
-            highest_seq: Arc::new(AtomicU64::new(0)),
-            last_slot: Arc::new(AtomicU64::new(0)),
+            highest_seq: Arc::clone(&highest_seq),
+            last_slot: Arc::clone(&last_slot),
         },
     );
 }

95-98: Clarify initialization behavior in comment

The comment "We init with 0, we cannot crash on 0" is unclear. It should explain that sequence 0 is treated as uninitialized state.

-// We init with 0, we cannot crash on 0
+// Sequence 0 indicates uninitialized state - skip gap detection
 if current_highest == 0 {
     return None;
 }
src/ingester/mod.rs (2)

119-121: Consider logging when processing empty batch.

While returning Ok(()) for empty batches is valid, consider adding a warning log to help with debugging:

 if block_batch.is_empty() {
+    log::warn!("Attempted to index empty block batch");
     return Ok(());
 }

123-140: Consider extracting the last block slot to avoid repeated unwrap calls.

To improve readability and avoid repeated unwrap() calls:

+let last_slot = block_batch.last().unwrap().metadata.slot;
 let merged_state_update = match StateUpdate::merge_updates_with_slot(
     state_updates,
-    Some(block_batch.last().unwrap().metadata.slot),
+    Some(last_slot),
 ) {
     Ok(merged) => merged,
     Err(SequenceGapError::GapDetected(gaps)) => {
         if let Some(controller) = rewind_controller {
             let rewind_slot = determine_rewind_slot(&gaps);
             let reason = format!(
                 "Sequence gaps detected in batch ending at slot {}: {} gaps found",
-                block_batch.last().unwrap().metadata.slot,
+                last_slot,
                 gaps.len()
             );
             controller.request_rewind(rewind_slot, reason)?;
         }
         return Err(IngesterError::SequenceGapDetected(gaps));
     }
 };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0a45ee and aa1d0f3.

📒 Files selected for processing (25)
  • src/ingester/error.rs (2 hunks)
  • src/ingester/fetchers/grpc.rs (4 hunks)
  • src/ingester/fetchers/mod.rs (3 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/indexer/mod.rs (3 hunks)
  • src/ingester/mod.rs (4 hunks)
  • src/ingester/parser/mod.rs (1 hunks)
  • src/ingester/parser/state_update.rs (4 hunks)
  • src/ingester/parser/tree_info.rs (9 hunks)
  • src/ingester/parser/tx_event_parser.rs (1 hunks)
  • src/ingester/parser/tx_event_parser_v2.rs (1 hunks)
  • src/ingester/rewind_controller.rs (1 hunks)
  • src/main.rs (6 hunks)
  • src/migration/migrations/custom/custom20250211_000002_solayer2.rs (0 hunks)
  • src/migration/migrations/custom/custom20252201_000001_init.rs (0 hunks)
  • src/migration/migrations/custom/mod.rs (0 hunks)
  • src/migration/migrations/mod.rs (1 hunks)
  • src/migration/mod.rs (2 hunks)
  • src/snapshot/mod.rs (1 hunks)
  • src/snapshot/snapshotter/main.rs (1 hunks)
  • tests/integration_tests/e2e_tests.rs (5 hunks)
  • tests/integration_tests/gap_detection_tests.rs (1 hunks)
  • tests/integration_tests/main.rs (1 hunks)
  • tests/integration_tests/mock_tests.rs (8 hunks)
  • tests/integration_tests/utils.rs (2 hunks)
💤 Files with no reviewable changes (3)
  • src/migration/migrations/custom/custom20250211_000002_solayer2.rs
  • src/migration/migrations/custom/mod.rs
  • src/migration/migrations/custom/custom20252201_000001_init.rs
🧰 Additional context used
🧬 Code Graph Analysis (6)
tests/integration_tests/utils.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
src/ingester/parser/mod.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
src/migration/mod.rs (1)
src/migration/migrations/standard/mod.rs (1)
  • get_standard_migrations (13-25)
src/ingester/parser/tx_event_parser_v2.rs (1)
src/ingester/parser/state_update.rs (1)
  • merge_updates (125-127)
src/ingester/fetchers/poller.rs (2)
src/ingester/typedefs/block_info.rs (1)
  • parse_ui_confirmed_blocked (55-93)
src/monitor/mod.rs (1)
  • start_latest_slot_updater (92-104)
src/ingester/indexer/mod.rs (1)
src/ingester/mod.rs (1)
  • index_block_batch_with_infinite_retries (150-176)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (27)
src/migration/migrations/mod.rs (1)

1-1: LGTM! Clean removal of custom migrations module.

The commenting out of the custom module declaration is consistent with the broader effort to remove custom migrations from the system.

src/migration/mod.rs (1)

1-1: LGTM! Import correctly updated for standard migrations only.

The import statement has been properly updated to only include get_standard_migrations, which is consistent with the removal of custom migrations.

tests/integration_tests/main.rs (1)

8-8: LGTM! Good addition of gap detection tests.

The new gap_detection_tests module registration appropriately extends the integration test suite to cover the newly introduced sequence gap detection functionality.

src/ingester/parser/tx_event_parser.rs (1)

71-71: LGTM! Appropriate log level elevation.

Changing from log::warn! to log::error! provides better visibility for unknown tree conditions, which aligns with the enhanced error handling discipline introduced across the parser components.

src/snapshot/mod.rs (1)

454-459: LGTM! Clean variable extraction for improved readability.

Extracting last_indexed_slot into a local variable improves code clarity without changing functionality. This refactor makes the update_snapshot_helper call more readable.

src/snapshot/snapshotter/main.rs (1)

259-259: LGTM! Appropriate exclusion of rewind functionality for snapshotter.

Setting rewind_receiver: None correctly excludes the snapshotter from rewind control logic, which is appropriate since snapshots operate independently of the main indexing flow's rewind mechanisms. The comment clearly documents this design decision.

src/ingester/parser/mod.rs (1)

130-130: LGTM! Essential error propagation for sequence gap detection.

Adding the ? operator correctly propagates errors from StateUpdate::merge_updates, which now returns a Result that can indicate sequence gaps. This error propagation is crucial for the rewind control system to detect and respond to sequence gaps during state update merging.

src/ingester/fetchers/grpc.rs (1)

56-56: LGTM! Consistent integration of rewind receiver parameter.

The explicit None arguments for the rewind receiver parameter are correctly added across all get_block_poller_stream calls. The comments clearly indicate that the gRPC fallback stream doesn't support rewind commands, which aligns with the design decision to only enable rewind control on the primary poller stream.

Also applies to: 119-119, 137-137, 150-150

src/ingester/parser/tx_event_parser_v2.rs (1)

151-151: LGTM! Proper error propagation for sequence gap detection.

The addition of the ? operator correctly propagates errors from StateUpdate::merge_updates, which now returns a Result that can indicate sequence gaps via SequenceGapError. This aligns with the enhanced error handling for sequence gap detection throughout the ingester pipeline.

tests/integration_tests/utils.rs (2)

468-468: LGTM! Proper handling of Result return type.

The .unwrap() call correctly handles the new Result return type from StateUpdate::merge_updates. In the test context, immediate failure on error is appropriate behavior.


558-558: LGTM! Correct integration of optional RewindController parameter.

The explicit None argument properly accommodates the updated index_block function signature that now accepts an optional RewindController parameter. This maintains existing test behavior without rewind control.

tests/integration_tests/mock_tests.rs (1)

83-83: LGTM! Consistent integration across all test functions.

All index_block calls have been systematically updated with the explicit None argument for the new optional RewindController parameter. The changes are consistent across all test functions and maintain existing test behavior without introducing rewind control logic.

Also applies to: 174-174, 458-458, 1078-1078, 1121-1121, 1203-1203, 1270-1270, 1357-1357

tests/integration_tests/e2e_tests.rs (1)

81-81: Test updates correctly handle the new RewindController parameter.

All index_block calls have been properly updated to include None as the third parameter, which is appropriate for these integration tests that don't need rewind functionality.

Also applies to: 501-501, 527-527, 532-532, 555-555, 590-590

src/ingester/fetchers/mod.rs (2)

25-25: Breaking change: load_block_stream now consumes self.

The method signature changed from &self to self, making BlockStreamConfig a single-use struct. This prevents reusing the configuration for multiple streams. Consider documenting this breaking change or evaluating if borrowing would still work with the rewind receiver.


37-46: Rewind support is only available in polling mode, not gRPC mode.

The rewind receiver is only passed to the poller stream when gRPC is not configured. This means sequence gap detection and rewind functionality won't work when using gRPC streaming. Is this intentional? Consider documenting this limitation or implementing rewind support for gRPC mode as well.

src/ingester/error.rs (1)

18-19: Well-structured error variant for sequence gaps.

The SequenceGapDetected variant provides clear error messaging with the number of gaps found, making debugging easier.

src/main.rs (2)

284-285: Proper integration of rewind controller in the indexing pipeline.

The rewind controller is correctly instantiated and its receiver is passed to the block stream configuration, enabling gap detection and rewind functionality for live indexing.

Also applies to: 292-292, 301-301


240-240: Correct exclusion of rewind for snapshot processing.

Snapshot processing appropriately passes None for the rewind controller, as rewinding doesn't make sense when processing historical snapshots.

src/ingester/indexer/mod.rs (2)

78-79: Verify the loop iteration change is correct.

The loop now iterates from first_slot_in_block to last_slot_in_block + 1, and blocks_indexed is calculated using slot.saturating_sub(last_indexed_slot_at_start). This change appears to support the rewind mechanism, but please verify this correctly tracks progress when blocks are rewound and re-processed.


76-76: Rewind controller properly integrated into batch processing.

The rewind controller is correctly passed through to index_block_batch_with_infinite_retries, enabling sequence gap detection at the batch processing level.

src/ingester/fetchers/poller.rs (1)

70-70: LGTM! Clean integration of rewind support

The rewind_receiver is properly threaded through from get_block_poller_stream to get_slot_stream, maintaining backward compatibility with the Option wrapper.

tests/integration_tests/gap_detection_tests.rs (2)

136-161: Test correctly validates gap detection for output accounts

The test properly validates that gaps in output account leaf indices are detected. However, as noted in the helper function comment, this uses leaf indices rather than sequence numbers for gap detection.


294-308: Excellent edge case coverage for snapshot scenarios

The test validates that the gap detection works correctly even when starting from high sequence numbers (e.g., after loading from a snapshot), which is crucial for production scenarios.

src/ingester/mod.rs (4)

18-21: Imports are well-organized and follow Rust conventions.

The new imports for sequence gap detection and rewind control are properly structured.

Also applies to: 31-31


34-58: Well-implemented sequence gap detection and rewind handling.

The function properly integrates the rewind control mechanism with clear error handling and descriptive rewind reasons.


60-70: Correct propagation of rewind controller.

The function signature and implementation are properly updated to support rewind control.


150-176: Excellent handling of sequence gaps in retry logic.

The function correctly distinguishes between sequence gaps (which require rewind) and other errors (which can be retried). The log message clearly communicates the intention.

Comment on lines +133 to +137
#[cfg(test)]
if slot.is_some() {
use crate::ingester::parser::tree_info::TreeInfo;
TreeInfo::reset_all_sequences();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove test-specific code from production logic

The conditional TreeInfo::reset_all_sequences() call for tests violates separation of concerns. Test setup should be handled in the test code, not in production logic.

-#[cfg(test)]
-if slot.is_some() {
-    use crate::ingester::parser::tree_info::TreeInfo;
-    TreeInfo::reset_all_sequences();
-}

Move this reset logic to the test setup in gap_detection_tests.rs.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[cfg(test)]
if slot.is_some() {
use crate::ingester::parser::tree_info::TreeInfo;
TreeInfo::reset_all_sequences();
}
🤖 Prompt for AI Agents
In src/ingester/parser/state_update.rs lines 133 to 137, remove the
test-specific conditional block that calls TreeInfo::reset_all_sequences()
within the production code. Instead, relocate this reset logic to the test setup
section of gap_detection_tests.rs to maintain separation of concerns and keep
test setup isolated from production logic.

Comment on lines +108 to 119
/// Reset all atomic counters for tests - only used in test environments
pub fn reset_all_sequences() {
for tree_info in QUEUE_TREE_MAPPING.values() {
tree_info
.highest_seq
.store(0, std::sync::atomic::Ordering::Release);
tree_info
.last_slot
.store(0, std::sync::atomic::Ordering::Release);
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Test utility used in production code

The reset_all_sequences() function is documented as test-only but is called from production code in state_update.rs. This should be properly marked with #[cfg(test)] to prevent accidental usage in production.

 /// Reset all atomic counters for tests - only used in test environments
+#[cfg(test)]
 pub fn reset_all_sequences() {
     for tree_info in QUEUE_TREE_MAPPING.values() {
         tree_info
             .highest_seq
             .store(0, std::sync::atomic::Ordering::Release);
         tree_info
             .last_slot
             .store(0, std::sync::atomic::Ordering::Release);
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Reset all atomic counters for tests - only used in test environments
pub fn reset_all_sequences() {
for tree_info in QUEUE_TREE_MAPPING.values() {
tree_info
.highest_seq
.store(0, std::sync::atomic::Ordering::Release);
tree_info
.last_slot
.store(0, std::sync::atomic::Ordering::Release);
}
}
}
/// Reset all atomic counters for tests - only used in test environments
#[cfg(test)]
pub fn reset_all_sequences() {
for tree_info in QUEUE_TREE_MAPPING.values() {
tree_info
.highest_seq
.store(0, std::sync::atomic::Ordering::Release);
tree_info
.last_slot
.store(0, std::sync::atomic::Ordering::Release);
}
}
}
🤖 Prompt for AI Agents
In src/ingester/parser/tree_info.rs around lines 108 to 119, the
reset_all_sequences() function is intended only for testing but is currently
included in production code and called from state_update.rs. To fix this, add
the #[cfg(test)] attribute above the reset_all_sequences() function to ensure it
is compiled and available only in test builds, preventing accidental use in
production.

Comment on lines +20 to +32
pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), String> {
let command = RewindCommand::Rewind {
to_slot,
reason: reason.clone(),
};

error!("Requesting rewind to slot {}: {}", to_slot, reason);

self.sender.send(command).map_err(|e| {
error!("Failed to send rewind command: {}", e);
format!("Failed to send rewind command: {}", e)
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging level and avoid unnecessary clone

Two issues in the request_rewind method:

  1. Line 26 logs a rewind request as an error, but this seems to be normal operation flow
  2. The reason string is cloned unnecessarily on line 23
 pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), String> {
     let command = RewindCommand::Rewind {
         to_slot,
-        reason: reason.clone(),
+        reason,
     };

-    error!("Requesting rewind to slot {}: {}", to_slot, reason);
+    tracing::info!("Requesting rewind to slot {}: {}", to_slot, command.reason);

     self.sender.send(command).map_err(|e| {
         error!("Failed to send rewind command: {}", e);
         format!("Failed to send rewind command: {}", e)
     })
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/ingester/rewind_controller.rs between lines 20 and 32, change the log
level on line 26 from error to info or debug to reflect normal operation, and
remove the unnecessary clone of the reason string on line 23 by passing a
reference instead of cloning it when constructing the RewindCommand.

Comment on lines +35 to +64
pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 {
use crate::ingester::parser::tree_info::TreeInfo;

// Find the earliest slot where we need to rewind to get the missing sequence
let mut earliest_slot = u64::MAX;

for gap in gaps {
// Try to find the exact slot for the last known good sequence
let target_seq = gap.expected_seq.saturating_sub(1);
if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) {
// Rewind to just before this slot to ensure we reprocess
earliest_slot = earliest_slot.min(slot.saturating_sub(1));
} else {
// Fallback: conservative approach if we can't find the exact slot
// This handles the case where this is the first sequence for this tree
earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10));
}
}

// Ensure we don't rewind to slot 0 unless explicitly needed
if earliest_slot == u64::MAX {
// No valid slots found, use conservative fallback
gaps.iter()
.map(|gap| gap.expected_seq.saturating_sub(10))
.min()
.unwrap_or(0)
} else {
earliest_slot
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add documentation and consider edge cases in rewind slot calculation

The determine_rewind_slot function needs documentation to explain its logic. Additionally, there are some edge cases to consider:

  1. When get_last_slot_for_seq returns a slot, subtracting 1 might not always be correct
  2. The fallback value of subtracting 10 from expected_seq is arbitrary and could be configurable
+/// Determines the optimal slot to rewind to based on detected sequence gaps.
+/// 
+/// For each gap, it attempts to find the slot where the last known good sequence
+/// was processed and rewinds to just before that slot. If no exact slot is found,
+/// it uses a conservative fallback.
+///
+/// Returns the earliest slot among all gaps that requires rewinding.
 pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 {
+    const FALLBACK_SLOT_OFFSET: u64 = 10;
     use crate::ingester::parser::tree_info::TreeInfo;

     // Find the earliest slot where we need to rewind to get the missing sequence
     let mut earliest_slot = u64::MAX;

     for gap in gaps {
         // Try to find the exact slot for the last known good sequence
         let target_seq = gap.expected_seq.saturating_sub(1);
         if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) {
-            // Rewind to just before this slot to ensure we reprocess
-            earliest_slot = earliest_slot.min(slot.saturating_sub(1));
+            // Rewind to this slot to ensure we reprocess from the last known good state
+            earliest_slot = earliest_slot.min(slot);
         } else {
             // Fallback: conservative approach if we can't find the exact slot
             // This handles the case where this is the first sequence for this tree
-            earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10));
+            earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(FALLBACK_SLOT_OFFSET));
         }
     }

     // Ensure we don't rewind to slot 0 unless explicitly needed
     if earliest_slot == u64::MAX {
         // No valid slots found, use conservative fallback
         gaps.iter()
-            .map(|gap| gap.expected_seq.saturating_sub(10))
+            .map(|gap| gap.expected_seq.saturating_sub(FALLBACK_SLOT_OFFSET))
             .min()
             .unwrap_or(0)
     } else {
         earliest_slot
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 {
use crate::ingester::parser::tree_info::TreeInfo;
// Find the earliest slot where we need to rewind to get the missing sequence
let mut earliest_slot = u64::MAX;
for gap in gaps {
// Try to find the exact slot for the last known good sequence
let target_seq = gap.expected_seq.saturating_sub(1);
if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) {
// Rewind to just before this slot to ensure we reprocess
earliest_slot = earliest_slot.min(slot.saturating_sub(1));
} else {
// Fallback: conservative approach if we can't find the exact slot
// This handles the case where this is the first sequence for this tree
earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(10));
}
}
// Ensure we don't rewind to slot 0 unless explicitly needed
if earliest_slot == u64::MAX {
// No valid slots found, use conservative fallback
gaps.iter()
.map(|gap| gap.expected_seq.saturating_sub(10))
.min()
.unwrap_or(0)
} else {
earliest_slot
}
}
/// Determines the optimal slot to rewind to based on detected sequence gaps.
///
/// For each gap, it attempts to find the slot where the last known good sequence
/// was processed and rewinds to just before that slot. If no exact slot is found,
/// it uses a conservative fallback.
///
/// Returns the earliest slot among all gaps that requires rewinding.
pub fn determine_rewind_slot(gaps: &[crate::ingester::parser::state_update::SequenceGap]) -> u64 {
const FALLBACK_SLOT_OFFSET: u64 = 10;
use crate::ingester::parser::tree_info::TreeInfo;
// Find the earliest slot where we need to rewind to get the missing sequence
let mut earliest_slot = u64::MAX;
for gap in gaps {
// Try to find the exact slot for the last known good sequence
let target_seq = gap.expected_seq.saturating_sub(1);
if let Some(slot) = TreeInfo::get_last_slot_for_seq(&gap.tree, target_seq) {
// Rewind to this slot to ensure we reprocess from the last known good state
earliest_slot = earliest_slot.min(slot);
} else {
// Fallback: conservative approach if we can't find the exact slot
// This handles the case where this is the first sequence for this tree
earliest_slot = earliest_slot.min(gap.expected_seq.saturating_sub(FALLBACK_SLOT_OFFSET));
}
}
// Ensure we don't rewind to slot 0 unless explicitly needed
if earliest_slot == u64::MAX {
// No valid slots found, use conservative fallback
gaps.iter()
.map(|gap| gap.expected_seq.saturating_sub(FALLBACK_SLOT_OFFSET))
.min()
.unwrap_or(0)
} else {
earliest_slot
}
}
🤖 Prompt for AI Agents
In src/ingester/rewind_controller.rs around lines 35 to 64, add clear
documentation comments to the determine_rewind_slot function explaining its
purpose and logic. Review the logic where 1 is subtracted from the slot returned
by get_last_slot_for_seq to ensure it correctly identifies the rewind point,
adjusting if necessary. Replace the hardcoded fallback subtraction of 10 from
expected_seq with a configurable parameter or constant to allow flexibility.
Also, handle edge cases where subtracting might underflow or produce invalid
slots.

Comment on lines +41 to +65
fn create_account_with_context(tree: solana_pubkey::Pubkey, leaf_index: u32) -> AccountWithContext {
AccountWithContext {
account: Account {
hash: Hash::new(&[0u8; 32]).unwrap(),
data: None,
owner: SerializablePubkey::try_from([0u8; 32]).unwrap(),
lamports: UnsignedInteger(0),
address: None,
tree: SerializablePubkey::from(tree),
leaf_index: UnsignedInteger(leaf_index as u64),
seq: None,
slot_created: UnsignedInteger(0),
},
context: AccountContext {
queue: SerializablePubkey::try_from([0u8; 32]).unwrap(),
in_output_queue: false,
spent: false,
nullified_in_tree: false,
nullifier_queue_index: None,
nullifier: None,
tx_hash: None,
tree_type: 0,
},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Account sequence field is not set in test helper

The create_account_with_context helper function sets seq: None on line 51, but the gap detection tests for output accounts (test_gap_detected_output_accounts) expect to test sequence gaps using leaf indices. This might cause confusion as the test is checking leaf_index gaps rather than sequence number gaps.

Consider either:

  1. Setting the seq field to match the expected test behavior, or
  2. Adding a comment clarifying that output accounts use leaf_index for gap detection instead of seq
🤖 Prompt for AI Agents
In tests/integration_tests/gap_detection_tests.rs around lines 41 to 65, the
create_account_with_context function sets the seq field to None, but the gap
detection tests rely on leaf_index for detecting gaps in output accounts. To fix
this, either set the seq field to a meaningful value that aligns with the test's
expectations or add a clear comment explaining that output accounts use
leaf_index for gap detection instead of seq to avoid confusion.

Comment on lines +13 to +14
pub highest_seq: std::sync::Arc<std::sync::atomic::AtomicU64>,
pub last_slot: std::sync::Arc<std::sync::atomic::AtomicU64>,
Copy link
Author

@ananas-block ananas-block Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need separate sequence numbers depending on tree type.
For example a batched state tree needs 3:

  1. Output queue (leaf index)
  2. input queue index
  3. batched update seq

batched address tree needs 2:

  1. queue index
  2. batched update seq

v1 trees just need one each because there is no separate queue.

Comment on lines +272 to +297
let (seq, source_name) = match sequence {
SequenceSource::IndexedMerkleTree(seq) => (seq, "indexed merkle tree"),
SequenceSource::LeafNullification(seq) => (seq, "leaf nullification"),
SequenceSource::OutputAccount(seq) => (seq, "output account"),
SequenceSource::BatchEvent(seq) => (seq, "batch event"),
SequenceSource::AddressQueue(seq) => (seq, "address queue"),
};

if let Some((expected_seq, actual_seq)) =
TreeInfo::check_sequence_gap(&tree, seq)
{
error!(
"{} sequence gap detected for tree {}: expected {}, got {}",
source_name, tree, expected_seq, actual_seq
);
detected_gaps.push(SequenceGap {
tree,
expected_seq,
actual_seq,
});
}

// Update highest sequence for this tree
if let Err(e) = TreeInfo::update_highest_seq(&tree, seq, slot) {
error!("Failed to update highest sequence for tree {}: {}", tree, e);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to differentiate between the same sequence numbers here.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/ingester/fetchers/poller.rs (2)

28-47: Well-implemented rewind handling with minor logging suggestion.

The rewind logic correctly uses try_recv() for non-blocking operation and properly updates the slot stream position. The implementation handles all pending rewind commands in a loop, which is good for responsiveness.

Consider using log::info! instead of log::error! for rewind operations, as rewinding might be a normal recovery mechanism rather than an error condition.

-                            log::error!("Rewinding slot stream to {}: {}", to_slot, reason);
+                            log::info!("Rewinding slot stream to {}: {}", to_slot, reason);

106-153: Excellent refactoring with robust block ordering logic.

The case-based approach significantly improves block processing reliability:

  • Case 1: Properly discards outdated blocks
  • Case 2: Handles direct succession with informative gap logging
  • Case 3: Correctly waits for missing intermediate blocks
  • Case 4: Appropriately discards fork/invalid blocks with warnings

The strict one-block-at-a-time processing (line 130) ensures proper ordering, which is likely essential for the rewind system's correctness.

Consider monitoring the performance impact of processing only one block per call. If throughput becomes a concern, you might need to batch consecutive valid blocks while maintaining the strict ordering guarantees.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa1d0f3 and 11e33a5.

📒 Files selected for processing (1)
  • src/ingester/fetchers/poller.rs (3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/ingester/fetchers/poller.rs (2)
src/ingester/typedefs/block_info.rs (1)
  • parse_ui_confirmed_blocked (55-93)
src/monitor/mod.rs (1)
  • start_latest_slot_updater (92-104)
🔇 Additional comments (2)
src/ingester/fetchers/poller.rs (2)

12-12: LGTM: Import changes support rewind functionality.

The addition of tokio::sync::mpsc and RewindCommand imports are necessary for the new rewind capability and are properly organized.

Also applies to: 18-21


63-63: LGTM: Clean parameter threading for rewind functionality.

The rewind_receiver parameter is properly added to the function signature and correctly passed through to get_slot_stream.

Also applies to: 70-70

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants