Skip to content

feat: fill gaps in snapshots #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

ananas-block
Copy link

@ananas-block ananas-block commented Jul 25, 2025

Overview

  • Summary of changes

Testing

  • Testing performed to validate the changes

Summary by CodeRabbit

  • New Features

    • Added comprehensive sequence gap detection and rewind handling for state updates, improving data consistency and recovery.
    • Introduced a rewind controller mechanism to support dynamic block stream rewinding.
    • Integrated rewind support into block fetching, indexing, and snapshot loading workflows.
    • Expanded integration tests to validate sequence consistency, detect/fill gaps, and verify snapshot round-trips.
    • Added utilities for creating and validating snapshots from blockchain transactions.
  • Bug Fixes

    • Improved error handling by replacing panics with controlled error responses in account retrieval.
  • Tests

    • Added new integration tests for gap filling, sequence validation, and snapshot utilities.
  • Chores

    • Updated dependencies to use specific Git revisions.
    • Enhanced configuration for snapshot and block stream management, including rewind support.

Copy link

coderabbitai bot commented Jul 25, 2025

Walkthrough

This update introduces a robust sequence gap detection and rewind mechanism into the ingester pipeline. It adds new modules for sequence tracking, gap detection, and rewind control, integrates these into block indexing and state update flows, and extends public APIs accordingly. Comprehensive integration tests and snapshot utilities are included to validate sequence consistency and automate gap filling in blockchain data.

Changes

File(s) / Module(s) Change Summary
.gitignore Added pattern to ignore all .txt files.
Cargo.toml Updated three dependencies to use specific git commit references instead of crates.io versions.
src/api/method/get_multiple_compressed_accounts.rs Replaced panic with controlled error return for validation failure.
src/ingester/detect_gaps.rs New module: Implements global sequence tracking, gap detection logic, and related data structures.
src/ingester/error.rs Added CustomError(String) variant to IngesterError enum.
src/ingester/fetchers/grpc.rs Modified get_grpc_stream_with_rpc_fallback to accept and propagate optional rewind receiver.
src/ingester/fetchers/mod.rs Extended BlockStreamConfig with optional rewind_receiver; updated method signatures to support rewind.
src/ingester/fetchers/poller.rs Enhanced get_block_poller_stream to handle rewind commands and restart streams accordingly.
src/ingester/indexer/mod.rs Updated index_block_stream to accept optional rewind controller and handle gap-triggered errors.
src/ingester/mod.rs Integrated sequence gap detection and rewind logic into state update and batch indexing; updated function signatures.
src/ingester/parser/state_update.rs Changed field used for tree in address queue updates from mt_pubkey to tree_pubkey.
src/ingester/parser/tree_info.rs Added new types for tree sequence tracking: TreeTypeSeq, StateV2Seq, StateV2SeqWithContext.
src/ingester/parser/tx_event_parser_v2.rs Filtered new addresses with queue_index == u64::MAX and switched to tree_pubkey for address mapping.
src/ingester/rewind_controller.rs New module: Implements RewindController, RewindCommand, and related error handling for rewind requests.
src/main.rs Integrated rewind controller into block stream config and indexing flow; updated function signatures.
src/snapshot/mod.rs Refactored to use local variable for last_indexed_slot in snapshot helper call.
src/snapshot/snapshotter/main.rs Explicitly set rewind_receiver: None in snapshotter block stream config.
tests/integration_tests/main.rs Added three new test modules for sequence consistency, gap filling, and snapshot utilities.
tests/integration_tests/snapshot_test_utils.rs New module: Provides async utilities for snapshot creation, validation, and round-trip testing.
tests/integration_tests/snapshot_tests.rs Added async integration test for snapshot creation and parsing using utility functions.
tests/integration_tests/test_v1_address_tree_gap_filler.rs New module: Comprehensive async test for detecting, fetching, and filling V1 address tree gaps in snapshots.
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs New module: Implements sequence consistency validation and gap detection for StateUpdate events in snapshots.

Sequence Diagram(s)

sequenceDiagram
    participant BlockStream
    participant Indexer
    participant GapDetector
    participant RewindController
    participant Database

    Note over BlockStream,Indexer: Block ingestion pipeline with gap detection & rewind

    BlockStream->>Indexer: Stream BlockInfo batches
    Indexer->>GapDetector: Extract and analyze sequences
    GapDetector-->>Indexer: Report detected gaps (if any)
    alt Gaps detected
        Indexer->>RewindController: Request rewind to earliest gap slot
        RewindController-->>BlockStream: Send rewind command
        BlockStream->>BlockStream: Restart stream from rewind slot
        Note over Indexer: Indexer returns error to trigger rewind
    else No gaps
        Indexer->>Database: Write state updates
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90 minutes

Poem

🐇
In the warren of blocks, I hop and I seek,
Filling in gaps—no more hide and seek!
With sequences tracked and rewinds at hand,
My snapshots are flawless, my data is grand.
From tests to controllers, the code’s looking neat—
This bunny’s delighted—consistency’s sweet!
🥕✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jorrit/feat-fill-gaps-in-snapshots

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🔭 Outside diff range comments (2)
src/ingester/fetchers/grpc.rs (2)

118-125: Design concern: Rewind commands may be lost during fallback scenarios.

In all three fallback cases (gRPC timeout, out-of-order blocks, unhealthy gRPC), None is passed for the rewind_receiver parameter instead of forwarding the original receiver. This means:

  1. Timeout fallback (lines 122): Rewind commands are ignored
  2. Out-of-order fallback (lines 140): Rewind commands are ignored
  3. Unhealthy fallback (lines 153): Rewind commands are ignored

Consider one of these approaches:

  1. Forward the receiver: Pass the original rewind_receiver to maintain rewind capability during fallbacks
  2. Document the behavior: If intentional, add comments explaining why rewind is disabled during fallbacks
  3. Implement fallback-aware rewind: Create a mechanism to handle rewind commands consistently across stream types
# Option 1: Forward the receiver
- None, // No rewind receiver for timeout fallback
+ rewind_receiver, // Forward rewind receiver to maintain functionality

Also applies to: 136-143, 149-156


54-61: Ensure rewind_receiver is forwarded to all RPC fallback streams

The current implementation only passes rewind_receiver to the initial poller, then drops it by passing None in each fallback. This means any rewind commands sent while running in timeout, out-of-order, or unhealthy fallbacks will never be observed. To retain rewind support across transitions, the same receiver must be forwarded:

• File: src/ingester/fetchers/grpc.rs
– Line 122, 140, 153: replace None with the original rewind_receiver (or otherwise wire the receiver through), so that every call to get_block_poller_stream can still receive and act on rewind commands.

Suggested diff snippet:

