001/* 002 * Copyright 2002-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.jadira.jms.container; 017 018import java.util.ArrayList; 019import java.util.List; 020 021import javax.jms.Connection; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageConsumer; 025import javax.jms.MessageListener; 026import javax.jms.Session; 027 028import org.jadira.jms.mdp.AbstractMessageDriven; 029import org.springframework.jms.connection.ConnectionFactoryUtils; 030import org.springframework.jms.connection.JmsResourceHolder; 031import org.springframework.jms.connection.SingleConnectionFactory; 032import org.springframework.jms.listener.AbstractMessageListenerContainer; 033import org.springframework.jms.listener.AbstractPollingMessageListenerContainer; 034import org.springframework.jms.listener.DefaultMessageListenerContainer; 035import org.springframework.jms.support.JmsUtils; 036import org.springframework.transaction.TransactionStatus; 037import org.springframework.transaction.support.TransactionSynchronizationManager; 038 039/** 040 * BatchedMessageListenerContainer is an extension of Spring's {@link DefaultMessageListenerContainer} which batches multiple message reads into each single transaction. Batching message reads 041 * typically provides significant enhancement to the throughput of message reads in message queue environments. 042 * <p> 043 * To use this class you must inject a transaction manager via {@link AbstractPollingMessageListenerContainer#setTransactionManager(org.springframework.transaction.PlatformTransactionManager)}. 044 * </p> 045 * <p> 046 * The class performs a blocking read for the first message read in any transaction. The blocking duration is determined by {@link AbstractPollingMessageListenerContainer#setReceiveTimeout(long)}. 047 * Subsequent messages up to the configured {@link #setMaxMessagesPerTransaction(int)} batch limit (which defaults to 150) are performed as non-blocking reads, with the bach completing as soon as the 048 * message queue cannot provide further messages. 049 * </p> 050 * <p> 051 * Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to 052 * design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example, 053 * delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the {@link AbstractMessageDriven} POJO is also provided which provides the 054 * basic framework for implementing a {@link MessageListener} that is aligned with this contract. 055 * </p> 056 * <p> 057 * The class contains an optional feature called RetryMitigation which is enabled by default. RetryMitigation prevents further messages being read in a batch if any message is identified as being one that 058 * is redelivered. When retryMitigation is enabled, any failure in processing will also trigger a pessimistic message mode. Once in pessimistic message mode messages are read one message at a time. This mode remains in place 059 * until either the number of messages in a batch multiplied by the number of concurrent consumers have been read since the last failure, or the queue cannot provide further messages (i.e. is empty). 060 * The aim of this feature is to reduce the likelihood of messages reaching the redelivery limit due to a bad message in the batch. 061 * </p> 062 * You can also configure the class to conclude any batch when a redelivered message is encountered (again the default behaviour). This feature complements RetryMitigation. 063 * <p> 064 * NB. Due to the design and structure of Spring's {@link DefaultMessageListenerContainer} and its superclasses, implementing this class must by necessity duplicate certain parts of 065 * {@link DefaultMessageListenerContainer}. Consequently, this class has been managed at a source code level as a derivative of {@link DefaultMessageListenerContainer} and copyright messages and 066 * attributions reflect this. 067 * </p> 068 * @author Juergen Hoeller was the original author of the {@link DefaultMessageListenerContainer}. The class was modified, extended and renamed to enable batching by Chris Pheby. 069 */ 070public class BatchedMessageListenerContainer extends DefaultMessageListenerContainer { 071 072 /** 073 * Default number of messages to read per transaction 074 */ 075 public static final int DEFAULT_BATCH_SIZE = 150; 076 077 /** 078 * Number of messages to read per transaction 079 */ 080 private int maxMessagesPerTransaction = DEFAULT_BATCH_SIZE; 081 082 private final MessageListenerContainerResourceFactory transactionalResourceFactory = new MessageListenerContainerResourceFactory(); 083 084 private boolean retryMitigation = true; 085 086 private boolean concludeBatchOnRedeliveredMessage = true; 087 088 private volatile boolean pessimisticMessageMode = false; 089 090 private volatile int pessimisticMessageReads = 0; 091 092 /** 093 * Create a new instance 094 */ 095 public BatchedMessageListenerContainer() { 096 } 097 098 /** 099 * Configures the maximum number of messages that can be read in a single transaction 100 * @param maxMessagesPerTransaction The requested maximum number of messages per transaction 101 */ 102 public void setMaxMessagesPerTransaction(int maxMessagesPerTransaction) { 103 this.maxMessagesPerTransaction = maxMessagesPerTransaction; 104 } 105 106 /** 107 * Get the configured maximum number of messages per transaction 108 * @return The maximum number of messages per transaction 109 */ 110 public int getMaxMessagesPerTransaction() { 111 return maxMessagesPerTransaction; 112 } 113 114 /** 115 * True if the instance attempts to mitigate the problems arising when messages in a batch 116 * are all retried when a poison message is encountered 117 * @return True if RetryMitigation is enabled 118 */ 119 public boolean isRetryMitigation() { 120 return retryMitigation; 121 } 122 123 /** 124 * Enables or disables the RetryMitigation functionality 125 * @param retryMitigation True if RetryMitigation is to be enabled, false otherwise 126 */ 127 public void setRetryMitigation(boolean retryMitigation) { 128 this.retryMitigation = retryMitigation; 129 } 130 131 /** 132 * True if seeing a redelivered message will conclude the current batch 133 * @return True if ConcludeBatchOnRedeliveredMessage is enabled 134 */ 135 public boolean isConcludeBatchOnRedeliveredMessage() { 136 return concludeBatchOnRedeliveredMessage; 137 } 138 139 /** 140 * Enables or disables the ConcludeBatchOnRedeliveredMessage functionality 141 * @param concludeBatchOnRedeliveredMessage True if ConcludeBatchOnRedeliveredMessage is to be enabled, false otherwise 142 */ 143 public void setConcludeBatchOnRedeliveredMessage(boolean concludeBatchOnRedeliveredMessage) { 144 this.concludeBatchOnRedeliveredMessage = concludeBatchOnRedeliveredMessage; 145 } 146 147 @Override 148 protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, 149 TransactionStatus status) throws JMSException { 150 151 Connection connectionToClose = null; 152 Session sessionToClose = null; 153 154 MessageConsumer consumerToClose = null; 155 156 final List<Message> messages; 157 Message message; 158 159 try { 160 boolean transactional = false; 161 162 if (session == null) { 163 164 session = ConnectionFactoryUtils.doGetTransactionalSession(getConnectionFactory(), 165 transactionalResourceFactory, true); 166 transactional = (session != null); 167 } 168 169 if (session == null) { 170 171 final Connection connection; 172 if (sharedConnectionEnabled()) { 173 connection = getSharedConnection(); 174 } else { 175 connection = createConnection(); 176 connectionToClose = connection; 177 connection.start(); 178 } 179 180 session = createSession(connection); 181 sessionToClose = session; 182 } 183 184 if (consumer == null) { 185 186 consumer = createListenerConsumer(session); 187 consumerToClose = consumer; 188 } 189 190 messages = new ArrayList<Message>(); 191 192 message = receiveMessage(consumer); 193 if (message != null) { 194 195 messages.add(message); 196 if (logger.isDebugEnabled()) { 197 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumer 198 + "] of " + (transactional ? "transactional " : "") + "session [" + session + "]"); 199 } 200 if (pessimisticMessageMode) { 201 pessimisticMessageReads = pessimisticMessageReads + 1; 202 } 203 } else { 204 pessimisticMessageMode = false; 205 } 206 207 int count = 0; 208 209 // Check the delivery account so we can stop batching when we hit a redelivered message 210 final int deliveryCount = (concludeBatchOnRedeliveredMessage && message.propertyExists("JMSXDeliveryCount")) ? message.getIntProperty("JMSXDeliveryCount") : -1; 211 while ((message != null) && (++count < maxMessagesPerTransaction) && (!retryMitigation || !pessimisticMessageMode) && (!concludeBatchOnRedeliveredMessage || deliveryCount < 2)) { 212 213 message = receiveMessageNoWait(consumer); 214 215 if (message != null) { 216 messages.add(message); 217 if (logger.isDebugEnabled()) { 218 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumer 219 + "] of " + (transactional ? "transactional " : "") + "session [" + session + "]"); 220 } 221 if (pessimisticMessageMode) { 222 pessimisticMessageReads = pessimisticMessageReads + 1; 223 } 224 } else { 225 pessimisticMessageMode = false; 226 } 227 } 228 229 if (pessimisticMessageMode && (pessimisticMessageReads == (maxMessagesPerTransaction * this.getConcurrentConsumers()))) { 230 pessimisticMessageMode = false; 231 } 232 233 if (messages.size() > 0) { 234 235 // Only if messages were collected, notify the listener to consume the same. 236 boolean exposeResource = (!transactional && isExposeListenerSession() && !TransactionSynchronizationManager 237 .hasResource(getConnectionFactory())); 238 if (exposeResource) { 239 TransactionSynchronizationManager.bindResource(getConnectionFactory(), new JmsResourceHolder( 240 session)); 241 } 242 243 try { 244 doExecuteListener(session, messages); 245 } catch (Throwable ex) { 246 247 if (status != null) { 248 if (logger.isDebugEnabled()) { 249 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 250 } 251 status.setRollbackOnly(); 252 } 253 254 handleListenerException(ex); 255 256 if (ex instanceof JMSException) { 257 throw (JMSException) ex; 258 } 259 260 } finally { 261 262 if (exposeResource) { 263 TransactionSynchronizationManager.unbindResource(getConnectionFactory()); 264 } 265 266 } 267 return true; 268 } else { 269 if (logger.isTraceEnabled()) { 270 logger.trace("Consumer [" + consumer + "] of " + (transactional ? "transactional " : "") 271 + "session [" + session + "] did not receive a message"); 272 } 273 noMessageReceived(invoker, session); 274 275 // Nevertheless call commit, in order to reset the transaction timeout (if any). 276 // However, don't do this on Tibco since this may lead to a deadlock there. 277 if (shouldCommitAfterNoMessageReceived(session)) { 278 commitIfNecessary(session, message); 279 } 280 // Indicate that no message has been received. 281 return false; 282 } 283 } catch (JMSException e) { 284 // We record that we last saw an exception - that ensures that only single messages will 285 // be read until we hit the redelivered messages 286 if (retryMitigation) { 287 this.pessimisticMessageMode = true; 288 pessimisticMessageReads = 0; 289 } 290 throw e; 291 } catch (RuntimeException e) { 292 // We record that we last saw an exception - that ensures that only single messages will 293 // be read until we hit the redelivered messages 294 if (retryMitigation) { 295 this.pessimisticMessageMode = true; 296 pessimisticMessageReads = 0; 297 } 298 throw e; 299 } finally { 300 JmsUtils.closeMessageConsumer(consumerToClose); 301 JmsUtils.closeSession(sessionToClose); 302 ConnectionFactoryUtils.releaseConnection(connectionToClose, getConnectionFactory(), true); 303 } 304 } 305 306 /** 307 * A batched variant of {@link DefaultMessageListenerContainer#doExecuteListener(Session, Message)}. 308 * 309 * @param session The session 310 * @param messages A list of messages 311 * @throws JMSException Indicates a problem during processing 312 */ 313 protected void doExecuteListener(Session session, List<Message> messages) throws JMSException { 314 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 315 if (logger.isWarnEnabled()) { 316 logger.warn("Rejecting received messages because of the listener container " 317 + "having been stopped in the meantime: " + messages); 318 } 319 rollbackIfNecessary(session); 320 throw new MessageRejectedWhileStoppingException(); 321 } 322 323 try { 324 for (Message message : messages) { 325 invokeListener(session, message); 326 } 327 } catch (JMSException ex) { 328 rollbackOnExceptionIfNecessary(session, ex); 329 throw ex; 330 } catch (RuntimeException ex) { 331 rollbackOnExceptionIfNecessary(session, ex); 332 throw ex; 333 } catch (Error err) { 334 rollbackOnExceptionIfNecessary(session, err); 335 throw err; 336 } 337 commitIfNecessary(session, messages); 338 } 339 340 /** 341 * Variant of {@link AbstractMessageListenerContainer#commitIfNecessary(Session, Message)} that performs the activity for a batch of messages. 342 * @param session the JMS Session to commit 343 * @param messages the messages to acknowledge 344 * @throws javax.jms.JMSException in case of commit failure 345 */ 346 protected void commitIfNecessary(Session session, List<Message> messages) throws JMSException { 347 // Commit session or acknowledge message. 348 if (session.getTransacted()) { 349 // Commit necessary - but avoid commit call within a JTA transaction. 350 if (isSessionLocallyTransacted(session)) { 351 // Transacted session created by this container -> commit. 352 JmsUtils.commitIfNecessary(session); 353 } 354 } else if (messages != null && isClientAcknowledge(session)) { 355 for (Message message : messages) { 356 message.acknowledge(); 357 } 358 } 359 } 360 361 @Override 362 protected void validateConfiguration() { 363 if (maxMessagesPerTransaction < 1) { 364 throw new IllegalArgumentException("maxMessagesPerTransaction property must have a value of at least 1"); 365 } 366 } 367 368 /** 369 * This is the {@link BatchedMessageListenerContainer}'s equivalent to {@link AbstractPollingMessageListenerContainer#receiveMessage}. Does not block if no message is available. 370 * @param consumer The MessageConsumer to use 371 * @return The Message, if any 372 * @throws JMSException Indicates a problem occurred 373 */ 374 protected Message receiveMessageNoWait(MessageConsumer consumer) throws JMSException { 375 return consumer.receiveNoWait(); 376 } 377 378 /** 379 * Internal exception class that indicates a rejected message on shutdown. Used to trigger a rollback for an external transaction manager in that case. 380 */ 381 private static class MessageRejectedWhileStoppingException extends RuntimeException { 382 private static final long serialVersionUID = -318011666513960841L; 383 } 384 385 /** 386 * ResourceFactory implementation that delegates to this listener container's protected callback methods. 387 */ 388 private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 389 390 public Connection getConnection(JmsResourceHolder holder) { 391 return BatchedMessageListenerContainer.this.getConnection(holder); 392 } 393 394 public Session getSession(JmsResourceHolder holder) { 395 return BatchedMessageListenerContainer.this.getSession(holder); 396 } 397 398 public Connection createConnection() throws JMSException { 399 if (BatchedMessageListenerContainer.this.sharedConnectionEnabled()) { 400 Connection sharedCon = BatchedMessageListenerContainer.this.getSharedConnection(); 401 return new SingleConnectionFactory(sharedCon).createConnection(); 402 } else { 403 return BatchedMessageListenerContainer.this.createConnection(); 404 } 405 } 406 407 public Session createSession(Connection con) throws JMSException { 408 return BatchedMessageListenerContainer.this.createSession(con); 409 } 410 411 public boolean isSynchedLocalTransactionAllowed() { 412 return BatchedMessageListenerContainer.this.isSessionTransacted(); 413 } 414 } 415}