/* istanbul ignore file */ import { Injectable, Logger } from '@nestjs/common'; import { KafkaJS } from '@confluentinc/kafka-javascript'; import { KafkaSettings } from '@shared/constants/config'; @Injectable() export class KafkaConsumerService { private kafka: KafkaJS.Kafka; private consumer: KafkaJS.Consumer; private readonly logger = new Logger(KafkaConsumerService.name); constructor() { this.kafka = new KafkaJS.Kafka({ kafkaJS: { clientId: KafkaSettings.CLIENT_ID, brokers: KafkaSettings.KAFKA_BROKER, }, }); this.consumer = this.kafka.consumer({ kafkaJS: { groupId: KafkaSettings.ConsumerConfiguration.groupId, autoCommit: false, }, }); } async subscribe(topic: string) { await this.consumer.connect(); this.logger.log('Kafka Consumer connected successfully.'); await this.consumer.subscribe({ topic }); this.logger.log(`Kafka Consumer subscribed to topic: ${topic}`); } async run(eachMessageHandler: (payload: KafkaJS.EachMessagePayload) => Promise) { await this.consumer.run({ eachMessage: async (payload) => { const { topic, partition } = payload; try { ///HERE IS THE PROCESSING LOGIC await eachMessageHandler(payload); } catch (error) { this.logger.error(`Error processing message from ${topic}[${partition}]`, error.stack); } finally { this.logger.debug(`Resumed partition ${partition} on topic ${topic}`); } }, }); } async commitOffset(topic: string, partition: number, offset?: string) { if (!offset) { this.logger.warn('Offset not found. Skipping manual commit.'); return; } const nextOffset = (BigInt(offset) + BigInt(1)).toString(); try { await this.consumer.commitOffsets([ { topic, partition, offset: nextOffset }, ]); this.logger.debug( `Manually committed offset ${nextOffset} for topic ${topic} partition ${partition}`, ); } catch (error) { this.logger.error('Failed to manually commit offset.', error.stack); } } async disconnect() { await this.consumer.disconnect(); this.logger.log('Kafka Consumer disconnected successfully.'); } }