- rpc_poll_stream = Some(Box::pin(get_block_poller_stream(
+ rpc_poll_stream = Some(Box::pin(get_block_poller_stream(
     rpc_client.clone(),
     last_indexed_slot,
     max_concurrent_block_fetches,
-    None, // No rewind receiver for timeout fallback
+    rewind_receiver, // preserve rewind receiver on fallback
 )));

Without this change, rewind requests during RPC‐only mode will be silently dropped.

🧹 Nitpick comments (11)
.gitignore (1)

16-16: Consider if the pattern is appropriately specific.

The *.txt pattern will exclude all text files from version control. Ensure this doesn't inadvertently ignore legitimate documentation or configuration files that should be tracked.

If only specific text files need to be ignored, consider using more targeted patterns like test*.txt or temp*.txt.

tests/integration_tests/snapshot_tests.rs (1)

124-153: Robust integration test with room for CI/CD improvements.

This comprehensive test validates end-to-end snapshot creation and parsing from real compression transactions, which is excellent for integration testing.

Consider making the test more CI/CD friendly:

 #[tokio::test]
+#[ignore] // Remove this attribute when API_KEY is available in CI
 async fn test_compression_snapshot_creation_and_parsing() {
     // Get API key from environment
-    let api_key = std::env::var("API_KEY")
-        .expect("API_KEY environment variable must be set (export API_KEY=\"your-api-key\")");
+    let api_key = match std::env::var("API_KEY") {
+        Ok(key) => key,
+        Err(_) => {
+            println!("Skipping test: API_KEY environment variable not set");
+            return;
+        }
+    };

For production environments, consider:

  1. Using a configuration struct to manage test environment variables
  2. Adding fallback test data for environments without API access
  3. Implementing test fixtures to reduce dependency on external RPC calls
src/ingester/fetchers/mod.rs (1)

21-21: Add documentation for the new rewind_receiver field.

The integration of rewind command handling is well-structured, but the new field lacks documentation.

Add documentation to explain the purpose and usage:

 pub struct BlockStreamConfig {
     pub rpc_client: Arc<RpcClient>,
     pub geyser_url: Option<String>,
     pub max_concurrent_block_fetches: usize,
     pub last_indexed_slot: u64,
+    /// Optional receiver for rewind commands that trigger the stream to restart from an earlier slot.
+    /// Used by the gap detection mechanism to fill sequence gaps in blockchain data.
     pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
 }
src/ingester/fetchers/poller.rs (1)

68-86: Review rewind command processing logic.

The rewind command handling looks generally sound, but there are a few considerations:

  1. Error handling: The code only handles Ok(command) from try_recv() but ignores potential errors. Consider handling disconnection cases:
while let Ok(command) = receiver.try_recv() {
    match command {
        RewindCommand::Rewind { to_slot, reason } => {
+           if to_slot == 0 {
+               log::warn!("Ignoring rewind to slot 0: {}", reason);
+               continue;
+           }
            log::error!("Rewinding block stream to {}: {}", to_slot, reason);
            // ... rest of logic
        }
    }
}
  1. Log level: Using log::error! for rewind operations might be too severe - consider log::warn! or log::info! since rewinds are expected operational behavior, not errors.
src/ingester/parser/tree_info.rs (1)

32-44: Add documentation for sequence tracking structs.

The StateV2Seq and StateV2SeqWithContext structs serve different purposes but lack documentation explaining their intended use cases.

Consider adding documentation to clarify the purpose and usage:

+/// Raw sequence numbers for StateV2 trees
#[derive(Debug, Clone, Copy, Default)]
pub struct StateV2Seq {
+    /// Index in the input queue
    pub input_queue_index: u64,
+    /// Sequence number of the batch event
    pub batch_event_seq: u64,
+    /// Index in the output queue
    pub output_queue_index: u64,
}

+/// StateV2 sequence entries with full context including slot and signature information
#[derive(Debug, Clone, Default)]
pub struct StateV2SeqWithContext {
+    /// Complete sequence entry for input queue operations
    pub input_queue_entry: Option<SequenceEntry>,
+    /// Complete sequence entry for batch event operations
    pub batch_event_entry: Option<SequenceEntry>,
+    /// Complete sequence entry for output queue operations
    pub output_queue_entry: Option<SequenceEntry>,
}
src/ingester/rewind_controller.rs (1)

54-62: Review gap processing logic for edge cases.

The determine_rewind_slot_from_gaps function is well-designed but has some edge case considerations:

  1. Document the slot 0 filtering: The filtering of zero slots should be documented since it's a specific business rule:
fn determine_rewind_slot_from_gaps(gaps: &[SequenceGap]) -> u64 {
    gaps.iter()
        .map(|gap| gap.before_slot)
+       // Filter out zero slots which indicate uninitialized state  
        .filter(|&slot| slot > 0) // Filter out zero slots from initialization
        .min()
        .unwrap_or(0) // Fallback to slot 0 if no valid slots found
}
  1. Consider alternative fallback: The unwrap_or(0) fallback might not be ideal. Consider returning a Result or using a different fallback strategy:
- .unwrap_or(0) // Fallback to slot 0 if no valid slots found
+ .unwrap_or_else(|| {
+     tracing::warn!("No valid rewind slots found in gaps, defaulting to slot 1");
+     1
+ })
src/ingester/indexer/mod.rs (1)

75-109: Consider more specific error handling for non-gap errors.

The implementation correctly handles gap-triggered rewinds by continuing the loop. However, for other errors, the code logs and sleeps before continuing, which might not be appropriate for all error types (e.g., database connection failures, critical system errors).

Consider categorizing errors and potentially propagating critical errors instead of continuing indefinitely.

            Err(e) => {
                if e.to_string().contains("Gap detection triggered rewind") {
                    // Gap detected, rewind triggered - the slot stream should handle repositioning
                    log::info!("Gap detection triggered rewind");
                    continue;
                } else {
-                   log::error!("Unexpected error in block processing: {}", e);
-                   sleep(Duration::from_secs(1));
+                   // Consider if this is a recoverable error
+                   match e {
+                       // Add specific error patterns that are recoverable
+                       _ => {
+                           log::error!("Unexpected error in block processing: {}", e);
+                           // For now, continue with existing behavior
+                           sleep(Duration::from_secs(1));
+                       }
+                   }
                }
            }
src/ingester/mod.rs (1)

153-194: Consider making logged tree addresses configurable.

The implementation correctly handles gap-triggered rewinds. However, the hardcoded tree public keys in the logging statements could be made configurable for better maintainability.

Consider moving these addresses to a configuration:

const MONITORED_TREES: &[&str] = &[
    "amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2",
    "smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT",
];
src/ingester/detect_gaps.rs (1)

361-568: Consider refactoring complex gap detection logic.

The detect_sequence_gaps_with_metadata function is quite complex with a large match statement (lines 415-531). While functionally correct, it could benefit from refactoring for better maintainability.

Consider extracting the sequence type matching logic into a separate function:

fn extract_sequence_from_tree_type(
    tree_type: &TreeTypeSeq,
    field_type: &StateUpdateFieldType,
) -> (u64, Option<SequenceEntry>) {
    // Extract the match logic here
}

Also, replace the println statements on lines 403-407 and 419-428 with proper logging.

tests/integration_tests/test_v1_address_tree_gap_filler.rs (2)

308-310: Fix formatting of else block.

Add proper spacing for the else block.

-                            }else {
+                            } else {

25-602: Consider test isolation for snapshot modifications.

This test modifies shared snapshot files which could cause issues with parallel test execution. Consider:

  1. Creating test-specific snapshot directories
  2. Using a test fixture pattern to ensure cleanup
  3. Centralizing configuration (paths, RPC URLs) in a test configuration module
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0a45ee and ec512b4.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (22)
  • .gitignore (1 hunks)
  • Cargo.toml (1 hunks)
  • src/api/method/get_multiple_compressed_accounts.rs (1 hunks)
  • src/ingester/detect_gaps.rs (1 hunks)
  • src/ingester/error.rs (1 hunks)
  • src/ingester/fetchers/grpc.rs (7 hunks)
  • src/ingester/fetchers/mod.rs (4 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/indexer/mod.rs (3 hunks)
  • src/ingester/mod.rs (4 hunks)
  • src/ingester/parser/state_update.rs (1 hunks)
  • src/ingester/parser/tree_info.rs (2 hunks)
  • src/ingester/parser/tx_event_parser_v2.rs (1 hunks)
  • src/ingester/rewind_controller.rs (1 hunks)
  • src/main.rs (5 hunks)
  • src/snapshot/mod.rs (1 hunks)
  • src/snapshot/snapshotter/main.rs (1 hunks)
  • tests/integration_tests/main.rs (1 hunks)
  • tests/integration_tests/snapshot_test_utils.rs (1 hunks)
  • tests/integration_tests/snapshot_tests.rs (2 hunks)
  • tests/integration_tests/test_v1_address_tree_gap_filler.rs (1 hunks)
  • tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (7)
tests/integration_tests/snapshot_tests.rs (1)
tests/integration_tests/snapshot_test_utils.rs (2)
  • create_test_snapshot_from_compression_transactions (17-153)
  • validate_snapshot_parsing (157-184)
src/main.rs (1)
src/ingester/rewind_controller.rs (1)
  • new (27-30)
src/ingester/parser/tx_event_parser_v2.rs (1)
src/ingester/parser/state_update.rs (1)
  • from (77-83)
src/ingester/indexer/mod.rs (1)
src/ingester/mod.rs (1)
  • index_block_batch_with_infinite_retries (153-194)
src/ingester/rewind_controller.rs (2)
src/snapshot/mod.rs (1)
  • new (249-263)
src/ingester/parser/indexer_events.rs (1)
  • tree_pubkey (34-39)
src/ingester/fetchers/poller.rs (2)
src/ingester/typedefs/block_info.rs (1)
  • parse_ui_confirmed_blocked (55-93)
src/ingester/rewind_controller.rs (1)
  • new (27-30)
src/ingester/detect_gaps.rs (5)
src/ingester/parser/state_update.rs (2)
  • new (105-107)
  • from (77-83)
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (5)
  • merkle_event_to_type_id (13-20)
  • extract_state_update_sequences (62-118)
  • detect_gaps_from_sequences (160-163)
  • detect_all_sequence_gaps (167-248)
  • detect_sequence_gaps_with_metadata (253-286)
src/ingester/parser/indexer_events.rs (2)
  • tree_pubkey (34-39)
  • seq (40-45)
src/ingester/parser/tree_info.rs (1)
  • default (27-29)
src/ingester/parser/tx_event_parser_v2.rs (1)
  • events (31-114)
🔇 Additional comments (41)
Cargo.toml (1)

84-89: Pin Git dependencies using [patch.crates-io] to avoid duplicate crate instances

Cargo treats crates with the same name from different sources (crates-io vs. git) as entirely separate packages. By only switching your direct dependencies to a git revision, you risk pulling in two copies of each crate—leading to larger binaries, slower compile times, potential trait‐implementation conflicts, and subtle runtime bugs.

Apply a global patch in your Cargo.toml:

 [dependencies]
 light-concurrent-merkle-tree = "2.1.0"
-light-batched-merkle-tree = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
-light-merkle-tree-metadata = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
-light-compressed-account = { version = "0.3.0", features = ["anchor"], git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
+[patch.crates-io]
+light-batched-merkle-tree = { git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
+light-merkle-tree-metadata = { git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
+light-compressed-account = { git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19", features = ["anchor"] }

(Optional) You can drop the redundant version = "0.3.0" entries when pinning by git, since Cargo ignores them.

To confirm you no longer have duplicates, ensure you have a default Rust toolchain and then run:

rustup default stable
cargo tree -d | grep light-

Each of your light-… crates should appear only once.

src/api/method/get_multiple_compressed_accounts.rs (1)

151-155: Excellent error handling improvement!

Replacing the panic with a proper PhotonApiError::ValidationError is a significant improvement that prevents application crashes and provides controlled error responses. This aligns well with the overall robustness enhancements in the PR.

src/ingester/error.rs (1)

17-18: LGTM! Clean error enum extension.

The CustomError(String) variant follows the established pattern and provides flexibility for custom error messages in the enhanced ingester functionality. The error message format is consistent with other variants.

src/snapshot/snapshotter/main.rs (1)

259-259: Good architectural decision with clear documentation.

Explicitly setting rewind_receiver: None for the snapshotter is appropriate since snapshot generation doesn't require rewind functionality. The comment clearly documents this design decision.

src/snapshot/mod.rs (1)

454-459: Clean refactoring for improved readability.

Extracting last_indexed_slot into a local variable improves code readability and maintains the same functionality. This type of refactoring often makes code easier to understand and modify.

src/ingester/parser/state_update.rs (1)

79-79: Verified removal of mt_pubkey – ready to merge

All references to the old mt_pubkey field have been removed and replaced with tree_pubkey consistently across the codebase.

  • Ran rg "mt_pubkey" --type rust: no matches found.
  • Confirmed plentiful tree_pubkey usages in parsers, controllers, tests, and persistence modules.

LGTM – approving and resolving this change.

tests/integration_tests/main.rs (1)

12-15: Excellent addition of comprehensive test coverage.

The new test modules provide thorough integration testing for the gap detection and filling functionality:

  • test_v1_address_tree_sequence_consistency for sequence validation
  • test_v1_address_tree_gap_filler for end-to-end gap filling testing
  • snapshot_test_utils for snapshot creation and validation utilities

This comprehensive test coverage will help ensure the reliability of the new gap detection and rewind mechanisms.

src/ingester/fetchers/mod.rs (2)

25-25: Proper ownership change for rewind receiver handling.

Changing the method signature to take mut self by value is correct and necessary for the take() operations on the rewind receiver. This ensures the receiver is properly moved to the appropriate stream without ownership conflicts.


34-44: Excellent integration of rewind command handling.

The use of take() to move the rewind receiver to the appropriate stream (grpc or poller) is well-implemented:

  1. Prevents double-use: Each stream gets the receiver only when needed
  2. Proper ownership: The receiver is moved, not cloned or referenced
  3. Conditional logic: Correctly handles both grpc and poller stream scenarios

The rewind mechanism integration supports the gap detection and filling functionality introduced in this PR.

src/ingester/fetchers/grpc.rs (2)

19-19: LGTM: Clean integration of rewind controller imports.

The necessary imports for mpsc and RewindCommand are properly added to support the new rewind functionality.

Also applies to: 34-34


42-49: LGTM: Function signature updated to support rewind commands.

The addition of the rewind_receiver parameter integrates well with the existing function design and maintains backward compatibility through the Option wrapper.

src/ingester/fetchers/poller.rs (5)

12-12: LGTM: Clean import additions for rewind functionality.

The necessary imports for mpsc and RewindCommand are properly added to support the new rewind mechanism.

Also applies to: 18-21


43-48: LGTM: Function signature updated for rewind support.

The addition of the rewind_receiver parameter with proper mutability (mut) is well-designed for handling rewind commands during block polling.


49-67: Excellent rewind-aware stream restart implementation.

The restructuring with an outer loop and current_start_slot provides a clean mechanism to restart streams from different positions based on rewind commands. The initialization logic properly handles the edge case where last_indexed_slot is 0.


87-103: LGTM: Clean rewind state management and block processing.

The rewind_occurred flag properly controls loop exit, and the block processing logic correctly integrates with the existing caching mechanism. The metrics emission remains intact.


104-109: LGTM: Proper loop control for rewind scenarios.

The conditional break logic ensures streams are restarted only when rewind occurs, while allowing normal termination otherwise.

src/ingester/parser/tree_info.rs (2)

15-24: Well-designed enum for sequence tracking across tree types.

The TreeTypeSeq enum provides good coverage of all tree types with appropriate sequence entry associations. The design properly handles the complexity differences between tree types (e.g., AddressV2 having both input queue and batch event entries).


26-30: LGTM: Sensible default implementation.

Defaulting to StateV1 with a default SequenceEntry is a reasonable choice that maintains backward compatibility.

src/ingester/rewind_controller.rs (6)

1-3: LGTM: Clean imports and dependencies.

The imports are minimal and appropriate for the functionality. Using thiserror for error types is a good choice.


5-11: LGTM: Well-designed command enum.

The RewindCommand enum is focused and extensible. The Rewind variant with to_slot and reason provides sufficient information for debugging and logging.


13-19: LGTM: Appropriate error types.

The RewindError enum covers the expected failure modes with descriptive error messages using thiserror.


21-38: LGTM: Clean controller implementation.

The RewindController design is simple and effective. The use of unbounded channels is appropriate for this use case since rewind commands should be rare and processed quickly.


40-52: LGTM: Gap-based rewind logic with good error handling.

The request_rewind_for_gaps method provides a convenient interface for the common gap detection use case, with proper early return for empty gaps and descriptive logging.


64-126: Excellent test coverage.

The unit tests comprehensively cover:

  • Controller creation and basic functionality
  • Gap processing logic with multiple gaps
  • Edge case handling (zero slot filtering)

The test structure is clear and the assertions are appropriate.

src/main.rs (6)

172-178: LGTM: Clean function signature update for rewind support.

The addition of the optional rewind_controller parameter maintains backward compatibility and integrates well with the existing function design.


187-187: LGTM: Proper rewind controller integration.

The controller is correctly passed as a reference to the index_block_stream function, maintaining the ownership model.


283-284: LGTM: Proper rewind controller creation and separation.

The controller and receiver are correctly created and separated using the standard pattern. The comment clearly explains the purpose.


291-291: LGTM: Rewind receiver integration into block stream config.

The receiver is properly added to the BlockStreamConfig, enabling rewind command propagation through the stream layers.


300-300: LGTM: Controller passed to indexing function.

The controller is correctly passed as Some(rewind_controller) to enable rewind functionality during continuous indexing.


239-239: LGTM: Rewind disabled for snapshot mode.

Passing None for the rewind controller during snapshot loading is the correct approach, as snapshots represent historical data that shouldn't trigger rewind operations.

src/ingester/indexer/mod.rs (1)

49-56: LGTM! Clean integration of rewind controller.

The function signature change properly adds the optional rewind controller parameter to enable gap detection and rewind functionality.

tests/integration_tests/snapshot_test_utils.rs (2)

115-141: LGTM! Clean snapshot file creation logic.

The implementation correctly:

  • Creates a single snapshot file for the entire block range
  • Filters to include only compression transactions
  • Serializes blocks without version headers (as noted in the comment)

156-227: Well-structured validation and round-trip testing functions.

The validation functions are properly implemented with:

  • Clear error messages and logging
  • Proper verification of compression transactions
  • Exact comparison in round-trip testing
src/ingester/mod.rs (1)

34-88: Excellent integration of gap detection and rewind control.

The implementation properly:

  • Extracts sequences with full context (slot, signature)
  • Detects gaps across all state update types
  • Handles rewind requests with appropriate error propagation
  • Updates global sequence state after processing
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (2)

24-119: Well-designed data structures for sequence tracking.

The implementation provides:

  • Clear categorization of state update types
  • Comprehensive sequence tracking with slot and signature context
  • Proper extraction logic for all update types

166-286: Comprehensive gap detection implementation.

The gap detection logic correctly:

  • Processes all sequence types systematically
  • Identifies gaps between consecutive sequences
  • Provides detailed metadata for gap filling
src/ingester/detect_gaps.rs (1)

12-69: Well-structured sequence tracking foundation.

The global state and data structures provide a solid foundation for sequence tracking:

  • Thread-safe global state for cross-component coordination
  • Clear type definitions for different update categories
  • Comprehensive sequence entry tracking with metadata
tests/integration_tests/test_v1_address_tree_gap_filler.rs (4)

25-95: Well-structured test implementation!

The test function has a clear step-by-step approach with comprehensive gap filling logic, including both signature-based and slot-range fallback mechanisms. The progress logging is helpful for debugging.


202-260: Robust pagination implementation!

The signature fetching with pagination is well-implemented with proper boundary handling and progress tracking.


341-393: Effective fallback mechanism!

The validation and fallback logic properly handles remaining gaps by checking slot ranges and fetching missing blocks. The logging is appropriately limited to avoid overwhelming output.


457-518: Comprehensive sequence validation!

The function thoroughly validates both sequence ordering and duplicates with clear diagnostic output.

Comment on lines +71 to +161
pub fn update_sequence_state(sequences: &StateUpdateSequences) {
let mut state = SEQUENCE_STATE.lock().unwrap();

// Update indexed tree sequences
for ((tree_pubkey, _tree_type_id), entries) in &sequences.indexed_tree_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
// Check the actual tree type from the mapping
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::AddressV1 => {
state.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone()));
}
light_compressed_account::TreeType::StateV1 => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
_ => {
// Other tree types not handled in indexed_tree_seqs
}
}
}
}
}

// Update nullification sequences
for (tree_pubkey, entries) in &sequences.nullification_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}

// Update batch address queue indexes
for (tree_pubkey, entries) in &sequences.batch_address_queue_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
println!(
"DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}",
tree_str, max_entry.sequence
);
let input_queue_entry = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::AddressV2(input_queue_entry, _) = current_seq {
input_queue_entry.clone()
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
}
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
};
state.insert(
tree_str,
TreeTypeSeq::AddressV2(input_queue_entry, max_entry.clone()),
);
}
}

