From 40b7b852ee77a0707eb50c530b12f338d824a83c Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 9 Aug 2025 00:33:05 +0100 Subject: [PATCH 1/2] refactor: timeouts & unwrap - added timeouts for transaction processing phases and warns about long-running items - removed unwrap --- forester/src/processor/tx_cache.rs | 65 +++++++++++++++++++++++-- forester/src/processor/v1/tx_builder.rs | 21 +++++++- forester/src/queue_helpers.rs | 12 ++++- 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/forester/src/processor/tx_cache.rs b/forester/src/processor/tx_cache.rs index f15e5c9e77..4b7c7fdb63 100644 --- a/forester/src/processor/tx_cache.rs +++ b/forester/src/processor/tx_cache.rs @@ -1,10 +1,17 @@ use std::{collections::HashMap, time::Duration}; use tokio::time::Instant; +use tracing::warn; + +#[derive(Debug, Clone)] +struct CacheEntry { + timestamp: Instant, + timeout: Duration, +} #[derive(Debug, Clone)] pub struct ProcessedHashCache { - entries: HashMap, + entries: HashMap, ttl: Duration, } @@ -17,18 +24,66 @@ impl ProcessedHashCache { } pub fn add(&mut self, hash: &str) { - self.entries.insert(hash.to_string(), Instant::now()); + self.entries.insert( + hash.to_string(), + CacheEntry { + timestamp: Instant::now(), + timeout: self.ttl, + }, + ); + } + + pub fn add_with_timeout(&mut self, hash: &str, timeout: Duration) { + self.entries.insert( + hash.to_string(), + CacheEntry { + timestamp: Instant::now(), + timeout, + }, + ); + } + + pub fn extend_timeout(&mut self, hash: &str, new_timeout: Duration) { + if let Some(entry) = self.entries.get_mut(hash) { + entry.timeout = new_timeout; + } } pub fn contains(&mut self, hash: &str) -> bool { self.cleanup(); - self.entries.contains_key(hash) + if let Some(entry) = self.entries.get(hash) { + let age = Instant::now().duration_since(entry.timestamp); + if age > Duration::from_secs(60) && age < entry.timeout { + warn!( + "Cache entry {} has been processing for {:?} (timeout: {:?})", + hash, age, entry.timeout + ); + } + true + } else { + false + } + } + + pub fn get_age(&self, hash: &str) -> Option { + self.entries + .get(hash) + .map(|entry| Instant::now().duration_since(entry.timestamp)) } pub fn cleanup(&mut self) { let now = Instant::now(); - self.entries - .retain(|_, timestamp| now.duration_since(*timestamp) < self.ttl); + self.entries.retain(|hash, entry| { + let age = now.duration_since(entry.timestamp); + let should_keep = age < entry.timeout; + if !should_keep && age < Duration::from_secs(30) { + warn!( + "Removing cache entry {} after {:?} timeout (was: {:?})", + hash, age, entry.timeout + ); + } + should_keep + }); } pub fn cleanup_by_key(&mut self, key: &str) { diff --git a/forester/src/processor/v1/tx_builder.rs b/forester/src/processor/v1/tx_builder.rs index 7f3b0075fa..7cf2404360 100644 --- a/forester/src/processor/v1/tx_builder.rs +++ b/forester/src/processor/v1/tx_builder.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use account_compression::processor::initialize_address_merkle_tree::Pubkey; use async_trait::async_trait; @@ -91,10 +91,18 @@ impl TransactionBuilder for EpochManagerTransactions { }) .collect(); + // Add items with short timeout (30 seconds) for processing for item in &work_items { let hash_str = bs58::encode(&item.queue_item_data.hash).into_string(); - cache.add(&hash_str); + cache.add_with_timeout(&hash_str, Duration::from_secs(15)); + trace!("Added {} to cache with 15s timeout", hash_str); } + + let work_item_hashes: Vec = work_items + .iter() + .map(|item| bs58::encode(&item.queue_item_data.hash).into_string()) + .collect(); + drop(cache); if work_items.is_empty() { @@ -143,6 +151,15 @@ impl TransactionBuilder for EpochManagerTransactions { .await?; transactions.push(transaction); } + + if !transactions.is_empty() { + let mut cache = self.processed_hash_cache.lock().await; + for hash_str in work_item_hashes { + cache.extend_timeout(&hash_str, Duration::from_secs(30)); + trace!("Extended cache timeout for {} to 30s after successful transaction creation", hash_str); + } + } + Ok((transactions, last_valid_block_height)) } } diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index f6b4d05f12..f2c4ee395f 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -25,7 +25,17 @@ pub async fn fetch_queue_item_data( queue_length: u16, ) -> Result> { trace!("Fetching queue data for {:?}", queue_pubkey); - let mut account = rpc.get_account(*queue_pubkey).await?.unwrap(); + let account = rpc.get_account(*queue_pubkey).await?; + let mut account = match account { + Some(acc) => acc, + None => { + tracing::warn!( + "Queue account {} not found - may have been deleted or not yet created", + queue_pubkey + ); + return Ok(Vec::new()); + } + }; let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[8 + size_of::()..])? }; let end_index = (start_index + processing_length).min(queue_length); From c9676238b14f2504c96dda388dab4f7df591baff Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 9 Aug 2025 01:19:04 +0100 Subject: [PATCH 2/2] formatting --- forester/src/processor/v1/tx_builder.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/forester/src/processor/v1/tx_builder.rs b/forester/src/processor/v1/tx_builder.rs index 7cf2404360..13f0d620f1 100644 --- a/forester/src/processor/v1/tx_builder.rs +++ b/forester/src/processor/v1/tx_builder.rs @@ -97,12 +97,12 @@ impl TransactionBuilder for EpochManagerTransactions { cache.add_with_timeout(&hash_str, Duration::from_secs(15)); trace!("Added {} to cache with 15s timeout", hash_str); } - + let work_item_hashes: Vec = work_items .iter() .map(|item| bs58::encode(&item.queue_item_data.hash).into_string()) .collect(); - + drop(cache); if work_items.is_empty() { @@ -151,15 +151,18 @@ impl TransactionBuilder for EpochManagerTransactions { .await?; transactions.push(transaction); } - + if !transactions.is_empty() { let mut cache = self.processed_hash_cache.lock().await; for hash_str in work_item_hashes { cache.extend_timeout(&hash_str, Duration::from_secs(30)); - trace!("Extended cache timeout for {} to 30s after successful transaction creation", hash_str); + trace!( + "Extended cache timeout for {} to 30s after successful transaction creation", + hash_str + ); } } - + Ok((transactions, last_valid_block_height)) } }