Skip to content

Commit f295d7c

Browse files
authored
Merge pull request #9 from topcoder-platform/feat/kafka-integration
Feat/kafka integration
2 parents 4743bfb + 3ca9d86 commit f295d7c

11 files changed

+755
-2
lines changed

.env.sample

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
POSTGRES_SCHEMA="public"
22
DATABASE_URL="postgresql://johndoe:randompassword@localhost:5432/mydb?schema=${POSTGRES_SCHEMA}"
33

4+
# Kafka Configuration
5+
KAFKA_BROKERS=localhost:9092
6+
KAFKA_CLIENT_ID=tc-review-api
7+
KAFKA_GROUP_ID=tc-review-consumer-group
8+
KAFKA_SSL_ENABLED=false
9+
10+
# SASL Configuration (optional - uncomment if needed)
11+
# KAFKA_SASL_MECHANISM=plain
12+
# KAFKA_SASL_USERNAME=
13+
# KAFKA_SASL_PASSWORD=
14+
15+
# Consumer Configuration
16+
KAFKA_SESSION_TIMEOUT=30000
17+
KAFKA_HEARTBEAT_INTERVAL=3000
18+
KAFKA_MAX_WAIT_TIME=5000
19+
KAFKA_CONNECTION_TIMEOUT=10000
20+
KAFKA_REQUEST_TIMEOUT=30000
21+
22+
# Retry Configuration
23+
KAFKA_RETRY_ATTEMPTS=5
24+
KAFKA_INITIAL_RETRY_TIME=100
25+
KAFKA_MAX_RETRY_TIME=30000
26+
27+
# Dead Letter Queue Configuration
28+
KAFKA_DLQ_ENABLED=true
29+
KAFKA_DLQ_TOPIC_SUFFIX=.dlq
30+
KAFKA_DLQ_MAX_RETRIES=3
31+
432
# API configs
533
BUS_API_URL="https://api.topcoder-dev.com/v5/bus/events"
634
CHALLENGE_API_URL="https://api.topcoder-dev.com/v5/challenges/"