// Update out account leaf indexes for StateV2 trees
for (tree_pubkey, entries) in &sequences.out_account_leaf_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::StateV2 => {
let mut seq_context = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::StateV2(seq_context) = current_seq {
seq_context.clone()
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
}
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
};
seq_context.output_queue_entry = Some(max_entry.clone());
state.insert(tree_str, TreeTypeSeq::StateV2(seq_context));
}
_ => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace println debug statements with proper logging.

The sequence state update logic is correct, but debug statements should use the logging framework.

-            println!(
-                "DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}",
-                tree_str, max_entry.sequence
-            );
+            tracing::debug!(
+                "Updating batch_address_queue_indexes for tree: {}, sequence: {}",
+                tree_str, max_entry.sequence
+            );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn update_sequence_state(sequences: &StateUpdateSequences) {
let mut state = SEQUENCE_STATE.lock().unwrap();
// Update indexed tree sequences
for ((tree_pubkey, _tree_type_id), entries) in &sequences.indexed_tree_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
// Check the actual tree type from the mapping
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::AddressV1 => {
state.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone()));
}
light_compressed_account::TreeType::StateV1 => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
_ => {
// Other tree types not handled in indexed_tree_seqs
}
}
}
}
}
// Update nullification sequences
for (tree_pubkey, entries) in &sequences.nullification_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}
// Update batch address queue indexes
for (tree_pubkey, entries) in &sequences.batch_address_queue_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
println!(
"DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}",
tree_str, max_entry.sequence
);
let input_queue_entry = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::AddressV2(input_queue_entry, _) = current_seq {
input_queue_entry.clone()
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
}
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
};
state.insert(
tree_str,
TreeTypeSeq::AddressV2(input_queue_entry, max_entry.clone()),
);
}
}
// Update out account leaf indexes for StateV2 trees
for (tree_pubkey, entries) in &sequences.out_account_leaf_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::StateV2 => {
let mut seq_context = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::StateV2(seq_context) = current_seq {
seq_context.clone()
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
}
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
};
seq_context.output_queue_entry = Some(max_entry.clone());
state.insert(tree_str, TreeTypeSeq::StateV2(seq_context));
}
_ => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}
}
}
}
}
pub fn update_sequence_state(sequences: &StateUpdateSequences) {
let mut state = SEQUENCE_STATE.lock().unwrap();
// Update indexed tree sequences
for ((tree_pubkey, _tree_type_id), entries) in &sequences.indexed_tree_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
// Check the actual tree type from the mapping
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::AddressV1 => {
state.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone()));
}
light_compressed_account::TreeType::StateV1 => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
_ => {
// Other tree types not handled in indexed_tree_seqs
}
}
}
}
}
// Update nullification sequences
for (tree_pubkey, entries) in &sequences.nullification_seqs {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}
// Update batch address queue indexes
for (tree_pubkey, entries) in &sequences.batch_address_queue_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
- println!(
- "DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}",
- tree_str, max_entry.sequence
- );
+ tracing::debug!(
+ "Updating batch_address_queue_indexes for tree: {}, sequence: {}",
+ tree_str,
+ max_entry.sequence
+ );
let input_queue_entry = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::AddressV2(input_queue_entry, _) = current_seq {
input_queue_entry.clone()
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
}
} else {
SequenceEntry {
sequence: 0,
slot: 0,
signature: String::new(),
}
};
state.insert(
tree_str,
TreeTypeSeq::AddressV2(input_queue_entry, max_entry.clone()),
);
}
}
// Update out account leaf indexes for StateV2 trees
for (tree_pubkey, entries) in &sequences.out_account_leaf_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
match info.tree_type {
light_compressed_account::TreeType::StateV2 => {
let mut seq_context = if let Some(current_seq) = state.get(&tree_str) {
if let TreeTypeSeq::StateV2(seq_context) = current_seq {
seq_context.clone()
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
}
} else {
crate::ingester::parser::tree_info::StateV2SeqWithContext::default()
};
seq_context.output_queue_entry = Some(max_entry.clone());
state.insert(tree_str, TreeTypeSeq::StateV2(seq_context));
}
_ => {
state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
}
}
}
}
}
}
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs between lines 71 and 161, replace the println
debug statement used for batch_address_queue_indexes updates with a proper
logging call using the project's logging framework. Identify the appropriate
logger and log level (e.g., debug) and convert the println macro to a logger
method call, ensuring the message and variables remain the same for consistent
debug output.

