Kafka Module
The @opra/nestjs-kafka package integrates the OPRA Kafka adapter into a NestJS application. It discovers your OPRA message queue controllers automatically from the NestJS provider tree, builds the ApiDocument, wires up the Kafka consumer lifecycle through NestJS hooks, and resolves class-based interceptors through the NestJS DI container — all transparently.
Installation
npm install @opra/nestjs-kafka @opra/kafka @opra/common
Setup
Import OpraKafkaModule in your root application module:
import { Module } from '@nestjs/common';
import { OpraKafkaModule } from '@opra/nestjs-kafka';
import { OrdersController } from './orders/orders.controller.js';
import { OrdersService } from './orders/orders.service.js';
import * as models from './models/models.js';
@Module({
imports: [
OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService],
name: 'OrderApi',
info: { title: 'Order API', version: '1.0' },
types: [...Object.values(models)],
client: {
bootstrapBrokers: ['localhost:9092'],
},
}),
],
})
export class AppModule {}
Controllers decorated with @MQController are discovered automatically from the NestJS provider tree — no explicit controllers list is needed in the module options.
Options
| Option | Type | Description |
|---|---|---|
name | string | API name. |
info | object | Document metadata — title, version, description. |
types | any[] | Data types to register (decorated classes, EnumType results). |
references | Record<string, ReferenceThunk> | Namespaced references to other ApiDocument instances or async thunks. |
client | ClientOptions | Kafka client connection options. Must include bootstrapBrokers. |
consumers | Record<string, ConsumerOptions> | Named consumer group configurations. |
defaults | { consumer?, subscribe? } | Default consumer and subscription options applied to all operations unless overridden. |
scope | string | Validation scope applied during message decoding. |
interceptors | (InterceptorFunction | IKafkaInterceptor | Type<IKafkaInterceptor>)[] | Interceptor chain executed on every message. Class types are resolved through the NestJS DI container. |
logger | Logger | Custom NestJS logger instance. Defaults to a logger named after the API. |
logExtra | boolean | Log additional diagnostic output from the Kafka client. |
imports | any[] | NestJS modules to import into the OPRA module context. |
providers | Provider[] | NestJS providers available for injection inside controllers. |
exports | any[] | Providers to export from the OPRA module to the rest of the application. |
global | boolean | Register the module as a NestJS global module. |
Async configuration
Use forRootAsync() when client, info, types, or other options depend on injected services:
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { OpraKafkaModule } from '@opra/nestjs-kafka';
import * as models from './models/models.js';
@Module({
imports: [
ConfigModule.forRoot(),
OpraKafkaModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
providers: [OrdersController, OrdersService],
useFactory: (config: ConfigService) => ({
name: 'OrderApi',
info: {
title: 'Order API',
version: config.get('API_VERSION'),
},
types: [...Object.values(models)],
client: {
bootstrapBrokers: config.get<string[]>('KAFKA_BROKERS'),
},
}),
}),
],
})
export class AppModule {}
How it works
OpraKafkaModule scans all NestJS providers in its module context for classes decorated with @MQController and builds the ApiDocument from them automatically. No explicit controller list is required.
Once the document is ready, the module creates a KafkaAdapter and manages its lifecycle through NestJS hooks:
onApplicationBootstrap— callsadapter.start(), which connects to brokers, fetches available topics, and begins consuming messages.onApplicationShutdown— callsadapter.close(true)to force-close all consumers gracefully.
Dependency injection in controllers
OPRA MQ controllers are registered as NestJS providers, so you can inject services directly into them:
import { MQController, MQOperation } from '@opra/common';
import { KafkaContext } from '@opra/kafka';
import { Injectable } from '@nestjs/common';
import { OrdersService } from './orders.service.js';
@Injectable()
@MQController()
export class OrdersController {
constructor(private readonly service: OrdersService) {}
@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: KafkaContext) {
await this.service.process(ctx.payload);
}
}
Register the controller as a NestJS provider in the module options:
OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService],
// ...
})
Class-based interceptors
Class-based interceptors passed to the interceptors option are resolved through the NestJS DI container, so they can have injected dependencies:
import { IKafkaInterceptor, KafkaContext } from '@opra/kafka';
import { Injectable } from '@nestjs/common';
import { MetricsService } from './metrics.service.js';
@Injectable()
class MetricsInterceptor implements IKafkaInterceptor {
constructor(private readonly metrics: MetricsService) {}
async intercept(ctx: KafkaContext, next: () => Promise<any>) {
const start = Date.now();
await next();
this.metrics.record(ctx.topic, Date.now() - start);
}
}
OpraKafkaModule.forRoot({
providers: [OrdersController, OrdersService, MetricsService, MetricsInterceptor],
interceptors: [MetricsInterceptor], // resolved via DI
// ...
})
Injecting the adapter
The KafkaAdapter instance is exported from the module and can be injected elsewhere in your application:
import { Injectable } from '@nestjs/common';
import { KafkaAdapter } from '@opra/kafka';
@Injectable()
export class HealthService {
constructor(private readonly adapter: KafkaAdapter) {}
isReady() {
return this.adapter.status === 'started';
}
}