Appearance
Business Events - ActiveMQ / JMS Connector
The JMS connector publishes Fineract business events to an Apache ActiveMQ destination using the JMSMultiExternalEventProducer. It supports both queue and topic delivery, parallel producers for throughput, and optional async send. Messages carry the same Avro MessageV1 payload as the Kafka connector.
See the Business Events overview for the pipeline architecture, message format, and event catalog.
Environment Variables
| Variable | Default | Description |
|---|---|---|
FINERACT_EXTERNAL_EVENTS_ENABLED | false | Master switch - must be true |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED | false | Activate the JMS connector |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL | tcp://127.0.0.1:61616 | ActiveMQ broker URL |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME | (empty) | Username if the broker requires authentication |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD | (empty) | Password if the broker requires authentication |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME | (empty) | Queue destination name - set this for point-to-point delivery |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME | (empty) | Topic destination name - set this for publish-subscribe delivery |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT | 1 | Number of parallel JMS producers (sessions) |
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED | false | Fire-and-forget send (no acknowledgement wait) |
FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE | 10 | Core thread pool size for JMS producer executor |
FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE | 100 | Max thread pool size for JMS producer executor |
Queue vs Topic
Set exactly one of FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME or FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME. Setting both or neither will cause a startup error.
- Queue (
ActiveMQQueue) - point-to-point. Each message is delivered to exactly one consumer. Use when a single downstream service processes all events. - Topic (
ActiveMQTopic) - publish-subscribe. Each message is delivered to all active subscribers. Use when multiple independent services each need their own copy of every event.
Minimal Configuration - Queue
bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME=fineract.eventsMinimal Configuration - Topic
bash
FINERACT_EXTERNAL_EVENTS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED=true
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_TOPIC_NAME=fineract.eventsDocker Compose Example
yaml
services:
activemq:
image: apache/activemq-classic:latest
ports:
- "61616:61616"
- "8161:8161" # Web console
fineract:
image: apache/fineract:latest
environment:
FINERACT_EXTERNAL_EVENTS_ENABLED: "true"
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED: "true"
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL: "tcp://activemq:61616"
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME: "fineract.events"
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT: "4"
depends_on:
- activemqAuthenticated Broker
bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=tcp://activemq:61616
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME=fineract
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD=changemeWhen both username and password are provided, the connector enables broker authentication on the ActiveMQConnectionFactory. If only one is set, authentication is not applied - set both or neither.
SSL / TLS Broker
ActiveMQ Classic supports SSL on port 61617. Use the ssl:// protocol prefix:
bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://activemq:61617For mutual TLS, pass keystore and truststore paths in the URL:
bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://activemq:61617?socket.keyStoreLocation=/app/certs/keystore.jks&socket.keyStorePassword=changeme&socket.trustStoreLocation=/app/certs/truststore.jks&socket.trustStorePassword=changemeParallel Producers
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT controls how many JMS sessions (producers) are created per send batch. The connector distributes events across producers using consistent hashing on the entity ID, preserving ordering per entity while parallelising across different entities.
A count of 1 is sufficient for low-to-moderate event volumes. Increase it (typically to 4-8) if the broker's throughput is a bottleneck and you observe growing TO_BE_SENT backlogs in m_external_event.
bash
# Higher throughput for busy deployments
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_PRODUCER_COUNT=8
FINERACT_EVENT_TASK_EXECUTOR_CORE_POOL_SIZE=16
FINERACT_EVENT_TASK_EXECUTOR_MAX_POOL_SIZE=32Async Send
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED=true sets useAsyncSend=true on the ActiveMQConnectionFactory. The producer does not wait for broker acknowledgement before returning. This improves throughput but means delivery is not confirmed synchronously - a broker crash after the send returns but before the broker persists the message can lose events.
Use async send only when event throughput is the priority and you have durable queues configured on the broker side (ActiveMQ persistent messaging).
Writing a Consumer
The JMS payload is a byte message containing the Avro-encoded MessageV1 envelope. Use a standard JMS MessageListener or Spring's @JmsListener:
Add the fineract-avro-schemas dependency:
xml
<dependency>
<groupId>org.apache.fineract</groupId>
<artifactId>fineract-avro-schemas</artifactId>
<version>1.10.0</version>
</dependency>Consumer configuration (application.yml):
yaml
spring:
activemq:
broker-url: tcp://activemq:61616
jms:
pub-sub-domain: false # false = queue, true = topicSpring JMS listener:
java
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.avro.loan.v1.LoanAccountDataV1;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class FineractJmsEventConsumer {
@JmsListener(destination = "fineract.events", concurrency = "4")
public void onMessage(Message message) throws JMSException, Exception {
if (!(message instanceof BytesMessage bytesMessage)) return;
byte[] payload = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(payload);
MessageV1 envelope = decode(payload, MessageV1.class);
String eventType = envelope.getType().toString();
String tenantId = envelope.getTenantId().toString();
String idempotencyKey = envelope.getIdempotencyKey().toString();
byte[] data = envelope.getData().array();
if (alreadyProcessed(idempotencyKey)) return;
switch (eventType) {
case "LoanApprovedBusinessEvent":
LoanAccountDataV1 loan = decode(data, LoanAccountDataV1.class);
// handle loan approval
break;
case "SavingsDepositBusinessEvent":
// decode with SavingsAccountTransactionDataV1
break;
}
markProcessed(idempotencyKey);
}
private <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
SpecificDatumReader<T> reader = new SpecificDatumReader<>(clazz);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
return reader.read(null, decoder);
}
private boolean alreadyProcessed(String key) { return false; }
private void markProcessed(String key) {}
}For topic subscriptions, set spring.jms.pub-sub-domain: true and use durable subscriptions if your consumer needs to receive messages published while it was offline:
java
@JmsListener(destination = "fineract.events", subscription = "my-service-sub", concurrency = "1")Idempotency
ActiveMQ does not provide built-in deduplication. Always implement consumer-side idempotency using the idempotencyKey from the MessageV1 envelope. Store processed keys in a database or cache and skip re-processing on duplicates. This is especially important when using async send or when the consumer restarts mid-processing.
Amazon MQ
Amazon MQ for ActiveMQ uses the same wire protocol. Replace the broker URL with your Amazon MQ endpoint:
bash
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_URL=ssl://b-xxxx.mq.eu-west-1.amazonaws.com:61617
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_USERNAME=fineractuser
FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_BROKER_PASSWORD=changemeAmazon MQ endpoints use SSL on port 61617 by default.