Comment on lines +210 to +237
let tree_str = address.tree.0.to_string();
println!(
"DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}",
tree_str, address.queue_index
);

// Check if this is an AddressV1 tree incorrectly in batch operations
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
if info.tree_type == light_compressed_account::TreeType::AddressV1 {
println!("ERROR: AddressV1 tree {} found in batch_new_addresses - this should not happen!", tree_str);
println!(
" queue_index: {}, slot: {}, signature: {}",
address.queue_index, slot, signature
);
// Skip this invalid data
continue;
}
}

self.batch_address_queue_indexes
.entry(address.tree.0)
.or_insert_with(Vec::new)
.push(SequenceEntry {
sequence: address.queue_index,
slot,
signature: signature.to_string(),
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Convert debug output to proper logging and review error handling.

Multiple issues to address:

  1. Replace println statements with tracing
  2. The ERROR level println for AddressV1 trees should be a proper error log
-            println!(
-                "DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}",
-                tree_str, address.queue_index
-            );
+            tracing::debug!(
+                "Extracting batch_new_address for tree: {}, queue_index: {}",
+                tree_str, address.queue_index
+            );

             // Check if this is an AddressV1 tree incorrectly in batch operations
             if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
                 if info.tree_type == light_compressed_account::TreeType::AddressV1 {
-                    println!("ERROR: AddressV1 tree {} found in batch_new_addresses - this should not happen!", tree_str);
-                    println!(
-                        "  queue_index: {}, slot: {}, signature: {}",
-                        address.queue_index, slot, signature
-                    );
+                    tracing::error!(
+                        "AddressV1 tree {} found in batch_new_addresses - this should not happen! queue_index: {}, slot: {}, signature: {}",
+                        tree_str, address.queue_index, slot, signature
+                    );
                     // Skip this invalid data
                     continue;
                 }
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let tree_str = address.tree.0.to_string();
println!(
"DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}",
tree_str, address.queue_index
);
// Check if this is an AddressV1 tree incorrectly in batch operations
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
if info.tree_type == light_compressed_account::TreeType::AddressV1 {
println!("ERROR: AddressV1 tree {} found in batch_new_addresses - this should not happen!", tree_str);
println!(
" queue_index: {}, slot: {}, signature: {}",
address.queue_index, slot, signature
);
// Skip this invalid data
continue;
}
}
self.batch_address_queue_indexes
.entry(address.tree.0)
.or_insert_with(Vec::new)
.push(SequenceEntry {
sequence: address.queue_index,
slot,
signature: signature.to_string(),
});
}
let tree_str = address.tree.0.to_string();
tracing::debug!(
"Extracting batch_new_address for tree: {}, queue_index: {}",
tree_str, address.queue_index
);
// Check if this is an AddressV1 tree incorrectly in batch operations
if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
if info.tree_type == light_compressed_account::TreeType::AddressV1 {
tracing::error!(
"AddressV1 tree {} found in batch_new_addresses - this should not happen! queue_index: {}, slot: {}, signature: {}",
tree_str, address.queue_index, slot, signature
);
// Skip this invalid data
continue;
}
}
self.batch_address_queue_indexes
.entry(address.tree.0)
.or_insert_with(Vec::new)
.push(SequenceEntry {
sequence: address.queue_index,
slot,
signature: signature.to_string(),
});
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs lines 210 to 237, replace all println debug and
error outputs with appropriate tracing crate macros: use tracing::debug! for
debug messages and tracing::error! for error messages. Specifically, change the
"DEBUG: Extracting batch_new_address" print to debug! and the "ERROR: AddressV1
tree" print to error!. Ensure the error log includes all relevant details
currently printed. This will standardize logging and improve error handling
visibility.

