Skip to content

feat: gap detection & rewind #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
88835d1
fix: filter nullifier queue updates by tree
sergeytimoshin Aug 19, 2025
d4640f8
stash
ananas-block Jul 24, 2025
86360f4
sucessfully created snapshot from tx, snapshot untested
ananas-block Jul 24, 2025
7feb60f
test snapshot file for address tree v1 gaps
ananas-block Jul 24, 2025
b194157
fixed indexed Merkle tree gaps
ananas-block Jul 24, 2025
4ea82f9
stash
ananas-block Jul 24, 2025
5ea15e0
refactor gap detection
ananas-block Jul 25, 2025
c03c857
snapshot gap filler works but super slow
ananas-block Jul 25, 2025
3b7f8d1
feat: add non working gap detection to indexer
ananas-block Jul 25, 2025
231bebf
stash
ananas-block Jul 25, 2025
e44945c
gap detection works as test
ananas-block Jul 25, 2025
578c8a6
add rewind controller
ananas-block Jul 25, 2025
8292b09
store claude code
ananas-block Jul 25, 2025
6778700
refactor: BatchNullifyContext
ananas-block Jul 25, 2025
74f5f56
replace panics with warnings
ananas-block Jul 25, 2025
2ca7b3c
remove claude session
ananas-block Jul 25, 2025
f253449
fix u64::max overflow
sergeytimoshin Jul 26, 2025
06cd7d2
Add batch input accounts to account transactions
sergeytimoshin Jul 28, 2025
9a32b95
address histories
sergeytimoshin Jul 28, 2025
d7b35dc
reindex with filter by tree
sergeytimoshin Jul 28, 2025
b834217
tx analyzer, filter optimisation
sergeytimoshin Jul 28, 2025
39f33e8
remove gap detection sql
sergeytimoshin Jul 30, 2025
f274019
uUpdate test file paths
sergeytimoshin Jul 30, 2025
d6a6dc2
format
sergeytimoshin Jul 30, 2025
9af8d2b
fix warnings
sergeytimoshin Jul 30, 2025
f072b71
ignore gap tests (they are more tools than tests)
sergeytimoshin Jul 31, 2025
6337b6e
Update tests/integration_tests/mock_tests.rs
sergeytimoshin Jul 31, 2025
29c4a4d
Update tests/integration_tests/zeroeth_element_fix_test.rs
sergeytimoshin Jul 31, 2025
d9c2f02
Update src/ingester/detect_gaps.rs
sergeytimoshin Jul 31, 2025
c93f2dd
Update src/ingester/detect_gaps.rs
sergeytimoshin Jul 31, 2025
21a1d3a
Remove AMT/SMT sequence state debug logging
sergeytimoshin Jul 31, 2025
b70eec6
Move analyze_snapshot tool into src/tools directory
sergeytimoshin Jul 31, 2025
a01bdfc
Use environment variable for test snapshot path
sergeytimoshin Jul 31, 2025
deeea87
Remove unused StateV2Seq struct
sergeytimoshin Jul 31, 2025
e42993f
cleanup
sergeytimoshin Jul 31, 2025
e889f35
cleanup
sergeytimoshin Jul 31, 2025
ecab5d4
remove unwraps
sergeytimoshin Jul 31, 2025
40b37d0
move gap logic into ingester/gap module
sergeytimoshin Jul 31, 2025
ab0684c
cleanup
sergeytimoshin Jul 31, 2025
3bb1f29
refactor: clear cache and sequence state on gap detection rewinds
sergeytimoshin Aug 8, 2025
1c70149
refactor: add clear_sequence_state fn for rewind recovery
sergeytimoshin Aug 8, 2025
1cb338e
fix gap detection
sergeytimoshin Aug 8, 2025
d1ecb05
Add flag to disable sequence gap detection and rewind
sergeytimoshin Aug 8, 2025
7823882
Remove sequence state clearing on block rewind
sergeytimoshin Aug 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ test-ledger/
minio
test.db
docker-compose.yml
*.txt
56 changes: 32 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ path = "src/snapshot/loader/main.rs"
name = "photon-tree-validator"
path = "src/tools/tree_validator/main.rs"

[[bin]]
name = "photon-analyze-snapshot"
path = "src/tools/analyze_snapshot.rs"

[dependencies]
ark-serialize = "0.5"
ark-bn254 = "0.5"
Expand Down Expand Up @@ -82,9 +86,11 @@ solana-pubkey = "2.3.0"
solana-transaction-status = "1.18.0"

light-concurrent-merkle-tree = "2.1.0"
light-batched-merkle-tree = "0.3.0"
light-merkle-tree-metadata = "0.3.0"
light-compressed-account = { version = "0.3.0", features = ["anchor"] }
light-batched-merkle-tree = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
light-merkle-tree-metadata = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
light-compressed-account = { version = "0.3.0", features = [
"anchor",
], git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" }
light-hasher = { version = "3.1.0" }

light-poseidon = "0.3.0"
Expand Down
6 changes: 5 additions & 1 deletion src/api/method/get_multiple_compressed_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ pub async fn get_multiple_compressed_accounts(
}
fetch_account_from_addresses(conn, addresses).await?
}
_ => panic!("Either hashes or addresses must be provided"),
_ => {
return Err(PhotonApiError::ValidationError(
"Either hashes or addresses must be provided".to_string(),
));
}
};

Ok(GetMultipleCompressedAccountsResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/api/method/get_transaction_with_compression_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ pub async fn get_transaction_helper(
PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0))
})?,
slot,
None,
)
.map_err(|_e| {
PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0))
Expand Down Expand Up @@ -360,6 +361,7 @@ pub async fn get_transaction_helper_v2(
PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0))
})?,
slot,
None,
)
.map_err(|_e| {
PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0))
Expand Down
4 changes: 4 additions & 0 deletions src/ingester/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub enum IngesterError {
EmptyBatchEvent,
#[error("Invalid event.")]
InvalidEvent,
#[error("Custom error: {0}")]
CustomError(String),
#[error("Gap detected, triggering rewind")]
GapDetectedRewind,
}

impl From<sea_orm::error::DbErr> for IngesterError {
Expand Down
7 changes: 7 additions & 0 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_pubkey::Pubkey;
use solana_sdk::pubkey::Pubkey as SdkPubkey;
use solana_sdk::signature::Signature;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::error;
use yellowstone_grpc_client::{GeyserGrpcBuilderResult, GeyserGrpcClient, Interceptor};
Expand All @@ -30,6 +31,7 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions;
use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE;
use crate::common::typedefs::hash::Hash;
use crate::ingester::fetchers::poller::get_block_poller_stream;
use crate::ingester::gap::RewindCommand;
use crate::ingester::typedefs::block_info::{
BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo,
};
Expand All @@ -43,6 +45,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = Vec<BlockInfo>> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
Expand All @@ -53,6 +56,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
rewind_receiver,
))
);

Expand Down Expand Up @@ -115,6 +119,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for timeout fallback
)));
continue;
}
Expand All @@ -132,6 +137,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for out-of-order fallback
)));
continue;
}
Expand All @@ -144,6 +150,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for unhealthy fallback
)));
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use std::sync::Arc;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::mpsc;

use super::typedefs::block_info::BlockInfo;

pub mod grpc;
pub mod poller;

use crate::ingester::gap::RewindCommand;
use grpc::get_grpc_stream_with_rpc_fallback;
use poller::get_block_poller_stream;

Expand All @@ -17,10 +19,11 @@ pub struct BlockStreamConfig {
pub geyser_url: Option<String>,
pub max_concurrent_block_fetches: usize,
pub last_indexed_slot: u64,
pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
}

impl BlockStreamConfig {
pub fn load_block_stream(&self) -> impl Stream<Item = Vec<BlockInfo>> {
pub fn load_block_stream(mut self) -> impl Stream<Item = Vec<BlockInfo>> {
let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| {
let auth_header = std::env::var("GRPC_X_TOKEN").unwrap();
get_grpc_stream_with_rpc_fallback(
Expand All @@ -29,6 +32,7 @@ impl BlockStreamConfig {
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
self.rewind_receiver.take(),
)
});

Expand All @@ -37,6 +41,7 @@ impl BlockStreamConfig {
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
self.rewind_receiver.take(),
))
} else {
None
Expand Down
Loading
Loading