Skip to content

feat: gap detection & rewind #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open

Conversation

sergeytimoshin
Copy link

@sergeytimoshin sergeytimoshin commented Jul 31, 2025

  1. Gap Detection & Rewind: A comprehensive system has been implemented to detect sequence gaps across all state update types (V1/V2 trees, nullifications, batch events, etc.). When a gap is detected, the indexer now automatically triggers a "rewind" command to the block fetcher, which repositions the stream to re-fetch the missing blocks, ensuring no data is missed.
  2. Tree Filtering: The indexer can now be started with a --tree argument. When provided, it will filter and process only the blocks and transactions relevant to the specified Merkle tree. This speeds up indexing, when backfilling a specific tree.
  3. Snapshot Analysis Tool: A new photon-analyze-snapshot tool added to provide info about snapshot files.

Key changes:

  • Gap Detection & Rewind Controller (src/ingester/detect_gaps.rs, src/ingester/rewind_controller.rs):

    • Introduced a global state to track the latest sequence numbers for all trees and operation types.
    • Implemented logic to extract sequence numbers from every StateUpdate and compare them against the last known sequence to detect gaps.
    • Created a RewindController that sends a command to the block fetcher to rewind to a specific slot when a gap is found.
  • Block Fetcher Integration (src/ingester/fetchers/):

    • The gRPC and Poller block fetchers now listen for rewind commands.
    • Upon receiving a command, the fetcher clears its internal cache and restarts the block stream from the requested slot.
  • Tree Filtering (src/ingester/, src/main.rs):

    • Added a --tree CLI argument to the main binary.
    • The ingester now performs a quick check to see if a block contains any instructions interacting with the filtered tree.
    • If a block is irrelevant to the filtered tree, it is skipped entirely, avoiding unnecessary parsing and database writes.
  • Unified Sequence Tracking (src/ingester/persist/persisted_indexed_merkle_tree.rs):

    • Address Tree (V1) updates are now also persisted to the state_tree_histories table.
    • This unifies the sequence number tracking for both State and Address trees into a single table, which simplifies the gap detection logic by providing a single source of truth./q
  • Snapshot Analysis Tool (analyze_snapshot.rs):

    • A new standalone binary for analyzing snapshot directories.
    • Provides statistics on total blocks, transactions, and compression-related transactions.
    • Shows the distribution of transactions across different Merkle trees.
    • Calculates the percentage of blocks that could be skipped if a specific tree filter were applied.

Summary by CodeRabbit

  • New Features

    • Added support for filtering indexed data by a specific tree public key and integrated a rewind controller for gap detection during block indexing.
    • Introduced a command-line tool for analyzing snapshot directories, providing detailed statistics and per-tree breakdowns.
    • Implemented comprehensive gap detection and filling for blockchain snapshots, including sequence consistency validation across multiple StateUpdate fields.
    • Added new binary for snapshot analysis.
  • Improvements

    • Enhanced error handling and logging for better diagnostics.
    • Improved block indexing to support dynamic rewinding and selective processing by tree.
    • Extended data structures to include transaction signatures for better traceability.
  • Bug Fixes

    • Converted certain runtime panics into controlled error responses for more robust error handling.
  • Tests

    • Added extensive integration tests and utilities for snapshot creation, validation, gap filling, and sequence consistency checking.
  • Chores

    • Updated dependencies and configuration for improved compatibility and reproducibility.
    • Expanded .gitignore to exclude .txt files.

@sergeytimoshin sergeytimoshin requested a review from Copilot July 31, 2025 00:12
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a comprehensive gap detection and rewind system for the Photon indexer to ensure sequence consistency across all state update types. The system automatically detects missing sequence numbers in V1/V2 trees, nullifications, and batch events, then triggers a rewind to re-fetch missing blocks.

Key changes include:

  • Gap detection system that tracks sequence numbers across all state update types
  • Rewind controller that repositions block streams when gaps are detected
  • Tree filtering capability to process only blocks relevant to specific Merkle trees
  • Unified sequence tracking for both State and Address trees

Reviewed Changes

