001/*
002 * Copyright 2002-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.jadira.jms.container;
017
018import java.util.ArrayList;
019import java.util.List;
020
021import javax.jms.Connection;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageConsumer;
025import javax.jms.MessageListener;
026import javax.jms.Session;
027
028import org.jadira.jms.mdp.AbstractMessageDriven;
029import org.springframework.jms.connection.ConnectionFactoryUtils;
030import org.springframework.jms.connection.JmsResourceHolder;
031import org.springframework.jms.connection.SingleConnectionFactory;
032import org.springframework.jms.listener.AbstractMessageListenerContainer;
033import org.springframework.jms.listener.AbstractPollingMessageListenerContainer;
034import org.springframework.jms.listener.DefaultMessageListenerContainer;
035import org.springframework.jms.support.JmsUtils;
036import org.springframework.transaction.TransactionStatus;
037import org.springframework.transaction.support.TransactionSynchronizationManager;
038
039/**
040 * BatchedMessageListenerContainer is an extension of Spring's {@link DefaultMessageListenerContainer} which batches multiple message reads into each single transaction. Batching message reads
041 * typically provides significant enhancement to the throughput of message reads in message queue environments.
042 * <p>
043 * To use this class you must inject a transaction manager via {@link AbstractPollingMessageListenerContainer#setTransactionManager(org.springframework.transaction.PlatformTransactionManager)}.
044 * </p>
045 * <p>
046 * The class performs a blocking read for the first message read in any transaction. The blocking duration is determined by {@link AbstractPollingMessageListenerContainer#setReceiveTimeout(long)}.
047 * Subsequent messages up to the configured {@link #setMaxMessagesPerTransaction(int)} batch limit (which defaults to 150) are performed as non-blocking reads, with the bach completing as soon as the
048 * message queue cannot provide further messages.
049 * </p>
050 * <p>
051 * Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to
052 * design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example,
053 * delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the {@link AbstractMessageDriven} POJO is also provided which provides the
054 * basic framework for implementing a {@link MessageListener} that is aligned with this contract.
055 * </p>
056 * <p>
057 * The class contains an optional feature called RetryMitigation which is enabled by default. RetryMitigation prevents further messages being read in a batch if any message is identified as being one that
058 * is redelivered. When retryMitigation is enabled, any failure in processing will also trigger a pessimistic message mode. Once in pessimistic message mode messages are read one message at a time. This mode remains in place
059 * until either the number of messages in a batch multiplied by the number of concurrent consumers have been read since the last failure, or the queue cannot provide further messages (i.e. is empty).
060 * The aim of this feature is to reduce the likelihood of messages reaching the redelivery limit due to a bad message in the batch.
061 * </p>
062 * You can also configure the class to conclude any batch when a redelivered message is encountered (again the default behaviour). This feature complements RetryMitigation.
063 * <p>
064 * NB. Due to the design and structure of Spring's {@link DefaultMessageListenerContainer} and its superclasses, implementing this class must by necessity duplicate certain parts of
065 * {@link DefaultMessageListenerContainer}. Consequently, this class has been managed at a source code level as a derivative of {@link DefaultMessageListenerContainer} and copyright messages and
066 * attributions reflect this.
067 * </p>
068 * @author Juergen Hoeller was the original author of the {@link DefaultMessageListenerContainer}. The class was modified, extended and renamed to enable batching by Chris Pheby.
069 */
070public class BatchedMessageListenerContainer extends DefaultMessageListenerContainer {
071
072    /**
073     * Default number of messages to read per transaction
074     */
075    public static final int DEFAULT_BATCH_SIZE = 150;
076
077    /**
078     * Number of messages to read per transaction
079     */
080    private int maxMessagesPerTransaction = DEFAULT_BATCH_SIZE;
081    
082    private final MessageListenerContainerResourceFactory transactionalResourceFactory = new MessageListenerContainerResourceFactory();
083
084    private boolean retryMitigation = true;
085    
086    private boolean concludeBatchOnRedeliveredMessage = true;
087    
088    private volatile boolean pessimisticMessageMode = false;
089
090    private volatile int pessimisticMessageReads = 0;
091    
092    /**
093     * Create a new instance
094     */
095    public BatchedMessageListenerContainer() {
096    }
097
098    /**
099     * Configures the maximum number of messages that can be read in a single transaction
100     * @param maxMessagesPerTransaction The requested maximum number of messages per transaction
101     */
102    public void setMaxMessagesPerTransaction(int maxMessagesPerTransaction) {
103        this.maxMessagesPerTransaction = maxMessagesPerTransaction;
104    }
105
106    /**
107     * Get the configured maximum number of messages per transaction
108     * @return The maximum number of messages per transaction
109     */
110    public int getMaxMessagesPerTransaction() {
111        return maxMessagesPerTransaction;
112    }
113
114    /**
115     * True if the instance attempts to mitigate the problems arising when messages in a batch
116     * are all retried when a poison message is encountered
117     * @return True if RetryMitigation is enabled
118     */
119    public boolean isRetryMitigation() {
120        return retryMitigation;
121    }
122
123    /**
124     * Enables or disables the RetryMitigation functionality
125     * @param retryMitigation True if RetryMitigation is to be enabled, false otherwise
126     */
127    public void setRetryMitigation(boolean retryMitigation) {
128        this.retryMitigation = retryMitigation;
129    }
130
131    /**
132     * True if seeing a redelivered message will conclude the current batch
133     * @return True if ConcludeBatchOnRedeliveredMessage is enabled
134     */
135    public boolean isConcludeBatchOnRedeliveredMessage() {
136        return concludeBatchOnRedeliveredMessage;
137    }
138
139    /**
140     * Enables or disables the ConcludeBatchOnRedeliveredMessage functionality
141     * @param concludeBatchOnRedeliveredMessage True if ConcludeBatchOnRedeliveredMessage is to be enabled, false otherwise
142     */
143    public void setConcludeBatchOnRedeliveredMessage(boolean concludeBatchOnRedeliveredMessage) {
144        this.concludeBatchOnRedeliveredMessage = concludeBatchOnRedeliveredMessage;
145    }
146
147    @Override
148    protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer,
149            TransactionStatus status) throws JMSException {
150
151        Connection connectionToClose = null;
152        Session sessionToClose = null;
153                
154        MessageConsumer consumerToClose = null;
155
156        final List<Message> messages;
157        Message message;
158        
159        try {           
160            boolean transactional = false;
161
162            if (session == null) {
163
164                session = ConnectionFactoryUtils.doGetTransactionalSession(getConnectionFactory(),
165                        transactionalResourceFactory, true);
166                transactional = (session != null);
167            }
168
169            if (session == null) {
170
171                final Connection connection;
172                if (sharedConnectionEnabled()) {
173                    connection = getSharedConnection();
174                } else {
175                    connection = createConnection();
176                    connectionToClose = connection;
177                    connection.start();
178                }
179
180                session = createSession(connection);
181                sessionToClose = session;
182            }
183
184            if (consumer == null) {
185
186                consumer = createListenerConsumer(session);
187                consumerToClose = consumer;
188            }
189
190            messages = new ArrayList<Message>();
191
192            message = receiveMessage(consumer);
193            if (message != null) {
194
195                messages.add(message);
196                if (logger.isDebugEnabled()) {
197                    logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumer
198                            + "] of " + (transactional ? "transactional " : "") + "session [" + session + "]");
199                }
200                if (pessimisticMessageMode) {
201                    pessimisticMessageReads = pessimisticMessageReads + 1;
202                }
203            } else {
204                pessimisticMessageMode = false;
205            }
206
207            int count = 0;
208
209            // Check the delivery account so we can stop batching when we hit a redelivered message
210            final int deliveryCount = (concludeBatchOnRedeliveredMessage && message.propertyExists("JMSXDeliveryCount")) ? message.getIntProperty("JMSXDeliveryCount") : -1;
211            while ((message != null) && (++count < maxMessagesPerTransaction) && (!retryMitigation || !pessimisticMessageMode) && (!concludeBatchOnRedeliveredMessage || deliveryCount < 2)) {
212                
213                message = receiveMessageNoWait(consumer);
214
215                if (message != null) {
216                    messages.add(message);
217                    if (logger.isDebugEnabled()) {
218                        logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumer
219                                + "] of " + (transactional ? "transactional " : "") + "session [" + session + "]");
220                    }
221                    if (pessimisticMessageMode) {
222                        pessimisticMessageReads = pessimisticMessageReads + 1;
223                    }
224                } else {
225                    pessimisticMessageMode = false;
226                }
227            }
228            
229            if (pessimisticMessageMode && (pessimisticMessageReads == (maxMessagesPerTransaction * this.getConcurrentConsumers()))) {
230                pessimisticMessageMode = false;
231            }
232
233            if (messages.size() > 0) {
234
235                // Only if messages were collected, notify the listener to consume the same.
236                boolean exposeResource = (!transactional && isExposeListenerSession() && !TransactionSynchronizationManager
237                        .hasResource(getConnectionFactory()));
238                if (exposeResource) {
239                    TransactionSynchronizationManager.bindResource(getConnectionFactory(), new JmsResourceHolder(
240                            session));
241                }
242
243                try {
244                    doExecuteListener(session, messages);
245                } catch (Throwable ex) {
246
247                    if (status != null) {
248                        if (logger.isDebugEnabled()) {
249                            logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
250                        }
251                        status.setRollbackOnly();
252                    }
253
254                    handleListenerException(ex);
255
256                    if (ex instanceof JMSException) {
257                        throw (JMSException) ex;
258                    }
259
260                } finally {
261
262                    if (exposeResource) {
263                        TransactionSynchronizationManager.unbindResource(getConnectionFactory());
264                    }
265
266                }
267                return true;
268            } else {
269                if (logger.isTraceEnabled()) {
270                    logger.trace("Consumer [" + consumer + "] of " + (transactional ? "transactional " : "")
271                            + "session [" + session + "] did not receive a message");
272                }
273                noMessageReceived(invoker, session);
274
275                // Nevertheless call commit, in order to reset the transaction timeout (if any).
276                // However, don't do this on Tibco since this may lead to a deadlock there.
277                if (shouldCommitAfterNoMessageReceived(session)) {
278                    commitIfNecessary(session, message);
279                }
280                // Indicate that no message has been received.
281                return false;
282            }
283        } catch (JMSException e) {
284            // We record that we last saw an exception - that ensures that only single messages will
285            // be read until we hit the redelivered messages
286            if (retryMitigation) {
287                this.pessimisticMessageMode = true;
288                pessimisticMessageReads = 0;
289            }
290            throw e;
291        } catch (RuntimeException e) {
292            // We record that we last saw an exception - that ensures that only single messages will
293            // be read until we hit the redelivered messages
294            if (retryMitigation) {
295                this.pessimisticMessageMode = true;
296                pessimisticMessageReads = 0;
297            }
298            throw e;
299        } finally {
300            JmsUtils.closeMessageConsumer(consumerToClose);
301            JmsUtils.closeSession(sessionToClose);
302            ConnectionFactoryUtils.releaseConnection(connectionToClose, getConnectionFactory(), true);
303        }
304    }
305
306    /**
307     * A batched variant of {@link DefaultMessageListenerContainer#doExecuteListener(Session, Message)}.
308     * 
309     * @param session The session
310     * @param messages A list of messages
311     * @throws JMSException Indicates a problem during processing
312     */
313    protected void doExecuteListener(Session session, List<Message> messages) throws JMSException {
314        if (!isAcceptMessagesWhileStopping() && !isRunning()) {
315            if (logger.isWarnEnabled()) {
316                logger.warn("Rejecting received messages because of the listener container "
317                        + "having been stopped in the meantime: " + messages);
318            }
319            rollbackIfNecessary(session);
320            throw new MessageRejectedWhileStoppingException();
321        }
322
323        try {
324            for (Message message : messages) {
325                invokeListener(session, message);
326            }
327        } catch (JMSException ex) {
328            rollbackOnExceptionIfNecessary(session, ex);
329            throw ex;
330        } catch (RuntimeException ex) {
331            rollbackOnExceptionIfNecessary(session, ex);
332            throw ex;
333        } catch (Error err) {
334            rollbackOnExceptionIfNecessary(session, err);
335            throw err;
336        }
337        commitIfNecessary(session, messages);
338    }
339
340    /**
341     * Variant of {@link AbstractMessageListenerContainer#commitIfNecessary(Session, Message)} that performs the activity for a batch of messages.
342     * @param session the JMS Session to commit
343     * @param messages the messages to acknowledge
344     * @throws javax.jms.JMSException in case of commit failure
345     */
346    protected void commitIfNecessary(Session session, List<Message> messages) throws JMSException {
347        // Commit session or acknowledge message.
348        if (session.getTransacted()) {
349            // Commit necessary - but avoid commit call within a JTA transaction.
350            if (isSessionLocallyTransacted(session)) {
351                // Transacted session created by this container -> commit.
352                JmsUtils.commitIfNecessary(session);
353            }
354        } else if (messages != null && isClientAcknowledge(session)) {
355            for (Message message : messages) {
356                message.acknowledge();
357            }
358        }
359    }
360
361    @Override
362    protected void validateConfiguration() {
363        if (maxMessagesPerTransaction < 1) {
364            throw new IllegalArgumentException("maxMessagesPerTransaction property must have a value of at least 1");
365        }
366    }
367
368    /**
369     * This is the {@link BatchedMessageListenerContainer}'s equivalent to {@link AbstractPollingMessageListenerContainer#receiveMessage}. Does not block if no message is available.
370     * @param consumer The MessageConsumer to use
371     * @return The Message, if any
372     * @throws JMSException Indicates a problem occurred
373     */
374    protected Message receiveMessageNoWait(MessageConsumer consumer) throws JMSException {
375        return consumer.receiveNoWait();
376    }
377
378    /**
379     * Internal exception class that indicates a rejected message on shutdown. Used to trigger a rollback for an external transaction manager in that case.
380     */
381    private static class MessageRejectedWhileStoppingException extends RuntimeException {
382        private static final long serialVersionUID = -318011666513960841L;
383    }
384
385    /**
386     * ResourceFactory implementation that delegates to this listener container's protected callback methods.
387     */
388    private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
389
390        public Connection getConnection(JmsResourceHolder holder) {
391            return BatchedMessageListenerContainer.this.getConnection(holder);
392        }
393
394        public Session getSession(JmsResourceHolder holder) {
395            return BatchedMessageListenerContainer.this.getSession(holder);
396        }
397
398        public Connection createConnection() throws JMSException {
399            if (BatchedMessageListenerContainer.this.sharedConnectionEnabled()) {
400                Connection sharedCon = BatchedMessageListenerContainer.this.getSharedConnection();
401                return new SingleConnectionFactory(sharedCon).createConnection();
402            } else {
403                return BatchedMessageListenerContainer.this.createConnection();
404            }
405        }
406
407        public Session createSession(Connection con) throws JMSException {
408            return BatchedMessageListenerContainer.this.createSession(con);
409        }
410
411        public boolean isSynchedLocalTransactionAllowed() {
412            return BatchedMessageListenerContainer.this.isSessionTransacted();
413        }
414    }
415}