KAFKA_SETUP.md

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# Kafka Development Setup
2+
3+
This document describes how to set up and test the Kafka consumer functionality in the TC Review API.
4+
5+
## Quick Start
6+
7+
### 1. Start Kafka Services
8+
9+
```bash
10+
# Start Kafka and related services
11+
docker compose -f docker-compose.kafka.yml up -d
12+
13+
# Verify services are running
14+
docker compose -f docker-compose.kafka.yml ps
15+
```
16+
17+
This will start:
18+
- **Zookeeper** on port 2181
19+
- **Kafka** on port 9092
20+
- **Kafka UI** on port 8080 (web interface)
21+
22+
### 2. Configure Environment
23+
24+
```bash
25+
# Copy the sample environment file
26+
cp .env.sample .env
27+
28+
# Update the .env file with your database and other configurations
29+
# Kafka settings are pre-configured for local development
30+
```
31+
32+
### 3. Start the Application
33+
34+
```bash
35+
# Install dependencies
36+
pnpm install
37+
38+
# Start in development mode
39+
pnpm run start:dev
40+
```
41+
42+
The application will automatically:
43+
- Connect to Kafka on startup
44+
- Subscribe to registered topics
45+
- Start consuming messages
46+
47+
## Testing Kafka Events
48+
49+
### Using Kafka UI (Recommended)
50+
51+
1. Open http://localhost:8080 in your browser
52+
2. Navigate to Topics
53+
3. Create or select the `avscan.action.scan` topic
54+
4. Produce a test message with JSON payload:
55+
```json
56+
{
57+
"scanId": "test-123",
58+
"submissionId": "sub-456",
59+
"status": "initiated",
60+
"timestamp": "2025-01-01T12:00:00Z"
61+
}
62+
```
63+
64+
### Using Command Line
65+
66+
```bash
67+
# Create a topic (optional - auto-created)
68+
docker exec -it kafka kafka-topics --create --topic avscan.action.scan --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
69+
70+
# Produce a test message
71+
docker exec -it kafka kafka-console-producer --topic avscan.action.scan --bootstrap-server localhost:9092
72+
# Then type your JSON message and press Enter
73+
74+
# Consume messages (for debugging)
75+
docker exec -it kafka kafka-console-consumer --topic avscan.action.scan --from-beginning --bootstrap-server localhost:9092
76+
```
77+
78+
## Development Workflow
79+
80+
### Adding New Event Handlers
81+
82+
1. Create a new handler class extending `BaseEventHandler`:
83+
```typescript
84+
@Injectable()
85+
export class MyCustomHandler extends BaseEventHandler implements OnModuleInit {
86+
private readonly topic = 'my.custom.topic';
87+
88+
constructor(private readonly handlerRegistry: KafkaHandlerRegistry) {
89+
super(LoggerService.forRoot('MyCustomHandler'));
90+
}
91+
92+
onModuleInit() {
93+
this.handlerRegistry.registerHandler(this.topic, this);
94+
}
95+
96+
getTopic(): string {
97+
return this.topic;
98+
}
99+
100+
async handle(message: any): Promise<void> {
101+
// Your custom logic here
102+
}
103+
}
104+
```
105+
106+
2. Register the handler in the KafkaModule providers array
107+
3. The handler will automatically be registered and start consuming messages
108+
109+
### Dead Letter Queue (DLQ) Support
110+
111+
The application includes a robust Dead Letter Queue implementation for handling message processing failures:
112+
113+
1. **Configuration**:
114+
```
115+
# DLQ Configuration in .env
116+
KAFKA_DLQ_ENABLED=true
117+
KAFKA_DLQ_TOPIC_SUFFIX=.dlq
118+
KAFKA_DLQ_MAX_RETRIES=3
119+
```
120+
121+
2. **Retry Mechanism**:
122+
- Failed messages are automatically retried up to the configured maximum number of retries
123+
- Retry count is tracked per message using a unique key based on topic, partition, and offset
124+
- Exponential backoff is applied between retries
125+
126+
3. **DLQ Processing**:
127+
- After exhausting retries, messages are sent to a DLQ topic (original topic name + configured suffix)
128+
- DLQ messages include:
129+
- Original message content
130+
- Error information
131+
- Original topic, partition, and offset
132+
- Timestamp of failure
133+
- Original message headers
134+
135+
4. **Monitoring DLQ**:
136+
- Use Kafka UI to monitor DLQ topics (they follow the pattern `<original-topic>.dlq`)
137+
- Check application logs for messages with "Message sent to DLQ" or "Failed to send message to DLQ"
138+
139+
### Monitoring and Debugging
140+
141+
- **Application Logs**: Check console output for Kafka connection status and message processing
142+
- **Kafka UI**: Monitor topics, partitions, and consumer groups at http://localhost:8080
143+
- **Health Checks**: Kafka connection status is included in application health checks
144+
145+
### Environment Variables
146+
147+
All Kafka-related environment variables are documented in `.env.sample`:
148+
149+
- `KAFKA_BROKERS`: Comma-separated list of Kafka brokers
150+
- `KAFKA_CLIENT_ID`: Unique client identifier
151+
- `KAFKA_GROUP_ID`: Consumer group ID
152+
- `KAFKA_SSL_ENABLED`: Enable SSL encryption
153+
- Connection timeouts and retry configurations
154+
- **DLQ Configuration**:
155+
- `KAFKA_DLQ_ENABLED`: Enable/disable the Dead Letter Queue feature
156+
- `KAFKA_DLQ_TOPIC_SUFFIX`: Suffix to append to original topic name for DLQ topics
157+
- `KAFKA_DLQ_MAX_RETRIES`: Maximum number of retries before sending to DLQ
158+
159+
## Troubleshooting
160+
161+
### Common Issues
162+
163+
1. **Connection Refused**: Ensure Kafka is running with `docker compose -f docker-compose.kafka.yml ps`
164+
2. **Topic Not Found**: Topics are auto-created by default, or create manually using Kafka UI
165+
3. **Consumer Group Issues**: Check consumer group status in Kafka UI under "Consumers"
166+
4. **DLQ Topics Missing**: DLQ topics are created automatically when the first message is sent to them
167+
168+
### Cleanup
169+
170+
```bash
171+
# Stop and remove Kafka services
172+
docker compose -f docker-compose.kafka.yml down
173+
174+
# Remove volumes (clears all Kafka data)
175+
docker compose -f docker-compose.kafka.yml down -v
176+
```
177+
178+
## Production Considerations
179+
180+
- Configure SSL/TLS and SASL authentication for production environments
181+
- Set appropriate retention policies for topics
182+
- Monitor consumer lag and processing metrics
183+
- Ensure DLQ topics have appropriate retention policies (longer than source topics)
184+
- Set up alerts for:
185+
- Messages in DLQ topics
186+
- High retry rates
187+
- Consumer failures
188+
- Implement a process for reviewing and potentially reprocessing DLQ messages

