Skip to main content

Kafka

Overview

The @opra/kafka package provides KafkaAdapter — a platform adapter that integrates an OPRA ApiDocument into a Kafka consumer setup. The adapter subscribes to topics derived from your operation channel declarations, decodes incoming messages against your schema, dispatches to your operation handlers, and manages consumer lifecycles — all driven by the type declarations you made in your schema.


Installation

npm install @opra/kafka

Setup

Create your ApiDocument with a transport: 'mq' and platform: 'kafka' API, pass it to KafkaAdapter, then call start().

import { KafkaAdapter } from '@opra/kafka';
import { ApiDocumentFactory } from '@opra/common';
import { OrdersController } from './api/orders.controller.js';

const document = await ApiDocumentFactory.createDocument({
info: { title: 'Order API', version: '1.0' },
api: {
transport: 'mq',
platform: 'kafka',
controllers: [OrdersController],
},
});

const adapter = new KafkaAdapter(document, {
client: {
bootstrapBrokers: ['localhost:9092'],
},
});

await adapter.start();

process.on('SIGTERM', async () => {
await adapter.close();
process.exit(0);
});

KafkaAdapter connects to the brokers, lists available topics, subscribes to matching channels, and starts dispatching messages — no manual consumer wiring needed.


Adapter Options (KafkaAdapter.Config)

OptionTypeDescription
clientClientOptionsKafka client connection options. Must include bootstrapBrokers.
consumersRecord<string, ConsumerOptions>Named consumer group configurations. Operations can reference a group by name.
defaults{ consumer?, subscribe? }Default consumer and subscription options applied to all operations unless overridden.
scopestringValidation scope applied during message decoding.
interceptors(InterceptorFunction | IKafkaInterceptor)[]Interceptor chain executed on every message.
logExtrabooleanLog additional diagnostic output from the Kafka client.

Defining Operations

Use @MQOperation() on a controller method to declare an operation. The channel property maps to one or more Kafka topics. Import @opra/kafka to unlock the .Kafka() chained decorator for per-operation consumer configuration.

import { MQController, MQOperation, ApiField, ComplexType } from '@opra/common';
import '@opra/kafka'; // augments MQOperationDecorator with .Kafka()

@ComplexType()
class OrderPayload {
@ApiField() declare orderId: string;
@ApiField() declare amount: number;
}

@MQController()
export class OrdersController {
@MQOperation({ channel: 'orders.created', type: OrderPayload })
async onOrderCreated(ctx: KafkaContext) {
const { payload, key, headers, topic, partition } = ctx;
console.log('Received order:', payload.orderId);
}
}

Channel patterns

The channel field accepts a string, a RegExp, or an array of either. Regex patterns are matched against the list of available topics fetched from the Kafka admin at startup.

@MQOperation({ channel: /^orders\..+/, type: OrderPayload })
async onAnyOrder(ctx: KafkaContext) { ... }

@MQOperation({ channel: ['orders.created', 'orders.updated'], type: OrderPayload })
async onOrderChange(ctx: KafkaContext) { ... }

Per-operation Kafka config (.Kafka())

Chain .Kafka() on @MQOperation() to override the consumer group or subscription options for a specific operation.

@(MQOperation({ channel: 'payments.processed', type: PaymentPayload })
.Kafka({
consumer: { groupId: 'payments-group', bootstrapBrokers: ['localhost:9092'] },
subscribe: { mode: 'latest' },
}))
async onPayment(ctx: KafkaContext) { ... }

You can also pass a resolver function for dynamic configuration:

@(MQOperation({ channel: 'orders.created', type: OrderPayload })
.Kafka(async () => {
const cfg = await loadConfig();
return { consumer: { groupId: cfg.groupId, bootstrapBrokers: cfg.brokers } };
}))
async onOrderCreated(ctx: KafkaContext) { ... }

KafkaContext

Every operation handler receives a KafkaContext as its first argument.

import { KafkaContext } from '@opra/kafka';

async onOrderCreated(ctx: KafkaContext) {
ctx.topic // Kafka topic name
ctx.partition // partition number
ctx.key // decoded message key
ctx.payload // decoded message payload
ctx.headers // decoded message headers as a plain object
ctx.rawMessage // raw Kafka message object
}
PropertyTypeDescription
topicstringThe topic the message arrived on.
partitionnumberThe partition number.
keyanyDecoded message key (validated against the operation's keyType).
payloadanyDecoded message payload (validated against the operation's type).
headersRecord<string, any>Decoded message headers.
rawMessageKafkaAdapter.MessageThe raw message object from the Kafka client.

Lifecycle

MethodDescription
adapter.initialize()Connects to brokers and creates all consumers. Called automatically by start().
adapter.start()Initializes consumers and begins subscribing to topics.
adapter.close(force?)Closes all consumers and resets the adapter to idle. Pass true to force-close.
adapter.statusCurrent status: 'idle' | 'starting' | 'started' | 'closing'.

Interceptors

Interceptors run before the operation handler on every incoming message. Configure them once on the adapter.

import { KafkaContext } from '@opra/kafka';

const adapter = new KafkaAdapter(document, {
client: { bootstrapBrokers: ['localhost:9092'] },
interceptors: [
// Function form
async (ctx: KafkaContext, next) => {
console.log('Received on topic:', ctx.topic);
await next();
},
],
});

Use the class form for interceptors that need shared state or dependency injection:

import { IKafkaInterceptor, KafkaContext } from '@opra/kafka';

class LoggingInterceptor implements IKafkaInterceptor {
async intercept(ctx: KafkaContext, next: () => Promise<any>) {
console.log(`[${ctx.topic}] partition=${ctx.partition}`);
await next();
}
}

const adapter = new KafkaAdapter(document, {
client: { bootstrapBrokers: ['localhost:9092'] },
interceptors: [new LoggingInterceptor()],
});

Error Handling

Throw any OpraException subclass from a handler — the adapter catches it, emits an error event, and logs it without crashing the consumer.

import { OpraException } from '@opra/common';

async onOrderCreated(ctx: KafkaContext) {
if (!ctx.payload.orderId) {
throw new OpraException('Missing orderId');
}
}

Listen to the adapter's error event to handle errors centrally:

adapter.on('error', (error, ctx) => {
console.error('Kafka error:', error.message, ctx?.topic);
});

Events

EventPayloadDescription
messageKafkaAdapter.MessageEmitted when a raw message is received, before decoding.
executeKafkaContextEmitted just before the operation handler is called.
finishKafkaContext, resultEmitted after the operation handler completes successfully.
errorError, KafkaContext | undefinedEmitted when an error occurs in a handler or interceptor.