Comment on lines +285 to +288
println!(
"DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
tree_pubkey, tree_type_id
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace debug println with proper logging.

-        println!(
-            "DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
-            tree_pubkey, tree_type_id
-        );
+        tracing::debug!(
+            "Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
+            tree_pubkey, tree_type_id
+        );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println!(
"DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
tree_pubkey, tree_type_id
);
tracing::debug!(
"Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
tree_pubkey, tree_type_id
);
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs around lines 285 to 288, replace the debug
println statement with a proper logging call using the project's logging
framework. Remove the println macro and use the appropriate log macro (e.g.,
log::debug!) to output the message, ensuring it integrates with the
application's logging system and respects log levels.

Comment on lines 90 to 97
pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
let txn = db.begin().await?;
index_block_metadatas(&txn, vec![&block.metadata]).await?;
persist_state_update(&txn, derive_block_state_update(block)?).await?;
derive_block_state_update(block, None)?;
persist_state_update(&txn, derive_block_state_update(block, None)?).await?;
txn.commit().await?;
Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate call to derive_block_state_update.

The function calls derive_block_state_update twice - once on line 93 (discarding the result) and again on line 94. This will cause gap detection to run twice and may lead to incorrect sequence state updates.

 pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
     let txn = db.begin().await?;
     index_block_metadatas(&txn, vec![&block.metadata]).await?;
-    derive_block_state_update(block, None)?;
-    persist_state_update(&txn, derive_block_state_update(block, None)?).await?;
+    let state_update = derive_block_state_update(block, None)?;
+    persist_state_update(&txn, state_update).await?;
     txn.commit().await?;
     Ok(())
 }
