diff --git a/Cargo.lock b/Cargo.lock index b6bf893a0f..7ddd2a7aa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2129,6 +2129,7 @@ dependencies = [ "light-client", "light-compressed-account", "light-compressed-token", + "light-concurrent-merkle-tree", "light-hash-set", "light-hasher", "light-merkle-tree-metadata", @@ -2137,6 +2138,7 @@ dependencies = [ "light-prover-client", "light-registry", "light-sdk", + "light-sparse-merkle-tree", "light-system-program-anchor", "light-test-utils", "num-bigint 0.4.6", diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 79601efe11..d625c22142 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -18,6 +18,9 @@ light-hash-set = { workspace = true, features = ["solana"] } light-hasher = { workspace = true } light-merkle-tree-reference = { workspace = true } light-registry = { workspace = true } +light-concurrent-merkle-tree = { workspace = true } +light-sparse-merkle-tree = { workspace = true } +light-prover-client = { workspace = true } photon-api = { workspace = true } forester-utils = { workspace = true } light-client = { workspace = true, features = ["v2"] } diff --git a/forester/package.json b/forester/package.json index 5113c31257..7cde4bb6b0 100644 --- a/forester/package.json +++ b/forester/package.json @@ -4,7 +4,7 @@ "license": "GPL-3.0", "scripts": { "build": "cargo build", - "test": "source .env && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture", + "test": "source ./.env && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture", "docker:build": "docker build --tag forester -f Dockerfile .." }, "devDependencies": { diff --git a/forester/src/processor/v2/changelog_cache.rs b/forester/src/processor/v2/changelog_cache.rs new file mode 100644 index 0000000000..65bb313d78 --- /dev/null +++ b/forester/src/processor/v2/changelog_cache.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use std::collections::HashMap; + +use anyhow::Result; +use light_sparse_merkle_tree::changelog::ChangelogEntry; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::RwLock; +use tracing::debug; + +pub static CHANGELOG_CACHE: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); + +pub async fn get_changelog_cache() -> &'static ChangelogCache { + CHANGELOG_CACHE.get_or_init(|| async { ChangelogCache::new() }).await +} + +pub struct ChangelogCache { + changelogs: Arc>>>>, +} + +impl ChangelogCache { + pub fn new() -> Self { + Self { + changelogs: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn get_changelogs(&self, merkle_tree: &Pubkey) -> Vec> { + self.changelogs.read().await.get(merkle_tree).cloned().unwrap_or_default() + } + + + pub async fn append_changelogs( + &self, + merkle_tree: Pubkey, + new_changelogs: Vec>, + ) -> Result<()> { + let mut changelogs = self.changelogs.write().await; + let entries = changelogs.entry(merkle_tree).or_insert_with(Vec::new); + let count = new_changelogs.len(); + entries.extend(new_changelogs); + + debug!("Appended {} changelogs for {:?}, total entries: {}", + count, merkle_tree, entries.len()); + Ok(()) + } + + +} \ No newline at end of file diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index d7790299d2..e672cc8f83 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -5,7 +5,9 @@ use forester_utils::rpc_pool::SolanaRpcPool; pub use forester_utils::{ParsedMerkleTreeData, ParsedQueueData}; use futures::{pin_mut, stream::StreamExt, Stream}; use light_batched_merkle_tree::{ - batch::BatchState, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, + batch::BatchState, + merkle_tree::{BatchedMerkleTreeAccount, InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs}, + queue::BatchedQueueAccount, }; use light_client::rpc::Rpc; use light_compressed_account::TreeType; @@ -13,7 +15,10 @@ use solana_sdk::{instruction::Instruction, pubkey::Pubkey, signature::Keypair, s use tokio::sync::Mutex; use tracing::{debug, error, info, trace}; -use super::{address, state}; +use super::{ + address, changelog_cache, + state_streams::{get_nullify_instruction_stream, get_append_instruction_stream} +}; use crate::{errors::ForesterError, processor::tx_cache::ProcessedHashCache, Result}; #[derive(Debug)] @@ -29,6 +34,10 @@ pub enum BatchReadyState { StateReadyForNullify { merkle_tree_data: ParsedMerkleTreeData, }, + BothReady { + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + }, } #[derive(Debug)] @@ -215,7 +224,7 @@ impl BatchProcessor { self.context.merkle_tree ); let result = self - .process_state_append_hybrid(merkle_tree_data, output_queue_data) + .process_state_append(merkle_tree_data, output_queue_data) .await; if let Err(ref e) = result { error!( @@ -230,7 +239,7 @@ impl BatchProcessor { "Processing batch for nullify, tree: {}", self.context.merkle_tree ); - let result = self.process_state_nullify_hybrid(merkle_tree_data).await; + let result = self.process_state_nullify(merkle_tree_data).await; if let Err(ref e) = result { error!( "State nullify failed for tree {}: {:?}", @@ -239,6 +248,17 @@ impl BatchProcessor { } result } + BatchReadyState::BothReady { + merkle_tree_data, + output_queue_data, + } => { + trace!( + "Processing both nullify and append in parallel for tree: {}", + self.context.merkle_tree + ); + self.process_parallel(merkle_tree_data, output_queue_data) + .await + } BatchReadyState::NotReady => { trace!( "Batch not ready for processing, tree: {}", @@ -322,35 +342,16 @@ impl BatchProcessor { }; } - // For State tree type, balance appends and nullifies operations - // based on the queue states match (input_ready, output_ready) { (true, true) => { if let (Some(mt_data), Some(oq_data)) = (merkle_tree_data, output_queue_data) { - // If both queues are ready, check their fill levels - let input_fill = Self::calculate_completion_from_parsed( - mt_data.num_inserted_zkps, - mt_data.current_zkp_batch_index, - ); - let output_fill = Self::calculate_completion_from_parsed( - oq_data.num_inserted_zkps, - oq_data.current_zkp_batch_index, + debug!( + "Both input and output queues ready for tree {}", + self.context.merkle_tree ); - - trace!( - "Input queue fill: {:.2}, Output queue fill: {:.2}", - input_fill, - output_fill - ); - if input_fill > output_fill { - BatchReadyState::StateReadyForNullify { - merkle_tree_data: mt_data, - } - } else { - BatchReadyState::StateReadyForAppend { - merkle_tree_data: mt_data, - output_queue_data: oq_data, - } + BatchReadyState::BothReady { + merkle_tree_data: mt_data, + output_queue_data: oq_data, } } else { BatchReadyState::NotReady @@ -379,14 +380,14 @@ impl BatchProcessor { } } - async fn process_state_nullify_hybrid( + async fn process_state_nullify( &self, merkle_tree_data: ParsedMerkleTreeData, ) -> Result { - let zkp_batch_size = merkle_tree_data.zkp_batch_size as usize; + info!("Processing state nullify with changelog cache"); let batch_hash = format!( - "state_nullify_hybrid_{}_{}", + "state_nullify_{}_{}", self.context.merkle_tree, self.context.epoch ); @@ -394,7 +395,7 @@ impl BatchProcessor { let mut cache = self.context.ops_cache.lock().await; if cache.contains(&batch_hash) { trace!( - "Skipping already processed state nullify batch (hybrid): {}", + "Skipping already processed state nullify batch: {}", batch_hash ); return Ok(0); @@ -402,51 +403,178 @@ impl BatchProcessor { cache.add(&batch_hash); } - state::perform_nullify(&self.context, merkle_tree_data).await?; - trace!( - "State nullify operation (hybrid) completed for tree: {}", - self.context.merkle_tree + let _ = changelog_cache::get_changelog_cache().await; + + // Create nullify stream + let nullify_future = get_nullify_instruction_stream( + self.context.rpc_pool.clone(), + self.context.merkle_tree, + self.context.prover_url.clone(), + self.context.prover_polling_interval, + self.context.prover_max_wait_time, + merkle_tree_data, + self.context.ixs_per_tx, ); + + // Process the stream + let result = process_stream( + &self.context, + nullify_future, + |data: &InstructionDataBatchNullifyInputs| { + light_registry::account_compression_cpi::sdk::create_batch_nullify_instruction( + self.context.authority.pubkey(), + self.context.derivation, + self.context.merkle_tree, + self.context.epoch, + data.try_to_vec().unwrap(), + ) + }, + "state", + Some("nullify") + ).await; + let mut cache = self.context.ops_cache.lock().await; cache.cleanup_by_key(&batch_hash); trace!("Cache cleaned up for batch: {}", batch_hash); - Ok(zkp_batch_size) + result } - async fn process_state_append_hybrid( + async fn process_state_append( &self, merkle_tree_data: ParsedMerkleTreeData, output_queue_data: ParsedQueueData, ) -> Result { - let zkp_batch_size = output_queue_data.zkp_batch_size as usize; + info!("Processing state append with changelog cache"); let batch_hash = format!( - "state_append_hybrid_{}_{}", + "state_append_{}_{}", self.context.merkle_tree, self.context.epoch ); { let mut cache = self.context.ops_cache.lock().await; if cache.contains(&batch_hash) { trace!( - "Skipping already processed state append batch (hybrid): {}", + "Skipping already processed state append batch: {}", batch_hash ); return Ok(0); } cache.add(&batch_hash); } - state::perform_append(&self.context, merkle_tree_data, output_queue_data).await?; - trace!( - "State append operation (hybrid) completed for tree: {}", - self.context.merkle_tree + + + let _ = changelog_cache::get_changelog_cache().await; + + // Create append stream + let append_future = get_append_instruction_stream( + self.context.rpc_pool.clone(), + self.context.merkle_tree, + self.context.prover_url.clone(), + self.context.prover_polling_interval, + self.context.prover_max_wait_time, + merkle_tree_data, + output_queue_data, + self.context.ixs_per_tx, ); + // Process the stream + let result = process_stream( + &self.context, + append_future, + |data: &InstructionDataBatchAppendInputs| { + light_registry::account_compression_cpi::sdk::create_batch_append_instruction( + self.context.authority.pubkey(), + self.context.derivation, + self.context.merkle_tree, + self.context.output_queue, + self.context.epoch, + data.try_to_vec().unwrap(), + ) + }, + "state", + Some("append") + ).await; + let mut cache = self.context.ops_cache.lock().await; cache.cleanup_by_key(&batch_hash); - Ok(zkp_batch_size) + result + } + + /// Process operations in parallel using cache-updating streams + async fn process_parallel( + &self, + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + ) -> Result { + info!("Processing state operations in parallel with changelog cache"); + + + let _ = changelog_cache::get_changelog_cache().await; + + // Create futures for stream creation + let nullify_future = get_nullify_instruction_stream( + self.context.rpc_pool.clone(), + self.context.merkle_tree, + self.context.prover_url.clone(), + self.context.prover_polling_interval, + self.context.prover_max_wait_time, + merkle_tree_data.clone(), + self.context.ixs_per_tx, + ); + + let append_future = get_append_instruction_stream( + self.context.rpc_pool.clone(), + self.context.merkle_tree, + self.context.prover_url.clone(), + self.context.prover_polling_interval, + self.context.prover_max_wait_time, + merkle_tree_data, + output_queue_data, + self.context.ixs_per_tx, + ); + + // Process both streams in parallel + let (nullify_result, append_result) = tokio::join!( + process_stream( + &self.context, + nullify_future, + |data: &InstructionDataBatchNullifyInputs| { + light_registry::account_compression_cpi::sdk::create_batch_nullify_instruction( + self.context.authority.pubkey(), + self.context.derivation, + self.context.merkle_tree, + self.context.epoch, + data.try_to_vec().unwrap(), + ) + }, + "state", + Some("nullify") + ), + process_stream( + &self.context, + append_future, + |data: &InstructionDataBatchAppendInputs| { + light_registry::account_compression_cpi::sdk::create_batch_append_instruction( + self.context.authority.pubkey(), + self.context.derivation, + self.context.merkle_tree, + self.context.output_queue, + self.context.epoch, + data.try_to_vec().unwrap(), + ) + }, + "state", + Some("append") + ) + ); + + let nullify_count = nullify_result?; + let append_count = append_result?; + + Ok(nullify_count + append_count) } /// Parse merkle tree account and check if batch is ready @@ -536,16 +664,5 @@ impl BatchProcessor { Ok((parsed_data, is_ready)) } - /// Calculate completion percentage from parsed data - fn calculate_completion_from_parsed( - num_inserted_zkps: u64, - current_zkp_batch_index: u64, - ) -> f64 { - let total = current_zkp_batch_index; - if total == 0 { - return 0.0; - } - let remaining = total - num_inserted_zkps; - remaining as f64 / total as f64 - } } + diff --git a/forester/src/processor/v2/mod.rs b/forester/src/processor/v2/mod.rs index 6e660ba4e4..cb87d4d0a2 100644 --- a/forester/src/processor/v2/mod.rs +++ b/forester/src/processor/v2/mod.rs @@ -1,6 +1,7 @@ mod address; mod common; -mod state; +mod state_streams; +mod changelog_cache; use common::BatchProcessor; use light_client::rpc::Rpc; diff --git a/forester/src/processor/v2/state.rs b/forester/src/processor/v2/state.rs deleted file mode 100644 index e99c5439a2..0000000000 --- a/forester/src/processor/v2/state.rs +++ /dev/null @@ -1,146 +0,0 @@ -use anyhow::{Error, Ok}; -use borsh::BorshSerialize; -use forester_utils::instructions::{ - state_batch_append::get_append_instruction_stream, - state_batch_nullify::get_nullify_instruction_stream, -}; -use futures::stream::{Stream, StreamExt}; -use light_batched_merkle_tree::merkle_tree::{ - InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, -}; -use light_client::rpc::Rpc; -use light_registry::account_compression_cpi::sdk::{ - create_batch_append_instruction, create_batch_nullify_instruction, -}; -use solana_program::instruction::Instruction; -use solana_sdk::signer::Signer; -use tracing::{info, instrument}; - -use super::common::{process_stream, BatchContext, ParsedMerkleTreeData, ParsedQueueData}; -use crate::Result; - -async fn create_nullify_stream_future( - ctx: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, -) -> Result<( - impl Stream>> + Send, - u16, -)> -where - R: Rpc, -{ - let (stream, size) = get_nullify_instruction_stream( - ctx.rpc_pool.clone(), - ctx.merkle_tree, - ctx.prover_url.clone(), - ctx.prover_polling_interval, - ctx.prover_max_wait_time, - merkle_tree_data, - ctx.ixs_per_tx, - ) - .await - .map_err(Error::from)?; - let stream = stream.map(|item| item.map_err(Error::from)); - Ok((stream, size)) -} - -async fn create_append_stream_future( - ctx: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, - output_queue_data: ParsedQueueData, -) -> Result<( - impl Stream>> + Send, - u16, -)> -where - R: Rpc, -{ - let (stream, size) = get_append_instruction_stream( - ctx.rpc_pool.clone(), - ctx.merkle_tree, - ctx.prover_url.clone(), - ctx.prover_polling_interval, - ctx.prover_max_wait_time, - merkle_tree_data, - output_queue_data, - ctx.ixs_per_tx, - ) - .await - .map_err(Error::from)?; - let stream = stream.map(|item| item.map_err(Error::from)); - Ok((stream, size)) -} - -#[instrument( - level = "debug", - skip(context, merkle_tree_data), - fields(merkle_tree = ?context.merkle_tree) -)] -pub(crate) async fn perform_nullify( - context: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, -) -> Result<()> { - info!( - "V2_TPS_METRIC: operation_start tree_type=StateV2 operation=nullify tree={} epoch={} (hybrid)", - context.merkle_tree, context.epoch - ); - - let instruction_builder = |data: &InstructionDataBatchNullifyInputs| -> Instruction { - create_batch_nullify_instruction( - context.authority.pubkey(), - context.derivation, - context.merkle_tree, - context.epoch, - data.try_to_vec().unwrap(), - ) - }; - - let stream_future = create_nullify_stream_future(context, merkle_tree_data); - - process_stream( - context, - stream_future, - instruction_builder, - "StateV2", - Some("nullify"), - ) - .await?; - Ok(()) -} - -#[instrument( - level = "debug", - skip(context, merkle_tree_data, output_queue_data), - fields(merkle_tree = ?context.merkle_tree) -)] -pub(crate) async fn perform_append( - context: &BatchContext, - merkle_tree_data: ParsedMerkleTreeData, - output_queue_data: ParsedQueueData, -) -> Result<()> { - info!( - "V2_TPS_METRIC: operation_start tree_type=StateV2 operation=append tree={} epoch={} (hybrid)", - context.merkle_tree, context.epoch - ); - let instruction_builder = |data: &InstructionDataBatchAppendInputs| -> Instruction { - create_batch_append_instruction( - context.authority.pubkey(), - context.derivation, - context.merkle_tree, - context.output_queue, - context.epoch, - data.try_to_vec().unwrap(), - ) - }; - - let stream_future = create_append_stream_future(context, merkle_tree_data, output_queue_data); - process_stream( - context, - stream_future, - instruction_builder, - "StateV2", - Some("append"), - ) - .await?; - Ok(()) -} diff --git a/forester/src/processor/v2/state_streams.rs b/forester/src/processor/v2/state_streams.rs new file mode 100644 index 0000000000..9985c3f9ab --- /dev/null +++ b/forester/src/processor/v2/state_streams.rs @@ -0,0 +1,427 @@ +use std::{pin::Pin, sync::Arc, time::Duration}; + +use async_stream::stream; +use futures::{ + stream::{FuturesOrdered, Stream}, + StreamExt, +}; +use light_batched_merkle_tree::{ + constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, + merkle_tree::{InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs}, +}; +use light_client::{indexer::Indexer, rpc::Rpc}; +use light_compressed_account::instruction_data::compressed_proof::CompressedProof; +use light_merkle_tree_metadata::QueueType; +use light_prover_client::{ + proof_client::ProofClient, + proof_types::{ + batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs}, + batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs}, + }, +}; +use light_sparse_merkle_tree::changelog::ChangelogEntry; +use solana_sdk::pubkey::Pubkey; +use tracing::{debug, trace, info}; + +use super::changelog_cache; +use forester_utils::{ + error::ForesterUtilsError, + rpc_pool::SolanaRpcPool, + ParsedMerkleTreeData, + ParsedQueueData, +}; +use anyhow::{anyhow, Result as AnyhowResult}; + +async fn generate_nullify_zkp_proof( + inputs: BatchUpdateCircuitInputs, + proof_client: Arc, +) -> Result { + let (proof, new_root) = proof_client + .generate_batch_update_proof(inputs) + .await + .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; + Ok(InstructionDataBatchNullifyInputs { + new_root, + compressed_proof: CompressedProof { + a: proof.a, + b: proof.b, + c: proof.c, + }, + }) +} + +async fn generate_append_zkp_proof( + circuit_inputs: BatchAppendsCircuitInputs, + proof_client: Arc, +) -> Result { + let (proof, new_root) = proof_client + .generate_batch_append_proof(circuit_inputs) + .await + .map_err(|e| ForesterUtilsError::Prover(e.to_string()))?; + Ok(InstructionDataBatchAppendInputs { + new_root, + compressed_proof: CompressedProof { + a: proof.a, + b: proof.b, + c: proof.c, + }, + }) +} + +#[allow(clippy::too_many_arguments)] +pub async fn get_nullify_instruction_stream<'a, R: Rpc>( + rpc_pool: Arc>, + merkle_tree_pubkey: Pubkey, + prover_url: String, + polling_interval: Duration, + max_wait_time: Duration, + merkle_tree_data: ParsedMerkleTreeData, + yield_batch_size: usize, +) -> AnyhowResult< + ( + Pin< + Box< + dyn Stream< + Item = Result, anyhow::Error>, + > + Send + + 'a, + >, + >, + u16, + ), +> { + let zkp_batch_size = merkle_tree_data.zkp_batch_size; + let leaves_hash_chains = merkle_tree_data.leaves_hash_chains.clone(); + + if leaves_hash_chains.is_empty() { + debug!("No hash chains to process for nullification"); + return Ok((Box::pin(futures::stream::empty()), zkp_batch_size)); + } + + let num_batches_to_process = leaves_hash_chains.len(); + let changelog_cache = changelog_cache::get_changelog_cache().await; + + let stream = stream! { + let total_elements = zkp_batch_size as usize * num_batches_to_process; + let current_root = merkle_tree_data.current_root; + let offset = merkle_tree_data.num_inserted_zkps * zkp_batch_size as u64; + + trace!("Starting nullify stream - total_elements: {}, offset: {}", total_elements, offset); + + // Get accumulated changelogs from cache + let previous_changelogs = changelog_cache.get_changelogs(&merkle_tree_pubkey).await; + info!("Using {} previous changelogs for nullify", previous_changelogs.len()); + + // Fetch queue elements with merkle proofs + let all_queue_elements = { + let mut connection = match rpc_pool.get_connection().await { + Ok(conn) => conn, + Err(e) => { + yield Err(anyhow!("RPC error: {}", e)); + return; + } + }; + + let indexer = match connection.indexer_mut() { + Ok(indexer) => indexer, + Err(e) => { + yield Err(anyhow!("Indexer error: {}", e)); + return; + } + }; + + match indexer.get_queue_elements( + merkle_tree_pubkey.to_bytes(), + QueueType::InputStateV2, + total_elements as u16, + Some(offset), + None, + ).await { + Ok(res) => res.value.items, + Err(e) => { + yield Err(anyhow!("Failed to get queue elements: {}", e)); + return; + } + } + }; + + trace!("Got {} queue elements in total", all_queue_elements.len()); + if all_queue_elements.len() != total_elements { + yield Err(anyhow!( + "Expected {} elements, got {}", + total_elements, all_queue_elements.len() + )); + return; + } + + if let Some(first_element) = all_queue_elements.first() { + if first_element.root != current_root { + yield Err(anyhow!("Root mismatch between indexer and on-chain state")); + return; + } + } + + let mut all_changelogs: Vec> = previous_changelogs.clone(); + let proof_client = Arc::new(ProofClient::with_config(prover_url.clone(), polling_interval, max_wait_time)); + let mut futures_ordered = FuturesOrdered::new(); + let mut pending_count = 0; + let mut proof_buffer = Vec::new(); + + for (batch_offset, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() { + let start_idx = batch_offset * zkp_batch_size as usize; + let end_idx = start_idx + zkp_batch_size as usize; + let batch_elements = &all_queue_elements[start_idx..end_idx]; + + let mut leaves = Vec::new(); + let mut tx_hashes = Vec::new(); + let mut old_leaves = Vec::new(); + let mut path_indices = Vec::new(); + let mut merkle_proofs = Vec::new(); + + for leaf_info in batch_elements.iter() { + path_indices.push(leaf_info.leaf_index as u32); + leaves.push(leaf_info.account_hash); + old_leaves.push(leaf_info.leaf); + merkle_proofs.push(leaf_info.proof.clone()); + tx_hashes.push( + leaf_info + .tx_hash + .ok_or(ForesterUtilsError::Indexer(format!( + "Missing tx_hash for leaf index {}", + leaf_info.leaf_index + )))?, + ); + } + + // Pass previous changelogs to get_batch_update_inputs + let (circuit_inputs, batch_changelog) = match get_batch_update_inputs::< + { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, + >( + current_root, + tx_hashes, + leaves.clone(), + *leaves_hash_chain, + old_leaves, + merkle_proofs, + path_indices.clone(), + zkp_batch_size as u32, + &previous_changelogs, // Use cached changelogs + ) { + Ok(inputs) => inputs, + Err(e) => { + yield Err(anyhow!("Failed to get batch update inputs: {}", e)); + return; + } + }; + + all_changelogs.extend(batch_changelog); + + let proof_client = proof_client.clone(); + let future = Box::pin(generate_nullify_zkp_proof(circuit_inputs, proof_client)); + futures_ordered.push_back(future); + pending_count += 1; + + while pending_count >= yield_batch_size || (batch_offset == num_batches_to_process - 1 && pending_count > 0) { + match futures_ordered.next().await { + Some(Ok(proof_data)) => { + pending_count -= 1; + proof_buffer.push(proof_data); + + if proof_buffer.len() >= yield_batch_size || (batch_offset == num_batches_to_process - 1 && pending_count == 0) { + yield Ok(proof_buffer.clone()); + proof_buffer.clear(); + } + }, + Some(Err(e)) => { + yield Err(e.into()); + return; + }, + None => break, + } + } + } + + // Store only new changelogs in cache (skip the ones we started with) + let new_changelogs = all_changelogs.into_iter().skip(previous_changelogs.len()).collect::>(); + if !new_changelogs.is_empty() { + if let Err(e) = changelog_cache.append_changelogs(merkle_tree_pubkey, new_changelogs.clone()).await { + yield Err(anyhow!("Failed to update changelog cache: {}", e)); + return; + } + info!("Stored {} new changelogs for nullify", new_changelogs.len()); + } + + if !proof_buffer.is_empty() { + yield Ok(proof_buffer); + } + }; + + Ok((Box::pin(stream), zkp_batch_size)) +} + +#[allow(clippy::too_many_arguments)] +pub async fn get_append_instruction_stream<'a, R: Rpc>( + rpc_pool: Arc>, + merkle_tree_pubkey: Pubkey, + prover_url: String, + polling_interval: Duration, + max_wait_time: Duration, + merkle_tree_data: ParsedMerkleTreeData, + output_queue_data: ParsedQueueData, + yield_batch_size: usize, +) -> AnyhowResult< + ( + Pin< + Box< + dyn Stream< + Item = Result, anyhow::Error>, + > + Send + + 'a, + >, + >, + u16, + ), +> { + let zkp_batch_size = output_queue_data.zkp_batch_size; + let leaves_hash_chains = output_queue_data.leaves_hash_chains.clone(); + + if leaves_hash_chains.is_empty() { + debug!("No hash chains to process for append"); + return Ok((Box::pin(futures::stream::empty()), zkp_batch_size)); + } + + let num_batches_to_process = leaves_hash_chains.len(); + let changelog_cache = changelog_cache::get_changelog_cache().await; + + let stream = stream! { + let total_elements = zkp_batch_size as usize * num_batches_to_process; + let current_root = merkle_tree_data.current_root; + let offset = merkle_tree_data.next_index; + + trace!("Starting append stream - total_elements: {}, offset: {}", total_elements, offset); + + // Get accumulated changelogs from cache + let previous_changelogs = changelog_cache.get_changelogs(&merkle_tree_pubkey).await; + info!("Using {} previous changelogs for append", previous_changelogs.len()); + + let queue_elements = { + let mut connection = match rpc_pool.get_connection().await { + Ok(conn) => conn, + Err(e) => { + yield Err(anyhow!("RPC error: {}", e)); + return; + } + }; + + let indexer = match connection.indexer_mut() { + Ok(indexer) => indexer, + Err(e) => { + yield Err(anyhow!("Indexer error: {}", e)); + return; + } + }; + + match indexer.get_queue_elements( + merkle_tree_pubkey.to_bytes(), + QueueType::OutputStateV2, + total_elements as u16, + Some(offset), + None, + ).await { + Ok(res) => res.value.items, + Err(e) => { + yield Err(anyhow!("Failed to get queue elements: {}", e)); + return; + } + } + }; + + trace!("Got {} queue elements for append", queue_elements.len()); + if queue_elements.len() != total_elements { + yield Err(anyhow!( + "Expected {} elements, got {}", + total_elements, queue_elements.len() + )); + return; + } + + let mut all_changelogs: Vec> = previous_changelogs.clone(); + let proof_client = Arc::new(ProofClient::with_config(prover_url.clone(), polling_interval, max_wait_time)); + let mut futures_ordered = FuturesOrdered::new(); + let mut pending_count = 0; + let mut proof_buffer = Vec::new(); + + for (batch_idx, leaves_hash_chain) in leaves_hash_chains.iter().enumerate() { + let start_idx = batch_idx * zkp_batch_size as usize; + let end_idx = start_idx + zkp_batch_size as usize; + let batch_elements = &queue_elements[start_idx..end_idx]; + + let old_leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.leaf).collect(); + let leaves: Vec<[u8; 32]> = batch_elements.iter().map(|x| x.account_hash).collect(); + let merkle_proofs: Vec> = batch_elements.iter().map(|x| x.proof.clone()).collect(); + let adjusted_start_index = offset as u32 + (batch_idx * zkp_batch_size as usize) as u32; + + // Pass previous changelogs to get_batch_append_inputs + let (circuit_inputs, batch_changelogs) = match get_batch_append_inputs::< + { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, + >( + current_root, + adjusted_start_index, + leaves.clone(), + *leaves_hash_chain, + old_leaves, + merkle_proofs, + zkp_batch_size as u32, + &previous_changelogs, // Use cached changelogs + ) { + Ok(inputs) => inputs, + Err(e) => { + yield Err(anyhow!("Failed to get batch append inputs: {}", e)); + return; + } + }; + + all_changelogs.extend(batch_changelogs); + + let proof_client = proof_client.clone(); + let future = Box::pin(generate_append_zkp_proof(circuit_inputs, proof_client)); + futures_ordered.push_back(future); + pending_count += 1; + + while pending_count >= yield_batch_size || (batch_idx == num_batches_to_process - 1 && pending_count > 0) { + match futures_ordered.next().await { + Some(Ok(proof_data)) => { + pending_count -= 1; + proof_buffer.push(proof_data); + + if proof_buffer.len() >= yield_batch_size || (batch_idx == num_batches_to_process - 1 && pending_count == 0) { + yield Ok(proof_buffer.clone()); + proof_buffer.clear(); + } + }, + Some(Err(e)) => { + yield Err(e.into()); + return; + }, + None => break, + } + } + } + + // Store only new changelogs in cache (skip the ones we started with) + let new_changelogs = all_changelogs.into_iter().skip(previous_changelogs.len()).collect::>(); + if !new_changelogs.is_empty() { + if let Err(e) = changelog_cache.append_changelogs(merkle_tree_pubkey, new_changelogs.clone()).await { + yield Err(anyhow!("Failed to update changelog cache: {}", e)); + return; + } + info!("Stored {} new changelogs for append", new_changelogs.len()); + } + + if !proof_buffer.is_empty() { + yield Ok(proof_buffer); + } + }; + + Ok((Box::pin(stream), zkp_batch_size)) +} \ No newline at end of file diff --git a/forester/src/processor/v2/tests/mod.rs b/forester/src/processor/v2/tests/mod.rs new file mode 100644 index 0000000000..042c426f5a --- /dev/null +++ b/forester/src/processor/v2/tests/mod.rs @@ -0,0 +1,2 @@ +#[cfg(test)] +mod shared_tree_test; \ No newline at end of file diff --git a/forester/src/processor/v2/tests/shared_tree_test.rs b/forester/src/processor/v2/tests/shared_tree_test.rs new file mode 100644 index 0000000000..da70b831cf --- /dev/null +++ b/forester/src/processor/v2/tests/shared_tree_test.rs @@ -0,0 +1,172 @@ +#[cfg(test)] +mod tests { + use super::super::*; + use crate::processor::v2::tree_cache::TREE_CACHE; + use light_hasher::Poseidon; + use light_sparse_merkle_tree::SparseMerkleTree; + use solana_sdk::pubkey::Pubkey; + + #[tokio::test] + async fn test_tree_cache_basic() { + // Create a test tree + let tree = SparseMerkleTree::::new_empty(); + let pubkey = Pubkey::new_unique(); + + // Cache the tree + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + + // Retrieve from cache + let snapshot = TREE_CACHE.get(&pubkey).await.unwrap(); + assert_eq!(snapshot.root, tree.root()); + assert_eq!(snapshot.next_index, tree.get_next_index()); + assert_eq!(snapshot.height, 32); + } + + #[tokio::test] + async fn test_tree_cache_update() { + let pubkey = Pubkey::new_unique(); + + // Create and cache initial tree + let mut tree = SparseMerkleTree::::new_empty(); + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + + let initial_snapshot = TREE_CACHE.get(&pubkey).await.unwrap(); + let initial_seq = initial_snapshot.sequence_number; + + // Append some leaves + tree.append([1u8; 32]); + tree.append([2u8; 32]); + + // Update cache + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + + // Verify updates + let updated_snapshot = TREE_CACHE.get(&pubkey).await.unwrap(); + assert_ne!(updated_snapshot.root, initial_snapshot.root); + assert_eq!(updated_snapshot.next_index, 2); + assert!(updated_snapshot.sequence_number > initial_seq); + } + + #[tokio::test] + async fn test_tree_snapshot_to_tree() { + let pubkey = Pubkey::new_unique(); + + // Create tree with some data + let mut original_tree = SparseMerkleTree::::new_empty(); + original_tree.append([10u8; 32]); + original_tree.append([20u8; 32]); + original_tree.append([30u8; 32]); + + // Cache it + TREE_CACHE.update(pubkey, &original_tree).await.unwrap(); + + // Get snapshot and recreate tree + let snapshot = TREE_CACHE.get(&pubkey).await.unwrap(); + let recreated_tree = snapshot.to_tree::().unwrap(); + + // Verify they match + assert_eq!(recreated_tree.root(), original_tree.root()); + assert_eq!(recreated_tree.get_next_index(), original_tree.get_next_index()); + assert_eq!(recreated_tree.get_subtrees(), original_tree.get_subtrees()); + } + + #[tokio::test] + async fn test_parallel_tree_operations() { + use tokio::sync::Arc; + + let pubkey = Pubkey::new_unique(); + let tree = SparseMerkleTree::::new_empty(); + + // Cache the tree + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + + // Simulate parallel operations + let cache = Arc::new(TREE_CACHE.clone()); + let pk1 = pubkey.clone(); + let pk2 = pubkey.clone(); + + let handle1 = tokio::spawn(async move { + // Operation 1: Read tree state + let snapshot = TREE_CACHE.get(&pk1).await.unwrap(); + let tree1 = snapshot.to_tree::().unwrap(); + assert_eq!(tree1.get_next_index(), 0); + }); + + let handle2 = tokio::spawn(async move { + // Operation 2: Also read tree state + let snapshot = TREE_CACHE.get(&pk2).await.unwrap(); + let tree2 = snapshot.to_tree::().unwrap(); + assert_eq!(tree2.get_next_index(), 0); + }); + + // Both should succeed without conflicts + handle1.await.unwrap(); + handle2.await.unwrap(); + } +} + +#[cfg(test)] +mod shared_tree_tests { + use super::super::*; + use crate::processor::v2::{ + shared_tree_proof_generator::SharedTreeProofGenerator, + tree_cache::TREE_CACHE, + }; + use forester_utils::{ParsedMerkleTreeData, ParsedQueueData}; + use light_hasher::Poseidon; + use solana_sdk::pubkey::Pubkey; + + fn create_test_merkle_data() -> ParsedMerkleTreeData { + ParsedMerkleTreeData { + next_index: 0, + current_root: [0u8; 32], + root_history: vec![[0u8; 32]], + zkp_batch_size: 10, + pending_batch_index: 0, + num_inserted_zkps: 0, + current_zkp_batch_index: 0, + leaves_hash_chains: vec![], + } + } + + fn create_test_queue_data() -> ParsedQueueData { + ParsedQueueData { + zkp_batch_size: 10, + pending_batch_index: 0, + num_inserted_zkps: 0, + current_zkp_batch_index: 2, + leaves_hash_chains: vec![[1u8; 32], [2u8; 32]], // 2 leaves to append + } + } + + #[tokio::test] + async fn test_cache_invalidation() { + let pubkey = Pubkey::new_unique(); + let tree = light_sparse_merkle_tree::SparseMerkleTree::::new_empty(); + + // Cache the tree + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + assert!(TREE_CACHE.get(&pubkey).await.is_some()); + + // Invalidate + TREE_CACHE.invalidate(&pubkey).await; + assert!(TREE_CACHE.get(&pubkey).await.is_none()); + } + + #[tokio::test] + async fn test_cache_clear() { + // Add multiple trees + for i in 0..5 { + let pubkey = Pubkey::new_unique(); + let tree = light_sparse_merkle_tree::SparseMerkleTree::::new_empty(); + TREE_CACHE.update(pubkey, &tree).await.unwrap(); + } + + // Clear all + TREE_CACHE.clear().await; + + // Verify all are gone + // (We can't easily check this without exposing internal state, + // but at least verify clear doesn't panic) + } +} \ No newline at end of file