public class BatchedMessageListenerContainer extends org.springframework.jms.listener.DefaultMessageListenerContainer
DefaultMessageListenerContainer
which batches multiple message reads into each single transaction. Batching message reads
typically provides significant enhancement to the throughput of message reads in message queue environments.
To use this class you must inject a transaction manager via AbstractPollingMessageListenerContainer.setTransactionManager(org.springframework.transaction.PlatformTransactionManager)
.
The class performs a blocking read for the first message read in any transaction. The blocking duration is determined by AbstractPollingMessageListenerContainer.setReceiveTimeout(long)
.
Subsequent messages up to the configured setMaxMessagesPerTransaction(int)
batch limit (which defaults to 150) are performed as non-blocking reads, with the bach completing as soon as the
message queue cannot provide further messages.
Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to
design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example,
delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the AbstractMessageDriven
POJO is also provided which provides the
basic framework for implementing a MessageListener
that is aligned with this contract.
The class contains an optional feature called RetryMitigation which is enabled by default. RetryMitigation prevents further messages being read in a batch if any message is identified as being one that is redelivered. When retryMitigation is enabled, any failure in processing will also trigger a pessimistic message mode. Once in pessimistic message mode messages are read one message at a time. This mode remains in place until either the number of messages in a batch multiplied by the number of concurrent consumers have been read since the last failure, or the queue cannot provide further messages (i.e. is empty). The aim of this feature is to reduce the likelihood of messages reaching the redelivery limit due to a bad message in the batch.
You can also configure the class to conclude any batch when a redelivered message is encountered (again the default behaviour). This feature complements RetryMitigation.
NB. Due to the design and structure of Spring's DefaultMessageListenerContainer
and its superclasses, implementing this class must by necessity duplicate certain parts of
DefaultMessageListenerContainer
. Consequently, this class has been managed at a source code level as a derivative of DefaultMessageListenerContainer
and copyright messages and
attributions reflect this.
DefaultMessageListenerContainer
. The class was modified, extended and renamed to enable batching by Chris Pheby.Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_BATCH_SIZE
Default number of messages to read per transaction
|
CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE, CACHE_SESSION, DEFAULT_RECOVERY_INTERVAL, DEFAULT_THREAD_NAME_PREFIX
DEFAULT_RECEIVE_TIMEOUT
lifecycleMonitor, sharedConnectionMonitor
Constructor and Description |
---|
BatchedMessageListenerContainer()
Create a new instance
|
Modifier and Type | Method and Description |
---|---|
protected void |
commitIfNecessary(javax.jms.Session session,
List<javax.jms.Message> messages)
Variant of
AbstractMessageListenerContainer.commitIfNecessary(Session, Message) that performs the activity for a batch of messages. |
protected void |
doExecuteListener(javax.jms.Session session,
List<javax.jms.Message> messages)
A batched variant of
AbstractMessageListenerContainer.doExecuteListener(Session, Message) . |
protected boolean |
doReceiveAndExecute(Object invoker,
javax.jms.Session session,
javax.jms.MessageConsumer consumer,
org.springframework.transaction.TransactionStatus status) |
int |
getMaxMessagesPerTransaction()
Get the configured maximum number of messages per transaction
|
boolean |
isConcludeBatchOnRedeliveredMessage()
True if seeing a redelivered message will conclude the current batch
|
boolean |
isRetryMitigation()
True if the instance attempts to mitigate the problems arising when messages in a batch
are all retried when a poison message is encountered
|
protected javax.jms.Message |
receiveMessageNoWait(javax.jms.MessageConsumer consumer)
This is the
BatchedMessageListenerContainer 's equivalent to AbstractPollingMessageListenerContainer.receiveMessage(javax.jms.MessageConsumer) . |
void |
setConcludeBatchOnRedeliveredMessage(boolean concludeBatchOnRedeliveredMessage)
Enables or disables the ConcludeBatchOnRedeliveredMessage functionality
|
void |
setMaxMessagesPerTransaction(int maxMessagesPerTransaction)
Configures the maximum number of messages that can be read in a single transaction
|
void |
setRetryMitigation(boolean retryMitigation)
Enables or disables the RetryMitigation functionality
|
protected void |
validateConfiguration() |
applyBackOffTime, createDefaultTaskExecutor, doInitialize, doRescheduleTask, doShutdown, establishSharedConnection, getActiveConsumerCount, getCacheLevel, getConcurrentConsumers, getIdleConsumerLimit, getIdleTaskExecutionLimit, getMaxConcurrentConsumers, getMaxMessagesPerTask, getScheduledConsumerCount, handleListenerSetupFailure, initialize, isRecovering, isRegisteredWithDestination, messageReceived, noMessageReceived, recoverAfterListenerSetupFailure, refreshConnectionUntilSuccessful, refreshDestination, scheduleNewInvokerIfAppropriate, setBackOff, setCacheLevel, setCacheLevelName, setConcurrency, setConcurrentConsumers, setIdleConsumerLimit, setIdleTaskExecutionLimit, setMaxConcurrentConsumers, setMaxMessagesPerTask, setRecoveryInterval, setTaskExecutor, sharedConnectionEnabled, start, startSharedConnection, stop, stopSharedConnection
createListenerConsumer, getConnection, getReceiveTimeout, getSession, getTransactionManager, isSessionLocallyTransacted, receiveAndExecute, receiveMessage, setReceiveTimeout, setSessionTransacted, setTransactionManager, setTransactionName, setTransactionTimeout, shouldCommitAfterNoMessageReceived
checkMessageListener, commitIfNecessary, createConsumer, doExecuteListener, doInvokeListener, doInvokeListener, executeListener, getDefaultSubscriptionName, getDestination, getDestinationDescription, getDestinationName, getDurableSubscriptionName, getErrorHandler, getExceptionListener, getMessageConverter, getMessageListener, getMessageSelector, getReplyQosSettings, getSubscriptionName, handleListenerException, invokeErrorHandler, invokeExceptionListener, invokeListener, isAcceptMessagesWhileStopping, isExposeListenerSession, isPubSubNoLocal, isReplyPubSubDomain, isSubscriptionDurable, isSubscriptionShared, rollbackIfNecessary, rollbackOnExceptionIfNecessary, setAcceptMessagesWhileStopping, setDestination, setDestinationName, setDurableSubscriptionName, setErrorHandler, setExceptionListener, setExposeListenerSession, setMessageConverter, setMessageListener, setMessageSelector, setPubSubNoLocal, setReplyPubSubDomain, setReplyQosSettings, setSubscriptionDurable, setSubscriptionName, setSubscriptionShared, setupMessageListener
afterPropertiesSet, createSharedConnection, destroy, doStart, doStop, getBeanName, getClientId, getPausedTaskCount, getPhase, getSharedConnection, isActive, isAutoStartup, isRunning, logRejectedTask, prepareSharedConnection, refreshSharedConnection, rescheduleTaskIfNecessary, resumePausedTasks, runningAllowed, setAutoStartup, setBeanName, setClientId, setPhase, shutdown, stop
getDestinationResolver, isPubSubDomain, receiveFromConsumer, resolveDestinationName, setDestinationResolver, setPubSubDomain
convertJmsAccessException, createConnection, createSession, getConnectionFactory, getSessionAcknowledgeMode, isClientAcknowledge, isSessionTransacted, obtainConnectionFactory, setConnectionFactory, setSessionAcknowledgeMode, setSessionAcknowledgeModeName
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
public static final int DEFAULT_BATCH_SIZE
public BatchedMessageListenerContainer()
public void setMaxMessagesPerTransaction(int maxMessagesPerTransaction)
maxMessagesPerTransaction
- The requested maximum number of messages per transactionpublic int getMaxMessagesPerTransaction()
public boolean isRetryMitigation()
public void setRetryMitigation(boolean retryMitigation)
retryMitigation
- True if RetryMitigation is to be enabled, false otherwisepublic boolean isConcludeBatchOnRedeliveredMessage()
public void setConcludeBatchOnRedeliveredMessage(boolean concludeBatchOnRedeliveredMessage)
concludeBatchOnRedeliveredMessage
- True if ConcludeBatchOnRedeliveredMessage is to be enabled, false otherwiseprotected boolean doReceiveAndExecute(Object invoker, javax.jms.Session session, javax.jms.MessageConsumer consumer, org.springframework.transaction.TransactionStatus status) throws javax.jms.JMSException
doReceiveAndExecute
in class org.springframework.jms.listener.AbstractPollingMessageListenerContainer
javax.jms.JMSException
protected void doExecuteListener(javax.jms.Session session, List<javax.jms.Message> messages) throws javax.jms.JMSException
AbstractMessageListenerContainer.doExecuteListener(Session, Message)
.session
- The sessionmessages
- A list of messagesjavax.jms.JMSException
- Indicates a problem during processingprotected void commitIfNecessary(javax.jms.Session session, List<javax.jms.Message> messages) throws javax.jms.JMSException
AbstractMessageListenerContainer.commitIfNecessary(Session, Message)
that performs the activity for a batch of messages.session
- the JMS Session to commitmessages
- the messages to acknowledgejavax.jms.JMSException
- in case of commit failureprotected void validateConfiguration()
validateConfiguration
in class org.springframework.jms.listener.AbstractMessageListenerContainer
protected javax.jms.Message receiveMessageNoWait(javax.jms.MessageConsumer consumer) throws javax.jms.JMSException
BatchedMessageListenerContainer
's equivalent to AbstractPollingMessageListenerContainer.receiveMessage(javax.jms.MessageConsumer)
. Does not block if no message is available.consumer
- The MessageConsumer to usejavax.jms.JMSException
- Indicates a problem occurredCopyright © 2010–2018 Jadira Systems. All rights reserved.