🤖 Prompt for AI Agents
In src/ingester/mod.rs around lines 90 to 97, the function index_block calls
derive_block_state_update twice unnecessarily, once on line 93 without using the
result and again on line 94. To fix this, call derive_block_state_update only
once, store its result in a variable, and reuse that variable in
persist_state_update to avoid duplicate processing and potential state
inconsistencies.

@@ -1,3 +1,4 @@
use crate::ingester::detect_gaps::SequenceEntry;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify import dependency design.

The import of SequenceEntry from detect_gaps creates a dependency between the parser and gap detection modules.

Please verify that this doesn't create circular dependencies and consider if SequenceEntry should be moved to a more foundational module:


🏁 Script executed:

#!/bin/bash
# Check for potential circular dependencies
echo "=== Checking dependencies between parser and detect_gaps modules ==="
rg -l "use.*detect_gaps" src/ingester/parser/
rg -l "use.*parser" src/ingester/detect_gaps.rs

echo "=== Checking if SequenceEntry is used elsewhere ==="
rg -B 2 -A 2 "SequenceEntry" --type rust

Length of output: 6457


Break circular dependency – relocate SequenceEntry to a shared module

The current setup creates a parser ↔ gap-detection cycle:

  • src/ingester/parser/tree_info.rs imports SequenceEntry from detect_gaps
  • src/ingester/detect_gaps.rs imports from parser

To decouple these modules, extract SequenceEntry into a foundational module (e.g. src/ingester/types/sequence.rs or src/ingester/common/types.rs) and update both imports accordingly:

--- a/src/ingester/parser/tree_info.rs
+++ b/src/ingester/parser/tree_info.rs
-use crate::ingester::detect_gaps::SequenceEntry;
+use crate::ingester::types::SequenceEntry;
--- a/src/ingester/detect_gaps.rs
-// (existing import from parser)
+use crate::ingester::types::SequenceEntry;

• Create src/ingester/types/sequence.rs (or similar) with:

#[derive(Debug, Default, Clone)]
pub struct SequenceEntry {
    pub sequence: u64,
    pub slot: u64,
}

• Update all references in both modules to point at the new shared location.

🤖 Prompt for AI Agents
In src/ingester/parser/tree_info.rs at line 1, the import of SequenceEntry from
detect_gaps creates a circular dependency with detect_gaps.rs importing from
parser. To fix this, move the SequenceEntry struct definition to a new shared
module, for example src/ingester/types/sequence.rs, defining it there with the
appropriate derives and fields. Then update all imports in both tree_info.rs and
detect_gaps.rs to import SequenceEntry from this new shared module instead of
from each other.

