001/*
002 * Copyright 2002-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.jadira.jms.template;
017
018import java.util.ArrayList;
019import java.util.List;
020
021import javax.jms.Destination;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageConsumer;
025import javax.jms.MessageListener;
026import javax.jms.Session;
027
028import org.jadira.jms.container.BatchedMessageListenerContainer;
029import org.jadira.jms.mdp.AbstractMessageDriven;
030import org.springframework.jms.JmsException;
031import org.springframework.jms.connection.JmsResourceHolder;
032import org.springframework.jms.core.JmsTemplate;
033import org.springframework.jms.core.SessionCallback;
034import org.springframework.jms.listener.DefaultMessageListenerContainer;
035import org.springframework.jms.support.JmsUtils;
036import org.springframework.transaction.support.TransactionSynchronization;
037import org.springframework.transaction.support.TransactionSynchronizationAdapter;
038import org.springframework.transaction.support.TransactionSynchronizationManager;
039
040/**
041 * BatchedJmsTemplate customises Spring's {@link JmsTemplate} with additional methods that enable multiple items to be processed in a single transaction for the various supported operations. The
042 * additional methods are identified by the suffix 'batch'.
043 * <p>
044 * As with {@link BatchedMessageListenerContainer}, Within each transaction, the first read is a blocking read, that blocks for {@link JmsTemplate#setReceiveTimeout(long)}. Subsequent messages up to
045 * the maximum batch size {@link #setBatchSize(int)} are read as non-blocking reads, with the batch completing as soon as the queue cannot service further messages.
046 * </p>
047 * <p>
048 * Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to
049 * design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example,
050 * delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the {@link AbstractMessageDriven} POJO is also provided which provides the
051 * basic framework for implementing a {@link MessageListener} that is aligned with this contract.
052 * </p>
053 * <p>
054 * NB. Due to the design and structure of Spring's {@link DefaultMessageListenerContainer} and its superclasses, implementing this class must by necessity duplicate certain parts of
055 * {@link DefaultMessageListenerContainer}. Consequently, this class has been managed at a source code level as a derivative of {@link DefaultMessageListenerContainer} and copyright messages and
056 * attributions reflect this.
057 * </p>
058 * @author Mark Pollack and Juergen Hoeller were the original authors of the {@link JmsTemplate}. Modifications to this class to enable batching were made by Chris Pheby.
059 */
060public class BatchedJmsTemplate extends JmsTemplate {
061
062    private static final TransactionSynchronization BATCH_SYNCHRONIZATION = new BatchTransactionSynchronization();
063
064    /**
065     * Flag to indicate the start of a batch
066     */
067    private static final ThreadLocal<Boolean> IS_START_OF_BATCH = new ThreadLocal<Boolean>() {
068        protected Boolean initialValue() {
069            return Boolean.TRUE;
070        }
071    };
072
073    /**
074     * The default maximum size for a transactional batch
075     */
076    public static final int DEFAULT_BATCH_SIZE = 150;
077
078    /**
079     * The configured maximum batch size, must be at least 1
080     */
081    private int batchSize = DEFAULT_BATCH_SIZE;
082
083    /**
084     * Creates a new instance
085     */
086    public BatchedJmsTemplate() {
087    }
088
089    /**
090     * Configures the maximum number of messages that can be read in a transaction
091     * @param batchSize The maximum batch size
092     */
093    public void setBatchSize(int batchSize) {
094        this.batchSize = batchSize;
095    }
096
097    /**
098     * Get the maximum number of messages that can be read in a transaction
099     * @return The maximum batch size
100     */
101    public int getBatchSize() {
102        return batchSize;
103    }
104
105    /**
106     * Receive a batch of up to default batch size for the default destination. Other than batching this method is the same as {@link JmsTemplate#receive()}
107     * @return A list of {@link Message}
108     * @throws JmsException The {@link JmsException}
109     */
110    public List<Message> receiveBatch() throws JmsException {
111
112        return receiveBatch(batchSize);
113    }
114
115    /**
116     * Receive a batch of up to batchSize. Other than batching this method is the same as {@link JmsTemplate#receive()}
117     * @return A list of {@link Message}
118     * @param batchSize The batch size
119     * @throws JmsException The {@link JmsException}
120     */
121    public List<Message> receiveBatch(int batchSize) throws JmsException {
122
123        Destination defaultDestination = getDefaultDestination();
124
125        if (defaultDestination != null) {
126            return receiveBatch(defaultDestination, batchSize);
127        } else {
128            return receiveBatch(getRequiredDefaultDestinationName(), batchSize);
129        }
130    }
131
132    /**
133     * Receive a batch of up to default batch size for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(Destination)}
134     * @return A list of {@link Message}
135     * @param destination The Destination
136     * @throws JmsException The {@link JmsException}
137     */
138    public List<Message> receiveBatch(Destination destination) throws JmsException {
139        return receiveBatch(destination, batchSize);
140    }
141
142    /**
143     * Receive a batch of up to batchSize for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(Destination)}
144     * @return A list of {@link Message}
145     * @param destination The Destination
146     * @param batchSize The batch size
147     * @throws JmsException The {@link JmsException}
148     */
149    public List<Message> receiveBatch(final Destination destination, final int batchSize) throws JmsException {
150        return execute(new SessionCallback<List<Message>>() {
151            public List<Message> doInJms(Session session) throws JMSException {
152                return doBatchReceive(session, destination, null, batchSize);
153            }
154        }, true);
155    }
156
157    /**
158     * Receive a batch of up to default batch size for given destinationName. Other than batching this method is the same as {@link JmsTemplate#receive(String)}
159     * @return A list of {@link Message}
160     * @param destinationName The Destination name
161     * @throws JmsException The {@link JmsException}
162     */
163    public List<Message> receiveBatch(String destinationName) throws JmsException {
164        return receiveBatch(destinationName, getBatchSize());
165    }
166
167    /**
168     * Receive a batch of up to default batch size for given destination. Other than batching this method is the same as {@link JmsTemplate#receive(String)}
169     * @return A list of {@link Message}
170     * @param destinationName The Destination
171     * @param batchSize The batch size
172     * @throws JmsException The {@link JmsException}
173     */
174    public List<Message> receiveBatch(final String destinationName, final int batchSize) throws JmsException {
175        return execute(new SessionCallback<List<Message>>() {
176            public List<Message> doInJms(Session session) throws JMSException {
177                Destination destination = resolveDestinationName(session, destinationName);
178                return doBatchReceive(session, destination, null, batchSize);
179            }
180        }, true);
181    }
182
183    /**
184     * Receive a batch of up to default batch size for default destination and given message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String)}
185     * @return A list of {@link Message}
186     * @param messageSelector The Selector
187     * @throws JmsException The {@link JmsException}
188     */
189    public List<Message> receiveSelectedBatch(String messageSelector) throws JmsException {
190        return receiveSelectedBatch(messageSelector, getBatchSize());
191    }
192
193    /**
194     * Receive a batch of up to batchSize for default destination and given message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String)}
195     * @return A list of {@link Message}
196     * @param messageSelector The Selector
197     * @param batchSize The batch size
198     * @throws JmsException The {@link JmsException}
199     */
200    public List<Message> receiveSelectedBatch(String messageSelector, int batchSize) throws JmsException {
201        Destination defaultDestination = getDefaultDestination();
202        if (defaultDestination != null) {
203            return receiveSelectedBatch(defaultDestination, messageSelector, batchSize);
204        } else {
205            return receiveSelectedBatch(getRequiredDefaultDestinationName(), messageSelector, batchSize);
206        }
207    }
208
209    /**
210     * Receive a batch of up to default batch size for given destination and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(Destination, String)}
211     * @return A list of {@link Message}
212     * @param destination The Destination
213     * @param messageSelector The Selector
214     * @throws JmsException The {@link JmsException}
215     */
216    public List<Message> receiveSelectedBatch(Destination destination, String messageSelector) throws JmsException {
217        return receiveSelectedBatch(destination, messageSelector, getBatchSize());
218    }
219
220    /**
221     * Receive a batch of up to batchSize for given destination and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(Destination, String)}
222     * @return A list of {@link Message}
223     * @param destination The Destination
224     * @param messageSelector The Selector
225     * @param batchSize The batch size
226     * @throws JmsException The {@link JmsException}
227     */
228    public List<Message> receiveSelectedBatch(final Destination destination, final String messageSelector,
229            final int batchSize) throws JmsException {
230        return execute(new SessionCallback<List<Message>>() {
231            public List<Message> doInJms(Session session) throws JMSException {
232
233                return doBatchReceive(session, destination, messageSelector, batchSize);
234            }
235        }, true);
236    }
237
238    /**
239     * Receive a batch of up to default batch size for given destination name and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String, String)}
240     * @return A list of {@link Message}
241     * @param destinationName The destination name
242     * @param messageSelector The Selector
243     * @throws JmsException The {@link JmsException}
244     */
245    public List<Message> receiveSelectedBatch(String destinationName, String messageSelector) throws JmsException {
246        return receiveSelectedBatch(destinationName, messageSelector, getBatchSize());
247    }
248
249    /**
250     * Receive a batch of up to batchSize for given destination name and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(String, String)}
251     * @return A list of {@link Message}
252     * @param destinationName The destination name
253     * @param messageSelector The Selector
254     * @param batchSize The batch size
255     * @throws JmsException The {@link JmsException}
256     */
257    public List<Message> receiveSelectedBatch(final String destinationName, final String messageSelector,
258            final int batchSize) throws JmsException {
259        return execute(new SessionCallback<List<Message>>() {
260            public List<Message> doInJms(Session session) throws JMSException {
261                Destination destination = resolveDestinationName(session, destinationName);
262                return doBatchReceive(session, destination, messageSelector, batchSize);
263            }
264        }, true);
265    }
266
267    /**
268     * Receive a batch of up to default batch size for default destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert()}
269     * @return A list of {@link Message}
270     * @throws JmsException The {@link JmsException}
271     */
272    public List<Object> receiveAndConvertBatch() throws JmsException {
273        return receiveAndConvertBatch(getBatchSize());
274    }
275
276    /**
277     * Receive a batch of up to batchSize for default destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert()}
278     * @return A list of {@link Message}
279     * @param batchSize The batch size
280     * @throws JmsException The {@link JmsException}
281     */
282    public List<Object> receiveAndConvertBatch(int batchSize) throws JmsException {
283        List<Message> messages = receiveBatch(batchSize);
284        List<Object> result = new ArrayList<Object>(messages.size());
285        for (Message next : messages) {
286            result.add(doConvertFromMessage(next));
287        }
288        return result;
289    }
290
291    /**
292     * Receive a batch of up to default batch size for the given Destination and convert each message in the batch. Other than batching this method is the same as
293     * {@link JmsTemplate#receiveAndConvert(Destination)}
294     * @return A list of {@link Message}
295     * @param destination The Destination
296     * @throws JmsException The {@link JmsException}
297     */
298    public List<Object> receiveAndConvertBatch(Destination destination) throws JmsException {
299        return receiveAndConvertBatch(destination, getBatchSize());
300    }
301
302    /**
303     * Receive a batch of up to batchSize for given Destination and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert(Destination)}
304     * @return A list of {@link Message}
305     * @param destination The Destination
306     * @param batchSize The batch size
307     * @throws JmsException The {@link JmsException}
308     */
309    public List<Object> receiveAndConvertBatch(Destination destination, int batchSize) throws JmsException {
310        List<Message> messages = receiveBatch(destination, batchSize);
311        List<Object> result = new ArrayList<Object>(messages.size());
312        for (Message next : messages) {
313            result.add(doConvertFromMessage(next));
314        }
315        return result;
316    }
317
318    /**
319     * Receive a batch of up to default batch size for given destination name and convert each message in the batch. Other than batching this method is the same as
320     * {@link JmsTemplate#receiveAndConvert(String)}
321     * @return A list of {@link Message}
322     * @param destinationName The destination name
323     * @throws JmsException The {@link JmsException}
324     */
325    public List<Object> receiveAndConvertBatch(String destinationName) throws JmsException {
326        return receiveAndConvertBatch(destinationName, getBatchSize());
327    }
328
329    /**
330     * Receive a batch of up to batchSize for given destination name and convert each message in the batch. Other than batching this method is the same as {@link JmsTemplate#receiveAndConvert(String)}
331     * @return A list of {@link Message}
332     * @param destinationName The destination name
333     * @param batchSize The batch size
334     * @throws JmsException The {@link JmsException}
335     */
336    public List<Object> receiveAndConvertBatch(String destinationName, int batchSize) throws JmsException {
337        List<Message> messages = receiveBatch(destinationName, batchSize);
338        List<Object> result = new ArrayList<Object>(messages.size());
339        for (Message next : messages) {
340            result.add(doConvertFromMessage(next));
341        }
342        return result;
343    }
344
345    /**
346     * Receive a batch of up to default batch size for default destination and message selector and convert each message in the batch. Other than batching this method is the same as
347     * {@link JmsTemplate#receiveSelectedAndConvert(String)}
348     * @return A list of {@link Message}
349     * @param messageSelector The Selector
350     * @throws JmsException The {@link JmsException}
351     */
352    public List<Object> receiveSelectedAndConvertBatch(String messageSelector) throws JmsException {
353        return receiveSelectedAndConvertBatch(messageSelector, getBatchSize());
354    }
355
356    /**
357     * Receive a batch of up to batchSize for default destination and message selector and convert each message in the batch. Other than batching this method is the same as
358     * {@link JmsTemplate#receiveSelectedAndConvert(String)}
359     * @return A list of {@link Message}
360     * @param messageSelector The Selector
361     * @param batchSize The batch size
362     * @throws JmsException The {@link JmsException}
363     */
364    public List<Object> receiveSelectedAndConvertBatch(String messageSelector, int batchSize) throws JmsException {
365        List<Message> messages = receiveSelectedBatch(messageSelector, batchSize);
366        List<Object> result = new ArrayList<Object>(messages.size());
367        for (Message next : messages) {
368            result.add(doConvertFromMessage(next));
369        }
370        return result;
371    }
372
373    /**
374     * Receive a batch of up to default batch size for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as
375     * {@link JmsTemplate#receiveSelectedAndConvert(Destination, String)}
376     * @return A list of {@link Message}
377     * @param destination The Destination
378     * @param messageSelector The Selector
379     * @throws JmsException The {@link JmsException}
380     */
381    public List<Object> receiveSelectedAndConvertBatch(Destination destination, String messageSelector)
382            throws JmsException {
383        return receiveSelectedAndConvertBatch(destination, messageSelector, getBatchSize());
384    }
385
386    /**
387     * Receive a batch of up to batchSize for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as
388     * {@link JmsTemplate#receiveSelectedAndConvert(Destination, String)}
389     * @return A list of {@link Message}
390     * @param destination The Destination
391     * @param messageSelector The Selector
392     * @param batchSize The batch size
393     * @throws JmsException The {@link JmsException}
394     */
395    public List<Object> receiveSelectedAndConvertBatch(Destination destination, String messageSelector, int batchSize)
396            throws JmsException {
397        List<Message> messages = receiveSelectedBatch(destination, messageSelector, batchSize);
398        List<Object> result = new ArrayList<Object>(messages.size());
399        for (Message next : messages) {
400            result.add(doConvertFromMessage(next));
401        }
402        return result;
403    }
404
405    /**
406     * Receive a batch of up to default batch size for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as
407     * {@link JmsTemplate#receiveSelectedAndConvert(String, String)}
408     * @return A list of {@link Message}
409     * @param destinationName The destination name
410     * @param messageSelector The Selector
411     * @throws JmsException The {@link JmsException}
412     */
413    public List<Object> receiveSelectedAndConvertBatch(String destinationName, String messageSelector)
414            throws JmsException {
415        return receiveSelectedAndConvertBatch(destinationName, messageSelector, getBatchSize());
416    }
417
418    /**
419     * Receive a batch of up to batchSize for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as
420     * {@link JmsTemplate#receiveSelectedAndConvert(String, String)}
421     * @return A list of {@link Message}
422     * @param destinationName The destination name
423     * @param messageSelector The Selector
424     * @param batchSize The batch size
425     * @throws JmsException The {@link JmsException}
426     */
427    public List<Object> receiveSelectedAndConvertBatch(String destinationName, String messageSelector, int batchSize)
428            throws JmsException {
429        List<Message> messages = receiveSelectedBatch(destinationName, batchSize);
430        List<Object> result = new ArrayList<Object>(messages.size());
431        for (Message next : messages) {
432            result.add(doConvertFromMessage(next));
433        }
434        return result;
435    }
436
437    /**
438     * {@inheritDoc}
439     */
440    @Override
441    public Message receive() throws JmsException {
442        Destination defaultDestination = getDefaultDestination();
443        if (defaultDestination != null) {
444            return receive(defaultDestination);
445        } else {
446            return receive(getRequiredDefaultDestinationName());
447        }
448    }
449
450    /**
451     * {@inheritDoc}
452     */
453    @Override
454    public Message receive(final Destination destination) throws JmsException {
455        return execute(new SessionCallback<Message>() {
456            public Message doInJms(Session session) throws JMSException {
457                return doSingleReceive(session, destination, null);
458            }
459        }, true);
460    }
461
462    /**
463     * {@inheritDoc}
464     */
465    @Override
466    public Message receive(final String destinationName) throws JmsException {
467        return execute(new SessionCallback<Message>() {
468            public Message doInJms(Session session) throws JMSException {
469                Destination destination = resolveDestinationName(session, destinationName);
470                return doSingleReceive(session, destination, null);
471            }
472        }, true);
473    }
474
475    /**
476     * {@inheritDoc}
477     */
478    @Override
479    public Message receiveSelected(String messageSelector) throws JmsException {
480        Destination defaultDestination = getDefaultDestination();
481        if (defaultDestination != null) {
482            return receiveSelected(defaultDestination, messageSelector);
483        } else {
484            return receiveSelected(getRequiredDefaultDestinationName(), messageSelector);
485        }
486    }
487
488    /**
489     * {@inheritDoc}
490     */
491    @Override
492    public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException {
493        return execute(new SessionCallback<Message>() {
494            public Message doInJms(Session session) throws JMSException {
495
496                return doSingleReceive(session, destination, messageSelector);
497            }
498        }, true);
499    }
500
501    /**
502     * {@inheritDoc}
503     */
504    @Override
505    public Message receiveSelected(final String destinationName, final String messageSelector) throws JmsException {
506        return execute(new SessionCallback<Message>() {
507            public Message doInJms(Session session) throws JMSException {
508                Destination destination = resolveDestinationName(session, destinationName);
509                return doSingleReceive(session, destination, messageSelector);
510            }
511        }, true);
512    }
513
514    /**
515     * Method replicates the simple logic from JmsTemplate#getRequiredDefaultDestinationName which is private and therefore cannot be accessed from this class
516     * @return The default destination name
517     * @throws IllegalStateException Example if no destination or destination name is specified
518     */
519    private String getRequiredDefaultDestinationName() throws IllegalStateException {
520        String name = getDefaultDestinationName();
521        if (name == null) {
522            throw new IllegalStateException(
523                    "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate.");
524        }
525        return name;
526    }
527
528    /**
529     * Method replicates the simple logic from JmsTemplate#doReceive(MessageConsumer, long) which is private and therefore cannot be accessed from this class
530     * @param consumer The consumer to use
531     * @param timeout The timeout to apply
532     * @return A Message
533     * @throws JMSException Indicates an error occurred
534     */
535    private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
536
537        if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
538            return consumer.receiveNoWait();
539        } else if (timeout > 0) {
540            return consumer.receive(timeout);
541        } else {
542            return consumer.receive();
543        }
544    }
545
546    protected List<Message> doBatchReceive(Session session, Destination destination, String messageSelector,
547            int batchSize) throws JMSException {
548        return doBatchReceive(session, createConsumer(session, destination, messageSelector), batchSize);
549    }
550
551    protected List<Message> doBatchReceive(Session session, MessageConsumer consumer, int batchSize)
552            throws JMSException {
553
554        try {
555            final List<Message> result;
556            long timeout = determineTimeout();
557
558            Message message = doReceive(consumer, timeout);
559
560            if (message == null) {
561                result = new ArrayList<Message>(0);
562            } else {
563                result = new ArrayList<Message>(batchSize);
564                result.add(message);
565                for (int i = 1; i < batchSize; i++) {
566                    message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT);
567                    if (message == null) {
568                        break;
569                    }
570                    result.add(message);
571                }
572            }
573
574            if (session.getTransacted()) {
575                if (isSessionLocallyTransacted(session)) {
576                    JmsUtils.commitIfNecessary(session);
577                }
578            } else if (isClientAcknowledge(session)) {
579                if (message != null) {
580                    message.acknowledge();
581                }
582            }
583            return result;
584        } finally {
585            JmsUtils.closeMessageConsumer(consumer);
586        }
587    }
588
589    protected Message doSingleReceive(Session session, Destination destination, String messageSelector)
590            throws JMSException {
591        return doSingleReceive(session, createConsumer(session, destination, messageSelector));
592    }
593
594    protected Message doSingleReceive(Session session, MessageConsumer consumer) throws JMSException {
595
596        if (!session.getTransacted() || isSessionLocallyTransacted(session)) {
597            // If we are not using JTA we should use standard JmsTemplate behaviour
598            return super.doReceive(session, consumer);
599        }
600
601        // Otherwise batching - the batch can span multiple receive() calls, until you commit the
602        // batch
603        try {
604            final Message message;
605            if (Boolean.TRUE.equals(IS_START_OF_BATCH.get())) {
606                // Register Synchronization
607                TransactionSynchronizationManager.registerSynchronization(BATCH_SYNCHRONIZATION);
608
609                // Use transaction timeout (if available).
610                long timeout = determineTimeout();
611
612                message = doReceive(consumer, timeout);
613                IS_START_OF_BATCH.set(Boolean.FALSE);
614            } else {
615                message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT);
616            }
617
618            if (isClientAcknowledge(session)) {
619                // Manually acknowledge message, if any.
620                if (message != null) {
621                    message.acknowledge();
622                }
623            }
624            return message;
625        } finally {
626            JmsUtils.closeMessageConsumer(consumer);
627        }
628    }
629
630/**
631         * Determines receive timeout, using logic equivalent to that of {@link JmsTemplate#doReceive(Session, MessageConsumer) 
632         * @return The timeout determined
633         */
634    private long determineTimeout() {
635
636        long timeout = getReceiveTimeout();
637
638        JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
639                .getResource(getConnectionFactory());
640        if (resourceHolder != null && resourceHolder.hasTimeout()) {
641            timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
642        }
643        return timeout;
644    }
645
646    /**
647     * A simple TransactionSynchronization implementation that resets the batch indicator so that the next read begins a new batch
648     */
649    private static class BatchTransactionSynchronization extends TransactionSynchronizationAdapter {
650
651        @Override
652        public void afterCompletion(int status) {
653            IS_START_OF_BATCH.set(Boolean.TRUE);
654        }
655    }
656}