001/* 002 * Copyright 2002-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.jadira.jms.template; 017 018import java.util.ArrayList; 019import java.util.List; 020 021import javax.jms.Destination; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageConsumer; 025import javax.jms.MessageListener; 026import javax.jms.Session; 027 028import org.jadira.jms.container.BatchedMessageListenerContainer; 029import org.jadira.jms.mdp.AbstractMessageDriven; 030import org.springframework.jms.JmsException; 031import org.springframework.jms.connection.JmsResourceHolder; 032import org.springframework.jms.core.JmsTemplate; 033import org.springframework.jms.core.SessionCallback; 034import org.springframework.jms.listener.DefaultMessageListenerContainer; 035import org.springframework.jms.support.JmsUtils; 036import org.springframework.transaction.support.TransactionSynchronization; 037import org.springframework.transaction.support.TransactionSynchronizationAdapter; 038import org.springframework.transaction.support.TransactionSynchronizationManager; 039 040/** 041 * BatchedJmsTemplate customises Spring's {@link JmsTemplate} with additional methods that enable multiple items to be processed in a single transaction for the various supported operations. The 042 * additional methods are identified by the suffix 'batch'. 043 * <p> 044 * As with {@link BatchedMessageListenerContainer}, Within each transaction, the first read is a blocking read, that blocks for {@link JmsTemplate#setReceiveTimeout(long)}. Subsequent messages up to 045 * the maximum batch size {@link #setBatchSize(int)} are read as non-blocking reads, with the batch completing as soon as the queue cannot service further messages. 046 * </p> 047 * <p> 048 * Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to 049 * design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example, 050 * delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the {@link AbstractMessageDriven} POJO is also provided which provides the 051 * basic framework for implementing a {@link MessageListener} that is aligned with this contract. 052 * </p> 053 * <p> 054 * NB. Due to the design and structure of Spring's {@link DefaultMessageListenerContainer} and its superclasses, implementing this class must by necessity duplicate certain parts of 055 * {@link DefaultMessageListenerContainer}. Consequently, this class has been managed at a source code level as a derivative of {@link DefaultMessageListenerContainer} and copyright messages and 056 * attributions reflect this. 057 * </p> 058 * @author Mark Pollack and Juergen Hoeller were the original authors of the {@link JmsTemplate}. Modifications to this class to enable batching were made by Chris Pheby. 059 */ 060public class BatchedJmsTemplate extends JmsTemplate { 061 062 private static final TransactionSynchronization BATCH_SYNCHRONIZATION = new BatchTransactionSynchronization(); 063 064 /** 065 * Flag to indicate the start of a batch 066 */ 067 private static final ThreadLocal<Boolean> IS_START_OF_BATCH = new ThreadLocal<Boolean>() { 068 protected Boolean initialValue() { 069 return Boolean.TRUE; 070 } 071 }; 072 073 /** 074 * The default maximum size for a transactional batch 075 */ 076 public static final int DEFAULT_BATCH_SIZE = 150; 077 078 /** 079 * The configured maximum batch size, must be at least 1 080 */ 081 private int batchSize = DEFAULT_BATCH_SIZE; 082 083 /** 084 * Creates a new instance 085 */ 086 public BatchedJmsTemplate() { 087 } 088 089 /** 090 * Configures the maximum number of messages that can be read in a transaction 091 * @param batchSize The maximum batch size 092 */ 093 public void setBatchSize(int batchSize) { 094 this.batchSize = batchSize; 095 } 096 097 /** 098 * Get the maximum number of messages that can be read in a transaction 099 * @return The maximum batch size 100 */ 101 public int getBatchSize() { 102 return batchSize; 103 } 104 105 /** 106 * Receive a batch of up to default batch size for the default destination. Other than batching this method is the same as {@link JmsTemplate#receive()} 107 * @return A list of {@link Message} 108 * @throws JmsException The {@link JmsException} 109 */ 110 public List<Message> receiveBatch() throws JmsException { 111 112 return receiveBatch(batchSize); 113 } 114 115 /** 116 * Receive a batch of up to batchSize. Other than batching this method is the same as {@link JmsTemplate#receive()} 117 * @return A list of {@link Message} 118 * @param batchSize The batch size 119 * @throws JmsException The {@link JmsException} 120 */ 121 public List<Message> receiveBatch(int batchSize) throws JmsException { 122 123 Destination defaultDestination = getDefaultDestination(); 124 125 if (defaultDestination != null) { 126 return receiveBatch(defaultDestination, batchSize); 127 } else { 128 return receiveBatch(getRequiredDefaultDestinationName(), batchSize); 129 } 130 } 131 132 /** 133 * Receive a batch of up to default batch size for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(Destination)} 134 * @return A list of {@link Message} 135 * @param destination The Destination 136 * @throws JmsException The {@link JmsException} 137 */ 138 public List<Message> receiveBatch(Destination destination) throws JmsException { 139 return receiveBatch(destination, batchSize); 140 } 141 142 /** 143 * Receive a batch of up to batchSize for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(Destination)} 144 * @return A list of {@link Message} 145 * @param destination The Destination 146 * @param batchSize The batch size 147 * @throws JmsException The {@link JmsException} 148 */ 149 public List<Message> receiveBatch(final Destination destination, final int batchSize) throws JmsException { 150 return execute(new SessionCallback<List<Message>>() { 151 public List<Message> doInJms(Session session) throws JMSException { 152 return doBatchReceive(session, destination, null, batchSize); 153 } 154 }, true); 155 } 156 157 /** 158 * Receive a batch of up to default batch size for given destinationName. Other than batching this method is the same as {@link JmsTemplate#receive(String)} 159 * @return A list of {@link Message} 160 * @param destinationName The Destination name 161 * @throws JmsException The {@link JmsException} 162 */ 163 public List<Message> receiveBatch(String destinationName) throws JmsException { 164 return receiveBatch(destinationName, getBatchSize()); 165 } 166 167 /** 168 * Receive a batch of up to default batch size for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(String)} 169 * @return A list of {@link Message} 170 * @param destinationName The Destination 171 * @param batchSize The batch size 172 * @throws JmsException The {@link JmsException} 173 */ 174 public List<Message> receiveBatch(final String destinationName, final int batchSize) throws JmsException { 175 return execute(new SessionCallback<List<Message>>() { 176 public List<Message> doInJms(Session session) throws JMSException { 177 Destination destination = resolveDestinationName(session, destinationName); 178 return doBatchReceive(session, destination, null, batchSize); 179 } 180 }, true); 181 } 182 183 /** 184 * Receive a batch of up to default batch size for default destination and given message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String)} 185 * @return A list of {@link Message} 186 * @param messageSelector The Selector 187 * @throws JmsException The {@link JmsException} 188 */ 189 public List<Message> receiveSelectedBatch(String messageSelector) throws JmsException { 190 return receiveSelectedBatch(messageSelector, getBatchSize()); 191 } 192 193 /** 194 * Receive a batch of up to batchSize for default destination and given message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String)} 195 * @return A list of {@link Message} 196 * @param messageSelector The Selector 197 * @param batchSize The batch size 198 * @throws JmsException The {@link JmsException} 199 */ 200 public List<Message> receiveSelectedBatch(String messageSelector, int batchSize) throws JmsException { 201 Destination defaultDestination = getDefaultDestination(); 202 if (defaultDestination != null) { 203 return receiveSelectedBatch(defaultDestination, messageSelector, batchSize); 204 } else { 205 return receiveSelectedBatch(getRequiredDefaultDestinationName(), messageSelector, batchSize); 206 } 207 } 208 209 /** 210 * Receive a batch of up to default batch size for given destination and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(Destination, String)} 211 * @return A list of {@link Message} 212 * @param destination The Destination 213 * @param messageSelector The Selector 214 * @throws JmsException The {@link JmsException} 215 */ 216 public List<Message> receiveSelectedBatch(Destination destination, String messageSelector) throws JmsException { 217 return receiveSelectedBatch(destination, messageSelector, getBatchSize()); 218 } 219 220 /** 221 * Receive a batch of up to batchSize for given destination and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(Destination, String)} 222 * @return A list of {@link Message} 223 * @param destination The Destination 224 * @param messageSelector The Selector 225 * @param batchSize The batch size 226 * @throws JmsException The {@link JmsException} 227 */ 228 public List<Message> receiveSelectedBatch(final Destination destination, final String messageSelector, 229 final int batchSize) throws JmsException { 230 return execute(new SessionCallback<List<Message>>() { 231 public List<Message> doInJms(Session session) throws JMSException { 232 233 return doBatchReceive(session, destination, messageSelector, batchSize); 234 } 235 }, true); 236 } 237 238 /** 239 * Receive a batch of up to default batch size for given destination name and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String, String)} 240 * @return A list of {@link Message} 241 * @param destinationName The destination name 242 * @param messageSelector The Selector 243 * @throws JmsException The {@link JmsException} 244 */ 245 public List<Message> receiveSelectedBatch(String destinationName, String messageSelector) throws JmsException { 246 return receiveSelectedBatch(destinationName, messageSelector, getBatchSize()); 247 } 248 249 /** 250 * Receive a batch of up to batchSize for given destination name and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String, String)} 251 * @return A list of {@link Message} 252 * @param destinationName The destination name 253 * @param messageSelector The Selector 254 * @param batchSize The batch size 255 * @throws JmsException The {@link JmsException} 256 */ 257 public List<Message> receiveSelectedBatch(final String destinationName, final String messageSelector, 258 final int batchSize) throws JmsException { 259 return execute(new SessionCallback<List<Message>>() { 260 public List<Message> doInJms(Session session) throws JMSException { 261 Destination destination = resolveDestinationName(session, destinationName); 262 return doBatchReceive(session, destination, messageSelector, batchSize); 263 } 264 }, true); 265 } 266 267 /** 268 * Receive a batch of up to default batch size for default destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert()} 269 * @return A list of {@link Message} 270 * @throws JmsException The {@link JmsException} 271 */ 272 public List<Object> receiveAndConvertBatch() throws JmsException { 273 return receiveAndConvertBatch(getBatchSize()); 274 } 275 276 /** 277 * Receive a batch of up to batchSize for default destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert()} 278 * @return A list of {@link Message} 279 * @param batchSize The batch size 280 * @throws JmsException The {@link JmsException} 281 */ 282 public List<Object> receiveAndConvertBatch(int batchSize) throws JmsException { 283 List<Message> messages = receiveBatch(batchSize); 284 List<Object> result = new ArrayList<Object>(messages.size()); 285 for (Message next : messages) { 286 result.add(doConvertFromMessage(next)); 287 } 288 return result; 289 } 290 291 /** 292 * Receive a batch of up to default batch size for the given Destination and convert each message in the batch. Other than batching this method is the same as 293 * {@link JmsTemplate#receiveAndConvert(Destination)} 294 * @return A list of {@link Message} 295 * @param destination The Destination 296 * @throws JmsException The {@link JmsException} 297 */ 298 public List<Object> receiveAndConvertBatch(Destination destination) throws JmsException { 299 return receiveAndConvertBatch(destination, getBatchSize()); 300 } 301 302 /** 303 * Receive a batch of up to batchSize for given Destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert(Destination)} 304 * @return A list of {@link Message} 305 * @param destination The Destination 306 * @param batchSize The batch size 307 * @throws JmsException The {@link JmsException} 308 */ 309 public List<Object> receiveAndConvertBatch(Destination destination, int batchSize) throws JmsException { 310 List<Message> messages = receiveBatch(destination, batchSize); 311 List<Object> result = new ArrayList<Object>(messages.size()); 312 for (Message next : messages) { 313 result.add(doConvertFromMessage(next)); 314 } 315 return result; 316 } 317 318 /** 319 * Receive a batch of up to default batch size for given destination name and convert each message in the batch. Other than batching this method is the same as 320 * {@link JmsTemplate#receiveAndConvert(String)} 321 * @return A list of {@link Message} 322 * @param destinationName The destination name 323 * @throws JmsException The {@link JmsException} 324 */ 325 public List<Object> receiveAndConvertBatch(String destinationName) throws JmsException { 326 return receiveAndConvertBatch(destinationName, getBatchSize()); 327 } 328 329 /** 330 * Receive a batch of up to batchSize for given destination name and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert(String)} 331 * @return A list of {@link Message} 332 * @param destinationName The destination name 333 * @param batchSize The batch size 334 * @throws JmsException The {@link JmsException} 335 */ 336 public List<Object> receiveAndConvertBatch(String destinationName, int batchSize) throws JmsException { 337 List<Message> messages = receiveBatch(destinationName, batchSize); 338 List<Object> result = new ArrayList<Object>(messages.size()); 339 for (Message next : messages) { 340 result.add(doConvertFromMessage(next)); 341 } 342 return result; 343 } 344 345 /** 346 * Receive a batch of up to default batch size for default destination and message selector and convert each message in the batch. Other than batching this method is the same as 347 * {@link JmsTemplate#receiveSelectedAndConvert(String)} 348 * @return A list of {@link Message} 349 * @param messageSelector The Selector 350 * @throws JmsException The {@link JmsException} 351 */ 352 public List<Object> receiveSelectedAndConvertBatch(String messageSelector) throws JmsException { 353 return receiveSelectedAndConvertBatch(messageSelector, getBatchSize()); 354 } 355 356 /** 357 * Receive a batch of up to batchSize for default destination and message selector and convert each message in the batch. Other than batching this method is the same as 358 * {@link JmsTemplate#receiveSelectedAndConvert(String)} 359 * @return A list of {@link Message} 360 * @param messageSelector The Selector 361 * @param batchSize The batch size 362 * @throws JmsException The {@link JmsException} 363 */ 364 public List<Object> receiveSelectedAndConvertBatch(String messageSelector, int batchSize) throws JmsException { 365 List<Message> messages = receiveSelectedBatch(messageSelector, batchSize); 366 List<Object> result = new ArrayList<Object>(messages.size()); 367 for (Message next : messages) { 368 result.add(doConvertFromMessage(next)); 369 } 370 return result; 371 } 372 373 /** 374 * Receive a batch of up to default batch size for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as 375 * {@link JmsTemplate#receiveSelectedAndConvert(Destination, String)} 376 * @return A list of {@link Message} 377 * @param destination The Destination 378 * @param messageSelector The Selector 379 * @throws JmsException The {@link JmsException} 380 */ 381 public List<Object> receiveSelectedAndConvertBatch(Destination destination, String messageSelector) 382 throws JmsException { 383 return receiveSelectedAndConvertBatch(destination, messageSelector, getBatchSize()); 384 } 385 386 /** 387 * Receive a batch of up to batchSize for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as 388 * {@link JmsTemplate#receiveSelectedAndConvert(Destination, String)} 389 * @return A list of {@link Message} 390 * @param destination The Destination 391 * @param messageSelector The Selector 392 * @param batchSize The batch size 393 * @throws JmsException The {@link JmsException} 394 */ 395 public List<Object> receiveSelectedAndConvertBatch(Destination destination, String messageSelector, int batchSize) 396 throws JmsException { 397 List<Message> messages = receiveSelectedBatch(destination, messageSelector, batchSize); 398 List<Object> result = new ArrayList<Object>(messages.size()); 399 for (Message next : messages) { 400 result.add(doConvertFromMessage(next)); 401 } 402 return result; 403 } 404 405 /** 406 * Receive a batch of up to default batch size for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as 407 * {@link JmsTemplate#receiveSelectedAndConvert(String, String)} 408 * @return A list of {@link Message} 409 * @param destinationName The destination name 410 * @param messageSelector The Selector 411 * @throws JmsException The {@link JmsException} 412 */ 413 public List<Object> receiveSelectedAndConvertBatch(String destinationName, String messageSelector) 414 throws JmsException { 415 return receiveSelectedAndConvertBatch(destinationName, messageSelector, getBatchSize()); 416 } 417 418 /** 419 * Receive a batch of up to batchSize for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as 420 * {@link JmsTemplate#receiveSelectedAndConvert(String, String)} 421 * @return A list of {@link Message} 422 * @param destinationName The destination name 423 * @param messageSelector The Selector 424 * @param batchSize The batch size 425 * @throws JmsException The {@link JmsException} 426 */ 427 public List<Object> receiveSelectedAndConvertBatch(String destinationName, String messageSelector, int batchSize) 428 throws JmsException { 429 List<Message> messages = receiveSelectedBatch(destinationName, batchSize); 430 List<Object> result = new ArrayList<Object>(messages.size()); 431 for (Message next : messages) { 432 result.add(doConvertFromMessage(next)); 433 } 434 return result; 435 } 436 437 /** 438 * {@inheritDoc} 439 */ 440 @Override 441 public Message receive() throws JmsException { 442 Destination defaultDestination = getDefaultDestination(); 443 if (defaultDestination != null) { 444 return receive(defaultDestination); 445 } else { 446 return receive(getRequiredDefaultDestinationName()); 447 } 448 } 449 450 /** 451 * {@inheritDoc} 452 */ 453 @Override 454 public Message receive(final Destination destination) throws JmsException { 455 return execute(new SessionCallback<Message>() { 456 public Message doInJms(Session session) throws JMSException { 457 return doSingleReceive(session, destination, null); 458 } 459 }, true); 460 } 461 462 /** 463 * {@inheritDoc} 464 */ 465 @Override 466 public Message receive(final String destinationName) throws JmsException { 467 return execute(new SessionCallback<Message>() { 468 public Message doInJms(Session session) throws JMSException { 469 Destination destination = resolveDestinationName(session, destinationName); 470 return doSingleReceive(session, destination, null); 471 } 472 }, true); 473 } 474 475 /** 476 * {@inheritDoc} 477 */ 478 @Override 479 public Message receiveSelected(String messageSelector) throws JmsException { 480 Destination defaultDestination = getDefaultDestination(); 481 if (defaultDestination != null) { 482 return receiveSelected(defaultDestination, messageSelector); 483 } else { 484 return receiveSelected(getRequiredDefaultDestinationName(), messageSelector); 485 } 486 } 487 488 /** 489 * {@inheritDoc} 490 */ 491 @Override 492 public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException { 493 return execute(new SessionCallback<Message>() { 494 public Message doInJms(Session session) throws JMSException { 495 496 return doSingleReceive(session, destination, messageSelector); 497 } 498 }, true); 499 } 500 501 /** 502 * {@inheritDoc} 503 */ 504 @Override 505 public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException { 506 return execute(new SessionCallback<Message>() { 507 public Message doInJms(Session session) throws JMSException { 508 Destination destination = resolveDestinationName(session, destinationName); 509 return doSingleReceive(session, destination, messageSelector); 510 } 511 }, true); 512 } 513 514 /** 515 * Method replicates the simple logic from JmsTemplate#getRequiredDefaultDestinationName which is private and therefore cannot be accessed from this class 516 * @return The default destination name 517 * @throws IllegalStateException Example if no destination or destination name is specified 518 */ 519 private String getRequiredDefaultDestinationName() throws IllegalStateException { 520 String name = getDefaultDestinationName(); 521 if (name == null) { 522 throw new IllegalStateException( 523 "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate."); 524 } 525 return name; 526 } 527 528 /** 529 * Method replicates the simple logic from JmsTemplate#doReceive(MessageConsumer, long) which is private and therefore cannot be accessed from this class 530 * @param consumer The consumer to use 531 * @param timeout The timeout to apply 532 * @return A Message 533 * @throws JMSException Indicates an error occurred 534 */ 535 private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException { 536 537 if (timeout == RECEIVE_TIMEOUT_NO_WAIT) { 538 return consumer.receiveNoWait(); 539 } else if (timeout > 0) { 540 return consumer.receive(timeout); 541 } else { 542 return consumer.receive(); 543 } 544 } 545 546 protected List<Message> doBatchReceive(Session session, Destination destination, String messageSelector, 547 int batchSize) throws JMSException { 548 return doBatchReceive(session, createConsumer(session, destination, messageSelector), batchSize); 549 } 550 551 protected List<Message> doBatchReceive(Session session, MessageConsumer consumer, int batchSize) 552 throws JMSException { 553 554 try { 555 final List<Message> result; 556 long timeout = determineTimeout(); 557 558 Message message = doReceive(consumer, timeout); 559 560 if (message == null) { 561 result = new ArrayList<Message>(0); 562 } else { 563 result = new ArrayList<Message>(batchSize); 564 result.add(message); 565 for (int i = 1; i < batchSize; i++) { 566 message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT); 567 if (message == null) { 568 break; 569 } 570 result.add(message); 571 } 572 } 573 574 if (session.getTransacted()) { 575 if (isSessionLocallyTransacted(session)) { 576 JmsUtils.commitIfNecessary(session); 577 } 578 } else if (isClientAcknowledge(session)) { 579 if (message != null) { 580 message.acknowledge(); 581 } 582 } 583 return result; 584 } finally { 585 JmsUtils.closeMessageConsumer(consumer); 586 } 587 } 588 589 protected Message doSingleReceive(Session session, Destination destination, String messageSelector) 590 throws JMSException { 591 return doSingleReceive(session, createConsumer(session, destination, messageSelector)); 592 } 593 594 protected Message doSingleReceive(Session session, MessageConsumer consumer) throws JMSException { 595 596 if (!session.getTransacted() || isSessionLocallyTransacted(session)) { 597 // If we are not using JTA we should use standard JmsTemplate behaviour 598 return super.doReceive(session, consumer); 599 } 600 601 // Otherwise batching - the batch can span multiple receive() calls, until you commit the 602 // batch 603 try { 604 final Message message; 605 if (Boolean.TRUE.equals(IS_START_OF_BATCH.get())) { 606 // Register Synchronization 607 TransactionSynchronizationManager.registerSynchronization(BATCH_SYNCHRONIZATION); 608 609 // Use transaction timeout (if available). 610 long timeout = determineTimeout(); 611 612 message = doReceive(consumer, timeout); 613 IS_START_OF_BATCH.set(Boolean.FALSE); 614 } else { 615 message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT); 616 } 617 618 if (isClientAcknowledge(session)) { 619 // Manually acknowledge message, if any. 620 if (message != null) { 621 message.acknowledge(); 622 } 623 } 624 return message; 625 } finally { 626 JmsUtils.closeMessageConsumer(consumer); 627 } 628 } 629 630/** 631 * Determines receive timeout, using logic equivalent to that of {@link JmsTemplate#doReceive(Session, MessageConsumer) 632 * @return The timeout determined 633 */ 634 private long determineTimeout() { 635 636 long timeout = getReceiveTimeout(); 637 638 JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager 639 .getResource(getConnectionFactory()); 640 if (resourceHolder != null && resourceHolder.hasTimeout()) { 641 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 642 } 643 return timeout; 644 } 645 646 /** 647 * A simple TransactionSynchronization implementation that resets the batch indicator so that the next read begins a new batch 648 */ 649 private static class BatchTransactionSynchronization extends TransactionSynchronizationAdapter { 650 651 @Override 652 public void afterCompletion(int status) { 653 IS_START_OF_BATCH.set(Boolean.TRUE); 654 } 655 } 656}