001/*
002 *  Copyright 2010, 2013, 2014 Chris Pheby
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.jadira.jms.mdp;
017
018import java.io.ByteArrayOutputStream;
019import java.io.PrintWriter;
020import java.io.StringWriter;
021import java.util.Enumeration;
022import java.util.HashMap;
023import java.util.Map;
024
025import javax.jms.BytesMessage;
026import javax.jms.JMSException;
027import javax.jms.MapMessage;
028import javax.jms.Message;
029import javax.jms.MessageFormatException;
030import javax.jms.ObjectMessage;
031import javax.jms.Session;
032import javax.jms.StreamMessage;
033import javax.jms.TextMessage;
034
035import org.springframework.jms.core.MessageCreator;
036
037/**
038 * A MessageCreator that takes a given Message and related Exeption. This class copies the source message and enriches the copy with information about the original headers and properties as well as
039 * the causing exception Priority and CorrelationID are preserved on the error message.
040 */
041public class FatalJmsExceptionMessageCreator implements MessageCreator {
042
043    private static final String EXCEPTION_MESSAGE_PROPERTY = "cause_exceptionMessage";
044    private static final String EXCEPTION_STACKTRACE_PROPERTY = "cause_exceptionStackTrace";
045
046    private static final String ORIGINAL_DELIVERY_MODE = "original_JMSDeliveryMode";
047    private static final String ORIGINAL_EXPIRATION = "original_JMSExpiration";
048    private static final String ORIGINAL_MESSAGE_ID = "original_JMSMessageID";
049    private static final String ORIGINAL_REPLY_TO = "original_JMSReplyTo";
050    private static final String ORIGINAL_REDELIVERED = "original_JMSRedelivered";
051    private static final String ORIGINAL_CORRELATIONID = "original_JMSCorrelationID";
052    private static final String ORIGINAL_PRIORITY = "original_JMSPriority";
053
054    private static final int BUFFER_CAPACITY_BYTES = 4096;
055
056    private final Message message;
057
058    private final Exception e;
059
060    public FatalJmsExceptionMessageCreator(Message message, Exception e) {
061        this.message = message;
062        this.e = e;
063    }
064
065    @Override
066    public Message createMessage(Session session) throws JMSException {
067
068        final Message copyMessage = copyMessage(session, message);
069        enrichMessage(copyMessage, message);
070        return copyMessage;
071    }
072
073    protected void enrichMessage(Message copyMessage, Message originalMessage) throws JMSException {
074
075        Map<String, Object> messageProps = buildErrorMessageProperties(originalMessage);
076
077        applyMessageProperties(copyMessage, messageProps);
078        applyMessageHeaders(copyMessage, originalMessage);
079        applyErrorDetails(copyMessage, e);
080    }
081
082    protected Map<String, Object> buildErrorMessageProperties(Message msg) throws JMSException {
083
084        Map<String, Object> properties = new HashMap<String, Object>();
085
086        @SuppressWarnings("unchecked")
087        Enumeration<String> srcProperties = msg.getPropertyNames();
088
089        while (srcProperties.hasMoreElements()) {
090            String propertyName = srcProperties.nextElement();
091            properties.put("original_" + propertyName, msg.getObjectProperty(propertyName));
092        }
093
094        properties.put(ORIGINAL_DELIVERY_MODE, msg.getJMSDeliveryMode());
095        properties.put(ORIGINAL_EXPIRATION, msg.getJMSExpiration());
096        properties.put(ORIGINAL_MESSAGE_ID, msg.getJMSMessageID());
097        properties.put(ORIGINAL_REPLY_TO, msg.getJMSReplyTo());
098        properties.put(ORIGINAL_REDELIVERED, msg.getJMSRedelivered());
099        properties.put(ORIGINAL_CORRELATIONID, msg.getJMSCorrelationID());
100        properties.put(ORIGINAL_PRIORITY, msg.getJMSPriority());
101
102        return properties;
103    }
104
105    private void applyMessageProperties(Message destinationMessage, Map<String, Object> properties) throws JMSException {
106
107        if (properties == null) {
108            return;
109        }
110
111        for (Map.Entry<String, Object> entry : properties.entrySet()) {
112            destinationMessage.setObjectProperty(entry.getKey(), entry.getValue());
113        }
114    }
115
116    protected void applyMessageHeaders(Message destinationMessage, Message sourceMessage) {
117
118        try {
119            destinationMessage.setJMSCorrelationIDAsBytes(sourceMessage.getJMSCorrelationIDAsBytes());
120        } catch (JMSException e) {
121        }
122        try {
123            destinationMessage.setJMSPriority(sourceMessage.getJMSPriority());
124        } catch (JMSException e) {
125        }
126    }
127
128    protected void applyErrorDetails(final Message destinationMessage, final Exception exception) throws JMSException {
129
130        destinationMessage.setStringProperty(EXCEPTION_MESSAGE_PROPERTY, exception.getMessage());
131
132        StringWriter stackTraceWriter = new StringWriter();
133        exception.printStackTrace(new PrintWriter(stackTraceWriter));
134
135        destinationMessage.setStringProperty(EXCEPTION_STACKTRACE_PROPERTY, stackTraceWriter.toString());
136    }
137
138    private static Message copyMessage(Session session, Message originalMessage) throws JMSException {
139
140        Message copyMessage;
141
142        if (originalMessage instanceof BytesMessage) {
143
144            final BytesMessage theMessage = session.createBytesMessage();
145            final byte[] bytes = extractByteArrayFromMessage((BytesMessage) originalMessage);
146            theMessage.writeBytes(bytes);
147
148            copyMessage = theMessage;
149        } else if (originalMessage instanceof StreamMessage) {
150
151            final StreamMessage theMessage = session.createStreamMessage();
152            final byte[] bytes = extractByteArrayFromMessage((StreamMessage) originalMessage);
153            theMessage.writeBytes(bytes);
154
155            copyMessage = theMessage;
156        } else if (originalMessage instanceof ObjectMessage) {
157
158            copyMessage = session.createObjectMessage(((ObjectMessage) originalMessage).getObject());
159        } else if (originalMessage instanceof TextMessage) {
160
161            copyMessage = session.createTextMessage(((TextMessage) originalMessage).getText());
162        } else if (originalMessage instanceof MapMessage) {
163
164            MapMessage theMessage = session.createMapMessage();
165
166            @SuppressWarnings("unchecked")
167            Enumeration<String> keys = ((MapMessage) originalMessage).getMapNames();
168            while (keys.hasMoreElements()) {
169                String next = keys.nextElement();
170                theMessage.setObject(next, ((MapMessage) originalMessage).getObject(next));
171            }
172
173            copyMessage = theMessage;
174        } else {
175            throw new MessageFormatException("Unexpected Message Type received, was: " + originalMessage.getClass());
176        }
177
178        return copyMessage;
179    }
180
181    private static byte[] extractByteArrayFromMessage(BytesMessage message) throws JMSException {
182
183        ByteArrayOutputStream oStream = new ByteArrayOutputStream(BUFFER_CAPACITY_BYTES);
184
185        byte[] buffer = new byte[BUFFER_CAPACITY_BYTES];
186
187        int bufferCount = -1;
188
189        while ((bufferCount = message.readBytes(buffer)) >= 0) {
190            oStream.write(buffer, 0, bufferCount);
191            if (bufferCount < BUFFER_CAPACITY_BYTES) {
192                break;
193            }
194        }
195
196        return oStream.toByteArray();
197    }
198
199    private static byte[] extractByteArrayFromMessage(StreamMessage message) throws JMSException {
200
201        ByteArrayOutputStream oStream = new ByteArrayOutputStream(BUFFER_CAPACITY_BYTES);
202
203        byte[] buffer = new byte[BUFFER_CAPACITY_BYTES];
204
205        int bufferCount = -1;
206
207        while ((bufferCount = message.readBytes(buffer)) >= 0) {
208            oStream.write(buffer, 0, bufferCount);
209            if (bufferCount < BUFFER_CAPACITY_BYTES) {
210                break;
211            }
212        }
213
214        return oStream.toByteArray();
215    }
216}