| /** |
| * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.jivesoftware.smackx.bytestreams.ibb; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.SocketTimeoutException; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.jivesoftware.smack.Connection; |
| import org.jivesoftware.smack.PacketListener; |
| import org.jivesoftware.smack.XMPPException; |
| import org.jivesoftware.smack.filter.AndFilter; |
| import org.jivesoftware.smack.filter.PacketFilter; |
| import org.jivesoftware.smack.filter.PacketTypeFilter; |
| import org.jivesoftware.smack.packet.IQ; |
| import org.jivesoftware.smack.packet.Message; |
| import org.jivesoftware.smack.packet.Packet; |
| import org.jivesoftware.smack.packet.PacketExtension; |
| import org.jivesoftware.smack.packet.XMPPError; |
| import org.jivesoftware.smack.util.StringUtils; |
| import org.jivesoftware.smack.util.SyncPacketSend; |
| import org.jivesoftware.smackx.bytestreams.BytestreamSession; |
| import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; |
| import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; |
| import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; |
| import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; |
| |
| /** |
| * InBandBytestreamSession class represents an In-Band Bytestream session. |
| * <p> |
| * In-band bytestreams are bidirectional and this session encapsulates the streams for both |
| * directions. |
| * <p> |
| * Note that closing the In-Band Bytestream session will close both streams. If both streams are |
| * closed individually the session will be closed automatically once the second stream is closed. |
| * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed |
| * automatically if one of them is closed. |
| * |
| * @author Henning Staib |
| */ |
| public class InBandBytestreamSession implements BytestreamSession { |
| |
| /* XMPP connection */ |
| private final Connection connection; |
| |
| /* the In-Band Bytestream open request for this session */ |
| private final Open byteStreamRequest; |
| |
| /* |
| * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) |
| */ |
| private IBBInputStream inputStream; |
| |
| /* |
| * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) |
| */ |
| private IBBOutputStream outputStream; |
| |
| /* JID of the remote peer */ |
| private String remoteJID; |
| |
| /* flag to close both streams if one of them is closed */ |
| private boolean closeBothStreamsEnabled = false; |
| |
| /* flag to indicate if session is closed */ |
| private boolean isClosed = false; |
| |
| /** |
| * Constructor. |
| * |
| * @param connection the XMPP connection |
| * @param byteStreamRequest the In-Band Bytestream open request for this session |
| * @param remoteJID JID of the remote peer |
| */ |
| protected InBandBytestreamSession(Connection connection, Open byteStreamRequest, |
| String remoteJID) { |
| this.connection = connection; |
| this.byteStreamRequest = byteStreamRequest; |
| this.remoteJID = remoteJID; |
| |
| // initialize streams dependent to the uses stanza type |
| switch (byteStreamRequest.getStanza()) { |
| case IQ: |
| this.inputStream = new IQIBBInputStream(); |
| this.outputStream = new IQIBBOutputStream(); |
| break; |
| case MESSAGE: |
| this.inputStream = new MessageIBBInputStream(); |
| this.outputStream = new MessageIBBOutputStream(); |
| break; |
| } |
| |
| } |
| |
| public InputStream getInputStream() { |
| return this.inputStream; |
| } |
| |
| public OutputStream getOutputStream() { |
| return this.outputStream; |
| } |
| |
| public int getReadTimeout() { |
| return this.inputStream.readTimeout; |
| } |
| |
| public void setReadTimeout(int timeout) { |
| if (timeout < 0) { |
| throw new IllegalArgumentException("Timeout must be >= 0"); |
| } |
| this.inputStream.readTimeout = timeout; |
| } |
| |
| /** |
| * Returns whether both streams should be closed automatically if one of the streams is closed. |
| * Default is <code>false</code>. |
| * |
| * @return <code>true</code> if both streams will be closed if one of the streams is closed, |
| * <code>false</code> if both streams can be closed independently. |
| */ |
| public boolean isCloseBothStreamsEnabled() { |
| return closeBothStreamsEnabled; |
| } |
| |
| /** |
| * Sets whether both streams should be closed automatically if one of the streams is closed. |
| * Default is <code>false</code>. |
| * |
| * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of |
| * the streams is closed, <code>false</code> if both streams should be closed |
| * independently |
| */ |
| public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { |
| this.closeBothStreamsEnabled = closeBothStreamsEnabled; |
| } |
| |
| public void close() throws IOException { |
| closeByLocal(true); // close input stream |
| closeByLocal(false); // close output stream |
| } |
| |
| /** |
| * This method is invoked if a request to close the In-Band Bytestream has been received. |
| * |
| * @param closeRequest the close request from the remote peer |
| */ |
| protected void closeByPeer(Close closeRequest) { |
| |
| /* |
| * close streams without flushing them, because stream is already considered closed on the |
| * remote peers side |
| */ |
| this.inputStream.closeInternal(); |
| this.inputStream.cleanup(); |
| this.outputStream.closeInternal(false); |
| |
| // acknowledge close request |
| IQ confirmClose = IQ.createResultIQ(closeRequest); |
| this.connection.sendPacket(confirmClose); |
| |
| } |
| |
| /** |
| * This method is invoked if one of the streams has been closed locally, if an error occurred |
| * locally or if the whole session should be closed. |
| * |
| * @throws IOException if an error occurs while sending the close request |
| */ |
| protected synchronized void closeByLocal(boolean in) throws IOException { |
| if (this.isClosed) { |
| return; |
| } |
| |
| if (this.closeBothStreamsEnabled) { |
| this.inputStream.closeInternal(); |
| this.outputStream.closeInternal(true); |
| } |
| else { |
| if (in) { |
| this.inputStream.closeInternal(); |
| } |
| else { |
| // close stream but try to send any data left |
| this.outputStream.closeInternal(true); |
| } |
| } |
| |
| if (this.inputStream.isClosed && this.outputStream.isClosed) { |
| this.isClosed = true; |
| |
| // send close request |
| Close close = new Close(this.byteStreamRequest.getSessionID()); |
| close.setTo(this.remoteJID); |
| try { |
| SyncPacketSend.getReply(this.connection, close); |
| } |
| catch (XMPPException e) { |
| throw new IOException("Error while closing stream: " + e.getMessage()); |
| } |
| |
| this.inputStream.cleanup(); |
| |
| // remove session from manager |
| InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this); |
| } |
| |
| } |
| |
| /** |
| * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. |
| * Subclasses of this input stream must provide a packet listener along with a packet filter to |
| * collect the In-Band Bytestream data packets. |
| */ |
| private abstract class IBBInputStream extends InputStream { |
| |
| /* the data packet listener to fill the data queue */ |
| private final PacketListener dataPacketListener; |
| |
| /* queue containing received In-Band Bytestream data packets */ |
| protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); |
| |
| /* buffer containing the data from one data packet */ |
| private byte[] buffer; |
| |
| /* pointer to the next byte to read from buffer */ |
| private int bufferPointer = -1; |
| |
| /* data packet sequence (range from 0 to 65535) */ |
| private long seq = -1; |
| |
| /* flag to indicate if input stream is closed */ |
| private boolean isClosed = false; |
| |
| /* flag to indicate if close method was invoked */ |
| private boolean closeInvoked = false; |
| |
| /* timeout for read operations */ |
| private int readTimeout = 0; |
| |
| /** |
| * Constructor. |
| */ |
| public IBBInputStream() { |
| // add data packet listener to connection |
| this.dataPacketListener = getDataPacketListener(); |
| connection.addPacketListener(this.dataPacketListener, getDataPacketFilter()); |
| } |
| |
| /** |
| * Returns the packet listener that processes In-Band Bytestream data packets. |
| * |
| * @return the data packet listener |
| */ |
| protected abstract PacketListener getDataPacketListener(); |
| |
| /** |
| * Returns the packet filter that accepts In-Band Bytestream data packets. |
| * |
| * @return the data packet filter |
| */ |
| protected abstract PacketFilter getDataPacketFilter(); |
| |
| public synchronized int read() throws IOException { |
| checkClosed(); |
| |
| // if nothing read yet or whole buffer has been read fill buffer |
| if (bufferPointer == -1 || bufferPointer >= buffer.length) { |
| // if no data available and stream was closed return -1 |
| if (!loadBuffer()) { |
| return -1; |
| } |
| } |
| |
| // return byte and increment buffer pointer |
| return ((int) buffer[bufferPointer++]) & 0xff; |
| } |
| |
| public synchronized int read(byte[] b, int off, int len) throws IOException { |
| if (b == null) { |
| throw new NullPointerException(); |
| } |
| else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) |
| || ((off + len) < 0)) { |
| throw new IndexOutOfBoundsException(); |
| } |
| else if (len == 0) { |
| return 0; |
| } |
| |
| checkClosed(); |
| |
| // if nothing read yet or whole buffer has been read fill buffer |
| if (bufferPointer == -1 || bufferPointer >= buffer.length) { |
| // if no data available and stream was closed return -1 |
| if (!loadBuffer()) { |
| return -1; |
| } |
| } |
| |
| // if more bytes wanted than available return all available |
| int bytesAvailable = buffer.length - bufferPointer; |
| if (len > bytesAvailable) { |
| len = bytesAvailable; |
| } |
| |
| System.arraycopy(buffer, bufferPointer, b, off, len); |
| bufferPointer += len; |
| return len; |
| } |
| |
| public synchronized int read(byte[] b) throws IOException { |
| return read(b, 0, b.length); |
| } |
| |
| /** |
| * This method blocks until a data packet is received, the stream is closed or the current |
| * thread is interrupted. |
| * |
| * @return <code>true</code> if data was received, otherwise <code>false</code> |
| * @throws IOException if data packets are out of sequence |
| */ |
| private synchronized boolean loadBuffer() throws IOException { |
| |
| // wait until data is available or stream is closed |
| DataPacketExtension data = null; |
| try { |
| if (this.readTimeout == 0) { |
| while (data == null) { |
| if (isClosed && this.dataQueue.isEmpty()) { |
| return false; |
| } |
| data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); |
| } |
| } |
| else { |
| data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); |
| if (data == null) { |
| throw new SocketTimeoutException(); |
| } |
| } |
| } |
| catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| |
| // handle sequence overflow |
| if (this.seq == 65535) { |
| this.seq = -1; |
| } |
| |
| // check if data packets sequence is successor of last seen sequence |
| long seq = data.getSeq(); |
| if (seq - 1 != this.seq) { |
| // packets out of order; close stream/session |
| InBandBytestreamSession.this.close(); |
| throw new IOException("Packets out of sequence"); |
| } |
| else { |
| this.seq = seq; |
| } |
| |
| // set buffer to decoded data |
| buffer = data.getDecodedData(); |
| bufferPointer = 0; |
| return true; |
| } |
| |
| /** |
| * Checks if this stream is closed and throws an IOException if necessary |
| * |
| * @throws IOException if stream is closed and no data should be read anymore |
| */ |
| private void checkClosed() throws IOException { |
| /* throw no exception if there is data available, but not if close method was invoked */ |
| if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) { |
| // clear data queue in case additional data was received after stream was closed |
| this.dataQueue.clear(); |
| throw new IOException("Stream is closed"); |
| } |
| } |
| |
| public boolean markSupported() { |
| return false; |
| } |
| |
| public void close() throws IOException { |
| if (isClosed) { |
| return; |
| } |
| |
| this.closeInvoked = true; |
| |
| InBandBytestreamSession.this.closeByLocal(true); |
| } |
| |
| /** |
| * This method sets the close flag and removes the data packet listener. |
| */ |
| private void closeInternal() { |
| if (isClosed) { |
| return; |
| } |
| isClosed = true; |
| } |
| |
| /** |
| * Invoked if the session is closed. |
| */ |
| private void cleanup() { |
| connection.removePacketListener(this.dataPacketListener); |
| } |
| |
| } |
| |
| /** |
| * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the |
| * data packets. |
| */ |
| private class IQIBBInputStream extends IBBInputStream { |
| |
| protected PacketListener getDataPacketListener() { |
| return new PacketListener() { |
| |
| private long lastSequence = -1; |
| |
| public void processPacket(Packet packet) { |
| // get data packet extension |
| DataPacketExtension data = (DataPacketExtension) packet.getExtension( |
| DataPacketExtension.ELEMENT_NAME, |
| InBandBytestreamManager.NAMESPACE); |
| |
| /* |
| * check if sequence was not used already (see XEP-0047 Section 2.2) |
| */ |
| if (data.getSeq() <= this.lastSequence) { |
| IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( |
| XMPPError.Condition.unexpected_request)); |
| connection.sendPacket(unexpectedRequest); |
| return; |
| |
| } |
| |
| // check if encoded data is valid (see XEP-0047 Section 2.2) |
| if (data.getDecodedData() == null) { |
| // data is invalid; respond with bad-request error |
| IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( |
| XMPPError.Condition.bad_request)); |
| connection.sendPacket(badRequest); |
| return; |
| } |
| |
| // data is valid; add to data queue |
| dataQueue.offer(data); |
| |
| // confirm IQ |
| IQ confirmData = IQ.createResultIQ((IQ) packet); |
| connection.sendPacket(confirmData); |
| |
| // set last seen sequence |
| this.lastSequence = data.getSeq(); |
| if (this.lastSequence == 65535) { |
| this.lastSequence = -1; |
| } |
| |
| } |
| |
| }; |
| } |
| |
| protected PacketFilter getDataPacketFilter() { |
| /* |
| * filter all IQ stanzas having type 'SET' (represented by Data class), containing a |
| * data packet extension, matching session ID and recipient |
| */ |
| return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter()); |
| } |
| |
| } |
| |
| /** |
| * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas |
| * encapsulating the data packets. |
| */ |
| private class MessageIBBInputStream extends IBBInputStream { |
| |
| protected PacketListener getDataPacketListener() { |
| return new PacketListener() { |
| |
| public void processPacket(Packet packet) { |
| // get data packet extension |
| DataPacketExtension data = (DataPacketExtension) packet.getExtension( |
| DataPacketExtension.ELEMENT_NAME, |
| InBandBytestreamManager.NAMESPACE); |
| |
| // check if encoded data is valid |
| if (data.getDecodedData() == null) { |
| /* |
| * TODO once a majority of XMPP server implementation support XEP-0079 |
| * Advanced Message Processing the invalid message could be answered with an |
| * appropriate error. For now we just ignore the packet. Subsequent packets |
| * with an increased sequence will cause the input stream to close the |
| * stream/session. |
| */ |
| return; |
| } |
| |
| // data is valid; add to data queue |
| dataQueue.offer(data); |
| |
| // TODO confirm packet once XMPP servers support XEP-0079 |
| } |
| |
| }; |
| } |
| |
| @Override |
| protected PacketFilter getDataPacketFilter() { |
| /* |
| * filter all message stanzas containing a data packet extension, matching session ID |
| * and recipient |
| */ |
| return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter()); |
| } |
| |
| } |
| |
| /** |
| * IBBDataPacketFilter class filters all packets from the remote peer of this session, |
| * containing an In-Band Bytestream data packet extension whose session ID matches this sessions |
| * ID. |
| */ |
| private class IBBDataPacketFilter implements PacketFilter { |
| |
| public boolean accept(Packet packet) { |
| // sender equals remote peer |
| if (!packet.getFrom().equalsIgnoreCase(remoteJID)) { |
| return false; |
| } |
| |
| // stanza contains data packet extension |
| PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME, |
| InBandBytestreamManager.NAMESPACE); |
| if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) { |
| return false; |
| } |
| |
| // session ID equals this session ID |
| DataPacketExtension data = (DataPacketExtension) packetExtension; |
| if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| } |
| |
| /** |
| * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. |
| * Subclasses of this output stream must provide a method to send data over XMPP stream. |
| */ |
| private abstract class IBBOutputStream extends OutputStream { |
| |
| /* buffer with the size of this sessions block size */ |
| protected final byte[] buffer; |
| |
| /* pointer to next byte to write to buffer */ |
| protected int bufferPointer = 0; |
| |
| /* data packet sequence (range from 0 to 65535) */ |
| protected long seq = 0; |
| |
| /* flag to indicate if output stream is closed */ |
| protected boolean isClosed = false; |
| |
| /** |
| * Constructor. |
| */ |
| public IBBOutputStream() { |
| this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3]; |
| } |
| |
| /** |
| * Writes the given data packet to the XMPP stream. |
| * |
| * @param data the data packet |
| * @throws IOException if an I/O error occurred while sending or if the stream is closed |
| */ |
| protected abstract void writeToXML(DataPacketExtension data) throws IOException; |
| |
| public synchronized void write(int b) throws IOException { |
| if (this.isClosed) { |
| throw new IOException("Stream is closed"); |
| } |
| |
| // if buffer is full flush buffer |
| if (bufferPointer >= buffer.length) { |
| flushBuffer(); |
| } |
| |
| buffer[bufferPointer++] = (byte) b; |
| } |
| |
| public synchronized void write(byte b[], int off, int len) throws IOException { |
| if (b == null) { |
| throw new NullPointerException(); |
| } |
| else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) |
| || ((off + len) < 0)) { |
| throw new IndexOutOfBoundsException(); |
| } |
| else if (len == 0) { |
| return; |
| } |
| |
| if (this.isClosed) { |
| throw new IOException("Stream is closed"); |
| } |
| |
| // is data to send greater than buffer size |
| if (len >= buffer.length) { |
| |
| // "byte" off the first chunk to write out |
| writeOut(b, off, buffer.length); |
| |
| // recursively call this method with the lesser amount |
| write(b, off + buffer.length, len - buffer.length); |
| } |
| else { |
| writeOut(b, off, len); |
| } |
| } |
| |
| public synchronized void write(byte[] b) throws IOException { |
| write(b, 0, b.length); |
| } |
| |
| /** |
| * Fills the buffer with the given data and sends it over the XMPP stream if the buffers |
| * capacity has been reached. This method is only called from this class so it is assured |
| * that the amount of data to send is <= buffer capacity |
| * |
| * @param b the data |
| * @param off the data |
| * @param len the number of bytes to write |
| * @throws IOException if an I/O error occurred while sending or if the stream is closed |
| */ |
| private synchronized void writeOut(byte b[], int off, int len) throws IOException { |
| if (this.isClosed) { |
| throw new IOException("Stream is closed"); |
| } |
| |
| // set to 0 in case the next 'if' block is not executed |
| int available = 0; |
| |
| // is data to send greater that buffer space left |
| if (len > buffer.length - bufferPointer) { |
| // fill buffer to capacity and send it |
| available = buffer.length - bufferPointer; |
| System.arraycopy(b, off, buffer, bufferPointer, available); |
| bufferPointer += available; |
| flushBuffer(); |
| } |
| |
| // copy the data left to buffer |
| System.arraycopy(b, off + available, buffer, bufferPointer, len - available); |
| bufferPointer += len - available; |
| } |
| |
| public synchronized void flush() throws IOException { |
| if (this.isClosed) { |
| throw new IOException("Stream is closed"); |
| } |
| flushBuffer(); |
| } |
| |
| private synchronized void flushBuffer() throws IOException { |
| |
| // do nothing if no data to send available |
| if (bufferPointer == 0) { |
| return; |
| } |
| |
| // create data packet |
| String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false); |
| DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), |
| this.seq, enc); |
| |
| // write to XMPP stream |
| writeToXML(data); |
| |
| // reset buffer pointer |
| bufferPointer = 0; |
| |
| // increment sequence, considering sequence overflow |
| this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); |
| |
| } |
| |
| public void close() throws IOException { |
| if (isClosed) { |
| return; |
| } |
| InBandBytestreamSession.this.closeByLocal(false); |
| } |
| |
| /** |
| * Sets the close flag and optionally flushes the stream. |
| * |
| * @param flush if <code>true</code> flushes the stream |
| */ |
| protected void closeInternal(boolean flush) { |
| if (this.isClosed) { |
| return; |
| } |
| this.isClosed = true; |
| |
| try { |
| if (flush) { |
| flushBuffer(); |
| } |
| } |
| catch (IOException e) { |
| /* |
| * ignore, because writeToXML() will not throw an exception if stream is already |
| * closed |
| */ |
| } |
| } |
| |
| } |
| |
| /** |
| * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating |
| * the data packets. |
| */ |
| private class IQIBBOutputStream extends IBBOutputStream { |
| |
| @Override |
| protected synchronized void writeToXML(DataPacketExtension data) throws IOException { |
| // create IQ stanza containing data packet |
| IQ iq = new Data(data); |
| iq.setTo(remoteJID); |
| |
| try { |
| SyncPacketSend.getReply(connection, iq); |
| } |
| catch (XMPPException e) { |
| // close session unless it is already closed |
| if (!this.isClosed) { |
| InBandBytestreamSession.this.close(); |
| throw new IOException("Error while sending Data: " + e.getMessage()); |
| } |
| } |
| |
| } |
| |
| } |
| |
| /** |
| * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas |
| * encapsulating the data packets. |
| */ |
| private class MessageIBBOutputStream extends IBBOutputStream { |
| |
| @Override |
| protected synchronized void writeToXML(DataPacketExtension data) { |
| // create message stanza containing data packet |
| Message message = new Message(remoteJID); |
| message.addExtension(data); |
| |
| connection.sendPacket(message); |
| |
| } |
| |
| } |
| |
| } |