Skip to content

Commit dc3e5e7

Browse files
refactor: forester: timeouts & unwrap (#1907)
* refactor: timeouts & unwrap - added timeouts for transaction processing phases and warns about long-running items - removed unwrap * formatting
1 parent 7fb1a83 commit dc3e5e7

File tree

3 files changed

+93
-8
lines changed

3 files changed

+93
-8
lines changed

forester/src/processor/tx_cache.rs

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
use std::{collections::HashMap, time::Duration};
22

33
use tokio::time::Instant;
4+
use tracing::warn;
5+
6+
#[derive(Debug, Clone)]
7+
struct CacheEntry {
8+
timestamp: Instant,
9+
timeout: Duration,
10+
}
411

512
#[derive(Debug, Clone)]
613
pub struct ProcessedHashCache {
7-
entries: HashMap<String, Instant>,
14+
entries: HashMap<String, CacheEntry>,
815
ttl: Duration,
916
}
1017

@@ -17,18 +24,66 @@ impl ProcessedHashCache {
1724
}
1825

1926
pub fn add(&mut self, hash: &str) {
20-
self.entries.insert(hash.to_string(), Instant::now());
27+
self.entries.insert(
28+
hash.to_string(),
29+
CacheEntry {
30+
timestamp: Instant::now(),
31+
timeout: self.ttl,
32+
},
33+
);
34+
}
35+
36+
pub fn add_with_timeout(&mut self, hash: &str, timeout: Duration) {
37+
self.entries.insert(
38+
hash.to_string(),
39+
CacheEntry {
40+
timestamp: Instant::now(),
41+
timeout,
42+
},
43+
);
44+
}
45+
46+
pub fn extend_timeout(&mut self, hash: &str, new_timeout: Duration) {
47+
if let Some(entry) = self.entries.get_mut(hash) {
48+
entry.timeout = new_timeout;
49+
}
2150
}
2251

2352
pub fn contains(&mut self, hash: &str) -> bool {
2453
self.cleanup();
25-
self.entries.contains_key(hash)
54+
if let Some(entry) = self.entries.get(hash) {
55+
let age = Instant::now().duration_since(entry.timestamp);
56+
if age > Duration::from_secs(60) && age < entry.timeout {
57+
warn!(
58+
"Cache entry {} has been processing for {:?} (timeout: {:?})",
59+
hash, age, entry.timeout
60+
);
61+
}
62+
true
63+
} else {
64+
false
65+
}
66+
}
67+
68+
pub fn get_age(&self, hash: &str) -> Option<Duration> {
69+
self.entries
70+
.get(hash)
71+
.map(|entry| Instant::now().duration_since(entry.timestamp))
2672
}
2773

2874
pub fn cleanup(&mut self) {
2975
let now = Instant::now();
30-
self.entries
31-
.retain(|_, timestamp| now.duration_since(*timestamp) < self.ttl);
76+
self.entries.retain(|hash, entry| {
77+
let age = now.duration_since(entry.timestamp);
78+
let should_keep = age < entry.timeout;
79+
if !should_keep && age < Duration::from_secs(30) {
80+
warn!(
81+
"Removing cache entry {} after {:?} timeout (was: {:?})",
82+
hash, age, entry.timeout
83+
);
84+
}
85+
should_keep
86+
});
3287
}
3388

3489
pub fn cleanup_by_key(&mut self, key: &str) {

forester/src/processor/v1/tx_builder.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use account_compression::processor::initialize_address_merkle_tree::Pubkey;
44
use async_trait::async_trait;
@@ -91,10 +91,18 @@ impl<R: Rpc> TransactionBuilder for EpochManagerTransactions<R> {
9191
})
9292
.collect();
9393

94+
// Add items with short timeout (30 seconds) for processing
9495
for item in &work_items {
9596
let hash_str = bs58::encode(&item.queue_item_data.hash).into_string();
96-
cache.add(&hash_str);
97+
cache.add_with_timeout(&hash_str, Duration::from_secs(15));
98+
trace!("Added {} to cache with 15s timeout", hash_str);
9799
}
100+
101+
let work_item_hashes: Vec<String> = work_items
102+
.iter()
103+
.map(|item| bs58::encode(&item.queue_item_data.hash).into_string())
104+
.collect();
105+
98106
drop(cache);
99107

100108
if work_items.is_empty() {
@@ -143,6 +151,18 @@ impl<R: Rpc> TransactionBuilder for EpochManagerTransactions<R> {
143151
.await?;
144152
transactions.push(transaction);
145153
}
154+
155+
if !transactions.is_empty() {
156+
let mut cache = self.processed_hash_cache.lock().await;
157+
for hash_str in work_item_hashes {
158+
cache.extend_timeout(&hash_str, Duration::from_secs(30));
159+
trace!(
160+
"Extended cache timeout for {} to 30s after successful transaction creation",
161+
hash_str
162+
);
163+
}
164+
}
165+
146166
Ok((transactions, last_valid_block_height))
147167
}
148168
}

forester/src/queue_helpers.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,17 @@ pub async fn fetch_queue_item_data<R: Rpc>(
2525
queue_length: u16,
2626
) -> Result<Vec<QueueItemData>> {
2727
trace!("Fetching queue data for {:?}", queue_pubkey);
28-
let mut account = rpc.get_account(*queue_pubkey).await?.unwrap();
28+
let account = rpc.get_account(*queue_pubkey).await?;
29+
let mut account = match account {
30+
Some(acc) => acc,
31+
None => {
32+
tracing::warn!(
33+
"Queue account {} not found - may have been deleted or not yet created",
34+
queue_pubkey
35+
);
36+
return Ok(Vec::new());
37+
}
38+
};
2939
let queue: HashSet =
3040
unsafe { HashSet::from_bytes_copy(&mut account.data[8 + size_of::<QueueAccount>()..])? };
3141
let end_index = (start_index + processing_length).min(queue_length);

0 commit comments

Comments
 (0)