Copilot reviewed 30 out of 32 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/ingester/detect_gaps.rs Implements comprehensive gap detection across all StateUpdate field types
src/ingester/rewind_controller.rs Creates rewind system to handle gap-triggered stream repositioning
src/ingester/fetchers/*.rs Adds rewind command handling to gRPC and poller block fetchers
src/main.rs Adds --tree CLI argument for filtering indexer to specific trees
tests/integration_tests/*.rs Adds comprehensive gap detection validation and filling test utilities
analyze_snapshot.rs New tool for analyzing snapshot files and tree distribution
Comments suppressed due to low confidence (1)

src/ingester/detect_gaps.rs:48

  • The field name _tree_type_string with leading underscore suggests it's unused, but it's part of a public struct. Either remove it if truly unused or rename without the underscore prefix.
    //  pub tree_type_string: Option<String>, // Tree type string (for indexed tree updates)

Comment on lines 216 to 229
log::info!(
"amt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2")
);
log::info!(
"smt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get("smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT")
);
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These debug log statements with hardcoded pubkeys appear to be temporary debugging code that should be removed before production deployment.

Suggested change
log::info!(
"amt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2")
);
log::info!(
"smt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get("smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT")
);
if cfg!(debug_assertions) {
log::debug!(
"amt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get(&get_amt_pubkey())
);
log::debug!(
"smt sequence state {:?}",
SEQUENCE_STATE
.lock()
.unwrap()
.get(&get_smt_pubkey())
);
}

Copilot uses AI. Check for mistakes.

@@ -76,7 +77,7 @@ pub struct AddressQueueUpdate {
impl From<NewAddress> for AddressQueueUpdate {
fn from(new_address: NewAddress) -> Self {
AddressQueueUpdate {
tree: SerializablePubkey::from(new_address.mt_pubkey),
tree: SerializablePubkey::from(new_address.tree_pubkey),
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field access changed from mt_pubkey to tree_pubkey but there's no indication in the diff that the NewAddress struct was updated to match. This could cause compilation errors if the field doesn't exist.

Copilot uses AI. Check for mistakes.

Copy link

coderabbitai bot commented Jul 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

This update introduces comprehensive enhancements to the ingester pipeline, focusing on robust sequence gap detection, rewind support, and selective processing by tree public key. It adds new modules for gap tracking and rewind control, extends core data structures and function signatures, and incorporates extensive integration tests for sequence consistency and gap filling. A new command-line tool for snapshot analysis is also included.

Changes

Cohort / File(s) Change Summary
Ingester Gap Detection & Rewind Modules
src/ingester/gap/mod.rs, src/ingester/gap/sequences.rs, src/ingester/gap/treetype_seq.rs, src/ingester/gap/rewind.rs
Introduce gap tracking, sequence state management, and rewind control modules. Define data structures for sequence gaps, sequence entries, and tree type sequences. Implement extraction, aggregation, and gap detection logic, and add a controller for issuing rewind commands.
Ingester Core Enhancements
src/ingester/mod.rs, src/ingester/indexer/mod.rs, src/ingester/error.rs
Refactor batch indexing, block state derivation, and error handling to support gap detection, rewind, and tree filtering. Update function signatures and add new helper functions for selective block processing and improved logging. Introduce a custom error variant.
Block Fetching & Stream Control
src/ingester/fetchers/mod.rs, src/ingester/fetchers/grpc.rs, src/ingester/fetchers/poller.rs
Extend block stream configuration and poller logic to support optional rewind receivers. Integrate rewind command handling for dynamic stream restarting. Update method signatures and stream creation logic accordingly.
Parser & State Update Modifications
src/ingester/parser/mod.rs, src/ingester/parser/merkle_tree_events_parser.rs, src/ingester/parser/state_update.rs, src/ingester/parser/tx_event_parser_v2.rs
Add tree filtering and signature tracking to transaction parsing. Update data structures and parsing logic to propagate signatures, filter state updates by tree, and improve handling of batch accounts and addresses.
Persistence Layer Update
src/ingester/persist/persisted_indexed_merkle_tree.rs
Add signature field handling when inserting indexed tree updates and ensure signature persistence in the state tree histories table.
Main Application Integration
src/main.rs, Cargo.toml
Add command-line argument for tree filtering, propagate rewind controller and tree filter through continuous indexing. Register new binary tool and update dependencies to specific Git revisions.
Snapshot & Tooling
src/snapshot/mod.rs, src/snapshot/snapshotter/main.rs, src/tools/analyze_snapshot.rs
Refactor snapshot update logic, explicitly set rewind support in snapshotter, and add a new CLI tool for analyzing snapshot directories with tree filtering and summary reporting.
API & Error Handling
src/api/method/get_multiple_compressed_accounts.rs, src/api/method/get_transaction_with_compression_info.rs
Improve error handling in API methods and update function calls to match new parser signatures.
Test Infrastructure & Utilities
tests/integration_tests/main.rs, tests/integration_tests/utils.rs, tests/integration_tests/mock_tests.rs, tests/integration_tests/e2e_tests.rs, tests/integration_tests/zeroeth_element_fix_test.rs
Update test modules and helpers to accommodate new data structure fields and function signatures. Add new test modules for snapshot utilities, gap filling, and sequence consistency validation.
Snapshot Integration Tests
tests/integration_tests/snapshot_test_utils.rs, tests/integration_tests/snapshot_tests.rs
Add utility functions and integration tests for snapshot creation, validation, and round-trip integrity using real blockchain data.
V1 Address Tree Gap & Sequence Tests
tests/integration_tests/test_v1_address_tree_gap_filler.rs, tests/integration_tests/test_v1_address_tree_sequence_consistency.rs
Implement comprehensive tests for gap filling and sequence consistency across all StateUpdate fields in blockchain snapshots, including utilities for gap analysis, block fetching, and verification.
Project Configuration
.gitignore
Add .txt files to ignored patterns.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as User/CLI
    participant Main as main.rs
    participant Ingester as Ingester
    participant Gap as gap/mod.rs
    participant Poller as fetchers/poller.rs

    CLI->>Main: Start with optional tree filter & rewind
    Main->>Ingester: Start block indexing (with tree_filter, rewind_controller)
    loop Block Stream
        Ingester->>Poller: Fetch next block batch (with rewind_receiver)
        Poller->>Gap: Check for rewind commands
        alt Rewind Command Received
            Poller->>Poller: Restart poller from rewind slot
        else No Command
            Poller->>Ingester: Return next block batch
        end
        Ingester->>Ingester: Derive state updates, detect gaps
        alt Gap Detected
            Ingester->>Gap: Request rewind for detected gaps
        end
        Ingester->>Main: Index blocks, update metrics
    end
Loading
sequenceDiagram
    participant Test as Integration Test
    participant Snapshot as Snapshot Utils
    participant RPC as Solana RPC
    participant Analyzer as analyze_snapshot.rs

    Test->>Snapshot: Create/validate snapshot from compression txs
    Snapshot->>RPC: Fetch compression signatures/blocks
    Snapshot->>Snapshot: Serialize, write snapshot
    Test->>Analyzer: Analyze snapshot (CLI tool)
    Analyzer->>Snapshot: Load blocks, parse transactions
    Analyzer->>Analyzer: Count, summarize, report stats
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90+ minutes

Poem

A rabbit hopped through code so deep,
Detecting gaps where data sleep.
With rewind magic, blocks replayed,
And every tree’s update surveyed.
Snapshots checked, sequences tight—
The blockchain’s garden now feels right.
🐇✨ Hooray for gaps all filled tonight!

Note

🔌 MCP (Model Context Protocol) integration is now available in Early Access!

Pro users can now connect to remote MCP servers under the Integrations page to get reviews and chat conversations that understand additional development context.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sergey/account-tx-safe

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

♻️ Duplicate comments (7)
tests/integration_tests/zeroeth_element_fix_test.rs (2)

119-119: Consider using a more explicit test signature.

Using Default::default() creates an all-zero signature which may not be appropriate for test data clarity.

Consider using a more explicit test signature:

-        signature: Default::default(),
+        signature: [1u8; 64], // Mock signature for testing purposes

215-215: Same signature concern as above.

Same issue with using Default::default() for signature field - consider using a more explicit test signature for clarity.

src/api/method/get_multiple_compressed_accounts.rs (1)

151-155: Good error handling improvement, but unrelated to main PR.

Replacing the panic with proper error handling is definitely an improvement for API robustness. However, this change appears unrelated to the gap detection and rewind functionality that's the main focus of this PR.

tests/integration_tests/mock_tests.rs (1)

1643-1643: Consider removing the explicit assignment.

Following up on the previous review comment, the let _ = assignment can be removed since the return value of insert() doesn't need to be captured in this test context.

-            let _ = indexed_leaf_updates.insert(
+            indexed_leaf_updates.insert(
src/ingester/mod.rs (1)

216-229: Remove debug log statements with hardcoded pubkeys.

These debug statements appear to be temporary debugging code that should be removed before production deployment.

src/ingester/detect_gaps.rs (2)

107-110: Debug print statements should use the tracing crate.

-            println!(
-                "DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}",
-                tree_str, max_entry.sequence
-            );
+            tracing::debug!(
+                "Updating batch_address_queue_indexes for tree: {}, sequence: {}",
+                tree_str, max_entry.sequence
+            );

219-223: Error logging should use tracing::error!.

-                    println!("ERROR: AddressV1 tree {} found in batch_new_addresses - this should not happen!", tree_str);
-                    println!(
-                        "  queue_index: {}, slot: {}, signature: {}",
-                        address.queue_index, slot, signature
-                    );
+                    tracing::error!(
+                        tree = %tree_str,
+                        queue_index = address.queue_index,
+                        slot = slot,
+                        signature = %signature,
+                        "AddressV1 tree found in batch_new_addresses - this should not happen!"
+                    );
🧹 Nitpick comments (16)
Cargo.toml (1)

89-93: Consider the implications of Git dependencies.

The change from version specifications to Git dependencies for light-protocol libraries points to a specific commit. While this may be necessary for accessing unreleased features, it introduces build stability risks.

Consider:

  1. Documentation: Add comments explaining why these specific Git dependencies are needed
  2. Migration plan: Plan to migrate back to published versions once the required features are released
  3. Version pinning: The commit hash provides good reproducibility, which is positive
# Required for [specific feature/fix] - migrate to published version when available
light-batched-merkle-tree = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
src/ingester/parser/tree_info.rs (1)

15-24: Improve documentation for sequence types

The inline comments could be more descriptive about what these sequence values represent and their purpose in gap detection.

Consider expanding the comments to explain:

  • What each sequence entry represents
  • Why StateV2 has three different sequence types
  • The relationship between input queue, batch event, and output queue sequences
 #[derive(Debug, Clone)]
 pub enum TreeTypeSeq {
     StateV1(SequenceEntry),
-    // Output queue (leaf index), Input queue index, Batch event seq with context
+    // StateV2 tracks three sequence types for comprehensive gap detection:
+    // - Output queue: tracks leaf index sequences
+    // - Input queue: tracks input processing sequences  
+    // - Batch event: tracks batch processing sequences with additional context
     StateV2(StateV2SeqWithContext),
-    // event seq with complete context
+    // AddressV1: tracks address tree event sequences with complete context
     AddressV1(SequenceEntry),
-    // Input queue index, Batch event seq with context
+    // AddressV2: tracks both input queue and batch event sequences separately
     AddressV2(SequenceEntry, SequenceEntry), // (input_queue_entry, batch_event_entry)
 }
src/ingester/indexer/mod.rs (1)

115-124: Fragile error detection using string matching

The error handling relies on checking if the error message contains "Gap detection triggered rewind", which is fragile and could break if the error message changes.

Consider using a dedicated error variant for rewind errors instead of string matching:

-                if e.to_string().contains("Gap detection triggered rewind") {
+                if matches!(e, IngesterError::GapDetectionRewind { .. }) {
                     // Gap detected, rewind triggered - the slot stream should handle repositioning
                     log::info!("Gap detection triggered rewind");
                     continue;

This would require adding a new variant to the IngesterError enum for more type-safe error handling.

src/main.rs (1)

241-245: Duplicated tree filter parsing logic

The tree pubkey parsing logic is duplicated in two places. Consider extracting it to a helper function to follow DRY principles.

+fn parse_tree_filter(tree_str: Option<&String>) -> Option<Pubkey> {
+    tree_str.map(|s| {
+        s.parse::<Pubkey>()
+            .expect("Invalid tree pubkey format")
+    })
+}

 // Then replace both occurrences with:
-let tree_filter = args.tree.as_ref().map(|tree_str| {
-    tree_str
-        .parse::<Pubkey>()
-        .expect("Invalid tree pubkey format")
-});
+let tree_filter = parse_tree_filter(args.tree.as_ref());

Also applies to: 302-306

tests/integration_tests/snapshot_test_utils.rs (2)

16-197: Consider breaking down the large function into smaller, focused functions.

The create_test_snapshot_from_compression_transactions function is quite long (~180 lines) and handles multiple responsibilities. This makes it harder to test and maintain.

Consider extracting logical segments into separate functions:

async fn fetch_compression_slots(
    client: &RpcClient,
    target_slot: u64,
) -> Result<Vec<u64>> {
    let (signatures, signature_to_slot_map) = 
        fetch_compression_signatures_until_slot(client, target_slot).await?;
    // ... extract unique slots logic
}

async fn fetch_blocks_for_slots(
    client: &RpcClient,
    slots: &[u64],
) -> Result<Vec<BlockInfo>> {
    // ... block fetching logic
}

async fn create_snapshot_from_blocks(
    blocks: Vec<BlockInfo>,
    snapshot_dir_path: &str,
) -> Result<()> {
    // ... snapshot creation logic
}

94-101: Document the error handling strategy for failed block fetches.

The current implementation logs errors but continues processing when blocks fail to fetch or parse. This could result in incomplete snapshots.

Consider collecting errors and reporting them at the end, or making the error handling configurable:

let mut fetch_errors = Vec::new();
// In the loop:
Err(e) => {
    let error_msg = format!("Failed to fetch block at slot {}: {}", slot, e);
    eprintln!("{}", error_msg);
    fetch_errors.push(error_msg);
}

// After the loop:
if !fetch_errors.is_empty() && blocks.is_empty() {
    return Err(anyhow::anyhow!(
        "Failed to fetch any blocks. Errors: {:?}", 
        fetch_errors
    ));
}
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1)

42-42: Fix inconsistent field naming for tree_type_string.

The field is declared as _tree_type_string with an underscore prefix (line 42) but the underscore typically indicates unused fields in Rust. Either remove the underscore or mark it with #[allow(dead_code)] if truly unused.

-    pub _tree_type_string: Option<String>, // Tree type string (for indexed tree updates)
+    pub tree_type_string: Option<String>, // Tree type string (for indexed tree updates)

And update the usage:

-                _tree_type_string: tree_type_string.clone(),
+                tree_type_string: tree_type_string.clone(),

Also applies to: 297-297

src/ingester/detect_gaps.rs (7)

12-15: Consider alternatives to global mutable state for better testability and maintainability.

The global SEQUENCE_STATE makes the code harder to test and reason about. Consider passing state through function parameters or using a state management pattern that doesn't rely on global mutability.


17-24: Replace magic numbers with named constants.

The function uses magic numbers (1, 2, 3, 0) without clear meaning. Consider using an enum or named constants for better readability.

+const BATCH_APPEND_TYPE_ID: u8 = 1;
+const BATCH_NULLIFY_TYPE_ID: u8 = 2;
+const BATCH_ADDRESS_APPEND_TYPE_ID: u8 = 3;
+const OTHER_EVENT_TYPE_ID: u8 = 0;
+
 fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
     match event {
-        MerkleTreeEvent::BatchAppend(_) => 1,
-        MerkleTreeEvent::BatchNullify(_) => 2,
-        MerkleTreeEvent::BatchAddressAppend(_) => 3,
-        _ => 0, // Other event types we don't care about
+        MerkleTreeEvent::BatchAppend(_) => BATCH_APPEND_TYPE_ID,
+        MerkleTreeEvent::BatchNullify(_) => BATCH_NULLIFY_TYPE_ID,
+        MerkleTreeEvent::BatchAddressAppend(_) => BATCH_ADDRESS_APPEND_TYPE_ID,
+        _ => OTHER_EVENT_TYPE_ID,
     }
 }

48-48: Remove commented-out code.

Dead code should be removed rather than left as comments.

     // Tree/context metadata
     pub tree_pubkey: Option<Pubkey>, // Tree pubkey (unified for all tree operations)
-    //  pub tree_type_string: Option<String>, // Tree type string (for indexed tree updates)
     pub field_type: StateUpdateFieldType,

211-214: Use structured logging instead of println for debugging.

-            println!(
-                "DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}",
-                tree_str, address.queue_index
-            );
+            tracing::debug!(
+                tree = %tree_str,
+                queue_index = address.queue_index,
+                "Extracting batch_new_address"
+            );

273-276: Remove unnecessary wrapper function.

The detect_gaps_from_sequences function is just a wrapper that adds no value. Consider removing it and using detect_all_sequence_gaps directly.

-/// Detects gaps from a single StateUpdateSequences struct
-pub fn detect_gaps_from_sequences(sequences: &StateUpdateSequences) -> Vec<SequenceGap> {
-    detect_all_sequence_gaps(sequences)
-}

285-288: Replace debug println with structured logging.

-        println!(
-            "DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
-            tree_pubkey, tree_type_id
-        );
+        tracing::debug!(
+            tree = %tree_pubkey,
+            tree_type_id = tree_type_id,
+            "Processing indexed_tree_seqs"
+        );

384-387: Replace all println debug statements with structured logging.

Multiple debug println statements should use the tracing crate for consistency.

Example for line 384-387:

-            println!(
-                "DEBUG: Using current sequence state for tree {}: {:?}",
-                tree_str, current_seq
-            );
+            tracing::debug!(
+                tree = %tree_str,
+                current_seq = ?current_seq,
+                "Using current sequence state for tree"
+            );

Also applies to: 403-407, 419-422, 427-429

tests/integration_tests/test_v1_address_tree_gap_filler.rs (2)

124-124: Make snapshot path configurable.

The hardcoded path should be configurable via environment variable or parameter for flexibility in different test environments.

-    let snapshot_path = "target/snapshot_local";
+    let snapshot_path = std::env::var("SNAPSHOT_PATH")
+        .unwrap_or_else(|_| "target/snapshot_local".to_string());

335-376: Simplify nested logic for checking compression activity.

The nested loops and conditions make this code hard to follow. Consider extracting helper functions.

fn has_compression_activity(state_update: &StateUpdate) -> bool {
    !state_update.indexed_merkle_tree_updates.is_empty()
        || !state_update.leaf_nullifications.is_empty()
        || !state_update.batch_nullify_context.is_empty()
        || !state_update.batch_new_addresses.is_empty()
        || !state_update.batch_merkle_tree_events.is_empty()
        || !state_update.out_accounts.is_empty()
}

fn extract_v1_address_updates(state_update: &StateUpdate) -> Vec<IndexedTreeLeafUpdate> {
    state_update.indexed_merkle_tree_updates
        .iter()
        .filter_map(|((tree_pubkey, _), leaf_update)| {
            if leaf_update.tree_type == TreeType::AddressV1 && *tree_pubkey == V1_ADDRESS_TREE {
                Some(leaf_update.clone())
            } else {
                None
            }
        })
        .collect()
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0a45ee and b4bc44b.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (31)
  • .gitignore (1 hunks)
  • Cargo.toml (2 hunks)
  • analyze_snapshot.rs (1 hunks)
  • src/api/method/get_multiple_compressed_accounts.rs (1 hunks)
  • src/api/method/get_transaction_with_compression_info.rs (2 hunks)
  • src/ingester/detect_gaps.rs (1 hunks)
  • src/ingester/error.rs (1 hunks)
  • src/ingester/fetchers/grpc.rs (7 hunks)
  • src/ingester/fetchers/mod.rs (4 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/indexer/mod.rs (3 hunks)
  • src/ingester/mod.rs (3 hunks)
  • src/ingester/parser/merkle_tree_events_parser.rs (3 hunks)
  • src/ingester/parser/mod.rs (3 hunks)
  • src/ingester/parser/state_update.rs (2 hunks)
  • src/ingester/parser/tree_info.rs (2 hunks)
  • src/ingester/parser/tx_event_parser_v2.rs (2 hunks)
  • src/ingester/persist/persisted_indexed_merkle_tree.rs (4 hunks)
  • src/ingester/rewind_controller.rs (1 hunks)
  • src/main.rs (7 hunks)
  • src/snapshot/mod.rs (1 hunks)
  • src/snapshot/snapshotter/main.rs (1 hunks)
  • tests/integration_tests/e2e_tests.rs (1 hunks)
  • tests/integration_tests/main.rs (1 hunks)
  • tests/integration_tests/mock_tests.rs (1 hunks)
  • tests/integration_tests/snapshot_test_utils.rs (1 hunks)
  • tests/integration_tests/snapshot_tests.rs (2 hunks)
  • tests/integration_tests/test_v1_address_tree_gap_filler.rs (1 hunks)
  • tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1 hunks)
  • tests/integration_tests/utils.rs (2 hunks)
  • tests/integration_tests/zeroeth_element_fix_test.rs (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
src/ingester/parser/state_update.rs (1)
src/common/typedefs/serializable_pubkey.rs (6)
  • from (87-89)
  • from (93-95)
  • from (99-101)
  • from (105-107)
  • from (111-113)
  • from (127-129)
tests/integration_tests/utils.rs (1)
src/ingester/parser/mod.rs (1)
  • parse_transaction (42-175)
src/ingester/rewind_controller.rs (2)
src/snapshot/mod.rs (1)
  • new (249-263)
src/ingester/parser/indexer_events.rs (1)
  • tree_pubkey (34-39)
src/ingester/parser/mod.rs (2)
src/ingester/fetchers/grpc.rs (1)
  • parse_transaction (306-384)
src/ingester/parser/state_update.rs (1)
  • new (106-108)
src/ingester/indexer/mod.rs (2)
src/common/mod.rs (1)
  • fetch_current_slot_with_infinite_retry (135-147)
src/ingester/mod.rs (1)
  • index_block_batch_with_infinite_retries (209-251)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (48)
.gitignore (1)

16-16: LGTM! Sensible addition for development artifacts.

Adding *.txt to gitignore is appropriate for excluding text files that may be generated during debugging, logging, or testing phases related to the new gap detection and analysis features.

src/snapshot/snapshotter/main.rs (1)

259-259: LGTM! Correct exclusion of rewind support for snapshotter.

The snapshotter handles static snapshot creation rather than continuous indexing, so explicitly setting rewind_receiver: None is appropriate. This maintains consistency with the new rewind architecture while correctly excluding functionality that isn't needed for snapshot generation.

src/api/method/get_transaction_with_compression_info.rs (2)

213-213: LGTM! Consistent with updated parse_transaction signature.

The addition of None as the third parameter aligns with the new tree_filter parameter in parse_transaction. Since API methods should parse all transactions regardless of tree filtering, passing None is correct.


364-364: LGTM! Consistent parameter addition.

Same as the v1 helper function - correctly passing None for the tree filter parameter.

src/ingester/error.rs (1)

17-18: LGTM! Useful addition for enhanced error handling.

The CustomError(String) variant provides flexibility for handling various error conditions from the new gap detection and rewind features without requiring specific error types for every scenario.

src/snapshot/mod.rs (1)

454-459: LGTM! Good refactoring for clarity.

Extracting last_indexed_slot into a local variable improves code readability and makes the function parameter more explicit.

tests/integration_tests/e2e_tests.rs (1)

715-715: LGTM! Correctly updated for new function signature.

The addition of None as the third parameter correctly adapts to the new parse_transaction signature that includes an optional tree filter parameter.

tests/integration_tests/main.rs (1)

11-14: Excellent test coverage expansion for new functionality.

The addition of these three test modules provides comprehensive testing for the gap detection and rewind features:

  • snapshot_test_utils: Utilities for snapshot validation
  • test_v1_address_tree_gap_filler: End-to-end gap detection and filling tests
  • test_v1_address_tree_sequence_consistency: Sequence consistency validation

This directly supports the main PR objectives and ensures reliability of the new features.

tests/integration_tests/mock_tests.rs (1)

1651-1651: LGTM: Test data updated to match struct changes.

The addition of the signature field with a default value correctly reflects the structural changes to IndexedTreeLeafUpdate in the production code.

src/ingester/parser/state_update.rs (2)

66-66: LGTM: Addition of signature field enhances traceability.

The new signature field provides necessary cryptographic signature information for each leaf update, supporting the gap detection and rewind control system introduced in this PR.


80-80: Confirm NewAddress Struct Has tree_pubkey Field

Ensure that the NewAddress type imported from the light_compressed_account crate defines a tree_pubkey field (replacing the former mt_pubkey), otherwise this change will break compilation.

  • Verify in the external crate’s NewAddress definition that:
    • pub struct NewAddress { …, tree_pubkey: Pubkey, … } exists.
tests/integration_tests/utils.rs (2)

446-446: LGTM: Function call updated for new signature.

The addition of None as the third argument correctly matches the updated parse_transaction signature that now accepts an optional tree filter parameter.


465-465: LGTM: Consistent function call update.

This change maintains consistency with the updated parse_transaction signature across the test utility functions.

Cargo.toml (1)

36-38: LGTM: New snapshot analysis binary added.

The addition of the photon-analyze-snapshot binary target correctly supports the new snapshot analysis functionality described in the PR objectives.

src/ingester/fetchers/mod.rs (3)

21-21: LGTM: Rewind receiver field enables gap detection support.

The optional rewind_receiver field correctly integrates rewind command functionality into the block stream configuration, supporting the gap detection and rewind mechanism described in the PR objectives.


25-25: LGTM: Method signature change supports ownership transfer.

Changing from &self to mut self is necessary to enable take() operations on the rewind receiver, allowing proper ownership transfer to the stream creation functions.


34-34: LGTM: Proper ownership transfer of rewind receiver.

The use of self.rewind_receiver.take() correctly transfers ownership of the receiver to the active stream (either gRPC or poller). The second take() call will safely return None since only one stream is used at a time.

Also applies to: 43-43

src/ingester/parser/merkle_tree_events_parser.rs (9)

38-38: LGTM: Transaction signature correctly propagated to parsing function

The change correctly passes the transaction signature to parse_indexed_merkle_tree_update, enabling signature-aware state updates for gap detection.


112-138: LGTM: Signature parameter integration is well-implemented

The function signature update and subsequent usage in the IndexedTreeLeafUpdate struct (line 138) correctly associates each leaf update with its originating transaction signature. This enables precise gap detection and rewind functionality as described in the PR objectives.


134-153: LGTM: Intelligent filtering prevents duplicate account transactions

The logic correctly filters out batch input accounts that are being created in the same transaction by checking against output_account_hashes. This prevents duplicate AccountTransaction entries and improves data consistency.


155-166: LGTM: Address filtering aligns with unified sequence tracking

Filtering out addresses with queue_index == u64::MAX (AddressV1 trees) and using tree_pubkey correctly implements the unified sequence tracking approach mentioned in the PR objectives for gap detection.


121-154: LGTM: Comprehensive integration test for snapshot functionality

This test provides excellent end-to-end coverage of the snapshot creation and parsing pipeline using real compression transaction data. The use of #[ignore] is appropriate for tests requiring external API access, and the API key requirement is clearly documented.


48-59: LGTM: Rewind receiver properly integrated into main polling stream

The rewind receiver parameter is correctly added and passed through to the block poller stream, enabling dynamic rewind functionality as part of the gap detection system.


122-153: LGTM: Fallback scenarios correctly disable rewind

Passing None for the rewind receiver in fallback scenarios (timeout, out-of-order blocks, unhealthy gRPC) is the correct approach. During recovery situations, disabling rewind prevents additional complexity and ensures stable operation.


84-84: LGTM: Appropriate placeholder signatures for synthetic elements

Using Signature::from([0; 64]) as placeholder signatures for synthetic zeroeth and top elements is correct since these elements don't originate from actual transactions.

Also applies to: 129-129


238-265: LGTM: Unified gap detection through state_tree_histories integration

The addition of address tree entries to the state_tree_histories table enables unified sequence tracking across all tree types, which is essential for the comprehensive gap detection system. The "on conflict do nothing" strategy appropriately handles duplicate entries.

src/ingester/indexer/mod.rs (1)

77-113: Well-structured integration of rewind controller and tree filter

The implementation correctly passes the new parameters through the indexing pipeline and provides appropriate logging for filtered indexing operations.

src/ingester/fetchers/poller.rs (1)

43-109: Well-implemented rewind mechanism for block polling

The rewind implementation correctly:

  • Checks for rewind commands without blocking
  • Clears the block cache to prevent stale data
  • Resets slot positions appropriately
  • Uses proper control flow to restart the stream

One minor suggestion for clarity:

-                                log::error!("Rewinding block stream to {}: {}", to_slot, reason);
+                                log::info!("Rewinding block stream to slot {}: {}", to_slot, reason);

Since rewinding is an expected operation when gaps are detected, consider using log::info instead of log::error.

src/main.rs (1)

103-106: Clear documentation for tree filtering feature

The CLI argument is well-documented with clear explanation of its purpose.

src/ingester/parser/mod.rs (2)

47-70: Efficient early return for non-matching transactions

Good optimization to check tree involvement early and return an empty state update for irrelevant transactions.

Minor suggestion to simplify the nested loop logic:

-        let mut involves_tree = false;
-        for instruction_group in &tx.instruction_groups {
-            if instruction_group.outer_instruction.accounts.contains(tree) {
-                involves_tree = true;
-                break;
-            }
-            for inner_instruction in &instruction_group.inner_instructions {
-                if inner_instruction.accounts.contains(tree) {
-                    involves_tree = true;
-                    break;
-                }
-            }
-            if involves_tree {
-                break;
-            }
-        }
-
-        if !involves_tree {
+        let involves_tree = tx.instruction_groups.iter().any(|group| {
+            group.outer_instruction.accounts.contains(tree) ||
+            group.inner_instructions.iter().any(|ix| ix.accounts.contains(tree))
+        });
+
+        if !involves_tree {

183-221: Comprehensive state update filtering implementation

The filter_state_update_by_tree function thoroughly filters all relevant fields and correctly clears transactions when no tree-specific data remains.

src/ingester/rewind_controller.rs (1)

55-63: Consider the implications of returning 0 as fallback in determine_rewind_slot_from_gaps.

When no valid slots are found (all are zero or the list is empty after filtering), the function returns 0. This could potentially trigger a rewind to the genesis block, which might not be the intended behavior.

Consider returning an Option<u64> instead:

-fn determine_rewind_slot_from_gaps(gaps: &[SequenceGap]) -> u64 {
+fn determine_rewind_slot_from_gaps(gaps: &[SequenceGap]) -> Option<u64> {
     gaps.iter()
         .map(|gap| gap.before_slot)
         .filter(|&slot| slot > 0)
         .min()
-        .unwrap_or(0)
 }

Then handle the None case appropriately in request_rewind_for_gaps.

tests/integration_tests/test_v1_address_tree_gap_filler.rs (4)

24-119: Well-structured integration test with comprehensive gap filling logic.

The test follows a clear phase-based approach with proper error handling and fallback mechanisms. Good use of helper functions to keep the main test readable.


241-306: Well-implemented pagination logic for signature fetching.

Good handling of RPC pagination with proper termination conditions and reasonable batch size.


419-487: Good implementation of fallback gap filling mechanism.

The function properly handles edge cases where the signature-based approach might miss blocks, with clear logging and progress tracking.


724-762: Well-implemented verification with comprehensive reporting.

Good approach to group remaining gaps by field type and provide clear feedback about the gap filling results.

src/ingester/parser/tx_event_parser_v2.rs (3)

8-8: LGTM! Necessary imports for enhanced functionality.

The new imports support the filtering logic for batch input accounts and address queue updates.

Also applies to: 11-12


134-153: Excellent filtering logic to prevent duplicate account transactions.

The implementation correctly:

  • Uses a HashSet for efficient O(1) lookups when filtering
  • Prevents creating AccountTransaction entries for accounts being created in the same transaction
  • Maintains proper type conversions and error handling

This optimization avoids unnecessary database entries and processing overhead.


155-166: LGTM! Proper filtering and field mapping for address queue updates.

The logic correctly:

  • Excludes AddressV1 trees using the u64::MAX sentinel value
  • Updates field mapping from mt_pubkey to tree_pubkey as mentioned in the PR objectives
  • Maintains consistent filtering pattern with the batch input accounts logic

The comment clearly explains the business logic for the exclusion.

tests/integration_tests/snapshot_tests.rs (2)

10-12: LGTM! Clean imports for test utilities.

The new imports support the compression snapshot testing functionality.


121-154: Well-structured integration test with good practices.

The test implementation correctly:

  • Uses #[ignore] for network-dependent test
  • Provides clear error message for missing API key
  • Includes proper assertions and informative output
  • Follows async test patterns

Minor considerations for future maintenance:

  • The hardcoded slot (391843372) may become stale over time
  • Consider cleanup of test snapshot directory
  • Test depends on external API availability

These are typical trade-offs for integration tests using real network data.

src/ingester/fetchers/grpc.rs (4)

18-18: LGTM! Necessary imports for rewind functionality.

The new imports support the rewind command receiver integration.

Also applies to: 34-34


48-48: Good design choice for rewind receiver parameter.

The optional parameter maintains backward compatibility while enabling rewind functionality when needed.


59-59: Correct integration of rewind receiver in main stream.

The rewind receiver is properly passed to the primary RPC polling stream, enabling rewind functionality.


122-122: Appropriate design to disable rewind in fallback scenarios.

Explicitly passing None for rewind receiver in fallback streams is the right approach because:

  • Fallback streams are temporary and already handling error conditions
  • Keeps fallback logic simple and focused
  • Main stream handles rewind functionality appropriately

The implementation is consistent across all fallback scenarios.

Also applies to: 140-140, 153-153

src/ingester/persist/persisted_indexed_merkle_tree.rs (3)

11-11: LGTM! Necessary import for signature support.

The Signature import supports the new signature field in IndexedTreeLeafUpdate.


84-84: Appropriate use of placeholder signatures for synthetic elements.

Using Signature::from([0; 64]) with clear comments is the right approach for synthetic zeroeth and top elements that don't originate from actual transactions.

Also applies to: 129-129


238-265: Excellent implementation of unified gap detection persistence.

The state_tree_histories persistence logic correctly:

  • Maps all required fields from IndexedTreeLeafUpdate to the database model
  • Uses appropriate conflict resolution (do_nothing) to prevent overwrites
  • Keys conflict detection on (tree, seq) which is perfect for gap detection
  • Includes proper error handling and empty batch guards
  • Converts signature to byte vector format correctly

This implementation effectively enables unified sequence tracking across state and address trees as described in the PR objectives.

sergeytimoshin and others added 5 commits July 31, 2025 01:52
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
src/ingester/detect_gaps.rs (4)

212-215: Replace debug print with tracing

Debug print statements should use the tracing crate for consistency.

-            println!(
-                "DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}",
-                tree_str, address.queue_index
-            );
+            tracing::debug!(
+                "Extracting batch_new_address for tree: {}, queue_index: {}",
+                tree_str, address.queue_index
+            );

286-289: Replace all debug prints with tracing

Multiple debug print statements should use the tracing crate.

-        println!(
-            "DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
-            tree_pubkey, tree_type_id
-        );
+        tracing::debug!(
+            "Processing indexed_tree_seqs - tree: {}, tree_type_id: {}",
+            tree_pubkey, tree_type_id
+        );

And at lines 404-408:

-        println!("field_type: {:?}", field_type);
-        println!(
-            "tree_pubkey: {:?}, queue_pubkey: {:?}",
-            tree_pubkey, queue_pubkey
-        );
+        tracing::debug!("field_type: {:?}", field_type);
+        tracing::debug!(
+            "tree_pubkey: {:?}, queue_pubkey: {:?}",
+            tree_pubkey, queue_pubkey
+        );

Also applies to: 404-408


361-573: Refactor this large function for better maintainability

This 200+ line function should be broken down into smaller, focused functions as suggested in the past review.

Consider extracting:

  1. State lookup logic (lines 379-414)
  2. Sequence unpacking logic (lines 416-532)
  3. Gap detection logic (lines 534-572)

This will improve readability and make the code easier to test and maintain.


416-532: Extract repetitive sequence unpacking logic

The match statement contains highly repetitive patterns that should be refactored.

Create helper functions to reduce duplication:

fn unpack_sequence_entry(
    tree_type_seq: &TreeTypeSeq,
    field_type: &StateUpdateFieldType,
) -> (u64, Option<SequenceEntry>) {
    match (tree_type_seq, field_type) {
        (TreeTypeSeq::AddressV1(entry), StateUpdateFieldType::IndexedTreeUpdate) => {
            tracing::debug!("IndexedTreeUpdate with AddressV1, seq: {}", entry.sequence);
            (entry.sequence, Some(entry.clone()))
        }
        (TreeTypeSeq::AddressV2(_, entry), StateUpdateFieldType::BatchMerkleTreeEventAddressAppend) |
        (TreeTypeSeq::AddressV2(_, entry), StateUpdateFieldType::BatchNewAddress) => {
            (entry.sequence, Some(entry.clone()))
        }
        // ... other cases
        _ => {
            tracing::warn!("Sequence type mismatch for field type: {:?}", field_type);
            (u64::MAX, None)
        }
    }
}
🧹 Nitpick comments (2)
src/ingester/detect_gaps.rs (2)

48-48: Remove commented code

This commented field appears to be leftover from refactoring and should be removed.

    pub tree_pubkey: Option<Pubkey>, // Tree pubkey (unified for all tree operations)
-    //  pub tree_type_string: Option<String>, // Tree type string (for indexed tree updates)
    pub field_type: StateUpdateFieldType,

293-293: Address TODO comment

The TODO indicates missing functionality for queue pubkey handling.

Would you like me to help implement the queue pubkey logic or create an issue to track this?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b4bc44b and 5ffa8f0.

📒 Files selected for processing (4)
  • src/ingester/detect_gaps.rs (1 hunks)
  • src/ingester/mod.rs (3 hunks)
  • tests/integration_tests/mock_tests.rs (1 hunks)
  • tests/integration_tests/zeroeth_element_fix_test.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • tests/integration_tests/mock_tests.rs
  • tests/integration_tests/zeroeth_element_fix_test.rs
  • src/ingester/mod.rs
🔇 Additional comments (3)
src/ingester/detect_gaps.rs (3)

1-15: LGTM!

The imports and global state declaration are appropriate for the gap detection functionality.


17-36: LGTM!

The helper function and enum definitions are well-structured and serve their purpose effectively.


70-162: Well-structured state update logic

The function properly handles different tree types and correctly uses tracing::debug for logging.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1)

11-18: Replace magic numbers with named constants.

The function still uses hardcoded numbers (1, 2, 3) to represent event types, which reduces code readability as noted in previous reviews.

Define constants for better clarity:

+const EVENT_TYPE_BATCH_APPEND: u8 = 1;
+const EVENT_TYPE_BATCH_NULLIFY: u8 = 2;
+const EVENT_TYPE_BATCH_ADDRESS_APPEND: u8 = 3;
+
 fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
     match event {
-        MerkleTreeEvent::BatchAppend(_) => 1,
-        MerkleTreeEvent::BatchNullify(_) => 2,
-        MerkleTreeEvent::BatchAddressAppend(_) => 3,
+        MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_BATCH_APPEND,
+        MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_BATCH_NULLIFY,
+        MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_BATCH_ADDRESS_APPEND,
         _ => 0,
     }
 }
src/ingester/detect_gaps.rs (1)

344-551: Function is still too long and complex.

The detect_sequence_gaps_with_metadata function remains overly long (200+ lines) with complex nested logic, as noted in previous reviews. The repetitive sequence unpacking logic (lines 394-510) also persists.

Consider breaking this into smaller functions:

  1. get_current_sequence_state() - handle state lookup logic
  2. unpack_sequence_by_field_type() - handle sequence unpacking
  3. detect_gaps_in_sorted_sequences() - core gap detection logic
fn detect_sequence_gaps_with_metadata(
    sequences: &[SequenceEntry],
    tree_pubkey: Option<Pubkey>,
    queue_pubkey: Option<Pubkey>,
    field_type: StateUpdateFieldType,
) -> Vec<SequenceGap> {
    if sequences.len() < 2 {
        return Vec::new();
    }

    let mut sorted_sequences = sequences.to_vec();
    sorted_sequences.sort_by_key(|entry| entry.sequence);
    
+   let start_seq = get_current_sequence_state(tree_pubkey, queue_pubkey);
+   let (unpacked_start_seq, start_entry) = unpack_sequence_by_field_type(&start_seq, &field_type, tree_pubkey, queue_pubkey);
+   
+   detect_gaps_in_sorted_sequences(&sorted_sequences, unpacked_start_seq, start_entry, tree_pubkey, field_type)
-   // ... rest of the complex logic
}
🧹 Nitpick comments (3)
src/tools/analyze_snapshot.rs (2)

55-55: Consider adding tree filtering to parse_transaction call.

The parse_transaction call passes None for the tree filter parameter, but since this tool supports tree filtering analysis, consider passing the target_tree_pubkey to potentially improve performance when analyzing specific trees.

-            match parse_transaction(tx, block.metadata.slot, None) {
+            match parse_transaction(tx, block.metadata.slot, target_tree_pubkey.as_ref()) {

136-140: Consider making tree display limit configurable.

The hardcoded limit of 10 trees in the output could be made configurable via a command-line argument for better flexibility in analysis.

-        if i < 10 || target_tree.as_ref().map(|t| t == tree).unwrap_or(false) {
+        const MAX_TREES_TO_DISPLAY: usize = 10;
+        if i < MAX_TREES_TO_DISPLAY || target_tree.as_ref().map(|t| t == tree).unwrap_or(false) {
tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1)

340-350: Consider more specific error handling.

The transaction parsing silently continues on errors. While this is appropriate for compression parsing issues, consider logging the error types or counts to help diagnose parsing problems.

                match parse_transaction(transaction, slot, None) {
                    Ok(state_update) => {
                        parsed_transactions += 1;
                        sequences.extract_state_update_sequences(&state_update, slot, &signature);
                    }
-                   Err(_) => {
-                       // Skip failed parsing - compression transactions might have parsing issues
-                       continue;
-                   }
+                   Err(e) => {
+                       // Skip failed parsing - compression transactions might have parsing issues
+                       tracing::debug!("Failed to parse transaction {}: {}", signature, e);
+                       continue;
+                   }
                }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5ffa8f0 and d6685b3.

📒 Files selected for processing (7)
  • Cargo.toml (2 hunks)
  • src/ingester/detect_gaps.rs (1 hunks)
  • src/ingester/mod.rs (3 hunks)
  • src/ingester/parser/tree_info.rs (2 hunks)
  • src/tools/analyze_snapshot.rs (1 hunks)
  • tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1 hunks)
  • tests/integration_tests/zeroeth_element_fix_test.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • tests/integration_tests/zeroeth_element_fix_test.rs
  • Cargo.toml
  • src/ingester/parser/tree_info.rs
  • src/ingester/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (5)
src/tools/analyze_snapshot.rs (2)

7-16: LGTM! Clean CLI argument validation.

The argument parsing and validation logic is well-structured with clear usage instructions and proper exit handling.


42-46: Good error handling for pubkey parsing.

The optional target tree pubkey parsing correctly uses transpose() to handle the Option<Result<T, E>> pattern and provides a clear error message.

tests/integration_tests/test_v1_address_tree_sequence_consistency.rs (1)

312-313: Good improvement on hardcoded path issue.

The previous hardcoded local path has been properly replaced with an environment variable approach, which resolves the portability concern from earlier reviews.

src/ingester/detect_gaps.rs (2)

101-105: Good use of tracing for debug logging.

The debug logging has been properly updated to use tracing::debug! instead of println!, addressing the previous review feedback.


207-214: Good use of tracing for error logging.

The error logging correctly uses tracing::error! instead of println!, which addresses the previous feedback about proper log level management.

Comment on lines 18 to 25
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => 1,
MerkleTreeEvent::BatchNullify(_) => 2,
MerkleTreeEvent::BatchAddressAppend(_) => 3,
_ => 0, // Other event types we don't care about
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace magic numbers with named constants.

Same issue as in the test file - hardcoded numbers reduce code readability and maintainability.

+const EVENT_TYPE_BATCH_APPEND: u8 = 1;
+const EVENT_TYPE_BATCH_NULLIFY: u8 = 2;
+const EVENT_TYPE_BATCH_ADDRESS_APPEND: u8 = 3;
+
 fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
     match event {
-        MerkleTreeEvent::BatchAppend(_) => 1,
-        MerkleTreeEvent::BatchNullify(_) => 2,
-        MerkleTreeEvent::BatchAddressAppend(_) => 3,
+        MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_BATCH_APPEND,
+        MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_BATCH_NULLIFY,
+        MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_BATCH_ADDRESS_APPEND,
         _ => 0,
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => 1,
MerkleTreeEvent::BatchNullify(_) => 2,
MerkleTreeEvent::BatchAddressAppend(_) => 3,
_ => 0, // Other event types we don't care about
}
}
// Add named constants for event type IDs
const EVENT_TYPE_BATCH_APPEND: u8 = 1;
const EVENT_TYPE_BATCH_NULLIFY: u8 = 2;
const EVENT_TYPE_BATCH_ADDRESS_APPEND: u8 = 3;
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_BATCH_APPEND,
MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_BATCH_NULLIFY,
MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_BATCH_ADDRESS_APPEND,
_ => 0, // Other event types we don't care about
}
}
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs around lines 18 to 25, the function uses
hardcoded numeric literals to represent event type IDs, which reduces
readability. Define named constants for each event type ID (e.g.,
BATCH_APPEND_ID = 1) at the top of the file or in a constants module, then
replace the numeric literals in the match arms with these constants to improve
clarity and maintainability.

Comment on lines 318 to 323
let field_type = match event_type {
1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Same magic numbers used again.

The magic numbers (1, 2, 3) are repeated here, creating the same maintainability issue as noted in previous comments.

         let field_type = match event_type {
-            1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
-            2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
-            3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
+            EVENT_TYPE_BATCH_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
+            EVENT_TYPE_BATCH_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
+            EVENT_TYPE_BATCH_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
             _ => continue,
         };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let field_type = match event_type {
1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
let field_type = match event_type {
EVENT_TYPE_BATCH_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
EVENT_TYPE_BATCH_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
EVENT_TYPE_BATCH_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
🤖 Prompt for AI Agents
In src/ingester/detect_gaps.rs around lines 318 to 323, replace the magic
numbers 1, 2, and 3 used in the match statement with named constants or an enum
variant that clearly represent these event types. Define these constants or enum
variants elsewhere in the codebase if not already present, and use them here to
improve code readability and maintainability.

Comment on lines +245 to +250
let field_type = match event_type {
1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inconsistent magic number usage.

The same magic numbers (1, 2, 3) are used again in the gap detection logic, creating maintenance burden if these values ever change.

         let field_type = match event_type {
-            1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
-            2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
-            3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
+            EVENT_TYPE_BATCH_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
+            EVENT_TYPE_BATCH_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
+            EVENT_TYPE_BATCH_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
             _ => continue,
         };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let field_type = match event_type {
1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
let field_type = match event_type {
EVENT_TYPE_BATCH_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
EVENT_TYPE_BATCH_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
EVENT_TYPE_BATCH_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
🤖 Prompt for AI Agents
In tests/integration_tests/test_v1_address_tree_sequence_consistency.rs around
lines 245 to 250, the magic numbers 1, 2, and 3 are used directly in the match
statement for event_type. To fix this, define constants or an enum representing
these event types and replace the magic numbers with these named constants
throughout the file, including the gap detection logic, to improve
maintainability and reduce duplication.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/ingester/mod.rs (1)

199-227: Good error handling for rewind scenarios, but consider improving error type detection.

The logic correctly distinguishes between rewind-triggered errors (which should propagate) and other errors (which should be retried). However, the string matching approach for detecting rewind errors is somewhat fragile.

Consider using a more robust error type checking approach:

-                // Check if this is a gap-triggered rewind error
-                if e.to_string().contains("Gap detection triggered rewind") {
-                    // Don't retry, propagate the rewind error up
-                    return Err(e);
-                }
+                // Check if this is a gap-triggered rewind error
+                if matches!(e, IngesterError::RewindTriggered(_)) {
+                    // Don't retry, propagate the rewind error up
+                    return Err(e);
+                }

This would require adding a RewindTriggered variant to the IngesterError enum for more type-safe error handling.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d6685b3 and cff013b.

📒 Files selected for processing (1)
  • src/ingester/mod.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (3)
src/ingester/mod.rs (3)

11-11: LGTM! Clean import organization and module additions.

The consolidation of ConnectionTrait and QueryTrait imports improves readability, and the new detect_gaps and rewind_controller modules align perfectly with the PR's gap detection and rewind functionality objectives.

Also applies to: 23-23, 25-25, 31-31


34-89: Excellent integration of gap detection and rewind logic.

The implementation correctly:

  • Tracks sequences with proper slot and signature context during transaction parsing
  • Detects gaps using the new detect_all_sequence_gaps function
  • Handles rewind scenarios by returning early after requesting rewind
  • Updates the global sequence state after gap processing
  • Maintains clean separation of concerns with optional parameters

The error handling for rewind-triggered scenarios is particularly well done, ensuring the processing pipeline halts appropriately when gaps are detected.


148-197: Well-implemented block filtering and batch processing.

The implementation effectively:

  • Pre-filters blocks based on tree criteria, avoiding unnecessary processing
  • Handles empty filtered batches efficiently with early return
  • Maintains accurate metrics for both indexed and skipped blocks
  • Provides clear logging for operational visibility
  • Properly passes the new parameters through the processing pipeline

The separation between original batch size and filtered block count in metrics and logging is particularly well done.

Comment on lines +94 to +95
derive_block_state_update(block, None, None)?;
persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Eliminate duplicate function call for better performance.

The derive_block_state_update function is called twice with identical parameters, but the first result is discarded. This is inefficient since the function performs transaction parsing, which can be expensive.

-    derive_block_state_update(block, None, None)?;
-    persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?;
+    let state_update = derive_block_state_update(block, None, None)?;
+    persist_state_update(&txn, state_update).await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
derive_block_state_update(block, None, None)?;
persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?;
let state_update = derive_block_state_update(block, None, None)?;
persist_state_update(&txn, state_update).await?;
🤖 Prompt for AI Agents
In src/ingester/mod.rs around lines 94 to 95, the function
derive_block_state_update is called twice with the same parameters, causing
unnecessary repeated computation. To fix this, call derive_block_state_update
once, store its result in a variable, and then use that variable in both places
where the result is needed. This eliminates the duplicate call and improves
performance.

Comment on lines +133 to +146
fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
for tx in &block.transactions {
for instruction_group in &tx.instruction_groups {
if instruction_group
.outer_instruction
.accounts
.contains(tree_filter)
{
return true;
}
}
}
false
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify that checking only outer instructions is sufficient for tree filtering.

The function only examines accounts in outer_instruction but not in any potential inner instructions. Please confirm this approach captures all relevant tree-related transactions.


🏁 Script executed:

#!/bin/bash
# Description: Check if there are any inner instruction patterns that might contain tree references
# Expected: Find examples of instruction structures to validate the filtering approach

ast-grep --pattern $'struct InstructionGroup {
  $$$
  inner_instructions: $$$
  $$$
}'

# Also check for any existing tree filtering logic in the codebase
rg -A 5 -B 5 "inner_instruction" --type rust

Length of output: 9396


Include inner instructions in block_contains_tree

The block_contains_tree helper currently only checks outer_instruction.accounts but ignores inner_instructions, causing it to miss tree references in nested instructions. The parser and other logic (e.g., is_compression_transaction) explicitly iterate inner instructions when filtering by tree pubkey.

Please update src/ingester/mod.rs (lines 133–146) to also scan instruction_group.inner_instructions, for example:

 fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
     for tx in &block.transactions {
         for instruction_group in &tx.instruction_groups {
             if instruction_group
-                .outer_instruction
-                .accounts
-                .contains(tree_filter)
+                .outer_instruction.accounts
+                .contains(tree_filter)
             {
                 return true;
             }
+            // Check inner instructions as well
+            for inner in &instruction_group.inner_instructions {
+                if inner.accounts.contains(tree_filter) {
+                    return true;
+                }
+            }
         }
     }
     false
 }

This ensures consistency with existing parsing logic and catches all tree-related instructions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
for tx in &block.transactions {
for instruction_group in &tx.instruction_groups {
if instruction_group
.outer_instruction
.accounts
.contains(tree_filter)
{
return true;
}
}
}
false
}
fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
for tx in &block.transactions {
for instruction_group in &tx.instruction_groups {
if instruction_group
.outer_instruction.accounts
.contains(tree_filter)
{
return true;
}
// Check inner instructions as well
for inner in &instruction_group.inner_instructions {
if inner.accounts.contains(tree_filter) {
return true;
}
}
}
}
false
}
🤖 Prompt for AI Agents
In src/ingester/mod.rs around lines 133 to 146, the function block_contains_tree
only checks the accounts of outer_instruction but misses checking
inner_instructions within each instruction_group. To fix this, update the
function to also iterate over instruction_group.inner_instructions and check if
any of their accounts contain the tree_filter pubkey, returning true if found.
This will ensure all nested instructions are scanned consistently with other
parsing logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
src/ingester/mod.rs (2)

90-91: Eliminate duplicate function call for better performance.

The derive_block_state_update function is called twice with identical parameters, but the first result is discarded. This is inefficient since the function performs transaction parsing, which can be expensive.

-    derive_block_state_update(block, None, None)?;
-    persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?;
+    let state_update = derive_block_state_update(block, None, None)?;
+    persist_state_update(&txn, state_update).await?;

129-142: Include inner instructions in block_contains_tree

The block_contains_tree helper currently only checks outer_instruction.accounts but ignores inner_instructions, causing it to miss tree references in nested instructions. The parser and other logic (e.g., is_compression_transaction) explicitly iterate inner instructions when filtering by tree pubkey.

 fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
     for tx in &block.transactions {
         for instruction_group in &tx.instruction_groups {
             if instruction_group
                 .outer_instruction
                 .accounts
                 .contains(tree_filter)
             {
                 return true;
             }
+            // Check inner instructions as well
+            for inner in &instruction_group.inner_instructions {
+                if inner.accounts.contains(tree_filter) {
+                    return true;
+                }
+            }
         }
     }
     false
 }
🧹 Nitpick comments (5)
src/ingester/gap/rewind.rs (2)

24-27: Consider using a bounded channel for production resilience.

While unbounded channels are convenient, they can lead to memory exhaustion if rewind commands accumulate faster than they're consumed. Consider using mpsc::channel(capacity) with a reasonable buffer size to apply backpressure when necessary.

 pub fn new() -> (Self, mpsc::UnboundedReceiver<RewindCommand>) {
-    let (sender, receiver) = mpsc::unbounded_channel();
+    let (sender, receiver) = mpsc::channel(100); // Adjust capacity based on expected load
     (Self { sender }, receiver)
 }

Note: This would require changing the receiver type and handling SendError differently in request_rewind.


65-127: Consider adding edge case tests.

The existing tests cover the main functionality well. Consider adding tests for:

  • Empty gaps array handling
  • All gaps having zero before_slot
  • Error propagation when channel is closed

Would you like me to generate additional test cases for these edge scenarios?

src/ingester/fetchers/poller.rs (1)

71-73: Use appropriate log level for operational events.

Rewinding is an expected operational behavior for gap handling, not an error condition. Consider using log::info! or log::warn! instead of log::error!.

-                            log::error!("Rewinding block stream to {}: {}", to_slot, reason);
+                            log::warn!("Rewinding block stream to {}: {}", to_slot, reason);
src/ingester/gap/treetype_seq.rs (1)

69-82: Consider using named constants for special sequence values.

While the documentation clearly explains the meaning of u64::MAX and 0, using named constants would improve code readability and maintainability.

Add these constants at the module level:

+/// Represents an invalid state due to tree type mismatch or unexpected configuration
+const SEQUENCE_INVALID: u64 = u64::MAX;
+
+/// Represents a valid but uninitialized sequence state
+const SEQUENCE_UNINITIALIZED: u64 = 0;

 impl TreeTypeSeq {
     // ... existing methods ...

Then use them in the method:

-                    (u64::MAX, None)
+                    (SEQUENCE_INVALID, None)

-                        (0, None)
+                        (SEQUENCE_UNINITIALIZED, None)
src/ingester/gap/mod.rs (1)

17-20: Consider dependency injection for better testability.

While the global state with lazy_static and RwLock works, it makes unit testing more difficult and creates hidden dependencies. Consider refactoring to pass the sequence state as a parameter through the call chain.

If refactoring is not feasible now, at least consider adding a test helper to reset the global state between tests to prevent test interdependencies.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c169afe and 9e053ba.

📒 Files selected for processing (10)
  • src/ingester/fetchers/grpc.rs (7 hunks)
  • src/ingester/fetchers/mod.rs (4 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/gap/mod.rs (1 hunks)
  • src/ingester/gap/rewind.rs (1 hunks)
  • src/ingester/gap/sequences.rs (1 hunks)
  • src/ingester/gap/treetype_seq.rs (1 hunks)
  • src/ingester/indexer/mod.rs (4 hunks)
  • src/ingester/mod.rs (2 hunks)
  • src/main.rs (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/ingester/fetchers/mod.rs
  • src/ingester/fetchers/grpc.rs
  • src/ingester/indexer/mod.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/ingester/gap/treetype_seq.rs (1)
src/ingester/parser/indexer_events.rs (1)
  • seq (40-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (12)
src/ingester/gap/rewind.rs (1)

55-63: LGTM! Clear and correct gap handling logic.

The function correctly identifies the earliest valid slot for rewinding, properly filtering out zero slots from initialization. The implementation ensures all missing data will be captured by using the minimum valid slot.

src/ingester/fetchers/poller.rs (1)

74-88: Well-structured rewind state management.

The implementation correctly handles all state updates during rewind:

  • Cache clearing prevents stale block data
  • Slot adjustments ensure proper restart position
  • Flag-based control flow is clean and maintainable
src/main.rs (1)

299-300: Clean RewindController integration.

The RewindController is properly integrated into the indexing pipeline with clear separation of concerns between the controller (for sending commands) and receiver (for listening to commands).

Also applies to: 313-313, 322-323

src/ingester/gap/treetype_seq.rs (1)

30-68: Well-designed helper methods with proper Option handling.

The helper methods demonstrate good functional programming practices:

  • Safe pattern matching prevents type confusion
  • Graceful handling of None cases with defaults
  • Clear separation of concerns between different tree types
src/ingester/gap/mod.rs (1)

67-93: Clear state retrieval logic with appropriate precedence.

The function correctly prioritizes tree pubkey over queue pubkey and provides helpful debug logging throughout. The fallback to default values is well-handled.

src/ingester/mod.rs (3)

32-85: Well-structured gap detection and rewind integration!

The enhanced derive_block_state_update function properly integrates sequence tracking, gap detection, and rewind control. The error handling ensures that gap-triggered rewinds are propagated correctly to stop further processing.


144-193: Excellent implementation of tree filtering and batch processing!

The updated index_block_batch function efficiently pre-filters blocks when a tree filter is specified, properly handles empty batches, and maintains good metrics tracking. The integration with rewind control is seamless.


195-223: Proper error handling for rewind-triggered errors!

The updated retry logic correctly distinguishes between gap-triggered rewind errors (which should be propagated) and other transient errors (which should be retried). This ensures the rewind mechanism works as intended.

src/ingester/gap/sequences.rs (4)

24-130: Comprehensive sequence extraction with proper validation!

The extract_state_update_sequences method thoroughly extracts sequences from all StateUpdate fields and includes important validation logic to detect and skip invalid AddressV1 trees in batch operations. The error logging provides good visibility into data quality issues.


132-226: Excellent concurrent state management!

The update_sequence_state method demonstrates proper lock handling by:

  1. Using appropriate error handling for lock acquisition failures
  2. Dropping the read lock before acquiring the write lock to prevent deadlocks
  3. Applying all updates atomically

The use of specialized constructors for different tree types ensures proper state transitions.


228-313: Thorough gap detection across all sequence types!

The detect_all_sequence_gaps method systematically checks all sequence collections and properly maps event types to their corresponding field types. The aggregation of all gaps provides a complete picture for the rewind mechanism.


315-374: Robust gap detection logic with proper edge case handling!

The detect_sequence_gaps_with_metadata method correctly:

  1. Handles initialization cases where no previous state exists
  2. Detects gaps both from the current state and between consecutive sequences
  3. Includes full metadata (slots and signatures) for accurate gap filling
  4. Properly sorts sequences before processing

Comment on lines +59 to +65
let state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
debug!("Failed to acquire sequence state read lock: {}", e);
return TreeTypeSeq::default();
}
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reconsider error handling strategy for poisoned locks.

A poisoned RwLock indicates a panic occurred in another thread while holding the lock, which is a critical error. Silently returning a default value could mask serious issues and lead to incorrect gap detection.

Consider either:

  1. Propagating the error up to the caller
  2. Logging at error level instead of debug
  3. Attempting to recover the lock with into_inner()
 let state = match SEQUENCE_STATE.read() {
     Ok(state) => state,
     Err(e) => {
-        debug!("Failed to acquire sequence state read lock: {}", e);
-        return TreeTypeSeq::default();
+        error!("Critical: Failed to acquire sequence state read lock: {}", e);
+        // Option 1: panic to fail fast
+        panic!("Sequence state lock poisoned: {}", e);
+        // Option 2: attempt recovery (if appropriate)
+        // e.into_inner()
     }
 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
debug!("Failed to acquire sequence state read lock: {}", e);
return TreeTypeSeq::default();
}
};
let state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
error!("Critical: Failed to acquire sequence state read lock: {}", e);
// Option 1: panic to fail fast
panic!("Sequence state lock poisoned: {}", e);
// Option 2: attempt recovery (if appropriate)
// e.into_inner()
}
};
🤖 Prompt for AI Agents
In src/ingester/gap/mod.rs around lines 59 to 65, the current error handling for
a poisoned RwLock silently returns a default value, which can mask critical
issues. Instead, change the error handling to either propagate the error to the
caller, log the error at error level rather than debug, or attempt to recover
the lock using into_inner(). Choose one of these approaches to properly handle
the poisoned lock scenario and avoid hiding serious concurrency problems.

Comment on lines +242 to +246
let tree_filter = args.tree.as_ref().map(|tree_str| {
tree_str
.parse::<Pubkey>()
.expect("Invalid tree pubkey format")
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Eliminate code duplication and improve error handling.

The tree pubkey parsing logic is duplicated. Additionally, using expect() causes a panic on invalid input, which provides poor user experience.

Extract the parsing logic into a helper function with proper error handling:

+fn parse_tree_filter(tree_str: Option<&String>) -> Result<Option<Pubkey>, String> {
+    tree_str
+        .map(|s| s.parse::<Pubkey>().map_err(|e| format!("Invalid tree pubkey '{}': {}", s, e)))
+        .transpose()
+}

 // Replace lines 242-246:
-            let tree_filter = args.tree.as_ref().map(|tree_str| {
-                tree_str
-                    .parse::<Pubkey>()
-                    .expect("Invalid tree pubkey format")
-            });
+            let tree_filter = match parse_tree_filter(args.tree.as_ref()) {
+                Ok(filter) => filter,
+                Err(e) => {
+                    error!("{}", e);
+                    std::process::exit(1);
+                }
+            };

 // Replace lines 302-306:
-            let tree_filter = args.tree.as_ref().map(|tree_str| {
-                tree_str
-                    .parse::<Pubkey>()
-                    .expect("Invalid tree pubkey format")
-            });
+            let tree_filter = match parse_tree_filter(args.tree.as_ref()) {
+                Ok(filter) => filter,
+                Err(e) => {
+                    error!("{}", e);
+                    std::process::exit(1);
+                }
+            };

Also applies to: 302-306

🤖 Prompt for AI Agents
In src/main.rs around lines 242 to 246 and also lines 302 to 306, the code
duplicates the tree pubkey parsing logic and uses expect(), which panics on
invalid input. Refactor by extracting the parsing logic into a single helper
function that returns a Result or Option with proper error handling. Replace all
duplicated parsing code with calls to this helper, and handle errors gracefully
by returning user-friendly error messages instead of panicking.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e053ba and 0bdcfa3.

📒 Files selected for processing (1)
  • src/ingester/gap/sequences.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (3)
src/ingester/gap/sequences.rs (3)

12-21: Well-structured data model for sequence tracking.

The use of composite keys and HashMap collections provides efficient lookups and appropriate grouping of sequence data by tree and type.


76-92: Good defensive programming with tree type validation.

The validation logic correctly ensures only AddressV2 trees are processed in batch_new_addresses, with appropriate error logging and skipping of invalid entries.


332-391: Robust gap detection implementation.

The method properly handles edge cases including initialization state, overflow protection with saturating_add, and maintains full metadata for gap analysis.

Comment on lines +139 to +145
let current_state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
debug!("Failed to acquire read lock for sequence state: {}", e);
return;
}
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider propagating lock acquisition failures.

The current implementation silently returns on lock failures, which could hide synchronization issues. Consider returning a Result to allow callers to handle these failures appropriately.

-    pub fn update_sequence_state(&self) {
+    pub fn update_sequence_state(&self) -> Result<(), String> {
         let current_state = match SEQUENCE_STATE.read() {
             Ok(state) => state,
             Err(e) => {
-                debug!("Failed to acquire read lock for sequence state: {}", e);
-                return;
+                let err_msg = format!("Failed to acquire read lock for sequence state: {}", e);
+                debug!("{}", err_msg);
+                return Err(err_msg);
             }
         };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let current_state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
debug!("Failed to acquire read lock for sequence state: {}", e);
return;
}
};
pub fn update_sequence_state(&self) -> Result<(), String> {
let current_state = match SEQUENCE_STATE.read() {
Ok(state) => state,
Err(e) => {
let err_msg = format!("Failed to acquire read lock for sequence state: {}", e);
debug!("{}", err_msg);
return Err(err_msg);
}
};
// …rest of the method…
}
🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 139 to 145, the code currently
returns silently when failing to acquire the read lock on SEQUENCE_STATE, which
can hide synchronization issues. Modify the function to return a Result type and
propagate the lock acquisition error instead of returning early. Update the
caller to handle this Result accordingly, allowing proper error handling of lock
failures.

Comment on lines +154 to +166
match info.tree_type {
light_compressed_account::TreeType::AddressV1 => {
updates.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone()));
}
tree_type => {
tracing::error!(
"Unhandled tree type {:?} for tree {} in indexed_tree_seqs",
tree_type,
tree_str
);
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify tree type handling for indexed_tree_seqs.

Currently only AddressV1 is handled for indexed_tree_seqs, while other tree types log errors. Please verify if this is intentional or if StateV1 should also be handled here.


🏁 Script executed:

#!/bin/bash
# Search for other usages of indexed_tree_seqs to understand expected tree types
rg -A 5 -B 5 "indexed_tree_seqs" --type rust

# Check what tree types are used with indexed_merkle_tree_updates
ast-grep --pattern 'indexed_merkle_tree_updates'

Length of output: 10125


🏁 Script executed:

#!/bin/bash
# Inspect QUEUE_TREE_MAPPING to determine which TreeType variants it covers
rg -n "QUEUE_TREE_MAPPING" -C 5 --type rust

Length of output: 5235


🏁 Script executed:

#!/bin/bash
# Display QUEUE_TREE_MAPPING contents to see all tree types included
sed -n '1,200p' src/ingester/parser/tree_info.rs | sed -n '35,120p'

Length of output: 3748


🏁 Script executed:

#!/bin/bash
# List all TreeType assignments in the QUEUE_TREE_MAPPING initializer
rg -n "TreeType::" -C 5 src/ingester/parser/tree_info.rs

Length of output: 2897


🏁 Script executed:

#!/bin/bash
# Find definition and type of indexed_merkle_tree_updates in parser/state_update.rs
rg -n "indexed_merkle_tree_updates" -C 10 src/ingester/parser/state_update.rs

Length of output: 2393


Handle StateV1 updates in indexed_tree_seqs

The indexed_tree_seqs loop currently only matches TreeType::AddressV1. Since QUEUE_TREE_MAPPING includes legacy StateV1 trees and indexed_merkle_tree_updates carries v1 Merkle‐leaf updates for both state and address trees, you should add a branch for TreeType::StateV1 so those sequences aren’t dropped.

Locations:

  • src/ingester/gap/sequences.rs lines 153–158

Suggested change:

 match info.tree_type {
     light_compressed_account::TreeType::AddressV1 => {
         updates.insert(tree_str.clone(), TreeTypeSeq::AddressV1(max_entry.clone()));
     }
+    light_compressed_account::TreeType::StateV1 => {
+        updates.insert(tree_str.clone(), TreeTypeSeq::StateV1(max_entry.clone()));
+    }
     tree_type => {
         tracing::error!(
             "Unhandled tree type {:?} for tree {} in indexed_tree_seqs",
             tree_type,
             tree_str
         );
     }
 }
🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 153 to 158, the match on
info.tree_type only handles TreeType::AddressV1, causing StateV1 updates to be
dropped. Add a match branch for TreeType::StateV1 that inserts the corresponding
TreeTypeSeq::StateV1 with max_entry into updates, similar to the AddressV1 case,
to ensure StateV1 sequences are properly included.

Comment on lines +302 to +307
let field_type = match event_type {
1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
_ => continue,
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace magic numbers with named constants.

The event type mapping uses hard-coded values that should be defined as constants for better maintainability.

Define constants at the module level:

const EVENT_TYPE_APPEND: u8 = 1;
const EVENT_TYPE_NULLIFY: u8 = 2;
const EVENT_TYPE_ADDRESS_APPEND: u8 = 3;

Then update the match:

             let field_type = match event_type {
-                1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
-                2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
-                3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
+                EVENT_TYPE_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
+                EVENT_TYPE_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
+                EVENT_TYPE_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
                 _ => continue,
             };
🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 302 to 307, the match statement
uses hard-coded numeric literals for event_type values. Define module-level
constants for these values (e.g., EVENT_TYPE_APPEND = 1, EVENT_TYPE_NULLIFY = 2,
EVENT_TYPE_ADDRESS_APPEND = 3) and replace the numeric literals in the match
arms with these constants to improve code clarity and maintainability.

Comment on lines +393 to +400
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => 1,
MerkleTreeEvent::BatchNullify(_) => 2,
MerkleTreeEvent::BatchAddressAppend(_) => 3,
_ => 0, // Other event types we don't care about
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use the same constants for consistency.

This function should use the same named constants suggested earlier for event type mapping.

 fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
     match event {
-        MerkleTreeEvent::BatchAppend(_) => 1,
-        MerkleTreeEvent::BatchNullify(_) => 2,
-        MerkleTreeEvent::BatchAddressAppend(_) => 3,
+        MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_APPEND,
+        MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_NULLIFY,
+        MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_ADDRESS_APPEND,
         _ => 0, // Other event types we don't care about
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => 1,
MerkleTreeEvent::BatchNullify(_) => 2,
MerkleTreeEvent::BatchAddressAppend(_) => 3,
_ => 0, // Other event types we don't care about
}
}
fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 {
match event {
MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_APPEND,
MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_NULLIFY,
MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_ADDRESS_APPEND,
_ => 0, // Other event types we don't care about
}
}
🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 393 to 400, the function
merkle_event_to_type_id uses hardcoded numeric literals for event type IDs.
Replace these numeric literals with the previously defined named constants for
event types to ensure consistency and maintainability across the codebase.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/ingester/mod.rs (1)

1-2: Avoid blocking the async runtime: use tokio::time::sleep

This async function currently uses std::thread::sleep, which blocks the executor thread. Switch to tokio::time::sleep and await it.

-use std::thread::sleep;
-use std::time::Duration;
+use tokio::time::{sleep, Duration};
...
-                sleep(Duration::from_secs(1));
+                sleep(Duration::from_secs(1)).await;

Also applies to: 217-217

♻️ Duplicate comments (2)
src/ingester/mod.rs (2)

88-89: Eliminate duplicate derive_block_state_update call

derive_block_state_update is called twice with identical parameters; the first result is discarded. Cache once and reuse.

-    derive_block_state_update(block, None, None)?;
-    persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?;
+    let state_update = derive_block_state_update(block, None, None)?;
+    persist_state_update(&txn, state_update).await?;

127-140: Tree filter misses inner instructions; scan inner_instructions too

Current block_contains_tree only checks outer_instruction.accounts. This can miss relevant matches contained in inner instructions.

 fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool {
     for tx in &block.transactions {
         for instruction_group in &tx.instruction_groups {
             if instruction_group
                 .outer_instruction
                 .accounts
                 .contains(tree_filter)
             {
                 return true;
             }
+            // Also check inner instructions
+            for inner in &instruction_group.inner_instructions {
+                if inner.accounts.contains(tree_filter) {
+                    return true;
+                }
+            }
         }
     }
     false
 }
🧹 Nitpick comments (3)
src/ingester/mod.rs (3)

55-77: When gaps are detected without a rewind controller, decide on explicit handling and add metrics

Right now, gaps are logged, but if rewind_controller is None, execution proceeds silently. At minimum, emit a metric; optionally, surface an error to upstream so callers can decide what to do.

-    let gaps = sequences.detect_all_sequence_gaps();
+    let gaps = sequences.detect_all_sequence_gaps();
+    // Metric for visibility into gap occurrences
+    metric! {
+        statsd_count!("gaps_detected", gaps.len() as i64);
+    }
     if !gaps.is_empty() {
         tracing::warn!(
             "Gaps detected in block {} sequences: {gaps:?}",
             block.metadata.slot
         );

         // Request rewind if controller is available
         if let Some(controller) = rewind_controller {
+            metric! { statsd_count!("rewind_requests", gaps.len() as i64); }
             if let Err(e) = controller.request_rewind_for_gaps(&gaps) {
                 tracing::error!(
                     "Failed to request rewind for gaps in block {}: {}",
                     block.metadata.slot,
                     e
                 );
+                metric! { statsd_count!("rewind_request_failures", gaps.len() as i64); }
                 return Err(IngesterError::CustomError(
                     "Gap detection triggered rewind failure".to_string(),
                 ));
             }
             // Return early after requesting rewind - don't continue processing
             return Err(IngesterError::GapDetectedRewind);
-        }
+        } else {
+            // Option: escalate instead of continuing silently
+            // return Err(IngesterError::CustomError("Gap detected but no rewind controller configured".into()));
+        }
     }

Please confirm the desired behavior in the no-controller path. If index_block is used only in contexts where gaps are acceptable, keep as-is but keep the metrics.


79-81: Update sequence state only after successful DB commit (consider refactor)

sequences.update_sequence_state() mutates global state before persistence. If persistence fails, in-memory state advances while DB state doesn’t. Consider deferring the state update until after successful commit, which likely entails returning sequences from derive_block_state_update and applying them post-persist.

High-level approach:

  • Change derive_block_state_update to return (StateUpdate, StateUpdateSequences).
  • Collect sequences across filtered_blocks alongside state updates.
  • After persist succeeds and tx.commit().await?, apply sequences.update_sequence_state() once.

This avoids transient divergence between in-memory sequence state and DB.

Also applies to: 172-178, 179-179, 189-191


179-179: Unify call style for persist_state_update for consistency

You import persist_state_update at Line 14 but call it via the module path at Line 179. Prefer one style throughout; using the imported function is concise.

-    persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;
+    persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;

Also applies to: 14-14

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0bdcfa3 and 11d16da.

📒 Files selected for processing (4)
  • src/ingester/error.rs (1 hunks)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/indexer/mod.rs (4 hunks)
  • src/ingester/mod.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/ingester/error.rs
  • src/ingester/fetchers/poller.rs
  • src/ingester/indexer/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (2)
src/ingester/mod.rs (2)

199-201: Guard against empty batches to avoid unwrap() panics

first()/last().unwrap() will panic if block_batch is empty. Add an early return.

 ) -> Result<(), IngesterError> {
-    loop {
+    if block_batch.is_empty() {
+        return Ok(());
+    }
+    loop {
         match index_block_batch(db, &block_batch, rewind_controller, tree_filter).await {

Also applies to: 209-210


82-82: Verify merge_updates behavior on empty input

With a tree filter, a block may yield zero state updates. Ensure StateUpdate::merge_updates(state_updates) handles an empty vector safely (no panic) and returns a sensible “no-op” update. If not, short-circuit when empty.

Potential guard (if StateUpdate implements Default):

if state_updates.is_empty() {
    return Ok(StateUpdate::default());
}

The new `--disable-gap-detection` flag allows users to opt out of the
sequence gap detection and rewinding behavior, simplifying the indexing
process in cases where it's not needed.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (4)
src/ingester/gap/sequences.rs (4)

411-418: Use the same constants in merkle_event_to_type_id for consistency.

Keep definitions centralized and avoid literal drift.

     match event {
-        MerkleTreeEvent::BatchAppend(_) => 1,
-        MerkleTreeEvent::BatchNullify(_) => 2,
-        MerkleTreeEvent::BatchAddressAppend(_) => 3,
+        MerkleTreeEvent::BatchAppend(_) => EVENT_TYPE_APPEND,
+        MerkleTreeEvent::BatchNullify(_) => EVENT_TYPE_NULLIFY,
+        MerkleTreeEvent::BatchAddressAppend(_) => EVENT_TYPE_ADDRESS_APPEND,
         _ => 0, // Other event types we don't care about
     }

138-146: Don’t silently swallow lock acquisition failures; propagate errors.

Return a Result so callers can handle lock poisoning or contention explicitly rather than returning early.

-    pub fn update_sequence_state(&self) {
-        let current_state = match SEQUENCE_STATE.read() {
-            Ok(state) => state,
-            Err(e) => {
-                debug!("Failed to acquire read lock for sequence state: {}", e);
-                return;
-            }
-        };
+    pub fn update_sequence_state(&self) -> Result<(), String> {
+        let mut state = match SEQUENCE_STATE.write() {
+            Ok(state) => state,
+            Err(e) => {
+                let err_msg = format!("Failed to acquire write lock for sequence state: {}", e);
+                debug!("{}", err_msg);
+                return Err(err_msg);
+            }
+        };

Follow-on changes are in later suggestions within this function. Remember to update callers to handle Result<()>.


150-169: Handle StateV1 for indexed_tree_seqs to avoid dropping sequences.

Currently only AddressV1 is handled; StateV1 entries get logged as “Unhandled” and discarded.

                 if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
                     match info.tree_type {
                         light_compressed_account::TreeType::AddressV1 => {
                             updates.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone()));
                         }
+                        light_compressed_account::TreeType::StateV1 => {
+                            updates.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone()));
+                        }
                         tree_type => {
                             tracing::error!(
                                 "Unhandled tree type {:?} for tree {} in indexed_tree_seqs",
                                 tree_type,
                                 tree_str
                             );
                         }
                     }
                 }

302-307: Replace magic numbers for event types with named constants.

Use named constants to avoid brittle literals and ensure consistency with merkle_event_to_type_id.

-            let field_type = match event_type {
-                1 => StateUpdateFieldType::BatchMerkleTreeEventAppend,
-                2 => StateUpdateFieldType::BatchMerkleTreeEventNullify,
-                3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
+            let field_type = match event_type {
+                EVENT_TYPE_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAppend,
+                EVENT_TYPE_NULLIFY => StateUpdateFieldType::BatchMerkleTreeEventNullify,
+                EVENT_TYPE_ADDRESS_APPEND => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend,
                 _ => continue,
             };

Add these constants near the top of the file (outside of this hunk):

const EVENT_TYPE_APPEND: u8 = 1;
const EVENT_TYPE_NULLIFY: u8 = 2;
const EVENT_TYPE_ADDRESS_APPEND: u8 = 3;

Optional: add a small helper fn event_type_to_field_type(u8) -> Option<StateUpdateFieldType> to DRY both places.

🧹 Nitpick comments (2)
src/ingester/gap/sequences.rs (2)

55-66: Verify unnecessary Pubkey conversion.

If context.tree_pubkey can be directly converted to Pubkey (e.g., implements Into/From), prefer that over to_bytes() + new_from_array to avoid an extra allocation and reduce noise.

If supported by the type, change to:

-            let tree = Pubkey::new_from_array(context.tree_pubkey.to_bytes());
+            let tree = Pubkey::from(context.tree_pubkey);

68-102: Add a log when QUEUE_TREE_MAPPING has no entry.

When QUEUE_TREE_MAPPING.get(&tree_str) returns None, we silently accept the address entry. Consider logging at least a warn-level message to surface potentially misconfigured or unexpected trees.

-            if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
+            if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) {
                 // batch_new_addresses should only contain AddressV2 trees
                 if info.tree_type != light_compressed_account::TreeType::AddressV2 {
                     tracing::error!(
                         "{:?} wrong tree {tree_str} found in batch_new_addresses \
                         Only AddressV2 trees should be in batch new address operations. \
                         queue_index: {}, slot: {}, signature: {}",
                         info.tree_type,
                         address.queue_index,
                         slot,
                         signature
                     );
                     // Skip this invalid data
                     continue;
                 }
             }
+            else {
+                tracing::warn!(
+                    "Tree {} not found in QUEUE_TREE_MAPPING for batch_new_addresses; \
+                     queue_index: {}, slot: {}, signature: {}",
+                    tree_str, address.queue_index, slot, signature
+                );
+            }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 718f407 and 4db2da2.

📒 Files selected for processing (2)
  • src/ingester/fetchers/poller.rs (2 hunks)
  • src/ingester/gap/sequences.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/ingester/fetchers/poller.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/ingester/gap/sequences.rs (6)
src/ingester/gap/mod.rs (1)
  • get_current_sequence_state (67-107)
src/ingester/parser/indexer_events.rs (2)
  • tree_pubkey (34-39)
  • seq (40-45)
src/ingester/gap/rewind.rs (1)
  • new (24-27)
src/main.rs (2)
  • tree_str (248-249)
  • tree_str (314-315)
src/ingester/parser/tx_event_parser_v2.rs (1)
  • events (31-114)
src/ingester/gap/treetype_seq.rs (2)
  • new_address_v2_with_output (61-67)
  • new_state_v2_with_output (51-58)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (2)
src/ingester/gap/sequences.rs (2)

31-41: Good: sequence entries carry slot and signature context.

Including slot and signature alongside sequence numbers will materially improve diagnostics when reporting gaps. LGTM.


171-177: Confirm nullifications are always StateV1.

All nullifications are currently funneled into StateV1. If V2 nullifications exist (or can in the future), this will misclassify and bias detection/state.

Would you like me to scan the codebase for V2 nullification producers/usages and open a follow-up if found?

Comment on lines +179 to +196
// Process batch address queue indexes (AddressV2)
for (tree_pubkey, entries) in &self.batch_address_queue_indexes {
if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) {
let tree_str = tree_pubkey.to_string();
debug!(
"Updating batch_address_queue_indexes for tree: {}, sequence: {}",
tree_str, max_entry.sequence
);

updates.insert(
tree_str.clone(),
TreeTypeSeq::new_address_v2_with_output(
current_state.get(&tree_str),
max_entry.clone(),
),
);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fix potential lost update due to TOCTOU between read and write of SEQUENCE_STATE.

You read current_state, compute V2 updates that depend on current state, drop the read lock, and later acquire write lock to persist. Concurrent writers could race in between, causing stale merges and lost fields (e.g., AddressV2 input preserved from an outdated snapshot).

Hold a single write lock throughout update computation and use it both for reads and writes. This maintains consistency without sacrificing correctness.

Apply the diffs below:

  1. Replace use of read lock with a write lock at the start and remove the later write-lock block.
-        let current_state = match SEQUENCE_STATE.read() {
-            Ok(state) => state,
-            Err(e) => {
-                debug!("Failed to acquire read lock for sequence state: {}", e);
-                return;
-            }
-        };
+        let mut state = match SEQUENCE_STATE.write() {
+            Ok(state) => state,
+            Err(e) => {
+                debug!("Failed to acquire write lock for sequence state: {}", e);
+                return;
+            }
+        };
  1. Use the up-to-date state when building V2 updates:
                 updates.insert(
                     tree_str.clone(),
                     TreeTypeSeq::new_address_v2_with_output(
-                        current_state.get(&tree_str),
+                        state.get(&tree_str),
                         max_entry.clone(),
                     ),
                 );
                             updates.insert(
                                 tree_str.clone(),
                                 TreeTypeSeq::new_state_v2_with_output(
-                                    current_state.get(&tree_str),
+                                    state.get(&tree_str),
                                     max_entry.clone(),
                                 ),
                             );
  1. Remove the explicit read-lock drop and the second write-lock acquisition, and insert directly:
-        // Drop read lock before acquiring write lock
-        drop(current_state);
-
-        // Apply all updates atomically
-        if !updates.is_empty() {
-            match SEQUENCE_STATE.write() {
-                Ok(mut state) => {
-                    for (key, value) in updates {
-                        state.insert(key, value);
-                    }
-                }
-                Err(e) => {
-                    debug!("Failed to acquire write lock for sequence state: {}", e);
-                }
-            }
-        }
+        // Apply all updates atomically under the same write lock
+        if !updates.is_empty() {
+            for (key, value) in updates {
+                state.insert(key, value);
+            }
+        }
+        return ();

Note: If you adopt the earlier Result return, adjust the final return accordingly.

Also applies to: 202-224, 228-243

🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 179 to 196, the code currently
reads from current_state using a read lock, computes updates, then later
acquires a write lock to persist changes, causing a TOCTOU race condition. To
fix this, acquire a single write lock at the start of the update process and use
it for both reading and writing state throughout the update computation. Replace
all read lock usages with this write lock, remove any explicit read lock drops
and subsequent write lock acquisitions, and ensure updates are built using the
most current state from the write lock. Apply the same pattern to lines 202-224
and 228-243 as well.

Comment on lines +334 to +409
fn detect_sequence_gaps_with_metadata(
sequences: &[SequenceEntry],
tree_pubkey: Option<Pubkey>,
queue_pubkey: Option<Pubkey>,
field_type: StateUpdateFieldType,
) -> Vec<SequenceGap> {
if sequences.len() < 2 {
return Vec::new();
}

let mut sorted_sequences = sequences.to_vec();
sorted_sequences.sort_by_key(|entry| entry.sequence);
let mut gaps = Vec::new();

let start_seq = get_current_sequence_state(tree_pubkey, queue_pubkey, &field_type);
let (unpacked_start_seq, start_entry) = start_seq.extract_sequence_info(&field_type);

// Skip gap detection for tree initialization (when unpacked_start_seq == 0)
// because there's no previous sequence to compare against
// Also skip if unpacked_start_seq is u64::MAX (no state found)
if unpacked_start_seq > 0 && unpacked_start_seq != u64::MAX {
// Check for any missing sequences between global state and the minimum sequence in this block
let min_seq_in_block = sorted_sequences[0].sequence;

// Check if there's a gap between the global state and the sequences in this block
// A gap exists if the minimum sequence in the block is more than 1 away from global state
// AND the missing sequences are not present anywhere in this block
if min_seq_in_block > unpacked_start_seq.saturating_add(1) {
// Check if ALL missing sequences are present in this block
let mut has_real_gap = false;
for missing_seq in (unpacked_start_seq + 1)..min_seq_in_block {
let found = sorted_sequences.iter().any(|e| e.sequence == missing_seq);
if !found {
has_real_gap = true;
break;
}
}

if has_real_gap {
let (before_slot, before_signature) = if let Some(entry) = start_entry {
(entry.slot, entry.signature)
} else {
(0, String::new())
};

gaps.push(SequenceGap {
before_slot,
after_slot: sorted_sequences[0].slot,
before_signature,
after_signature: sorted_sequences[0].signature.clone(),
tree_pubkey,
field_type: field_type.clone(),
});
}
}
}

for i in 1..sorted_sequences.len() {
let prev_entry = &sorted_sequences[i - 1];
let curr_entry = &sorted_sequences[i];

if curr_entry.sequence - prev_entry.sequence > 1 {
gaps.push(SequenceGap {
before_slot: prev_entry.slot,
after_slot: curr_entry.slot,
before_signature: prev_entry.signature.clone(),
after_signature: curr_entry.signature.clone(),
tree_pubkey,
field_type: field_type.clone(),
});
}
}

gaps
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add unit tests for boundary cases in gap detection.

Recommend adding tests for:

  • Single-entry block with prior state gap (covered by the previous fix).
  • Duplicate sequences within the same block (ensure no false gaps).
  • Non-monotonic sequences within a block that still contain missing numbers (ensure real gap detection).

I can provide focused tests using minimal StateUpdateSequences construction to cover these.

🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 334 to 409, the
detect_sequence_gaps_with_metadata function lacks unit tests for important
boundary cases. Add unit tests to cover these scenarios: a single-entry block
with a prior state gap to verify gap detection after a previous sequence, blocks
containing duplicate sequences to ensure no false gaps are reported, and blocks
with non-monotonic sequences that still have missing numbers to confirm real
gaps are detected. Use minimal StateUpdateSequences constructions in the tests
to isolate and validate each case effectively.

Comment on lines +340 to +346
if sequences.len() < 2 {
return Vec::new();
}

let mut sorted_sequences = sequences.to_vec();
sorted_sequences.sort_by_key(|entry| entry.sequence);
let mut gaps = Vec::new();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: early-return prevents detecting gaps vs previous state when only one sequence is present in a block.

With a single sequence in sequences, we still need to check for a gap between the global state and the block’s minimum sequence. The current < 2 early return skips that entirely.

-        if sequences.len() < 2 {
+        if sequences.is_empty() {
             return Vec::new();
         }

Add a unit test for a block containing exactly one sequence where there is a gap between global state and the block’s min sequence to prevent regression. I can draft this if helpful.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if sequences.len() < 2 {
return Vec::new();
}
let mut sorted_sequences = sequences.to_vec();
sorted_sequences.sort_by_key(|entry| entry.sequence);
let mut gaps = Vec::new();
if sequences.is_empty() {
return Vec::new();
}
let mut sorted_sequences = sequences.to_vec();
sorted_sequences.sort_by_key(|entry| entry.sequence);
let mut gaps = Vec::new();
🤖 Prompt for AI Agents
In src/ingester/gap/sequences.rs around lines 340 to 346, the early return when
sequences.len() is less than 2 prevents checking for gaps between the global
state and the block's minimum sequence if only one sequence is present. Remove
or modify this early return to allow gap detection in the single-sequence case.
Additionally, add a unit test for a block with exactly one sequence where a gap
exists between the global state and the block's minimum sequence to ensure this
case is handled and prevent regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants