Skip to content

Commit ff7add5

Browse files
Moved reddit lib into jobs folder + added jobs & web3career libs.
1 parent a6d62da commit ff7add5

File tree

3 files changed

+241
-39
lines changed

3 files changed

+241
-39
lines changed

src/lib/jobs/jobs.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { PrismaClient } from '@prisma/client'
2+
const prisma = new PrismaClient()
3+
4+
/**
5+
* Input structure for creating or updating a job record.
6+
*/
7+
export type JobInput = {
8+
title: string
9+
company: string
10+
author?: string
11+
location?: string
12+
url: string
13+
postedAt?: Date | string
14+
description?: string
15+
isRemote?: boolean
16+
tags?: string[]
17+
metadata?: Record<string, string>
18+
source: {
19+
name: string
20+
externalId?: string
21+
rawUrl?: string
22+
data?: any
23+
}
24+
}
25+
26+
/**
27+
* Creates or updates a job record along with its tags, metadata, and source.
28+
*/
29+
export async function upsertJob(input: JobInput) {
30+
// Upsert the company record if present
31+
let companyRecord = null
32+
if (input.company) {
33+
companyRecord = await prisma.company.upsert({
34+
where: { name: input.company },
35+
create: { name: input.company },
36+
update: {},
37+
})
38+
}
39+
40+
// Upsert the job by its unique URL
41+
const job = await prisma.job.upsert({
42+
where: { url: input.url },
43+
update: {
44+
title: input.title,
45+
author: input.author,
46+
location: input.location,
47+
postedAt: input.postedAt ? new Date(input.postedAt) : undefined,
48+
description: input.description,
49+
isRemote: input.isRemote,
50+
company: companyRecord ? { connect: { id: companyRecord.id } } : undefined,
51+
updatedAt: new Date(),
52+
},
53+
create: {
54+
title: input.title,
55+
author: input.author,
56+
location: input.location,
57+
postedAt: input.postedAt ? new Date(input.postedAt) : undefined,
58+
description: input.description,
59+
isRemote: input.isRemote,
60+
url: input.url,
61+
company: companyRecord ? { connect: { id: companyRecord.id } } : undefined,
62+
},
63+
})
64+
65+
// Upsert tags (many-to-many via JobTag)
66+
if (input.tags && input.tags.length) {
67+
for (const tagName of input.tags) {
68+
const tag = await prisma.tag.upsert({
69+
where: { name: tagName },
70+
update: {},
71+
create: { name: tagName },
72+
})
73+
await prisma.jobTag.upsert({
74+
where: { jobId_tagId: { jobId: job.id, tagId: tag.id } },
75+
update: {},
76+
create: { jobId: job.id, tagId: tag.id },
77+
})
78+
}
79+
}
80+
81+
// Upsert metadata (requires a compound unique constraint in the model)
82+
// schema.prisma (JobMetadata) must have: @@unique([jobId, name])
83+
if (input.metadata) {
84+
for (const [name, value] of Object.entries(input.metadata)) {
85+
await prisma.jobMetadata.upsert({
86+
where: { jobId_name: { jobId: job.id, name } },
87+
update: { value },
88+
create: { jobId: job.id, name, value },
89+
})
90+
}
91+
}
92+
93+
// Upsert job source (also requires a compound unique constraint for source + externalId)
94+
await prisma.jobSource.upsert({
95+
where: {
96+
source_externalId: {
97+
source: input.source.name,
98+
externalId: input.source.externalId || '',
99+
},
100+
},
101+
update: {
102+
rawUrl: input.source.rawUrl,
103+
data: input.source.data,
104+
jobId: job.id,
105+
},
106+
create: {
107+
source: input.source.name,
108+
externalId: input.source.externalId,
109+
rawUrl: input.source.rawUrl,
110+
data: input.source.data,
111+
jobId: job.id,
112+
},
113+
})
114+
115+
return job
116+
}

src/lib/reddit.ts renamed to src/lib/jobs/reddit.ts

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import axios from 'axios'
44
import Snoowrap from 'snoowrap'
55
import { CommentStream } from 'snoostorm'
6-
import { sendNotification } from './notifications'
6+
import { sendNotification } from '../notifications'
77
import prisma from '@/lib/db'
8+
import { logger } from '@/lib/logger'
9+
import { upsertJob } from './jobs'
810

911
// Initialize Reddit API client with environment variables
1012
export const redditClient = new Snoowrap({
@@ -35,7 +37,7 @@ function isPrivateMessage(
3537
*/
3638
export async function storeMessages(items: Array<Snoowrap.PrivateMessage | Snoowrap.Comment>) {
3739
const newMessages = []
38-
console.debug(`Processing ${items.length} message(s)`)
40+
logger.debug(`Processing ${items.length} message(s)`)
3941

4042
const subscriptions = await prisma.subscription.findMany()
4143

@@ -49,7 +51,7 @@ export async function storeMessages(items: Array<Snoowrap.PrivateMessage | Snoow
4951
})
5052

5153
if (existing) {
52-
console.debug(`Skipping duplicate ${type} [${item.name}]`)
54+
//logger.debug(`Skipping duplicate ${type} [${item.name}]`)
5355
continue
5456
}
5557

@@ -83,10 +85,10 @@ export async function storeMessages(items: Array<Snoowrap.PrivateMessage | Snoow
8385
)
8486
await Promise.all(notificationPromises)
8587

86-
console.debug(`Stored new ${type} [${createdMsg.redditId}] from /u/${createdMsg.author}`)
88+
logger.debug(`Stored new ${type} [${createdMsg.redditId}] from /u/${createdMsg.author}`)
8789
newMessages.push(createdMsg)
8890
} catch (error: any) {
89-
console.error(`Error processing message ${item.name}:`, error.message)
91+
logger.error(`Error processing message ${item.name}:`, error.message)
9092
}
9193
}
9294

@@ -99,17 +101,17 @@ export async function storeMessages(items: Array<Snoowrap.PrivateMessage | Snoow
99101
*/
100102
export async function checkRedditMessages() {
101103
try {
102-
console.debug('Fetching Reddit inbox...')
104+
logger.debug('Fetching Reddit inbox...')
103105

104106
const [commentReplies, messages] = await Promise.all([
105107
redditClient.getInbox({ filter: 'comments' }),
106108
redditClient.getInbox({ filter: 'messages' }),
107109
])
108110

109-
console.debug(`Found ${commentReplies.length} comment(s), ${messages.length} message(s)`)
111+
logger.debug(`Found ${commentReplies.length} comment(s), ${messages.length} message(s)`)
110112
return await storeMessages([...commentReplies, ...messages])
111113
} catch (error: any) {
112-
console.error('Reddit API Error:', error.message, error.stack)
114+
logger.error('Reddit API Error:', error.message, error.stack)
113115
throw new Error(`Failed to fetch messages: ${error.message}`)
114116
}
115117
}
@@ -119,9 +121,9 @@ export async function checkRedditMessages() {
119121
* @returns A promise that resolves to an array of unread messages.
120122
*/
121123
export const getUnreadMessages = async () => {
122-
console.debug('Fetching unread messages...')
124+
logger.debug('Fetching unread messages...')
123125
const messages = await redditClient.getUnreadMessages({ limit: 25 })
124-
console.debug(`Found ${messages.length} unread message(s)`)
126+
//logger.debug(`Found ${messages.length} unread message(s)`)
125127
return messages
126128
}
127129

@@ -131,10 +133,10 @@ export const getUnreadMessages = async () => {
131133
*/
132134
export const markMessageRead = (messageId: string) => {
133135
try {
134-
console.debug(`Marking message ${messageId} as read...`)
136+
logger.debug(`Marking message ${messageId} as read...`)
135137
return redditClient.getMessage(messageId).markAsRead()
136138
} catch (error: any) {
137-
console.error(`Error marking message ${messageId} as read:`, error.message)
139+
logger.error(`Error marking message ${messageId} as read:`, error.message)
138140
}
139141
}
140142

@@ -145,11 +147,11 @@ export const markMessageRead = (messageId: string) => {
145147
*/
146148
export const fetchRedditPosts = async (subreddits: string[]) => {
147149
const allPosts = []
148-
console.debug(`Fetching posts from ${subreddits.length} subreddit(s)`)
150+
logger.debug(`Fetching posts from ${subreddits.length} subreddit(s)`)
149151

150152
for (const subreddit of subreddits) {
151153
try {
152-
console.debug(`Fetching /r/${subreddit}...`)
154+
logger.debug(`Fetching /r/${subreddit}...`)
153155
const response = await axios.get(`https://www.reddit.com/r/${subreddit}/new.json?limit=10`, {
154156
timeout: 5000,
155157
})
@@ -168,46 +170,61 @@ export const fetchRedditPosts = async (subreddits: string[]) => {
168170
downvotes: child.data.downs,
169171
}))
170172

171-
console.debug(`Found ${subredditPosts.length} post(s) in /r/${subreddit}`)
173+
logger.debug(`Found ${subredditPosts.length} post(s) in /r/${subreddit}`)
172174
allPosts.push(...subredditPosts)
173175
} catch (error: any) {
174-
console.error(`Error fetching /r/${subreddit}:`, error.message)
176+
logger.error(`Error fetching /r/${subreddit}:`, error.message)
175177
}
176178
}
177179

178180
return allPosts
179181
}
180182

181183
/**
182-
* Stores an array of Reddit posts in the database, preventing duplicates
183-
* and sending notifications for new items.
184+
* Stores an array of Reddit posts in the unified jobs table using the upsertJob function.
185+
* This replaces the old storePosts function to use the new unified jobs architecture.
184186
* @param posts Array of post objects from `fetchRedditPosts`.
185-
* @returns A promise that resolves to an array of newly created database records.
187+
* @returns A promise that resolves to an array of newly created/updated job records.
186188
*/
187-
export async function storePosts(posts: Array<any>) {
188-
const newPosts = []
189-
console.debug(`Processing ${posts.length} post(s)`)
189+
export async function storeRedditJobPosts(posts: Array<any>) {
190+
const newJobs = []
191+
logger.debug(`Processing ${posts.length} Reddit post(s) for jobs table`)
190192

191193
const subscriptions = await prisma.subscription.findMany()
192194

193195
for (const post of posts) {
194196
try {
195-
const existing = await prisma.redditPost.findUnique({
196-
where: { url: post.url },
197-
})
198-
199-
if (existing) {
200-
console.debug(`Skipping duplicate post [${post.url}]`)
201-
continue
197+
const jobInput = {
198+
title: post.title,
199+
company: '', // Reddit posts don't have company info
200+
author: post.author,
201+
location: '', // Reddit posts don't have structured location
202+
url: post.url,
203+
postedAt: post.postedAt,
204+
description: post.body || '',
205+
isRemote: null, // Can't determine from Reddit posts
206+
tags: [post.subreddit], // Use subreddit as a tag
207+
metadata: {
208+
subreddit: post.subreddit,
209+
bodyHtml: post.bodyHtml || '',
210+
upvotes: post.upvotes ? String(post.upvotes) : '0',
211+
downvotes: post.downvotes ? String(post.downvotes) : '0',
212+
},
213+
source: {
214+
name: 'reddit',
215+
externalId: post.url, // Use URL as external ID since Reddit doesn't provide a better ID
216+
rawUrl: post.url,
217+
data: post,
218+
},
202219
}
203220

204-
// This will now work correctly because the `post` object has the `postedAt` field.
205-
const createdPost = await prisma.redditPost.create({ data: post })
206-
221+
const upsertedJob = await upsertJob(jobInput)
222+
223+
// Send notifications for new jobs
207224
const notificationPayload = {
208-
title: `${createdPost.title} (${createdPost.subreddit})`,
209-
body: `Posted by /u/${createdPost.author}`,
210-
url: createdPost.url,
225+
title: `${post.title} (${post.subreddit})`,
226+
body: `Posted by /u/${post.author}`,
227+
url: post.url,
211228
icon: 'https://new.codebuilder.org/images/logo2.png',
212229
badge: 'https://new.codebuilder.org/images/logo2.png',
213230
}
@@ -217,12 +234,12 @@ export async function storePosts(posts: Array<any>) {
217234
)
218235
await Promise.all(notificationPromises)
219236

220-
console.debug(`Stored new post [${createdPost.url}] from /u/${createdPost.author}`)
221-
newPosts.push(createdPost)
237+
logger.debug(`Stored new job [${post.url}] from /u/${post.author}`)
238+
newJobs.push(upsertedJob)
222239
} catch (error: any) {
223-
console.error(`Error processing post ${post.url}:`, error)
240+
logger.error(`Error processing Reddit post ${post.url}:`, error)
224241
}
225242
}
226243

227-
return newPosts
244+
return newJobs
228245
}

src/lib/jobs/web3career.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { upsertJob } from './jobs'
2+
import { logger } from '@/lib/logger'
3+
4+
// API URL/config encapsulated here
5+
const WEB3CAREER_API_URL = 'https://web3.career/api/v1?token=Rg9PrsGP96Z2GB6T9tNZ1AzHzriQEwxa'
6+
7+
/**
8+
* Fetches job listings from Web3Career API.
9+
*/
10+
export async function fetchWeb3CareerJobs() {
11+
try {
12+
const response = await fetch(WEB3CAREER_API_URL)
13+
if (!response.ok)
14+
throw new Error(`Web3Career API request failed with status: ${response.status}`)
15+
const data = await response.json()
16+
const jobsArray = Array.isArray(data[2]) ? data[2] : []
17+
logger.info('Web3Career jobs fetched:', jobsArray.length)
18+
return jobsArray
19+
} catch (error: any) {
20+
logger.error('Error fetching Web3Career jobs:', error.message)
21+
throw error
22+
}
23+
}
24+
25+
/**
26+
* Stores a list of Web3Career jobs in the database.
27+
*/
28+
export async function storeWeb3CareerJobs(jobs: any[]) {
29+
const newJobs = []
30+
for (const job of jobs) {
31+
try {
32+
const jobInput = {
33+
title: job.title,
34+
company: job.company,
35+
author: '', // Web3Career does not provide author
36+
location: job.location,
37+
url: job.apply_url,
38+
postedAt: job.date,
39+
description: job.description,
40+
isRemote: !!job.is_remote,
41+
tags: Array.isArray(job.tags) ? job.tags : [],
42+
metadata: {
43+
country: job.country || '',
44+
city: job.city || '',
45+
date_epoch: job.date_epoch ? String(job.date_epoch) : '',
46+
},
47+
source: {
48+
name: 'web3career',
49+
externalId: job.id ? String(job.id) : undefined,
50+
rawUrl: job.apply_url,
51+
data: job,
52+
},
53+
}
54+
const upserted = await upsertJob(jobInput)
55+
newJobs.push(upserted)
56+
} catch (error: any) {
57+
logger.error('Error storing Web3Career job:', error.message)
58+
}
59+
}
60+
return newJobs
61+
}
62+
63+
/**
64+
* Fetches and stores jobs from Web3Career.
65+
*/
66+
export async function fetchAndStoreWeb3CareerJobs() {
67+
const jobs = await fetchWeb3CareerJobs()
68+
await storeWeb3CareerJobs(jobs)
69+
}

0 commit comments

Comments
 (0)