diff --git a/forester/src/processor/tx_cache.rs b/forester/src/processor/tx_cache.rs index f15e5c9e77..4b7c7fdb63 100644 --- a/forester/src/processor/tx_cache.rs +++ b/forester/src/processor/tx_cache.rs @@ -1,10 +1,17 @@ use std::{collections::HashMap, time::Duration}; use tokio::time::Instant; +use tracing::warn; + +#[derive(Debug, Clone)] +struct CacheEntry { + timestamp: Instant, + timeout: Duration, +} #[derive(Debug, Clone)] pub struct ProcessedHashCache { - entries: HashMap, + entries: HashMap, ttl: Duration, } @@ -17,18 +24,66 @@ impl ProcessedHashCache { } pub fn add(&mut self, hash: &str) { - self.entries.insert(hash.to_string(), Instant::now()); + self.entries.insert( + hash.to_string(), + CacheEntry { + timestamp: Instant::now(), + timeout: self.ttl, + }, + ); + } + + pub fn add_with_timeout(&mut self, hash: &str, timeout: Duration) { + self.entries.insert( + hash.to_string(), + CacheEntry { + timestamp: Instant::now(), + timeout, + }, + ); + } + + pub fn extend_timeout(&mut self, hash: &str, new_timeout: Duration) { + if let Some(entry) = self.entries.get_mut(hash) { + entry.timeout = new_timeout; + } } pub fn contains(&mut self, hash: &str) -> bool { self.cleanup(); - self.entries.contains_key(hash) + if let Some(entry) = self.entries.get(hash) { + let age = Instant::now().duration_since(entry.timestamp); + if age > Duration::from_secs(60) && age < entry.timeout { + warn!( + "Cache entry {} has been processing for {:?} (timeout: {:?})", + hash, age, entry.timeout + ); + } + true + } else { + false + } + } + + pub fn get_age(&self, hash: &str) -> Option { + self.entries + .get(hash) + .map(|entry| Instant::now().duration_since(entry.timestamp)) } pub fn cleanup(&mut self) { let now = Instant::now(); - self.entries - .retain(|_, timestamp| now.duration_since(*timestamp) < self.ttl); + self.entries.retain(|hash, entry| { + let age = now.duration_since(entry.timestamp); + let should_keep = age < entry.timeout; + if !should_keep && age < Duration::from_secs(30) { + warn!( + "Removing cache entry {} after {:?} timeout (was: {:?})", + hash, age, entry.timeout + ); + } + should_keep + }); } pub fn cleanup_by_key(&mut self, key: &str) { diff --git a/forester/src/processor/v1/tx_builder.rs b/forester/src/processor/v1/tx_builder.rs index 7f3b0075fa..13f0d620f1 100644 --- a/forester/src/processor/v1/tx_builder.rs +++ b/forester/src/processor/v1/tx_builder.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use account_compression::processor::initialize_address_merkle_tree::Pubkey; use async_trait::async_trait; @@ -91,10 +91,18 @@ impl TransactionBuilder for EpochManagerTransactions { }) .collect(); + // Add items with short timeout (30 seconds) for processing for item in &work_items { let hash_str = bs58::encode(&item.queue_item_data.hash).into_string(); - cache.add(&hash_str); + cache.add_with_timeout(&hash_str, Duration::from_secs(15)); + trace!("Added {} to cache with 15s timeout", hash_str); } + + let work_item_hashes: Vec = work_items + .iter() + .map(|item| bs58::encode(&item.queue_item_data.hash).into_string()) + .collect(); + drop(cache); if work_items.is_empty() { @@ -143,6 +151,18 @@ impl TransactionBuilder for EpochManagerTransactions { .await?; transactions.push(transaction); } + + if !transactions.is_empty() { + let mut cache = self.processed_hash_cache.lock().await; + for hash_str in work_item_hashes { + cache.extend_timeout(&hash_str, Duration::from_secs(30)); + trace!( + "Extended cache timeout for {} to 30s after successful transaction creation", + hash_str + ); + } + } + Ok((transactions, last_valid_block_height)) } } diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index f6b4d05f12..f2c4ee395f 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -25,7 +25,17 @@ pub async fn fetch_queue_item_data( queue_length: u16, ) -> Result> { trace!("Fetching queue data for {:?}", queue_pubkey); - let mut account = rpc.get_account(*queue_pubkey).await?.unwrap(); + let account = rpc.get_account(*queue_pubkey).await?; + let mut account = match account { + Some(acc) => acc, + None => { + tracing::warn!( + "Queue account {} not found - may have been deleted or not yet created", + queue_pubkey + ); + return Ok(Vec::new()); + } + }; let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[8 + size_of::()..])? }; let end_index = (start_index + processing_length).min(queue_length);