Skip to content

feat: add index signatures from file #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod fetchers;
pub mod indexer;
pub mod parser;
pub mod persist;
pub mod signature_indexer;
pub mod typedefs;

fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
Expand Down
124 changes: 124 additions & 0 deletions src/ingester/signature_indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::str::FromStr;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

use log::{error, info, warn};
use sea_orm::{DatabaseConnection, TransactionTrait};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_config::RpcTransactionConfig;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::signature::Signature;
use solana_transaction_status::UiTransactionEncoding;

use super::error::IngesterError;
use super::parser::parse_transaction;
use super::persist::persist_state_update;
use super::typedefs::block_info::TransactionInfo;

const RPC_CONFIG: RpcTransactionConfig = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Base64),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}),
max_supported_transaction_version: Some(0),
};

pub fn read_signatures_from_file(file_path: &str) -> Result<Vec<Signature>, IngesterError> {
let file = File::open(file_path).map_err(|e| {
IngesterError::ParserError(format!("Failed to open signatures file {}: {}", file_path, e))
})?;

let reader = BufReader::new(file);
let mut signatures = Vec::new();

for (line_num, line) in reader.lines().enumerate() {
let line = line.map_err(|e| {
IngesterError::ParserError(format!("Failed to read line {} from {}: {}", line_num + 1, file_path, e))
})?;

let line = line.trim();
if line.is_empty() {
continue;
}

match Signature::from_str(line) {
Ok(signature) => signatures.push(signature),
Err(e) => {
warn!("Invalid signature on line {}: {} ({})", line_num + 1, line, e);
continue;
}
}
}

info!("Loaded {} signatures from {}", signatures.len(), file_path);
Ok(signatures)
}

pub async fn fetch_transaction_from_signature(
rpc_client: &RpcClient,
signature: &Signature,
) -> Result<(TransactionInfo, u64), IngesterError> {
let encoded_transaction = rpc_client
.get_transaction_with_config(signature, RPC_CONFIG)
.await
.map_err(|e| {
IngesterError::ParserError(format!(
"Failed to fetch transaction {}: {}",
signature, e
))
})?;

let slot = encoded_transaction.slot;
let transaction_info = TransactionInfo::try_from(encoded_transaction)?;
Ok((transaction_info, slot))
}

pub async fn index_signatures_from_file(
db: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClient>,
file_path: &str,
) -> Result<(), IngesterError> {
let signatures = read_signatures_from_file(file_path)?;

info!("Starting to index {} signatures from file", signatures.len());

for (i, signature) in signatures.iter().enumerate() {
loop {
match process_single_signature(db.clone(), rpc_client.clone(), signature).await {
Ok(()) => {
if (i + 1) % 10 == 0 {
info!("Indexed {} / {} signatures", i + 1, signatures.len());
}
break;
}
Err(e) => {
error!("Failed to index signature {}: {}. Retrying in 1 second...", signature, e);
sleep(Duration::from_secs(1));
}
}
}
}

info!("Finished indexing all {} signatures", signatures.len());
Ok(())
}

async fn process_single_signature(
db: Arc<DatabaseConnection>,
rpc_client: Arc<RpcClient>,
signature: &Signature,
) -> Result<(), IngesterError> {
let (transaction_info, slot) = fetch_transaction_from_signature(&rpc_client, signature).await?;

// Use the actual slot from the transaction context
let state_update = parse_transaction(&transaction_info, slot)?;

let tx = db.as_ref().begin().await?;
persist_state_update(&tx, state_update).await?;
tx.commit().await?;

Ok(())
}
120 changes: 72 additions & 48 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use photon_indexer::ingester::fetchers::BlockStreamConfig;
use photon_indexer::ingester::indexer::{
fetch_last_indexed_slot_with_infinite_retry, index_block_stream,
};
use photon_indexer::ingester::signature_indexer::index_signatures_from_file;
use photon_indexer::migration::{
sea_orm::{DatabaseBackend, DatabaseConnection, SqlxPostgresConnector, SqlxSqliteConnector},
Migrator, MigratorTrait,
Expand Down Expand Up @@ -98,6 +99,11 @@ struct Args {
/// If provided, metrics will be sent to the specified statsd server.
#[arg(long, default_value = None)]
metrics_endpoint: Option<String>,

/// Path to a file containing transaction signatures to index (one per line)
/// When provided, the indexer will fetch and index these specific transactions instead of polling for new blocks
#[arg(long, default_value = None)]
signatures_file: Option<String>,
}

async fn start_api_server(
Expand Down Expand Up @@ -245,57 +251,75 @@ async fn main() {
(None, None)
}
false => {
info!("Starting indexer...");
// For localnet we can safely use a large batch size to speed up indexing.
let max_concurrent_block_fetches = match args.max_concurrent_block_fetches {
Some(max_concurrent_block_fetches) => max_concurrent_block_fetches,
None => {
if is_rpc_node_local {
200
} else {
20
// Check if we should index from signatures file instead of normal block indexing
if let Some(signatures_file) = &args.signatures_file {
info!("Starting signature-based indexing from file: {}", signatures_file);
let signatures_file = signatures_file.clone();
let db_conn_clone = db_conn.clone();
let rpc_client_clone = rpc_client.clone();
let indexer_handle = tokio::spawn(async move {
if let Err(e) = index_signatures_from_file(
db_conn_clone,
rpc_client_clone,
&signatures_file,
).await {
error!("Signature indexing failed: {}", e);
}
}
};
let last_indexed_slot = match args.start_slot {
Some(start_slot) => match start_slot.as_str() {
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
_ => {
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
.await
});
(Some(indexer_handle), None)
} else {
info!("Starting indexer...");
// For localnet we can safely use a large batch size to speed up indexing.
let max_concurrent_block_fetches = match args.max_concurrent_block_fetches {
Some(max_concurrent_block_fetches) => max_concurrent_block_fetches,
None => {
if is_rpc_node_local {
200
} else {
20
}
}
},
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
.await
.unwrap_or(
get_network_start_slot(&rpc_client)
.await
.try_into()
.unwrap(),
)
.try_into()
.unwrap(),
};

let block_stream_config = BlockStreamConfig {
rpc_client: rpc_client.clone(),
max_concurrent_block_fetches,
last_indexed_slot,
geyser_url: args.grpc_url,
};

(
Some(continously_index_new_blocks(
block_stream_config,
db_conn.clone(),
rpc_client.clone(),
};
let last_indexed_slot = match args.start_slot {
Some(start_slot) => match start_slot.as_str() {
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
_ => {
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
.await
}
},
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
.await
.unwrap_or(
get_network_start_slot(&rpc_client)
.await
.try_into()
.unwrap(),
)
.try_into()
.unwrap(),
};

let block_stream_config = BlockStreamConfig {
rpc_client: rpc_client.clone(),
max_concurrent_block_fetches,
last_indexed_slot,
)),
Some(continously_monitor_photon(
db_conn.clone(),
rpc_client.clone(),
)),
)
geyser_url: args.grpc_url,
};

(
Some(continously_index_new_blocks(
block_stream_config,
db_conn.clone(),
rpc_client.clone(),
last_indexed_slot,
)),
Some(continously_monitor_photon(
db_conn.clone(),
rpc_client.clone(),
)),
)
}
}
};

Expand Down
Loading