async fn analyze_existing_snapshot_for_all_gaps() -> Result<Vec<SequenceGap>> {
println!("📂 Analyzing existing snapshot for ALL gaps using comprehensive gap detection...");

let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace hardcoded path with configurable option.

The hardcoded snapshot path won't work on other machines or in CI environments. Use an environment variable or test configuration.

-    let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
+    let snapshot_path = std::env::var("SNAPSHOT_PATH")
+        .unwrap_or_else(|_| "./target/snapshot_local".to_string());
🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_gap_filler.rs at line 100,
replace the hardcoded snapshot path string with a configurable option by reading
the path from an environment variable or test configuration. Use a method like
std::env::var to get the snapshot path, and provide a default or error handling
if the variable is not set, ensuring the test can run on different machines and
CI environments without modification.


/// Build a HashSet of all slot numbers that already exist in the current snapshot
async fn build_existing_slot_index() -> Result<HashSet<u64>> {
let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace hardcoded path with configurable option.

Same issue as above - the hardcoded path needs to be configurable.

-    let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
+    let snapshot_path = std::env::var("SNAPSHOT_PATH")
+        .unwrap_or_else(|_| "./target/snapshot_local".to_string());
🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_gap_filler.rs at line 168,
replace the hardcoded snapshot_path string with a configurable option, such as
reading the path from an environment variable or a test configuration parameter,
to avoid fixed paths and improve flexibility.

async fn update_snapshot_with_missing_blocks(missing_blocks: &[BlockInfo]) -> Result<()> {
println!("💾 Updating snapshot file with missing blocks...");

let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace hardcoded path with configurable option.

Consistent with other functions, use environment variable.

-    let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
+    let snapshot_path = std::env::var("SNAPSHOT_PATH")
+        .unwrap_or_else(|_| "./target/snapshot_local".to_string());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local";
let snapshot_path = std::env::var("SNAPSHOT_PATH")
.unwrap_or_else(|_| "./target/snapshot_local".to_string());
🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_gap_filler.rs at line 523,
replace the hardcoded snapshot_path string with code that reads the path from an
environment variable, consistent with other functions. Use a suitable
environment variable name and provide a fallback or error handling if the
variable is not set.

// Serialize all blocks
let mut snapshot_data = Vec::new();
for block in &existing_blocks {
let block_bytes = bincode::serialize(block).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle serialization errors gracefully.

Using unwrap() could cause panics on serialization failures.

-        let block_bytes = bincode::serialize(block).unwrap();
+        let block_bytes = bincode::serialize(block)?;

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_gap_filler.rs at line 558, the
use of unwrap() on the result of bincode::serialize(block) can cause the test to
panic if serialization fails. Replace unwrap() with proper error handling by
matching on the Result returned from serialize, and handle the error case
gracefully, such as returning a test failure or logging the error, to avoid
panics.

Comment on lines +289 to +300
#[tokio::test]
async fn test_comprehensive_state_update_validation() -> Result<()> {
println!("🔍 Testing Comprehensive StateUpdate Sequence Consistency");

// Load blocks from the created snapshot
let snapshot_path = "/Users/ananas/dev/photon/target/with_gaps";
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string()));

println!("📂 Loading snapshot from: {}", snapshot_path);
let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await;

// Collect all blocks from the stream
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix hardcoded path for portability.

The test uses a hardcoded absolute path that won't work on other systems.

-    let snapshot_path = "/Users/ananas/dev/photon/target/with_gaps";
+    let snapshot_path = std::env::temp_dir()
+        .join("photon_test_snapshots")
+        .join("with_gaps");
+    std::fs::create_dir_all(&snapshot_path)?;
     let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string()));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tokio::test]
async fn test_comprehensive_state_update_validation() -> Result<()> {
println!("🔍 Testing Comprehensive StateUpdate Sequence Consistency");
// Load blocks from the created snapshot
let snapshot_path = "/Users/ananas/dev/photon/target/with_gaps";
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string()));
println!("📂 Loading snapshot from: {}", snapshot_path);
let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await;
// Collect all blocks from the stream
#[tokio::test]
async fn test_comprehensive_state_update_validation() -> Result<()> {
println!("🔍 Testing Comprehensive StateUpdate Sequence Consistency");
// Load blocks from the created snapshot
let snapshot_path = std::env::temp_dir()
.join("photon_test_snapshots")
.join("with_gaps");
std::fs::create_dir_all(&snapshot_path)?;
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string()));
println!("📂 Loading snapshot from: {}", snapshot_path);
let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await;
// Collect all blocks from the stream
}
🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_sequence_consistency.rs around
lines 289 to 300, replace the hardcoded absolute path
"/Users/ananas/dev/photon/target/with_gaps" with a relative path or use
environment variables or configuration to determine the snapshot path
dynamically. This change will make the test portable and runnable on different
systems without modification.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
src/ingester/detect_gaps.rs (3)

107-110: Replace debug println with proper logging.

This debug statement should use the logging framework instead of println.


211-227: Replace debug and error println statements with proper logging.

Multiple println statements should use the tracing framework for consistency.


285-288: Replace debug println with proper logging.

This debug statement should use the tracing framework.

🧹 Nitpick comments (2)
src/ingester/detect_gaps.rs (2)

12-15: Consider thread safety implications of global mutable state.

While the Mutex provides thread safety, this global state pattern could become a bottleneck under high concurrency. Consider if this state could be passed as parameters or managed through a dedicated service.


415-531: Complex but comprehensive sequence type handling.

The match statement correctly handles all StateUpdateFieldType variants with appropriate TreeTypeSeq unpacking. The logic for extracting sequences from different tree state contexts is sound, though the repeated warning messages could be consolidated.

Consider extracting a helper function to reduce code duplication in the warning messages:

