001/*
002 *  Copyright 2013 Chris Pheby
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.jadira.lang.io.buffered;
017
018import java.io.FilterInputStream;
019import java.io.IOException;
020import java.io.InputStream;
021
022/**
023 * Provides the basic structure for implementing BufferedInputStreams. Subclasses typically differ in terms of how they provide the buffer.
024 * Some of the functionality needs to be effectively reproduced from BufferedInputStream - such methods reference the equivalents in that class.
025 */
026public abstract class AbstractBufferedInputStream extends FilterInputStream {
027
028    /** The default buffer size in bytes */
029    public static int DEFAULT_BUFFER_SIZE = 8192;
030
031    private int count;
032    private int pos;
033
034    private int markpos = -1;
035    private int marklimit;
036
037    private boolean isRestartable = true;
038
039    protected abstract void assertBufferOpen() throws IOException;
040
041    /**
042     * Create a new instance with the default buffer size
043     * @param in InputStream to be wrapped
044     * @see java.io.BufferedInputStream#BufferedInputStream(InputStream)
045     */
046    public AbstractBufferedInputStream(InputStream in) {
047        this(in, DEFAULT_BUFFER_SIZE);
048    }
049
050    /**
051     * Creates a new instance with the given buffer size in bytes.
052     * @param in InputStream to be wrapped
053     * @param size The size of the buffer in bytes
054     * @see java.io.BufferedInputStream#BufferedInputStream(InputStream, int)
055     */
056    public AbstractBufferedInputStream(InputStream in, int size) {
057        super(in);
058        if (size <= 0) {
059            throw new IllegalArgumentException("Buffer size may not be less than or equal to zero");
060        }
061    }
062
063    private void fill() throws IOException {
064
065        if (markpos < 0) {
066
067            // No mark exists, so throw away the buffer
068            pos = 0;
069
070        } else if (pos >= limit()) {
071
072            // Buffer exhausted
073            if (markpos > 0 && !isRestartable) {
074
075                // Throw away unnecessary early bit of buffer
076                int sz = pos - markpos;
077                compact(markpos, sz);
078                pos = sz;
079                markpos = 0;
080
081            } else if (limit() >= marklimit) {
082
083                // Buffer became too big so invalidate the mark and drop the buffer contents
084                markpos = -1;
085                pos = 0;
086
087            } else {
088                
089                // Grow buffer
090                int newSize = pos * 2;
091
092                if (newSize > marklimit) {
093                    newSize = marklimit;
094                }
095
096                resizeBuffer(pos, newSize);
097            }
098        }
099
100        count = pos;
101
102        int n = getInputStreamIfOpen().read(toArray(), pos, limit() - pos);
103
104        if (n > 0) {
105            count = n + pos;
106        }
107    }
108
109    private InputStream getInputStreamIfOpen() throws IOException {
110
111        InputStream input = in;
112
113        if (input == null) {
114            throw new IOException("Stream closed");
115        }
116
117        return input;
118    }
119
120    /**
121     * @see java.io.BufferedInputStream#read()
122     */
123    public int read() throws IOException {
124
125        if (pos >= count) {
126
127            fill();
128            if (pos >= count) {
129                return -1;
130            }
131
132        }
133
134        return getInt(pos++) & 0xff;
135    }
136
137    private int read1(byte[] b, int offset, int length) throws IOException {
138
139        int avail = count - pos;
140
141        if (avail <= 0) {
142
143            if (length >= limit() && markpos < 0) {
144                return getInputStreamIfOpen().read(b, offset, length);
145            }
146
147            fill();
148
149            avail = count - pos;
150
151            if (avail <= 0) {
152                return -1;
153            }
154
155        }
156
157        int countSize = (avail < length) ? avail : length;
158
159        if (offset == 0) {
160
161            get(b, pos, countSize);
162
163        } else {
164
165            byte[] bc = new byte[countSize];
166            get(bc, pos, countSize);
167            System.arraycopy(bc, 0, b, offset, countSize);
168
169        }
170
171        pos += countSize;
172        return countSize;
173    }
174
175    /**
176     * @see java.io.BufferedInputStream#read(byte[], int, int)
177     */
178    public int read(byte b[], int off, int len) throws IOException {
179
180        assertBufferOpen(); // Cause exception if closed
181
182        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
183            throw new IndexOutOfBoundsException();
184        } else if (len == 0) {
185            return 0;
186        }
187
188        int n = 0;
189
190        while (true) {
191
192            int numberRead = read1(b, off + n, len - n);
193
194            if (numberRead <= 0) {
195                return (n == 0) ? numberRead : n;
196            }
197
198            n += numberRead;
199
200            if (n >= len) {
201                return n;
202            }
203
204            // In case of no bytes available, but not closed, return
205            InputStream input = in;
206            if (input != null && input.available() <= 0) {
207                return n;
208            }
209        }
210    }
211
212    /**
213     * @see java.io.BufferedInputStream#skip(long)
214     */
215    public long skip(long n) throws IOException {
216
217        assertBufferOpen(); // Cause exception if closed
218        if (n <= 0) {
219            return 0;
220        }
221
222        long available = count - pos;
223
224        if (available <= 0) {
225
226            if (markpos < 0) {
227                return getInputStreamIfOpen().skip(n);
228            }
229
230            fill();
231            available = count - pos;
232            if (available <= 0) {
233                return 0;
234            }
235        }
236
237        long skipped = (available < n) ? available : n;
238        pos += skipped;
239
240        return skipped;
241    }
242
243    /**
244     * @see java.io.BufferedInputStream#available()
245     */
246    public int available() throws IOException {
247
248        int n = count - pos;
249        int avail = getInputStreamIfOpen().available();
250
251        return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
252    }
253
254    /**
255     * @see java.io.BufferedInputStream#mark(int)
256     */
257    public void mark(int readlimit) {
258        marklimit = readlimit;
259        markpos = pos;
260    }
261
262    /**
263     * @see java.io.BufferedInputStream#reset()
264     */
265    public void reset() throws IOException {
266
267        assertBufferOpen(); // Cause exception if closed
268
269        if (markpos < 0) {
270            throw new IOException("Resetting to invalid mark");
271        }
272
273        pos = markpos;
274    }
275
276    /**
277     * @see java.io.BufferedInputStream#markSupported()
278     */
279    public boolean markSupported() {
280        return true;
281    }
282
283    /**
284     * @see java.io.BufferedInputStream#close()
285     */
286    public void close() throws IOException {
287
288        try {
289            if (in != null) {
290                in.close();
291            }
292        } finally {
293            clean();
294        }
295    }
296
297    /**
298     * Indicates whether the stream can be restarted
299     * @return true if the stream can be restarted
300     */
301    public boolean isRestartable() {
302        return isRestartable;
303    }
304
305    /**
306     * Return the stream back to its start position
307     * @throws IOException Indicates a problem occurred (e.g. the stream was closed)
308     */
309    public synchronized void restart() throws IOException {
310
311        if (!isRestartable()) {
312            throw new IllegalStateException("Cannot restart as current buffer is not restartable");
313        }
314        assertBufferOpen(); // Cause exception if closed
315        pos = 0;
316    }
317
318    /**
319     * Compact the buffer reducing it to the given size, offset from the given mark position
320     * @param markpos The mark position to use as the start of the buffer
321     * @param size Required buffer size
322     */
323    protected abstract void compact(int markpos, int size);
324
325    /**
326     * Return the bytes in the buffer as a byte array
327     * @return The bytes
328     */
329    protected abstract byte[] toArray();
330
331    /**
332     * Enlarge the buffer to the new size, retaining the position
333     * @param position The position
334     * @param newSize The new size
335     * @throws IOException Indicates a problem resizing
336     */
337    protected abstract void resizeBuffer(int position, int newSize) throws IOException;
338
339    /**
340     * Returns the buffer's limit
341     * @return The limit
342     */
343    protected abstract int limit();
344
345    /**
346     * Reads four bytes (an int) from the given offset
347     * @param position The offset
348     * @return An int
349     * @throws IOException Indicates a problem reading the int
350     */
351    protected abstract int getInt(int position) throws IOException;
352
353    /**
354     * Retrieve the given number of bytes from the stream
355     * @param b The array into which bytes will be written
356     * @param pos The offset within the array of the first byte to be written
357     * @param cnt The maximum number of bytes to be written to the given array
358     */
359    protected abstract void get(byte[] b, int pos, int cnt);
360
361    /**
362     * Performs any necessary cleaning of the buffer and releasing of resources.
363     */
364    protected abstract void clean();
365
366    protected int getMarkLimit() {
367        return marklimit;
368    }
369}