Skip to content

Business Events - Kafka Connector

The Kafka connector publishes Fineract business events to a Kafka topic using Spring Kafka's KafkaTemplate. Messages are Avro-serialized MessageV1 envelopes. The connector is active when FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true.

See the Business Events overview for the pipeline architecture, message format, and event catalog.

Environment Variables

VariableDefaultDescription
FINERACT_EXTERNAL_EVENTS_ENABLEDfalseMaster switch - must be true
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLEDfalseActivate the Kafka connector
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Comma-separated broker addresses
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAMEexternal-eventsTopic to publish to
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATEtrueAuto-create the topic on startup if it does not exist
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_PARTITIONS10Partition count (used when auto-creating)
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS1Replication factor (used when auto-creating)
FINERACT_EXTERNAL_EVENTS_KAFKA_TIMEOUT_IN_SECONDS10Producer acknowledgement timeout per batch
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIESlinger.ms=10|batch.size=16384Additional producer properties, pipe-separated key=value pairs
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES_SEPARATOR|Separator character for extra properties
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES_KEY_VALUE_SEPARATOR=Key-value separator for extra properties

Minimal Configuration

bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME=fineract-events

Docker Compose Example

yaml
services:
  fineract:
    image: apache/fineract:latest
    environment:
      FINERACT_EXTERNAL_EVENTS_ENABLED: "true"
      FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED: "true"
      FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
      FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME: "fineract-events"
      FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE: "true"
      FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_PARTITIONS: "10"
      FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS: "3"

AWS MSK (IAM Authentication)

Pass the SASL/IAM properties through FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES:

bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_ENABLED=true
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=b-1.mycluster.kafka.eu-west-1.amazonaws.com:9098
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_NAME=fineract-events
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=security.protocol=SASL_SSL|sasl.mechanism=AWS_MSK_IAM|sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;|sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

The MSK IAM auth library must be on the classpath. For Docker deployments, drop the aws-msk-iam-auth-*.jar into /app/plugins/ (see Plugin JAR guide).

Confluent Cloud (SASL/SCRAM)

bash
FINERACT_EXTERNAL_EVENTS_KAFKA_BOOTSTRAP_SERVERS=pkc-xxxxx.eu-west-1.aws.confluent.cloud:9092
FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=security.protocol=SASL_SSL|sasl.mechanism=PLAIN|sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="API_SECRET";

Set FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false when using managed Kafka - create the topic in the Confluent Cloud console instead.

Topic Design Recommendations

Partitions: The default of 10 gives reasonable parallelism for most deployments. Kafka does not allow reducing the partition count after creation, so size generously upfront if you expect high throughput.

Retention: Set topic retention by time or size based on your consumer SLAs. If your consumer can be offline for hours, retain at least 24-48 hours. Use log.retention.hours or log.retention.bytes in the broker or per-topic config.

Replication: Use at least 3 replicas in production (FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_REPLICAS=3).

Separate topics per domain (optional): By default all events go to one topic. If you need independent scaling or retention per domain, set FINERACT_EXTERNAL_EVENTS_KAFKA_TOPIC_AUTO_CREATE=false, create multiple topics manually, and route using a consumer-side filter. Fineract itself publishes to a single topic.

Writing a Consumer

Add the fineract-avro-schemas dependency to decode event payloads:

xml
<dependency>
    <groupId>org.apache.fineract</groupId>
    <artifactId>fineract-avro-schemas</artifactId>
    <version>1.10.0</version>
</dependency>

Consumer configuration (application.yml):

yaml
spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: my-integration-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      enable-auto-commit: false
    listener:
      ack-mode: MANUAL_IMMEDIATE

Decoding the MessageV1 envelope and routing by event type:

java
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
import org.apache.fineract.avro.loan.v1.LoanTransactionDataV1;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class FineractEventConsumer {

    @KafkaListener(topics = "fineract-events", groupId = "my-integration-service")
    public void consume(byte[] payload, Acknowledgment ack) throws Exception {
        MessageV1 message = decode(payload, MessageV1.class);

        String eventType      = message.getType().toString();
        String tenantId       = message.getTenantId().toString();
        String idempotencyKey = message.getIdempotencyKey().toString();
        byte[] data           = message.getData().array();

        // Guard against duplicates
        if (alreadyProcessed(idempotencyKey)) {
            ack.acknowledge();
            return;
        }

        switch (eventType) {
            case "LoanApprovedBusinessEvent":
            case "LoanDisbursalBusinessEvent":
            case "LoanCloseBusinessEvent":
                handleLoanEvent(data, tenantId, eventType);
                break;

            case "LoanTransactionMakeRepaymentPostBusinessEvent":
                handleRepayment(data, tenantId);
                break;

            case "ClientCreateBusinessEvent":
                handleClientCreated(data, tenantId);
                break;

            default:
                // log unhandled event type
        }

        markProcessed(idempotencyKey);
        ack.acknowledge();
    }

    private void handleLoanEvent(byte[] data, String tenantId, String eventType) throws Exception {
        LoanAccountDataV1 loan = decode(data, LoanAccountDataV1.class);
        // loan.getId(), loan.getStatus(), loan.getPrincipalDisbursed(), etc.
    }

    private void handleRepayment(byte[] data, String tenantId) throws Exception {
        LoanTransactionDataV1 tx = decode(data, LoanTransactionDataV1.class);
        // tx.getId(), tx.getAmount(), tx.getDate(), etc.
    }

    private <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
        SpecificDatumReader<T> reader = new SpecificDatumReader<>(clazz);
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
        return reader.read(null, decoder);
    }

    private boolean alreadyProcessed(String key) { /* check your idempotency store */ return false; }
    private void markProcessed(String key) { /* store the key */ }
}

Partition Key and Ordering

Fineract publishes each event with the entity's database ID (aggregateRootId) as the Kafka partition key. All events for the same loan, client, or savings account land on the same partition in creation order. Consumer instances processing the same partition see a single entity's events in sequence.

For consumers that need to project the full state of an entity (e.g. a loan ledger), assign one partition per consumer thread and process messages sequentially within each partition.

Tuning

The default extra properties linger.ms=10|batch.size=16384 are applied to the producer. These batch small bursts of events efficiently. Adjust via FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES:

bash
# Larger batches for high-throughput deployments
FINERACT_EXTERNAL_EVENTS_KAFKA_PRODUCER_EXTRA_PROPERTIES=linger.ms=50|batch.size=65536|compression.type=lz4