Skip to content

Business Events - ActiveMQ / JMS Connector

The JMS connector publishes Fineract business events to an Apache ActiveMQ destination using the JMSMultiExternalEventProducer. It supports both queue and topic delivery, parallel producers for throughput, and optional async send. Messages carry the same Avro MessageV1 payload as the Kafka connector.

See the Business Events overview for the pipeline architecture, message format, and event catalog.

Environment Variables

VariableDefaultDescription
FINERACT_EXTERNAL_EVENTS_ENABLEDfalseMaster switch - must be true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLEDfalseActivate the JMS connector
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URLtcp://127.0.0.1:61616ActiveMQ broker URL
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME(empty)Username if the broker requires authentication
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD(empty)Password if the broker requires authentication
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME(empty)Queue destination name - set this for point-to-point delivery
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME(empty)Topic destination name - set this for publish-subscribe delivery
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT1Number of parallel JMS producers (sessions)
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLEDfalseFire-and-forget send (no acknowledgement wait)
FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE10Core thread pool size for JMS producer executor
FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE100Max thread pool size for JMS producer executor

Queue vs Topic

Set exactly one of FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME or FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME. Setting both or neither will cause a startup error.

  • Queue (ActiveMQQueue) - point-to-point. Each message is delivered to exactly one consumer. Use when a single downstream service processes all events.
  • Topic (ActiveMQTopic) - publish-subscribe. Each message is delivered to all active subscribers. Use when multiple independent services each need their own copy of every event.

Minimal Configuration - Queue

bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME=fineract.events

Minimal Configuration - Topic

bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME=fineract.events

Docker Compose Example

yaml
services:
  activemq:
    image: apache/activemq-classic:latest
    ports:
      - "61616:61616"
      - "8161:8161"   # Web console

  fineract:
    image: apache/fineract:latest
    environment:
      FINERACT_EXTERNAL_EVENTS_ENABLED: "true"
      FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED: "true"
      FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL: "tcp://activemq:61616"
      FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME: "fineract.events"
      FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT: "4"
    depends_on:
      - activemq

Authenticated Broker

bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME=fineract
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD=changeme

When both username and password are provided, the connector enables broker authentication on the ActiveMQConnectionFactory. If only one is set, authentication is not applied - set both or neither.

SSL / TLS Broker

ActiveMQ Classic supports SSL on port 61617. Use the ssl:// protocol prefix:

bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://activemq:61617

For mutual TLS, pass keystore and truststore paths in the URL:

bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://activemq:61617?socket.keyStoreLocation=/app/certs/keystore.jks&socket.keyStorePassword=changeme&socket.trustStoreLocation=/app/certs/truststore.jks&socket.trustStorePassword=changeme

Parallel Producers

FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT controls how many JMS sessions (producers) are created per send batch. The connector distributes events across producers using consistent hashing on the entity ID, preserving ordering per entity while parallelising across different entities.

A count of 1 is sufficient for low-to-moderate event volumes. Increase it (typically to 4-8) if the broker's throughput is a bottleneck and you observe growing TO_BE_SENT backlogs in m_external_event.

bash
# Higher throughput for busy deployments
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT=8
FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE=16
FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE=32

Async Send

FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED=true sets useAsyncSend=true on the ActiveMQConnectionFactory. The producer does not wait for broker acknowledgement before returning. This improves throughput but means delivery is not confirmed synchronously - a broker crash after the send returns but before the broker persists the message can lose events.

Use async send only when event throughput is the priority and you have durable queues configured on the broker side (ActiveMQ persistent messaging).

Writing a Consumer

The JMS payload is a byte message containing the Avro-encoded MessageV1 envelope. Use a standard JMS MessageListener or Spring's @JmsListener:

Add the fineract-avro-schemas dependency:

xml
<dependency>
    <groupId>org.apache.fineract</groupId>
    <artifactId>fineract-avro-schemas</artifactId>
    <version>1.10.0</version>
</dependency>

Consumer configuration (application.yml):

yaml
spring:
  activemq:
    broker-url: tcp://activemq:61616
  jms:
    pub-sub-domain: false   # false = queue, true = topic

Spring JMS listener:

java
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

@Service
public class FineractJmsEventConsumer {

    @JmsListener(destination = "fineract.events", concurrency = "4")
    public void onMessage(Message message) throws JMSException, Exception {
        if (!(message instanceof BytesMessage bytesMessage)) return;

        byte[] payload = new byte[(int) bytesMessage.getBodyLength()];
        bytesMessage.readBytes(payload);

        MessageV1 envelope = decode(payload, MessageV1.class);

        String eventType      = envelope.getType().toString();
        String tenantId       = envelope.getTenantId().toString();
        String idempotencyKey = envelope.getIdempotencyKey().toString();
        byte[] data           = envelope.getData().array();

        if (alreadyProcessed(idempotencyKey)) return;

        switch (eventType) {
            case "LoanApprovedBusinessEvent":
                LoanAccountDataV1 loan = decode(data, LoanAccountDataV1.class);
                // handle loan approval
                break;
            case "SavingsDepositBusinessEvent":
                // decode with SavingsAccountTransactionDataV1
                break;
        }

        markProcessed(idempotencyKey);
    }

    private <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
        SpecificDatumReader<T> reader = new SpecificDatumReader<>(clazz);
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
        return reader.read(null, decoder);
    }

    private boolean alreadyProcessed(String key) { return false; }
    private void markProcessed(String key) {}
}

For topic subscriptions, set spring.jms.pub-sub-domain: true and use durable subscriptions if your consumer needs to receive messages published while it was offline:

java
@JmsListener(destination = "fineract.events", subscription = "my-service-sub", concurrency = "1")

Idempotency

ActiveMQ does not provide built-in deduplication. Always implement consumer-side idempotency using the idempotencyKey from the MessageV1 envelope. Store processed keys in a database or cache and skip re-processing on duplicates. This is especially important when using async send or when the consumer restarts mid-processing.

Amazon MQ

Amazon MQ for ActiveMQ uses the same wire protocol. Replace the broker URL with your Amazon MQ endpoint:

bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://b-xxxx.mq.eu-west-1.amazonaws.com:61617
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME=fineractuser
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD=changeme

Amazon MQ endpoints use SSL on port 61617 by default.