Skip to content

Commit eb42b41

Browse files
committed
fix(universalpubsub): fix pg driver
1 parent 94c5a4e commit eb42b41

File tree

22 files changed

+502
-293
lines changed

22 files changed

+502
-293
lines changed

docker/dev-host/grafana/dashboards/cache.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@
11701170
"timepicker": {},
11711171
"timezone": "browser",
11721172
"title": "Rivet Guard",
1173-
"uid": "cen785ige8fswd",
1173+
"uid": "cen785ige8fswd2",
11741174
"version": 1,
11751175
"weekStart": ""
1176-
}
1176+
}

docker/dev-multidc-multinode/core/grafana/dashboards/cache.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@
11701170
"timepicker": {},
11711171
"timezone": "browser",
11721172
"title": "Rivet Guard",
1173-
"uid": "cen785ige8fswd",
1173+
"uid": "cen785ige8fswd2",
11741174
"version": 1,
11751175
"weekStart": ""
1176-
}
1176+
}

docker/dev-multidc/core/grafana/dashboards/cache.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@
11701170
"timepicker": {},
11711171
"timezone": "browser",
11721172
"title": "Rivet Guard",
1173-
"uid": "cen785ige8fswd",
1173+
"uid": "cen785ige8fswd2",
11741174
"version": 1,
11751175
"weekStart": ""
1176-
}
1176+
}

docker/dev-multinode/grafana/dashboards/cache.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@
11701170
"timepicker": {},
11711171
"timezone": "browser",
11721172
"title": "Rivet Guard",
1173-
"uid": "cen785ige8fswd",
1173+
"uid": "cen785ige8fswd2",
11741174
"version": 1,
11751175
"weekStart": ""
1176-
}
1176+
}

docker/dev/grafana/dashboards/cache.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@
11701170
"timepicker": {},
11711171
"timezone": "browser",
11721172
"title": "Rivet Guard",
1173-
"uid": "cen785ige8fswd",
1173+
"uid": "cen785ige8fswd2",
11741174
"version": 1,
11751175
"weekStart": ""
1176-
}
1176+
}

packages/common/config/src/config/mod.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,14 @@ impl Root {
187187
}
188188

189189
pub fn validate_and_set_defaults(&mut self) -> Result<()> {
190-
// TODO: Add back
191-
//// Set default pubsub to Postgres if configured for database
192-
//if self.pubsub.is_none()
193-
// && let Some(Database::Postgres(pg)) = &self.database
194-
//{
195-
// self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {
196-
// url: pg.url.clone(),
197-
// }));
198-
//}
190+
// Set default pubsub to Postgres if configured for database
191+
if self.pubsub.is_none()
192+
&& let Some(Database::Postgres(pg)) = &self.database
193+
{
194+
self.pubsub = Some(PubSub::PostgresNotify(pubsub::Postgres {
195+
url: pg.url.clone(),
196+
}));
197+
}
199198

200199
Ok(())
201200
}

packages/common/config/src/config/pegboard_tunnel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::net::IpAddr;
33
use schemars::JsonSchema;
44
use serde::{Deserialize, Serialize};
55

6-
/// The tunnel service that forwards tunnel-protocol messages between NATS and WebSocket connections.
6+
/// The tunnel service that forwards tunnel-protocol messages between pubsub and WebSocket connections.
77
#[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)]
88
#[serde(deny_unknown_fields)]
99
pub struct PegboardTunnel {

packages/common/config/src/config/pubsub.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ pub struct Postgres {
2323
///
2424
/// See: https://docs.rs/postgres/0.19.10/postgres/config/struct.Config.html#url
2525
pub url: Secret<String>,
26+
#[serde(default = "default_mem_opt")]
27+
pub memory_optimization: bool,
2628
}
2729

2830
impl Default for Postgres {
2931
fn default() -> Self {
3032
Self {
3133
url: Secret::new("postgresql://postgres:postgres@127.0.0.1:5432/postgres".into()),
34+
memory_optimization: true,
3235
}
3336
}
3437
}
@@ -81,3 +84,7 @@ impl Memory {
8184
"default".to_string()
8285
}
8386
}
87+
88+
fn default_mem_opt() -> bool {
89+
true
90+
}

packages/common/gasoline/core/src/ctx/message.rs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ use universalpubsub::{NextOutput, Subscriber};
1212

1313
use crate::{
1414
error::{WorkflowError, WorkflowResult},
15-
message::{Message, NatsMessage, NatsMessageWrapper},
15+
message::{Message, PubsubMessage, PubsubMessageWrapper},
1616
utils::{self, tags::AsTags},
1717
};
1818

1919
#[derive(Clone)]
2020
pub struct MessageCtx {
21-
/// The connection used to communicate with NATS.
22-
nats: UpsPool,
21+
/// The connection used to communicate with pubsub.
22+
pubsub: UpsPool,
2323

2424
ray_id: Id,
2525

@@ -35,7 +35,7 @@ impl MessageCtx {
3535
ray_id: Id,
3636
) -> WorkflowResult<Self> {
3737
Ok(MessageCtx {
38-
nats: pools.ups().map_err(WorkflowError::PoolsGeneric)?,
38+
pubsub: pools.ups().map_err(WorkflowError::PoolsGeneric)?,
3939
ray_id,
4040
config: config.clone(),
4141
})
@@ -44,7 +44,7 @@ impl MessageCtx {
4444

4545
// MARK: Publishing messages
4646
impl MessageCtx {
47-
/// Publishes a message to NATS and to a durable message stream if a topic is
47+
/// Publishes a message to pubsub and to a durable message stream if a topic is
4848
/// set.
4949
///
5050
/// Use `subscribe` to consume these messages ephemerally and `tail` to read
@@ -94,7 +94,7 @@ impl MessageCtx {
9494
where
9595
M: Message,
9696
{
97-
let nats_subject = M::nats_subject();
97+
let subject = M::subject();
9898
let duration_since_epoch = std::time::SystemTime::now()
9999
.duration_since(std::time::UNIX_EPOCH)
100100
.unwrap_or_else(|err| unreachable!("time is broken: {}", err));
@@ -109,7 +109,7 @@ impl MessageCtx {
109109

110110
// Serialize message
111111
let req_id = Id::new_v1(self.config.dc_label());
112-
let message = NatsMessageWrapper {
112+
let message = PubsubMessageWrapper {
113113
req_id,
114114
ray_id: self.ray_id,
115115
tags: tags.as_tags()?,
@@ -119,7 +119,7 @@ impl MessageCtx {
119119
let message_buf = serde_json::to_vec(&message).map_err(WorkflowError::SerializeMessage)?;
120120

121121
tracing::debug!(
122-
%nats_subject,
122+
%subject,
123123
body_bytes = ?body_buf_len,
124124
message_bytes = ?message_buf.len(),
125125
"publish message"
@@ -128,15 +128,15 @@ impl MessageCtx {
128128
// It's important to write to the stream as fast as possible in order to
129129
// ensure messages are handled quickly.
130130
let message_buf = Arc::new(message_buf);
131-
self.message_publish_nats::<M>(&nats_subject, message_buf)
131+
self.message_publish_pubsub::<M>(&subject, message_buf)
132132
.await;
133133

134134
Ok(())
135135
}
136136

137-
/// Publishes the message to NATS.
137+
/// Publishes the message to pubsub.
138138
#[tracing::instrument(level = "debug", skip_all)]
139-
async fn message_publish_nats<M>(&self, nats_subject: &str, message_buf: Arc<Vec<u8>>)
139+
async fn message_publish_pubsub<M>(&self, subject: &str, message_buf: Arc<Vec<u8>>)
140140
where
141141
M: Message,
142142
{
@@ -146,19 +146,19 @@ impl MessageCtx {
146146
// Ignore for infinite backoff
147147
backoff.tick().await;
148148

149-
let nats_subject = nats_subject.to_owned();
149+
let subject = subject.to_owned();
150150

151151
tracing::trace!(
152-
%nats_subject,
152+
%subject,
153153
message_len = message_buf.len(),
154-
"publishing message to nats"
154+
"publishing message to pubsub"
155155
);
156-
if let Err(err) = self.nats.publish(&nats_subject, &(*message_buf)).await {
156+
if let Err(err) = self.pubsub.publish(&subject, &(*message_buf)).await {
157157
tracing::warn!(?err, "publish message failed, trying again");
158158
continue;
159159
}
160160

161-
tracing::debug!("publish nats message succeeded");
161+
tracing::debug!("publish pubsub message succeeded");
162162
break;
163163
}
164164
}
@@ -172,21 +172,21 @@ impl MessageCtx {
172172

173173
// MARK: Subscriptions
174174
impl MessageCtx {
175-
/// Listens for gasoline messages globally on NATS.
175+
/// Listens for gasoline messages globally on pubsub.
176176
#[tracing::instrument(skip_all, fields(message = M::NAME))]
177177
pub async fn subscribe<M>(&self, tags: impl AsTags) -> WorkflowResult<SubscriptionHandle<M>>
178178
where
179179
M: Message,
180180
{
181181
self.subscribe_opt::<M>(SubscribeOpts {
182182
tags: tags.as_tags()?,
183-
flush_nats: true,
183+
flush: true,
184184
})
185185
.in_current_span()
186186
.await
187187
}
188188

189-
/// Listens for gasoline messages globally on NATS.
189+
/// Listens for gasoline messages globally on pubsub.
190190
#[tracing::instrument(skip_all, fields(message = M::NAME))]
191191
pub async fn subscribe_opt<M>(
192192
&self,
@@ -195,32 +195,32 @@ impl MessageCtx {
195195
where
196196
M: Message,
197197
{
198-
let nats_subject = M::nats_subject();
198+
let subject = M::subject();
199199

200200
// Create subscription and flush immediately.
201-
tracing::debug!(%nats_subject, tags = ?opts.tags, "creating subscription");
201+
tracing::debug!(%subject, tags = ?opts.tags, "creating subscription");
202202
let subscription = self
203-
.nats
204-
.subscribe(&nats_subject)
203+
.pubsub
204+
.subscribe(&subject)
205205
.await
206206
.map_err(|x| WorkflowError::CreateSubscription(x.into()))?;
207-
if opts.flush_nats {
208-
self.nats
207+
if opts.flush {
208+
self.pubsub
209209
.flush()
210210
.await
211-
.map_err(|x| WorkflowError::FlushNats(x.into()))?;
211+
.map_err(|x| WorkflowError::FlushPubsub(x.into()))?;
212212
}
213213

214214
// Return handle
215-
let subscription = SubscriptionHandle::new(nats_subject, subscription, opts.tags.clone());
215+
let subscription = SubscriptionHandle::new(subject, subscription, opts.tags.clone());
216216
Ok(subscription)
217217
}
218218
}
219219

220220
#[derive(Debug)]
221221
pub struct SubscribeOpts {
222222
pub tags: serde_json::Value,
223-
pub flush_nats: bool,
223+
pub flush: bool,
224224
}
225225

226226
/// Used to receive messages from other contexts.
@@ -291,15 +291,15 @@ where
291291
///
292292
/// This future can be safely dropped.
293293
#[tracing::instrument(name="message_next", skip_all, fields(message = M::NAME))]
294-
pub async fn next(&mut self) -> WorkflowResult<NatsMessage<M>> {
294+
pub async fn next(&mut self) -> WorkflowResult<PubsubMessage<M>> {
295295
tracing::debug!("waiting for message");
296296

297297
loop {
298298
// Poll the subscription.
299299
//
300300
// Use blocking threads instead of `try_next`, since I'm not sure
301301
// try_next works as intended.
302-
let nats_message = match self.subscription.next().await {
302+
let message = match self.subscription.next().await {
303303
Ok(NextOutput::Message(msg)) => msg,
304304
Ok(NextOutput::Unsubscribed) => {
305305
tracing::debug!("unsubscribed");
@@ -311,11 +311,11 @@ where
311311
}
312312
};
313313

314-
let message_wrapper = NatsMessage::<M>::deserialize_wrapper(&nats_message.payload)?;
314+
let message_wrapper = PubsubMessage::<M>::deserialize_wrapper(&message.payload)?;
315315

316316
// Check if the subscription tags match a subset of the message tags
317317
if utils::is_value_subset(&self.tags, &message_wrapper.tags) {
318-
let message = NatsMessage::<M>::deserialize_from_wrapper(message_wrapper)?;
318+
let message = PubsubMessage::<M>::deserialize_from_wrapper(message_wrapper)?;
319319
tracing::debug!(?message, "received message");
320320

321321
return Ok(message);
@@ -326,7 +326,7 @@ where
326326
}
327327

328328
/// Converts the subscription in to a stream.
329-
pub fn into_stream(self) -> impl futures_util::Stream<Item = WorkflowResult<NatsMessage<M>>> {
329+
pub fn into_stream(self) -> impl futures_util::Stream<Item = WorkflowResult<PubsubMessage<M>>> {
330330
futures_util::stream::try_unfold(self, |mut sub| {
331331
async move {
332332
let message = sub.next().await?;

packages/common/gasoline/core/src/db/kv/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Implementation of a workflow database driver with UniversalDB and NATS.
1+
//! Implementation of a workflow database driver with UniversalDB and UniversalPubSub.
22
// TODO: Move code to smaller functions for readability
33

44
use std::{
@@ -43,7 +43,7 @@ mod keys;
4343
const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30);
4444
/// How long before overwriting an existing metrics lock.
4545
const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30);
46-
/// For NATS wake mechanism.
46+
/// For pubsub wake mechanism.
4747
const WORKER_WAKE_SUBJECT: &str = "gasoline.worker.wake";
4848

4949
pub struct DatabaseKv {
@@ -52,17 +52,17 @@ pub struct DatabaseKv {
5252
}
5353

5454
impl DatabaseKv {
55-
/// Spawns a new thread and publishes a worker wake message to nats.
55+
/// Spawns a new thread and publishes a worker wake message to pubsub.
5656
fn wake_worker(&self) {
57-
let Ok(nats) = self.pools.ups() else {
58-
tracing::debug!("failed to acquire nats pool");
57+
let Ok(pubsub) = self.pools.ups() else {
58+
tracing::debug!("failed to acquire pubsub pool");
5959
return;
6060
};
6161

6262
let spawn_res = tokio::task::Builder::new().name("wake").spawn(
6363
async move {
6464
// Fail gracefully
65-
if let Err(err) = nats.publish(WORKER_WAKE_SUBJECT, &Vec::new()).await {
65+
if let Err(err) = pubsub.publish(WORKER_WAKE_SUBJECT, &Vec::new()).await {
6666
tracing::warn!(?err, "failed to publish wake message");
6767
}
6868
}

0 commit comments

Comments
 (0)