Appearance
Business Events - Kafka Connector
The Kafka connector publishes Fineract business events to a Kafka topic using Spring Kafka's KafkaTemplate. Messages are Avro-serialized MessageV1 envelopes. The connector is active when FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true.
See the Business Events overview for the pipeline architecture, message format, and event catalog.
Environment Variables
| Variable | Default | Description |
|---|---|---|
FINERACT_EXTERNAL_EVENTS_ENABLED | false | Master switch - must be true |
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED | false | Activate the Kafka connector |
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Comma-separated broker addresses |
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME | external-events | Topic to publish to |
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE | true | Auto-create the topic on startup if it does not exist |
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_PARTITIONS | 10 | Partition count (used when auto-creating) |
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS | 1 | Replication factor (used when auto-creating) |
FINERACT_EXTERNAL_EVENTS_KAFKA_TIMEOUT_IN_SECONDS | 10 | Producer acknowledgement timeout per batch |
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES | linger.ms=10|batch.size=16384 | Additional producer properties, pipe-separated key=value pairs |
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES_SEPARATOR | | | Separator character for extra properties |
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES_KEY_VALUE_SEPARATOR | = | Key-value separator for extra properties |
Minimal Configuration
bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME=fineract-eventsDocker Compose Example
yaml
services:
fineract:
image: apache/fineract:latest
environment:
FINERACT_EXTERNAL_EVENTS_ENABLED: "true"
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED: "true"
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME: "fineract-events"
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE: "true"
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_PARTITIONS: "10"
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS: "3"AWS MSK (IAM Authentication)
Pass the SASL/IAM properties through FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES:
bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=b-1.mycluster.kafka.eu-west-1.amazonaws.com:9098
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME=fineract-events
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=security.protocol=SASL_SSL|sasl.mechanism=AWS_MSK_IAM|sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;|sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandlerThe MSK IAM auth library must be on the classpath. For Docker deployments, drop the aws-msk-iam-auth-*.jar into /app/plugins/ (see Plugin JAR guide).
Confluent Cloud (SASL/SCRAM)
bash
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=pkc-xxxxx.eu-west-1.aws.confluent.cloud:9092
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=security.protocol=SASL_SSL|sasl.mechanism=PLAIN|sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="API_SECRET";Set FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false when using managed Kafka - create the topic in the Confluent Cloud console instead.
Topic Design Recommendations
Partitions: The default of 10 gives reasonable parallelism for most deployments. Kafka does not allow reducing the partition count after creation, so size generously upfront if you expect high throughput.
Retention: Set topic retention by time or size based on your consumer SLAs. If your consumer can be offline for hours, retain at least 24-48 hours. Use log.retention.hours or log.retention.bytes in the broker or per-topic config.
Replication: Use at least 3 replicas in production (FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS=3).
Separate topics per domain (optional): By default all events go to one topic. If you need independent scaling or retention per domain, set FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false, create multiple topics manually, and route using a consumer-side filter. Fineract itself publishes to a single topic.
Writing a Consumer
Add the fineract-avro-schemas dependency to decode event payloads:
xml
<dependency>
<groupId>org.apache.fineract</groupId>
<artifactId>fineract-avro-schemas</artifactId>
<version>1.10.0</version>
</dependency>Consumer configuration (application.yml):
yaml
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
group-id: my-integration-service
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
enable-auto-commit: false
listener:
ack-mode: MANUAL_IMMEDIATEDecoding the MessageV1 envelope and routing by event type:
java
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
import org.apache.fineract.avro.loan.v1.LoanTransactionDataV1;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class FineractEventConsumer {
@KafkaListener(topics = "fineract-events", groupId = "my-integration-service")
public void consume(byte[] payload, Acknowledgment ack) throws Exception {
MessageV1 message = decode(payload, MessageV1.class);
String eventType = message.getType().toString();
String tenantId = message.getTenantId().toString();
String idempotencyKey = message.getIdempotencyKey().toString();
byte[] data = message.getData().array();
// Guard against duplicates
if (alreadyProcessed(idempotencyKey)) {
ack.acknowledge();
return;
}
switch (eventType) {
case "LoanApprovedBusinessEvent":
case "LoanDisbursalBusinessEvent":
case "LoanCloseBusinessEvent":
handleLoanEvent(data, tenantId, eventType);
break;
case "LoanTransactionMakeRepaymentPostBusinessEvent":
handleRepayment(data, tenantId);
break;
case "ClientCreateBusinessEvent":
handleClientCreated(data, tenantId);
break;
default:
// log unhandled event type
}
markProcessed(idempotencyKey);
ack.acknowledge();
}
private void handleLoanEvent(byte[] data, String tenantId, String eventType) throws Exception {
LoanAccountDataV1 loan = decode(data, LoanAccountDataV1.class);
// loan.getId(), loan.getStatus(), loan.getPrincipalDisbursed(), etc.
}
private void handleRepayment(byte[] data, String tenantId) throws Exception {
LoanTransactionDataV1 tx = decode(data, LoanTransactionDataV1.class);
// tx.getId(), tx.getAmount(), tx.getDate(), etc.
}
private <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
SpecificDatumReader<T> reader = new SpecificDatumReader<>(clazz);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
return reader.read(null, decoder);
}
private boolean alreadyProcessed(String key) { /* check your idempotency store */ return false; }
private void markProcessed(String key) { /* store the key */ }
}Partition Key and Ordering
Fineract publishes each event with the entity's database ID (aggregateRootId) as the Kafka partition key. All events for the same loan, client, or savings account land on the same partition in creation order. Consumer instances processing the same partition see a single entity's events in sequence.
For consumers that need to project the full state of an entity (e.g. a loan ledger), assign one partition per consumer thread and process messages sequentially within each partition.
Tuning
The default extra properties linger.ms=10|batch.size=16384 are applied to the producer. These batch small bursts of events efficiently. Adjust via FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES:
bash
# Larger batches for high-throughput deployments
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=linger.ms=50|batch.size=65536|compression.type=lz4