Skip to content

Commit 40a5f8e

Browse files
feat: async queue monitor (helius-labs#297)
* fix: skip transactions with errors during parsing * clean up * feat: add queue hash chains caching * wip * cleanup * add v2 queue indexes and cleanup queue monitor * invalidate queues on insertion * feat: queue verification with parallel processing
1 parent 3a0c764 commit 40a5f8e

File tree

2 files changed

+293
-292
lines changed

2 files changed

+293
-292
lines changed

src/monitor/mod.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ pub fn continously_monitor_photon(
5959
let mut has_been_healthy = false;
6060
start_latest_slot_updater(rpc_client.clone()).await;
6161

62+
// Use interval timer to ensure fixed intervals regardless of execution time
63+
let mut interval = interval(Duration::from_millis(5000));
64+
6265
loop {
66+
interval.tick().await;
67+
6368
let latest_slot = LATEST_SLOT.load(Ordering::SeqCst);
6469
let last_indexed_slot = fetch_last_indexed_slot_with_infinite_retry(db.as_ref()).await;
6570
let lag = if latest_slot > last_indexed_slot {
@@ -79,23 +84,43 @@ pub fn continously_monitor_photon(
7984
error!("Indexing lag is too high: {}", lag);
8085
}
8186
} else {
82-
let tree_roots = load_db_tree_roots_with_infinite_retry(db.as_ref()).await;
83-
validate_tree_roots(rpc_client.as_ref(), tree_roots).await;
87+
let db_clone = db.clone();
88+
let rpc_clone = rpc_client.clone();
8489

90+
tokio::spawn(async move {
91+
let tree_roots =
92+
load_db_tree_roots_with_infinite_retry(db_clone.as_ref()).await;
93+
validate_tree_roots(rpc_clone.as_ref(), tree_roots).await;
94+
});
95+
96+
// Spawn parallel verification tasks for each V2 tree
8597
let v2_trees = queue_monitor::collect_v2_trees().await;
86-
if !v2_trees.is_empty() {
87-
if let Err(divergences) =
88-
queue_monitor::verify_queues(rpc_client.as_ref(), db.as_ref(), v2_trees)
98+
for (tree_pubkey, queue_type) in v2_trees {
99+
let db_clone = db.clone();
100+
let rpc_clone = rpc_client.clone();
101+
102+
tokio::spawn(async move {
103+
if let Err(divergences) = queue_monitor::verify_single_queue(
104+
rpc_clone.as_ref(),
105+
db_clone.as_ref(),
106+
tree_pubkey,
107+
queue_type,
108+
)
89109
.await
90110
{
111+
if !divergences.is_empty() {
112+
for divergence in &divergences {
113+
queue_monitor::log_divergence(divergence);
114+
}
91115
error!(
92-
"V2 queue verification failed with {} divergences",
93-
divergences.len()
116+
"Queue verification failed for tree {} type {:?} with {} divergences",
117+
tree_pubkey, queue_type, divergences.len()
94118
);
95119
}
96120
}
121+
});
122+
}
97123
}
98-
sleep(Duration::from_millis(5000)).await;
99124
}
100125
})
101126
}

0 commit comments

Comments
 (0)