fn log_missing_sequence_state(queue_pubkey: Option<Pubkey>, tree_pubkey: Option<Pubkey>) {
    warn!(
        "No current sequence state found for queue {:?} and tree {:?}",
        queue_pubkey, tree_pubkey
    );
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec512b4 and 0a761f7.

📒 Files selected for processing (1)
  • src/ingester/detect_gaps.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (8)
src/ingester/detect_gaps.rs (8)

1-10: LGTM! Clean imports and dependencies.

The imports are well-organized and appropriate for the functionality. The use of lazy_static for global state management is justified for this use case.


17-24: LGTM! Clear event type mapping logic.

The function correctly maps MerkleTreeEvent variants to numeric IDs with appropriate fallback handling.


26-50: Well-structured data types for gap tracking.

The enum and struct definitions are comprehensive and well-documented. The SequenceGap struct contains all necessary metadata for gap filling operations.


52-68: Efficient data structure design for sequence tracking.

The StateUpdateSequences struct uses appropriate HashMap keys to efficiently organize sequences by tree and operation type. The composite keys (Pubkey, u64) and (Pubkey, u8) enable fine-grained tracking.


71-161: Complex but necessary sequence state management logic.

The function correctly handles different tree types (AddressV1, StateV1, StateV2) with appropriate sequence tracking. The logic for preserving existing state context while updating specific fields is sound.


163-271: Comprehensive sequence extraction implementation.

The method correctly extracts sequences from all relevant StateUpdate fields, properly handling different data structures and maintaining slot/signature context. The error handling for invalid AddressV1 trees in batch operations is appropriate.


278-358: Thorough gap detection across all sequence types.

The function systematically checks all sequence categories and properly delegates to the metadata-aware detection function. The event type mapping for batch merkle tree events is correct.


533-569: Robust gap detection algorithm with proper boundary handling.

The gap detection logic correctly:

  1. Skips gap detection for initialization cases (sequence 0 or u64::MAX)
  2. Detects gaps between known state and first new sequence
  3. Detects gaps between consecutive sequences
  4. Uses saturating_add(1) to prevent overflow

The algorithm is sound and handles edge cases appropriately.

Comment on lines +367 to +370
if field_type == StateUpdateFieldType::BatchNullifyContext {
// For batch nullify context, we don't have tree or queue pubkey, so we can't detect gaps
return Vec::new();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the BatchNullifyContext gap detection skip logic.

The early return for BatchNullifyContext might be overly restrictive. Even without tree/queue pubkey, sequence gaps could still be detected within the batch nullify context entries themselves.


🏁 Script executed:

#!/bin/bash
# Search for how BatchNullifyContext is used elsewhere to understand if gap detection should be skipped
rg -A 10 -B 5 "BatchNullifyContext" --type rust

Length of output: 14031


Remove the BatchNullifyContext early‐return so we can detect nullifier‐queue gaps

The detect_sequence_gaps_with_metadata function currently bails out on
StateUpdateFieldType::BatchNullifyContext, returning no gaps—even though each
BatchNullifyContext event carries its own nullifier_queue_index that we map into
SequenceEntry.sequence. We can and should detect missing indexes across these entries.

• In src/ingester/detect_gaps.rs (around line 367), remove or comment out:

-    if field_type == StateUpdateFieldType::BatchNullifyContext {
-        // For batch nullify context, we don't have tree or queue pubkey, so we can't detect gaps
-        return Vec::new();
-    }

• Let the standard gap‐detection logic run for BatchNullifyContext, passing along the
supplied tree_pubkey (and a None queue_pubkey if that’s intentional).
• Add a unit test covering a missing nullifier_queue_index in a sequence of
BatchNullifyContext entries to ensure gaps are detected as expected.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if field_type == StateUpdateFieldType::BatchNullifyContext {
// For batch nullify context, we don't have tree or queue pubkey, so we can't detect gaps
return Vec::new();
}
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs around lines 367 to 370, remove the early return
that skips gap detection for StateUpdateFieldType::BatchNullifyContext so that
the standard gap detection logic runs for these entries. Ensure the function
continues processing with the provided tree_pubkey and a None queue_pubkey if
applicable. Additionally, add a unit test to verify that missing
nullifier_queue_index values in BatchNullifyContext sequences are correctly
detected as gaps.

Comment on lines +383 to +413
println!(
"DEBUG: Using current sequence state for tree {}: {:?}",
tree_str, current_seq
);
current_seq.clone()
} else {
warn!("No current sequence state found for tree {}", tree_str);
TreeTypeSeq::default()
}
} else if let Some(queue_pubkey) = queue_pubkey {
let queue_str = queue_pubkey.to_string();
// This could be an issue in case of batched output queue updates.
let state = SEQUENCE_STATE.lock().unwrap();
if let Some(current_seq) = state.get(&queue_str) {
current_seq.clone()
} else {
warn!("No current sequence state found for queue {}", queue_str);
TreeTypeSeq::default()
}
} else {
println!("field_type: {:?}", field_type);
println!(
"tree_pubkey: {:?}, queue_pubkey: {:?}",
tree_pubkey, queue_pubkey
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
TreeTypeSeq::default()
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace debug println statements with proper logging.

Multiple debug and warning prints should use the tracing framework for consistency with the rest of the codebase.

-            println!(
-                "DEBUG: Using current sequence state for tree {}: {:?}",
-                tree_str, current_seq
-            );
+            tracing::debug!(
+                "Using current sequence state for tree {}: {:?}",
+                tree_str, current_seq
+            );

-        println!("field_type: {:?}", field_type);
-        println!(
-            "tree_pubkey: {:?}, queue_pubkey: {:?}",
-            tree_pubkey, queue_pubkey
-        );
+        tracing::debug!("field_type: {:?}", field_type);
+        tracing::debug!(
+            "tree_pubkey: {:?}, queue_pubkey: {:?}",
+            tree_pubkey, queue_pubkey
+        );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println!(
"DEBUG: Using current sequence state for tree {}: {:?}",
tree_str, current_seq
);
current_seq.clone()
} else {
warn!("No current sequence state found for tree {}", tree_str);
TreeTypeSeq::default()
}
} else if let Some(queue_pubkey) = queue_pubkey {
let queue_str = queue_pubkey.to_string();
// This could be an issue in case of batched output queue updates.
let state = SEQUENCE_STATE.lock().unwrap();
if let Some(current_seq) = state.get(&queue_str) {
current_seq.clone()
} else {
warn!("No current sequence state found for queue {}", queue_str);
TreeTypeSeq::default()
}
} else {
println!("field_type: {:?}", field_type);
println!(
"tree_pubkey: {:?}, queue_pubkey: {:?}",
tree_pubkey, queue_pubkey
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
TreeTypeSeq::default()
};
tracing::debug!(
"Using current sequence state for tree {}: {:?}",
tree_str, current_seq
);
current_seq.clone()
} else {
warn!("No current sequence state found for tree {}", tree_str);
TreeTypeSeq::default()
}
} else if let Some(queue_pubkey) = queue_pubkey {
let queue_str = queue_pubkey.to_string();
// This could be an issue in case of batched output queue updates.
let state = SEQUENCE_STATE.lock().unwrap();
if let Some(current_seq) = state.get(&queue_str) {
current_seq.clone()
} else {
warn!("No current sequence state found for queue {}", queue_str);
TreeTypeSeq::default()
}
} else {
tracing::debug!("field_type: {:?}", field_type);
tracing::debug!(
"tree_pubkey: {:?}, queue_pubkey: {:?}",
tree_pubkey, queue_pubkey
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
TreeTypeSeq::default()
};
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs around lines 383 to 413, replace all debug and
warning println! statements with appropriate tracing crate macros such as
tracing::debug! and tracing::warn! to ensure consistent logging across the
codebase. Remove println! calls and use debug! for debug-level messages and
warn! for warnings, passing the same message content and variables.

Comment on lines +418 to +434
println!(
"DEBUG: IndexedTreeUpdate with AddressV1, seq: {}",
entry.sequence
);
(entry.sequence, Some(entry))
}
_ => {
println!(
"DEBUG: IndexedTreeUpdate with unsupported tree type: {:?}",
start_seq
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
(u64::MAX, None)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace debug println statements with proper logging.

Debug prints should use the tracing framework for consistency.

-                println!(
-                    "DEBUG: IndexedTreeUpdate with AddressV1, seq: {}",
-                    entry.sequence
-                );
+                tracing::debug!(
+                    "IndexedTreeUpdate with AddressV1, seq: {}",
+                    entry.sequence
+                );

-                println!(
-                    "DEBUG: IndexedTreeUpdate with unsupported tree type: {:?}",
-                    start_seq
-                );
+                tracing::debug!(
+                    "IndexedTreeUpdate with unsupported tree type: {:?}",
+                    start_seq
+                );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
println!(
"DEBUG: IndexedTreeUpdate with AddressV1, seq: {}",
entry.sequence
);
(entry.sequence, Some(entry))
}
_ => {
println!(
"DEBUG: IndexedTreeUpdate with unsupported tree type: {:?}",
start_seq
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
(u64::MAX, None)
}
tracing::debug!(
"IndexedTreeUpdate with AddressV1, seq: {}",
entry.sequence
);
(entry.sequence, Some(entry))
}
_ => {
tracing::debug!(
"IndexedTreeUpdate with unsupported tree type: {:?}",
start_seq
);
warn!(
"No current sequence state found for queue {:?} and tree {:?}",
queue_pubkey, tree_pubkey
);
(u64::MAX, None)
}
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs between lines 418 and 434, replace the debug
println! statements with appropriate tracing crate logging macros such as
tracing::debug! to ensure consistent logging. Remove the println! calls and use
tracing::debug! with the same message strings and variables to maintain the
debug output within the tracing framework.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants