From 276c4353f3d9fbb7db7784e899a3f08373bed6b5 Mon Sep 17 00:00:00 2001 From: ananas Date: Thu, 24 Jul 2025 00:59:57 +0100 Subject: [PATCH 1/2] feat: add index signatures from file --- src/ingester/mod.rs | 1 + src/ingester/signature_indexer.rs | 122 ++++++++++++++++++++++++++++++ src/main.rs | 120 +++++++++++++++++------------ 3 files changed, 195 insertions(+), 48 deletions(-) create mode 100644 src/ingester/signature_indexer.rs diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index f2934d47..15ebda91 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -27,6 +27,7 @@ pub mod fetchers; pub mod indexer; pub mod parser; pub mod persist; +pub mod signature_indexer; pub mod typedefs; fn derive_block_state_update(block: &BlockInfo) -> Result { diff --git a/src/ingester/signature_indexer.rs b/src/ingester/signature_indexer.rs new file mode 100644 index 00000000..aa01ea82 --- /dev/null +++ b/src/ingester/signature_indexer.rs @@ -0,0 +1,122 @@ +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::str::FromStr; +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; + +use log::{error, info, warn}; +use sea_orm::{DatabaseConnection, TransactionTrait}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_config::RpcTransactionConfig; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::signature::Signature; +use solana_transaction_status::UiTransactionEncoding; + +use super::error::IngesterError; +use super::parser::parse_transaction; +use super::persist::persist_state_update; +use super::typedefs::block_info::TransactionInfo; + +const RPC_CONFIG: RpcTransactionConfig = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base64), + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }), + max_supported_transaction_version: Some(0), +}; + +pub fn read_signatures_from_file(file_path: &str) -> Result, IngesterError> { + let file = File::open(file_path).map_err(|e| { + IngesterError::ParserError(format!("Failed to open signatures file {}: {}", file_path, e)) + })?; + + let reader = BufReader::new(file); + let mut signatures = Vec::new(); + + for (line_num, line) in reader.lines().enumerate() { + let line = line.map_err(|e| { + IngesterError::ParserError(format!("Failed to read line {} from {}: {}", line_num + 1, file_path, e)) + })?; + + let line = line.trim(); + if line.is_empty() { + continue; + } + + match Signature::from_str(line) { + Ok(signature) => signatures.push(signature), + Err(e) => { + warn!("Invalid signature on line {}: {} ({})", line_num + 1, line, e); + continue; + } + } + } + + info!("Loaded {} signatures from {}", signatures.len(), file_path); + Ok(signatures) +} + +pub async fn fetch_transaction_from_signature( + rpc_client: &RpcClient, + signature: &Signature, +) -> Result { + let encoded_transaction = rpc_client + .get_transaction_with_config(signature, RPC_CONFIG) + .await + .map_err(|e| { + IngesterError::ParserError(format!( + "Failed to fetch transaction {}: {}", + signature, e + )) + })?; + + TransactionInfo::try_from(encoded_transaction) +} + +pub async fn index_signatures_from_file( + db: Arc, + rpc_client: Arc, + file_path: &str, +) -> Result<(), IngesterError> { + let signatures = read_signatures_from_file(file_path)?; + + info!("Starting to index {} signatures from file", signatures.len()); + + for (i, signature) in signatures.iter().enumerate() { + loop { + match process_single_signature(db.clone(), rpc_client.clone(), signature).await { + Ok(()) => { + if (i + 1) % 10 == 0 { + info!("Indexed {} / {} signatures", i + 1, signatures.len()); + } + break; + } + Err(e) => { + error!("Failed to index signature {}: {}. Retrying in 1 second...", signature, e); + sleep(Duration::from_secs(1)); + } + } + } + } + + info!("Finished indexing all {} signatures", signatures.len()); + Ok(()) +} + +async fn process_single_signature( + db: Arc, + rpc_client: Arc, + signature: &Signature, +) -> Result<(), IngesterError> { + let transaction_info = fetch_transaction_from_signature(&rpc_client, signature).await?; + + // Use slot 0 since we don't have block context + let state_update = parse_transaction(&transaction_info, 0)?; + + let tx = db.as_ref().begin().await?; + persist_state_update(&tx, state_update).await?; + tx.commit().await?; + + Ok(()) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 185013ac..554785c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use photon_indexer::ingester::fetchers::BlockStreamConfig; use photon_indexer::ingester::indexer::{ fetch_last_indexed_slot_with_infinite_retry, index_block_stream, }; +use photon_indexer::ingester::signature_indexer::index_signatures_from_file; use photon_indexer::migration::{ sea_orm::{DatabaseBackend, DatabaseConnection, SqlxPostgresConnector, SqlxSqliteConnector}, Migrator, MigratorTrait, @@ -98,6 +99,11 @@ struct Args { /// If provided, metrics will be sent to the specified statsd server. #[arg(long, default_value = None)] metrics_endpoint: Option, + + /// Path to a file containing transaction signatures to index (one per line) + /// When provided, the indexer will fetch and index these specific transactions instead of polling for new blocks + #[arg(long, default_value = None)] + signatures_file: Option, } async fn start_api_server( @@ -245,57 +251,75 @@ async fn main() { (None, None) } false => { - info!("Starting indexer..."); - // For localnet we can safely use a large batch size to speed up indexing. - let max_concurrent_block_fetches = match args.max_concurrent_block_fetches { - Some(max_concurrent_block_fetches) => max_concurrent_block_fetches, - None => { - if is_rpc_node_local { - 200 - } else { - 20 + // Check if we should index from signatures file instead of normal block indexing + if let Some(signatures_file) = &args.signatures_file { + info!("Starting signature-based indexing from file: {}", signatures_file); + let signatures_file = signatures_file.clone(); + let db_conn_clone = db_conn.clone(); + let rpc_client_clone = rpc_client.clone(); + let indexer_handle = tokio::spawn(async move { + if let Err(e) = index_signatures_from_file( + db_conn_clone, + rpc_client_clone, + &signatures_file, + ).await { + error!("Signature indexing failed: {}", e); } - } - }; - let last_indexed_slot = match args.start_slot { - Some(start_slot) => match start_slot.as_str() { - "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, - _ => { - fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) - .await + }); + (Some(indexer_handle), None) + } else { + info!("Starting indexer..."); + // For localnet we can safely use a large batch size to speed up indexing. + let max_concurrent_block_fetches = match args.max_concurrent_block_fetches { + Some(max_concurrent_block_fetches) => max_concurrent_block_fetches, + None => { + if is_rpc_node_local { + 200 + } else { + 20 + } } - }, - None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) - .await - .unwrap_or( - get_network_start_slot(&rpc_client) - .await - .try_into() - .unwrap(), - ) - .try_into() - .unwrap(), - }; - - let block_stream_config = BlockStreamConfig { - rpc_client: rpc_client.clone(), - max_concurrent_block_fetches, - last_indexed_slot, - geyser_url: args.grpc_url, - }; - - ( - Some(continously_index_new_blocks( - block_stream_config, - db_conn.clone(), - rpc_client.clone(), + }; + let last_indexed_slot = match args.start_slot { + Some(start_slot) => match start_slot.as_str() { + "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, + _ => { + fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) + .await + } + }, + None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) + .await + .unwrap_or( + get_network_start_slot(&rpc_client) + .await + .try_into() + .unwrap(), + ) + .try_into() + .unwrap(), + }; + + let block_stream_config = BlockStreamConfig { + rpc_client: rpc_client.clone(), + max_concurrent_block_fetches, last_indexed_slot, - )), - Some(continously_monitor_photon( - db_conn.clone(), - rpc_client.clone(), - )), - ) + geyser_url: args.grpc_url, + }; + + ( + Some(continously_index_new_blocks( + block_stream_config, + db_conn.clone(), + rpc_client.clone(), + last_indexed_slot, + )), + Some(continously_monitor_photon( + db_conn.clone(), + rpc_client.clone(), + )), + ) + } } }; From a40ea123478f9f09a75e547be3f398c35a9bb8ce Mon Sep 17 00:00:00 2001 From: ananas Date: Thu, 24 Jul 2025 01:31:55 +0100 Subject: [PATCH 2/2] add slot info --- src/ingester/signature_indexer.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ingester/signature_indexer.rs b/src/ingester/signature_indexer.rs index aa01ea82..8b47111b 100644 --- a/src/ingester/signature_indexer.rs +++ b/src/ingester/signature_indexer.rs @@ -60,7 +60,7 @@ pub fn read_signatures_from_file(file_path: &str) -> Result, Inge pub async fn fetch_transaction_from_signature( rpc_client: &RpcClient, signature: &Signature, -) -> Result { +) -> Result<(TransactionInfo, u64), IngesterError> { let encoded_transaction = rpc_client .get_transaction_with_config(signature, RPC_CONFIG) .await @@ -71,7 +71,9 @@ pub async fn fetch_transaction_from_signature( )) })?; - TransactionInfo::try_from(encoded_transaction) + let slot = encoded_transaction.slot; + let transaction_info = TransactionInfo::try_from(encoded_transaction)?; + Ok((transaction_info, slot)) } pub async fn index_signatures_from_file( @@ -109,10 +111,10 @@ async fn process_single_signature( rpc_client: Arc, signature: &Signature, ) -> Result<(), IngesterError> { - let transaction_info = fetch_transaction_from_signature(&rpc_client, signature).await?; + let (transaction_info, slot) = fetch_transaction_from_signature(&rpc_client, signature).await?; - // Use slot 0 since we don't have block context - let state_update = parse_transaction(&transaction_info, 0)?; + // Use the actual slot from the transaction context + let state_update = parse_transaction(&transaction_info, slot)?; let tx = db.as_ref().begin().await?; persist_state_update(&tx, state_update).await?;