docker-compose.kafka.yml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
services:
2+
zookeeper:
3+
image: confluentinc/cp-zookeeper:7.4.0
4+
hostname: zookeeper
5+
container_name: zookeeper
6+
ports:
7+
- "2181:2181"
8+
environment:
9+
ZOOKEEPER_CLIENT_PORT: 2181
10+
ZOOKEEPER_TICK_TIME: 2000
11+
networks:
12+
- kafka-network
13+
14+
kafka:
15+
image: confluentinc/cp-kafka:7.4.0
16+
hostname: kafka
17+
container_name: kafka
18+
depends_on:
19+
- zookeeper
20+
ports:
21+
- "9092:9092"
22+
- "9101:9101"
23+
environment:
24+
KAFKA_BROKER_ID: 1
25+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
26+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
27+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
28+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
29+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
30+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
31+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
32+
KAFKA_JMX_PORT: 9101
33+
KAFKA_JMX_HOSTNAME: localhost
34+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
35+
networks:
36+
- kafka-network
37+
38+
kafka-ui:
39+
image: provectuslabs/kafka-ui:latest
40+
container_name: kafka-ui
41+
depends_on:
42+
- kafka
43+
ports:
44+
- "8080:8080"
45+
environment:
46+
KAFKA_CLUSTERS_0_NAME: local
47+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
48+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
49+
networks:
50+
- kafka-network
51+
52+
networks:
53+
kafka-network:
54+
driver: bridge

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
"nanoid": "~5.1.2",
4040
"reflect-metadata": "^0.2.2",
4141
"rxjs": "^7.8.1",
42-
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1"
42+
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1",
43+
"kafkajs": "^2.2.4"
4344
},
4445
"devDependencies": {
4546
"@eslint/eslintrc": "^3.2.0",

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/shared/modules/global/globalProviders.module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import { M2MService } from './m2m.service';
1010
import { ChallengeApiService } from './challenge.service';
1111
import { EventBusService } from './eventBus.service';
1212
import { MemberService } from './member.service';
13+
import { KafkaModule } from '../kafka/kafka.module';
1314

1415
// Global module for providing global providers
1516
// Add any provider you want to be global here
1617
@Global()
1718
@Module({
18-
imports: [HttpModule],
19+
imports: [HttpModule, KafkaModule.forRoot()],
1920
providers: [
2021
{
2122
provide: APP_GUARD,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { LoggerService } from '../global/logger.service';
2+
3+
export abstract class BaseEventHandler {
4+
protected logger: LoggerService;
5+
6+
constructor(logger: LoggerService) {
7+
this.logger = logger;
8+
}
9+
10+
abstract handle(message: any): Promise<void>;
11+
abstract getTopic(): string;
12+
13+
protected logMessage(message: any): void {
14+
this.logger.log({
15+
message: 'Processing Kafka message',
16+
topic: this.getTopic(),
17+
payload: message,
18+
});
19+
}
20+
21+
protected validateMessage(message: any): boolean {
22+
return message !== null && message !== undefined;
23+
}
24+
}

0 commit comments

Comments
 (0)