initial import
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b0a098b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+mqttIO-iOS-SDK
+==============
+
+iOS SDK for MQTT
+
+Copyright © 2011, 2013 2lemetry, LLC
+
+All rights reserved. 
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that 
+the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the distribution.
+ 
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 
+INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 
+STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/MqttSDK/MQTTDecoder.h b/src/MqttSDK/MQTTDecoder.h
new file mode 100644
index 0000000..aaeb232
--- /dev/null
+++ b/src/MqttSDK/MQTTDecoder.h
@@ -0,0 +1,62 @@
+//
+// MQTTDecoder.h
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQTTDecoder : NSObject <NSStreamDelegate> {
+    NSInteger       status;
+    NSInputStream*  stream;
+    NSRunLoop*      runLoop;
+    NSString*       runLoopMode;
+    id              delegate;
+    UInt8           header;
+    UInt32          length;
+    UInt32          lengthMultiplier;
+    NSMutableData*  dataBuffer;
+}
+
+typedef enum {
+    MQTTDecoderEventProtocolError,
+    MQTTDecoderEventConnectionClosed,
+    MQTTDecoderEventConnectionError
+} MQTTDecoderEvent;
+
+enum {
+    MQTTDecoderStatusInitializing,
+    MQTTDecoderStatusDecodingHeader,
+    MQTTDecoderStatusDecodingLength,
+    MQTTDecoderStatusDecodingData,
+    MQTTDecoderStatusConnectionClosed,
+    MQTTDecoderStatusConnectionError,
+    MQTTDecoderStatusProtocolError
+};
+
+- (id)initWithStream:(NSInputStream*)aStream
+             runLoop:(NSRunLoop*)aRunLoop
+         runLoopMode:(NSString*)aMode;
+- (void)setDelegate:(id)aDelegate;
+- (void)open;
+- (void)close;
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode;
+@end
+
+@interface NSObject (MQTTDecoderDelegate)
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg;
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode;
+
+@end
diff --git a/src/MqttSDK/MQTTDecoder.m b/src/MqttSDK/MQTTDecoder.m
new file mode 100644
index 0000000..ea12f57
--- /dev/null
+++ b/src/MqttSDK/MQTTDecoder.m
@@ -0,0 +1,147 @@
+//
+// MQTTDecoder.m
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import "MQTTDecoder.h"
+
+@implementation MQTTDecoder
+
+- (id)initWithStream:(NSInputStream*)aStream
+             runLoop:(NSRunLoop*)aRunLoop
+         runLoopMode:(NSString*)aMode {
+    status = MQTTDecoderStatusInitializing;
+    stream = aStream;
+    [stream setDelegate:self];
+    runLoop = aRunLoop;
+    runLoopMode = aMode;
+    return self;
+}
+
+- (void)setDelegate:(id)aDelegate {
+    delegate = aDelegate;
+}
+
+- (void)open {
+    [stream setDelegate:self];
+    [stream scheduleInRunLoop:runLoop forMode:runLoopMode];
+    [stream open];
+}
+
+- (void)close {
+    [stream setDelegate:nil];
+    [stream close];
+    [stream removeFromRunLoop:runLoop forMode:runLoopMode];
+    stream = nil;
+}
+
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
+    if(stream == nil)
+        return;
+    switch (eventCode) {
+        case NSStreamEventOpenCompleted:
+            status = MQTTDecoderStatusDecodingHeader;
+            break;
+        case NSStreamEventHasBytesAvailable:
+            if (status == MQTTDecoderStatusDecodingHeader) {
+                NSInteger n = [stream read:&header maxLength:1];
+                if (n == -1) {
+                    status = MQTTDecoderStatusConnectionError;
+                    [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+                }
+                else if (n == 1) {
+                    length = 0;
+                    lengthMultiplier = 1;
+                    status = MQTTDecoderStatusDecodingLength;
+                }
+            }
+            while (status == MQTTDecoderStatusDecodingLength) {
+                UInt8 digit;
+                NSInteger n = [stream read:&digit maxLength:1];
+                if (n == -1) {
+                    status = MQTTDecoderStatusConnectionError;
+                    [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+                    break;
+                }
+                else if (n == 0) {
+                    break;
+                }
+                length += (digit & 0x7f) * lengthMultiplier;
+                if ((digit & 0x80) == 0x00) {
+                    dataBuffer = [NSMutableData dataWithCapacity:length];
+                    status = MQTTDecoderStatusDecodingData;
+                }
+                else {
+                    lengthMultiplier *= 128;
+                }
+            }
+            if (status == MQTTDecoderStatusDecodingData) {
+                if (length > 0) {
+                    NSInteger n, toRead;
+                    UInt8 buffer[768];
+                    toRead = length - [dataBuffer length];
+                    if (toRead > sizeof buffer) {
+                        toRead = sizeof buffer;
+                    }
+                    n = [stream read:buffer maxLength:toRead];
+                    if (n == -1) {
+                        status = MQTTDecoderStatusConnectionError;
+                        [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+                    }
+                    else {
+                        [dataBuffer appendBytes:buffer length:n];
+                    }
+                }
+                if ([dataBuffer length] == length) {
+                    MQTTMessage* msg;
+                    UInt8 type, qos;
+                    BOOL isDuplicate, retainFlag;
+                    type = (header >> 4) & 0x0f;
+                    isDuplicate = NO;
+                    if ((header & 0x08) == 0x08) {
+                        isDuplicate = YES;
+                    }
+                    // XXX qos > 2
+                    qos = (header >> 1) & 0x03;
+                    retainFlag = NO;
+                    if ((header & 0x01) == 0x01) {
+                        retainFlag = YES;
+                    }
+                    msg = [[MQTTMessage alloc] initWithType:type
+                                                        qos:qos
+                                                 retainFlag:retainFlag
+                                                    dupFlag:isDuplicate
+                                                       data:dataBuffer];
+                    [delegate decoder:self newMessage:msg];
+                    dataBuffer = NULL;
+                    status = MQTTDecoderStatusDecodingHeader;
+                }
+            }
+            break;
+        case NSStreamEventEndEncountered:
+            status = MQTTDecoderStatusConnectionClosed;
+            [delegate decoder:self handleEvent:MQTTDecoderEventConnectionClosed];
+            break;
+        case NSStreamEventErrorOccurred:
+            status = MQTTDecoderStatusConnectionError;
+            [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+            break;
+        default:
+            NSLog(@"unhandled event code");
+            break;
+    }
+}
+
+@end
diff --git a/src/MqttSDK/MQTTEncoder.h b/src/MqttSDK/MQTTEncoder.h
new file mode 100644
index 0000000..e063fbd
--- /dev/null
+++ b/src/MqttSDK/MQTTEncoder.h
@@ -0,0 +1,59 @@
+//
+// MQTTEncoder.h
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQTTEncoder : NSObject <NSStreamDelegate> {
+    NSInteger       status;
+    NSOutputStream* stream;
+    NSRunLoop*      runLoop;
+    NSString*       runLoopMode;
+    NSMutableData*  buffer;
+    NSInteger       byteIndex;
+    id              delegate;
+}
+
+typedef enum {
+    MQTTEncoderEventReady,
+    MQTTEncoderEventErrorOccurred
+} MQTTEncoderEvent;
+
+typedef enum {
+    MQTTEncoderStatusInitializing,
+    MQTTEncoderStatusReady,
+    MQTTEncoderStatusSending,
+    MQTTEncoderStatusEndEncountered,
+    MQTTEncoderStatusError
+} MQTTEncoderStatus;
+
+- (id)initWithStream:(NSOutputStream*)aStream
+             runLoop:(NSRunLoop*)aRunLoop
+         runLoopMode:(NSString*)aMode;
+- (void)setDelegate:(id)aDelegate;
+- (void)open;
+- (void)close;
+- (MQTTEncoderStatus)status;
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode;
+- (void)encodeMessage:(MQTTMessage*)msg;
+
+@end
+
+@interface NSObject (MQTTEncoderDelegate)
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent)eventCode;
+
+@end
diff --git a/src/MqttSDK/MQTTEncoder.m b/src/MqttSDK/MQTTEncoder.m
new file mode 100644
index 0000000..c99e0c2
--- /dev/null
+++ b/src/MqttSDK/MQTTEncoder.m
@@ -0,0 +1,161 @@
+//
+// MQTTEncoder.m
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import "MQTTEncoder.h"
+
+@implementation MQTTEncoder
+
+- (id)initWithStream:(NSOutputStream*)aStream
+             runLoop:(NSRunLoop*)aRunLoop
+         runLoopMode:(NSString*)aMode {
+    status = MQTTEncoderStatusInitializing;
+    stream = aStream;
+    [stream setDelegate:self];
+    runLoop = aRunLoop;
+    runLoopMode = aMode;
+    return self;
+}
+
+- (void)setDelegate:(id)aDelegate {
+    delegate = aDelegate;
+}
+
+- (MQTTEncoderStatus)status {
+    return status;
+}
+
+- (void)open {
+    [stream setDelegate:self];
+    [stream scheduleInRunLoop:runLoop forMode:runLoopMode];
+    [stream open];
+}
+
+- (void)close {
+    [stream close];
+    [stream setDelegate:nil];
+    [stream removeFromRunLoop:runLoop forMode:runLoopMode];
+    stream = nil;
+}
+
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
+    if(stream == nil)
+        return;
+    assert(sender == stream);
+    switch (eventCode) {
+        case NSStreamEventOpenCompleted:
+            break;
+        case NSStreamEventHasSpaceAvailable:
+            if (status == MQTTEncoderStatusInitializing) {
+                status = MQTTEncoderStatusReady;
+                [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+            }
+            else if (status == MQTTEncoderStatusReady) {
+                [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+            }
+            else if (status == MQTTEncoderStatusSending) {
+                UInt8* ptr;
+                NSInteger n, length;
+                
+                ptr = (UInt8*) [buffer bytes] + byteIndex;
+                // Number of bytes pending for transfer
+                length = [buffer length] - byteIndex;
+                n = [stream write:ptr maxLength:length];
+                if (n == -1) {
+                    status = MQTTEncoderStatusError;
+                    [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+                }
+                else if (n < length) {
+                    byteIndex += n;
+                }
+                else {
+                    buffer = NULL;
+                    byteIndex = 0;
+                    status = MQTTEncoderStatusReady;
+                }
+            }
+            break;
+        case NSStreamEventErrorOccurred:
+        case NSStreamEventEndEncountered:
+            if (status != MQTTEncoderStatusError) {
+                status = MQTTEncoderStatusError;
+                [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+            }
+            break;
+        default:
+            NSLog(@"Oops, event code not handled: 0x%02x", eventCode);
+            break;
+    }
+}
+
+- (void)encodeMessage:(MQTTMessage*)msg {
+    UInt8 header;
+    NSInteger n, length;
+    
+    if (status != MQTTEncoderStatusReady) {
+        NSLog(@"Encoder not ready");
+        return;
+    }
+    
+    assert (buffer == NULL);
+    assert (byteIndex == 0);
+    
+    buffer = [[NSMutableData alloc] init];
+    
+    // encode fixed header
+    header = [msg type] << 4;
+    if ([msg isDuplicate]) {
+        header |= 0x08;
+    }
+    header |= [msg qos] << 1;
+    if ([msg retainFlag]) {
+        header |= 0x01;
+    }
+    [buffer appendBytes:&header length:1];
+    
+    // encode remaining length
+    length = [[msg data] length];
+    do {
+        UInt8 digit = length % 128;
+        length /= 128;
+        if (length > 0) {
+            digit |= 0x80;
+        }
+        [buffer appendBytes:&digit length:1];
+    }
+    while (length > 0);
+    
+    // encode message data
+    if ([msg data] != NULL) {
+        [buffer appendData:[msg data]];
+    }
+    
+    n = [stream write:[buffer bytes] maxLength:[buffer length]];
+    if (n == -1) {
+        status = MQTTEncoderStatusError;
+        [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+    }
+    else if (n < [buffer length]) {
+        byteIndex += n;
+        status = MQTTEncoderStatusSending;
+    }
+    else {
+        buffer = NULL;
+        // XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+    }
+}
+
+@end
diff --git a/src/MqttSDK/MQTTMessage.h b/src/MqttSDK/MQTTMessage.h
new file mode 100644
index 0000000..35f98bd
--- /dev/null
+++ b/src/MqttSDK/MQTTMessage.h
@@ -0,0 +1,104 @@
+//
+// MQTTMessage.h
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import <Foundation/Foundation.h>
+
+@interface MQTTMessage : NSObject {
+    UInt8    type;
+    UInt8    qos;
+    BOOL     retainFlag;
+    BOOL     dupFlag;
+}
+
+enum {
+    MQTTConnect = 1,
+    MQTTConnack = 2,
+    MQTTPublish = 3,
+    MQTTPuback = 4,
+    MQTTPubrec = 5,
+    MQTTPubrel = 6,
+    MQTTPubcomp = 7,
+    MQTTSubscribe = 8,
+    MQTTSuback = 9,
+    MQTTUnsubscribe = 10,
+    MQTTUnsuback = 11,
+    MQTTPingreq = 12,
+    MQTTPingresp = 13,
+    MQTTDisconnect = 14
+};
+
+// instance methods
++ (id)connectMessageWithClientId:(NSString*)clientId
+                        userName:(NSString*)userName
+                        password:(NSString*)password
+                       keepAlive:(NSInteger)keeplive
+                    cleanSession:(BOOL)cleanSessionFlag;
++ (id)connectMessageWithClientId:(NSString*)clientId
+                        userName:(NSString*)userName
+                        password:(NSString*)password
+                       keepAlive:(NSInteger)keeplive
+                    cleanSession:(BOOL)cleanSessionFlag
+                       willTopic:(NSString*)willTopic
+                         willMsg:(NSData*)willData
+                         willQoS:(UInt8)willQoS
+                      willRetain:(BOOL)willRetainFlag;
+
++ (id)pingreqMessage;
++ (id)subscribeMessageWithMessageId:(UInt16)msgId
+                              topic:(NSString*)topic
+                                qos:(UInt8)qos;
++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId
+                                topic:(NSString*)topic;
++ (id)publishMessageWithData:(NSData*)payload
+                     onTopic:(NSString*)theTopic
+                     retainFlag:(BOOL)retain;
++ (id)publishMessageWithData:(NSData*)payload
+                     onTopic:(NSString*)topic
+                         qos:(UInt8)qosLevel
+                       msgId:(UInt16)msgId
+                  retainFlag:(BOOL)retain
+                     dupFlag:(BOOL)dup;
++ (id)pubackMessageWithMessageId:(UInt16)msgId;
++ (id)pubrecMessageWithMessageId:(UInt16)msgId;
++ (id)pubrelMessageWithMessageId:(UInt16)msgId;
++ (id)pubcompMessageWithMessageId:(UInt16)msgId;
+
+- (id)initWithType:(UInt8)aType;
+- (id)initWithType:(UInt8)aType data:(NSData*)aData;
+- (id)initWithType:(UInt8)aType
+               qos:(UInt8)aQos
+              data:(NSData*)aData;
+- (id)initWithType:(UInt8)aType
+               qos:(UInt8)aQos
+        retainFlag:(BOOL)aRetainFlag
+           dupFlag:(BOOL)aDupFlag
+              data:(NSData*)aData;
+- (void)setDupFlag;
+- (UInt8)type;
+- (UInt8)qos;
+- (BOOL)retainFlag;
+- (BOOL)isDuplicate;
+@property (strong,nonatomic) NSData * data;
+
+@end
+
+@interface NSMutableData (MQTT)
+- (void)appendByte:(UInt8)byte;
+- (void)appendUInt16BigEndian:(UInt16)val;
+- (void)appendMQTTString:(NSString*)s;
+
+@end
diff --git a/src/MqttSDK/MQTTMessage.m b/src/MqttSDK/MQTTMessage.m
new file mode 100644
index 0000000..5871bdb
--- /dev/null
+++ b/src/MqttSDK/MQTTMessage.m
@@ -0,0 +1,268 @@
+//
+// MQTTMessage.m
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import "MQTTMessage.h"
+
+@implementation MQTTMessage
+
++ (id)connectMessageWithClientId:(NSString*)clientId
+                        userName:(NSString*)userName
+                        password:(NSString*)password
+                       keepAlive:(NSInteger)keepAlive
+                    cleanSession:(BOOL)cleanSessionFlag {
+    MQTTMessage* msg;
+    UInt8 flags = 0x00;
+
+    if (cleanSessionFlag) {
+        flags |= 0x02;
+    }
+    if ([userName length] > 0) {
+        flags |= 0x80;
+        if ([password length] > 0) {
+            flags |= 0x40;
+        }
+    }
+
+    NSMutableData* data = [NSMutableData data];
+    [data appendMQTTString:@"MQIsdp"];
+    [data appendByte:3];
+    [data appendByte:flags];
+    [data appendUInt16BigEndian:keepAlive];
+    [data appendMQTTString:clientId];
+    if ([userName length] > 0) {
+        [data appendMQTTString:userName];
+        if ([password length] > 0) {
+            [data appendMQTTString:password];
+        }
+    }
+
+    msg = [[MQTTMessage alloc] initWithType:MQTTConnect data:data];
+    return msg;
+}
+
++ (id)connectMessageWithClientId:(NSString*)clientId
+                        userName:(NSString*)userName
+                        password:(NSString*)password
+                       keepAlive:(NSInteger)keepAlive
+                    cleanSession:(BOOL)cleanSessionFlag
+                       willTopic:(NSString*)willTopic
+                         willMsg:(NSData*)willMsg
+                         willQoS:(UInt8)willQoS
+                      willRetain:(BOOL)willRetainFlag {
+    UInt8 flags = 0x04 | (willQoS << 4 & 0x18);
+
+    if (willRetainFlag) {
+        flags |= 0x20;
+    }
+    if (cleanSessionFlag) {
+        flags |= 0x02;
+    }
+    if ([userName length] > 0) {
+        flags |= 0x80;
+        if ([password length] > 0) {
+            flags |= 0x40;
+        }
+    }
+
+    NSMutableData* data = [NSMutableData data];
+    [data appendMQTTString:@"MQIsdp"];
+    [data appendByte:3];
+    [data appendByte:flags];
+    [data appendUInt16BigEndian:keepAlive];
+    [data appendMQTTString:clientId];
+    [data appendMQTTString:willTopic];
+    [data appendUInt16BigEndian:[willMsg length]];
+    [data appendData:willMsg];
+    if ([userName length] > 0) {
+        [data appendMQTTString:userName];
+        if ([password length] > 0) {
+            [data appendMQTTString:password];
+        }
+    }
+
+    MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTConnect
+                                                    data:data];
+    return msg;
+}
+
++ (id)pingreqMessage {
+    return [[MQTTMessage alloc] initWithType:MQTTPingreq];
+}
+
++ (id)subscribeMessageWithMessageId:(UInt16)msgId
+                              topic:(NSString*)topic
+                                qos:(UInt8)qos {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    [data appendMQTTString:topic];
+    [data appendByte:qos];
+    MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTSubscribe
+                                                     qos:1
+                                                    data:data];
+    return msg;
+}
+
++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId
+                                topic:(NSString*)topic {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    [data appendMQTTString:topic];
+    MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTUnsubscribe
+                                                     qos:1
+                                                    data:data];
+    return msg;
+}
+
++ (id)publishMessageWithData:(NSData*)payload
+                     onTopic:(NSString*)topic
+                  retainFlag:(BOOL)retain {
+    NSMutableData* data = [NSMutableData data];
+    [data appendMQTTString:topic];
+    [data appendData:payload];
+    MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish
+                                                     qos:0
+                                              retainFlag:retain
+                                                 dupFlag:false
+                                                    data:data];
+    return msg;
+}
+
++ (id)publishMessageWithData:(NSData*)payload
+                     onTopic:(NSString*)topic
+                         qos:(UInt8)qosLevel
+                       msgId:(UInt16)msgId
+                  retainFlag:(BOOL)retain
+                     dupFlag:(BOOL)dup {
+    NSMutableData* data = [NSMutableData data];
+    [data appendMQTTString:topic];
+    [data appendUInt16BigEndian:msgId];
+    [data appendData:payload];
+    MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish
+                                                     qos:qosLevel
+                                              retainFlag:retain
+                                                 dupFlag:dup
+                                                    data:data];
+    return msg;
+}
+
++ (id)pubackMessageWithMessageId:(UInt16)msgId {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    return [[MQTTMessage alloc] initWithType:MQTTPuback
+                                         data:data];
+}
+
++ (id)pubrecMessageWithMessageId:(UInt16)msgId {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    return [[MQTTMessage alloc] initWithType:MQTTPubrec
+                                         data:data];
+}
+
++ (id)pubrelMessageWithMessageId:(UInt16)msgId {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    return [[MQTTMessage alloc] initWithType:MQTTPubrel
+                                         data:data];
+}
+
++ (id)pubcompMessageWithMessageId:(UInt16)msgId {
+    NSMutableData* data = [NSMutableData data];
+    [data appendUInt16BigEndian:msgId];
+    return [[MQTTMessage alloc] initWithType:MQTTPubcomp
+                                         data:data];
+}
+
+- (id)initWithType:(UInt8)aType {
+    type = aType;
+    self.data = nil;
+    return self;
+}
+
+- (id)initWithType:(UInt8)aType data:(NSData*)aData {
+    type = aType;
+    self.data = aData;
+    return self;
+}
+
+- (id)initWithType:(UInt8)aType
+               qos:(UInt8)aQos
+              data:(NSData*)aData {
+    type = aType;
+    qos = aQos;
+    self.data = aData;
+    return self;
+}
+
+- (id)initWithType:(UInt8)aType
+               qos:(UInt8)aQos
+        retainFlag:(BOOL)aRetainFlag
+           dupFlag:(BOOL)aDupFlag
+              data:(NSData*)aData {
+    type = aType;
+    qos = aQos;
+    retainFlag = aRetainFlag;
+    dupFlag = aDupFlag;
+    self.data = aData;
+    return self;
+}
+
+- (void)setDupFlag {
+    dupFlag = true;
+}
+
+- (UInt8)type {
+    return type;
+}
+
+- (UInt8)qos {
+    return qos;
+}
+
+- (BOOL)retainFlag {
+    return retainFlag;
+}
+
+- (BOOL)isDuplicate {
+    return dupFlag;
+}
+
+
+@end
+
+@implementation NSMutableData (MQTT)
+
+- (void)appendByte:(UInt8)byte {
+    [self appendBytes:&byte length:1];
+}
+
+- (void)appendUInt16BigEndian:(UInt16)val {
+    [self appendByte:val / 256];
+    [self appendByte:val % 256];
+}
+
+- (void)appendMQTTString:(NSString*)string {
+    UInt8 buf[2];
+    const char* utf8String = [string UTF8String];
+    int strLen = strlen(utf8String);
+    buf[0] = strLen / 256;
+    buf[1] = strLen % 256;
+    [self appendBytes:buf length:2];
+    [self appendBytes:utf8String length:strLen];
+}
+
+@end
diff --git a/src/MqttSDK/MQTTSession.h b/src/MqttSDK/MQTTSession.h
new file mode 100644
index 0000000..bb01e03
--- /dev/null
+++ b/src/MqttSDK/MQTTSession.h
@@ -0,0 +1,148 @@
+//
+// MQTTSession.h
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import <Foundation/Foundation.h>
+#import "MQTTDecoder.h"
+#import "MQTTEncoder.h"
+
+typedef enum {
+    MQTTSessionStatusCreated,
+    MQTTSessionStatusConnecting,
+    MQTTSessionStatusConnected,
+    MQTTSessionStatusError
+} MQTTSessionStatus;
+
+typedef enum {
+    MQTTSessionEventConnected,
+    MQTTSessionEventConnectionRefused,
+    MQTTSessionEventConnectionClosed,
+    MQTTSessionEventConnectionError,
+    MQTTSessionEventProtocolError
+} MQTTSessionEvent;
+
+@interface MQTTSession : NSObject {
+    MQTTSessionStatus    status;
+    NSString*            clientId;
+    //NSString*            userName;
+    //NSString*            password;
+    UInt16               keepAliveInterval;
+    BOOL                 cleanSessionFlag;
+    MQTTMessage*         connectMessage;
+    NSRunLoop*           runLoop;
+    NSString*            runLoopMode;
+    NSTimer*             timer;
+    NSInteger            idleTimer;
+    MQTTEncoder*         encoder;
+    MQTTDecoder*         decoder;
+    UInt16               txMsgId;
+    id                   delegate;
+    NSMutableDictionary* txFlows;
+    NSMutableDictionary* rxFlows;
+    unsigned int         ticks;
+}
+
+- (id)initWithClientId:(NSString*)theClientId;
+- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUsername
+              password:(NSString*)thePassword;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUsername
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)cleanSessionFlag;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUsername
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAlive
+          cleanSession:(BOOL)theCleanSessionFlag
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theMode;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag
+             willTopic:(NSString*)willTopic
+               willMsg:(NSData*)willMsg
+               willQoS:(UInt8)willQoS
+        willRetainFlag:(BOOL)willRetainFlag;
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag
+             willTopic:(NSString*)willTopic
+               willMsg:(NSData*)willMsg
+               willQoS:(UInt8)willQoS
+        willRetainFlag:(BOOL)willRetainFlag
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+             keepAlive:(UInt16)theKeepAliveInterval
+        connectMessage:(MQTTMessage*)theConnectMessage
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode;
+
+- (void)dealloc;
+- (void)close;
+- (void)setDelegate:aDelegate;
+- (void)connectToHost:(NSString*)ip port:(UInt32)port;
+- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL;
+- (void)subscribeTopic:(NSString*)theTopic;
+- (void)subscribeToTopic:(NSString*)topic atLevel:(UInt8)qosLevel;
+- (void)unsubscribeTopic:(NSString*)theTopic;
+- (void)publishData:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishJson:(id)payload onTopic:(NSString*)theTopic;
+- (void)timerHandler:(NSTimer*)theTimer;
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode;
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent) eventCode;
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*) msg;
+
+// private methods
+- (void)newMessage:(MQTTMessage*)msg;
+- (void)error:(MQTTSessionEvent)event;
+- (void)handlePublish:(MQTTMessage*)msg;
+- (void)handlePuback:(MQTTMessage*)msg;
+- (void)handlePubrec:(MQTTMessage*)msg;
+- (void)handlePubrel:(MQTTMessage*)msg;
+- (void)handlePubcomp:(MQTTMessage*)msg;
+- (void)send:(MQTTMessage*)msg;
+- (UInt16)nextMsgId;
+
+@property (strong,atomic) NSMutableArray* queue;
+@property (strong,atomic) NSMutableArray* timerRing;
+
+@end
+
+@interface NSObject (MQTTSessionDelegate)
+- (void)session:(MQTTSession*)session handleEvent:(MQTTSessionEvent)eventCode;
+- (void)session:(MQTTSession*)session newMessage:(NSData*)data onTopic:(NSString*)topic;
+
+@end
diff --git a/src/MqttSDK/MQTTSession.m b/src/MqttSDK/MQTTSession.m
new file mode 100644
index 0000000..9dd1fc2
--- /dev/null
+++ b/src/MqttSDK/MQTTSession.m
@@ -0,0 +1,630 @@
+//
+// MQTTSession.m
+// MQtt Client
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import "MQTTSession.h"
+#import "MQttTxFlow.h"
+
+@implementation MQTTSession
+
+- (id)initWithClientId:(NSString*)theClientId {
+    return [self initWithClientId:theClientId userName:@"" password:@""];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword {
+    return [self initWithClientId:theClientId
+                         userName:theUserName
+                         password:thePassword
+                        keepAlive:60
+                     cleanSession:YES];
+}
+
+- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode {
+    return [self initWithClientId:theClientId userName:@"" password:@"" runLoop:theRunLoop forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode {
+    return [self initWithClientId:theClientId
+                         userName:theUserName
+                         password:thePassword
+                        keepAlive:60
+                     cleanSession:YES
+                          runLoop:theRunLoop
+                          forMode:theRunLoopMode];
+}
+
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag {
+    return [self initWithClientId:theClientId
+                         userName:theUserName
+                         password:thePassword
+                        keepAlive:theKeepAliveInterval
+                     cleanSession:theCleanSessionFlag
+                          runLoop:[NSRunLoop currentRunLoop]
+                          forMode:NSDefaultRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode {
+    MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId
+                                                      userName:theUserName
+                                                      password:thePassword
+                                                     keepAlive:theKeepAliveInterval
+                                                  cleanSession:theCleanSessionFlag];
+    return [self initWithClientId:theClientId
+                        keepAlive:theKeepAliveInterval
+                   connectMessage:msg
+                          runLoop:theRunLoop
+                          forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag
+             willTopic:(NSString*)willTopic
+               willMsg:(NSData*)willMsg
+               willQoS:(UInt8)willQoS
+        willRetainFlag:(BOOL)willRetainFlag {
+    return [self initWithClientId:theClientId
+                         userName:theUserName
+                         password:thePassword
+                        keepAlive:theKeepAliveInterval
+                     cleanSession:theCleanSessionFlag
+                        willTopic:willTopic
+                          willMsg:willMsg
+                          willQoS:willQoS
+                   willRetainFlag:willRetainFlag
+                          runLoop:[NSRunLoop currentRunLoop]
+                          forMode:NSDefaultRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+              userName:(NSString*)theUserName
+              password:(NSString*)thePassword
+             keepAlive:(UInt16)theKeepAliveInterval
+          cleanSession:(BOOL)theCleanSessionFlag
+             willTopic:(NSString*)willTopic
+               willMsg:(NSData*)willMsg
+               willQoS:(UInt8)willQoS
+        willRetainFlag:(BOOL)willRetainFlag
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode {
+    MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId
+                                                      userName:theUserName
+                                                      password:thePassword
+                                                     keepAlive:theKeepAliveInterval
+                                                  cleanSession:theCleanSessionFlag
+                                                     willTopic:willTopic
+                                                       willMsg:willMsg
+                                                       willQoS:willQoS
+                                                    willRetain:willRetainFlag];
+    return [self initWithClientId:theClientId
+                        keepAlive:theKeepAliveInterval
+                   connectMessage:msg
+                          runLoop:theRunLoop
+                          forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+             keepAlive:(UInt16)theKeepAliveInterval
+        connectMessage:(MQTTMessage*)theConnectMessage
+               runLoop:(NSRunLoop*)theRunLoop
+               forMode:(NSString*)theRunLoopMode {
+    clientId = theClientId;
+    keepAliveInterval = theKeepAliveInterval;
+    connectMessage = theConnectMessage;
+    runLoop = theRunLoop;
+    runLoopMode = theRunLoopMode;
+    
+    self.queue = [NSMutableArray array];
+    txMsgId = 1;
+    txFlows = [[NSMutableDictionary alloc] init];
+    rxFlows = [[NSMutableDictionary alloc] init];
+    self.timerRing = [[NSMutableArray alloc] initWithCapacity:60];
+    int i;
+    for (i = 0; i < 60; i++) {
+        [self.timerRing addObject:[NSMutableSet set]];
+    }
+    ticks = 0;
+    
+    return self;
+}
+
+- (void)dealloc {
+    [encoder close];
+    encoder = nil;
+    [decoder close];
+    decoder = nil;
+    if (timer != nil) {
+        [timer invalidate];
+        timer = nil;
+    }
+}
+
+- (void)close {
+    [encoder close];
+    [decoder close];
+    encoder = nil;
+    decoder = nil;
+     if (timer != nil) {
+        [timer invalidate];
+        timer = nil;
+        }
+    [self error:MQTTSessionEventConnectionClosed];
+}
+
+- (void)setDelegate:(id)aDelegate {
+    delegate = aDelegate;
+}
+
+- (void)connectToHost:(NSString*)ip port:(UInt32)port {
+    [self connectToHost:ip port:port usingSSL:false];
+}
+
+- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL {
+
+    status = MQTTSessionStatusCreated;
+
+    CFReadStreamRef readStream;
+    CFWriteStreamRef writeStream;
+
+    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)ip, port, &readStream, &writeStream);
+
+    if (usingSSL) {
+        const void *keys[] = { kCFStreamSSLLevel,
+                               kCFStreamSSLPeerName };
+
+        const void *vals[] = { kCFStreamSocketSecurityLevelNegotiatedSSL,
+                               kCFNull };
+
+        CFDictionaryRef sslSettings = CFDictionaryCreate(kCFAllocatorDefault, keys, vals, 2,
+                                                         &kCFTypeDictionaryKeyCallBacks,
+                                                         &kCFTypeDictionaryValueCallBacks);
+
+        CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, sslSettings);
+        CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, sslSettings);
+
+        CFRelease(sslSettings);
+    }
+
+    encoder = [[MQTTEncoder alloc] initWithStream:(__bridge NSOutputStream*)writeStream
+                                          runLoop:runLoop
+                                      runLoopMode:runLoopMode];
+
+    decoder = [[MQTTDecoder alloc] initWithStream:(__bridge NSInputStream*)readStream
+                                          runLoop:runLoop
+                                      runLoopMode:runLoopMode];
+
+    [encoder setDelegate:self];
+    [decoder setDelegate:self];
+    
+    [encoder open];
+    [decoder open];
+}
+
+- (void)subscribeTopic:(NSString*)theTopic {
+    [self subscribeToTopic:theTopic atLevel:0];
+}
+
+- (void) subscribeToTopic:(NSString*)topic
+                  atLevel:(UInt8)qosLevel {
+    [self send:[MQTTMessage subscribeMessageWithMessageId:[self nextMsgId]
+                                                    topic:topic
+                                                      qos:qosLevel]];
+}
+
+- (void)unsubscribeTopic:(NSString*)theTopic {
+    [self send:[MQTTMessage unsubscribeMessageWithMessageId:[self nextMsgId]
+                                                      topic:theTopic]];
+}
+
+- (void)publishData:(NSData*)data onTopic:(NSString*)topic {
+    [self publishDataAtMostOnce:data onTopic:topic];
+}
+
+- (void)publishDataAtMostOnce:(NSData*)data
+                      onTopic:(NSString*)topic {
+  [self publishDataAtMostOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataAtMostOnce:(NSData*)data
+                      onTopic:(NSString*)topic
+                     retain:(BOOL)retainFlag {
+    [self send:[MQTTMessage publishMessageWithData:data
+                                           onTopic:topic
+                                        retainFlag:retainFlag]];
+}
+
+- (void)publishDataAtLeastOnce:(NSData*)data
+                       onTopic:(NSString*)topic {
+    [self publishDataAtLeastOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataAtLeastOnce:(NSData*)data
+                       onTopic:(NSString*)topic
+                        retain:(BOOL)retainFlag {
+    UInt16 msgId = [self nextMsgId];
+    MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
+                                                   onTopic:topic
+                                                       qos:1
+                                                     msgId:msgId
+                                                retainFlag:retainFlag
+                                                   dupFlag:false];
+    MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg
+                                      deadline:(ticks + 60)];
+    [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
+    [self send:msg];
+}
+
+- (void)publishDataExactlyOnce:(NSData*)data
+                       onTopic:(NSString*)topic {
+    [self publishDataExactlyOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataExactlyOnce:(NSData*)data
+                       onTopic:(NSString*)topic
+                        retain:(BOOL)retainFlag {
+    UInt16 msgId = [self nextMsgId];
+    MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
+                                                   onTopic:topic
+                                                       qos:2
+                                                     msgId:msgId
+                                                retainFlag:retainFlag
+                                                   dupFlag:false];
+    MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg
+                                      deadline:(ticks + 60)];
+    [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
+    [self send:msg];
+}
+
+- (void)publishJson:(id)payload onTopic:(NSString*)theTopic {
+    NSError * error = nil;
+    NSData * data = [NSJSONSerialization dataWithJSONObject:payload options:0 error:&error];
+    if(error!=nil){
+        //NSLog(@"Error creating JSON: %@",error.description);
+        return;
+    }
+    [self publishData:data onTopic:theTopic];
+}
+
+- (void)timerHandler:(NSTimer*)theTimer {
+    idleTimer++;
+    if (idleTimer >= keepAliveInterval) {
+        if ([encoder status] == MQTTEncoderStatusReady) {
+            //NSLog(@"sending PINGREQ");
+            [encoder encodeMessage:[MQTTMessage pingreqMessage]];
+            idleTimer = 0;
+        }
+    }
+    ticks++;
+    NSEnumerator *e = [[self.timerRing objectAtIndex:(ticks % 60)] objectEnumerator];
+    id msgId;
+
+    while ((msgId = [e nextObject])) {
+        MQttTxFlow *flow = [txFlows objectForKey:msgId];
+        MQTTMessage *msg = [flow msg];
+        [flow setDeadline:(ticks + 60)];
+        [msg setDupFlag];
+        [self send:msg];
+    }
+}
+
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode {
+   // NSLog(@"encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode ");
+    if(sender == encoder) {
+        switch (eventCode) {
+            case MQTTEncoderEventReady:
+                switch (status) {
+                    case MQTTSessionStatusCreated:
+                        //NSLog(@"Encoder has been created. Sending Auth Message");
+                        [sender encodeMessage:connectMessage];
+                        status = MQTTSessionStatusConnecting;
+                        break;
+                    case MQTTSessionStatusConnecting:
+                        break;
+                    case MQTTSessionStatusConnected:
+                        if ([self.queue count] > 0) {
+                            MQTTMessage *msg = [self.queue objectAtIndex:0];
+                            [self.queue removeObjectAtIndex:0];
+                            [encoder encodeMessage:msg];
+                        }
+                        break;
+                    case MQTTSessionStatusError:
+                        break;
+                }
+                break;
+            case MQTTEncoderEventErrorOccurred:
+                [self error:MQTTSessionEventConnectionError];
+                break;
+        }
+    }
+}
+
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode {
+    //NSLog(@"decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode");
+    if(sender == decoder) {
+        MQTTSessionEvent event;
+        switch (eventCode) {
+            case MQTTDecoderEventConnectionClosed:
+                event = MQTTSessionEventConnectionError;
+                break;
+            case MQTTDecoderEventConnectionError:
+                event = MQTTSessionEventConnectionError;
+                break;
+            case MQTTDecoderEventProtocolError:
+                event = MQTTSessionEventProtocolError;
+                break;
+        }
+        [self error:event];
+    }
+}
+
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg {
+    //NSLog(@"decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg ");
+    if(sender == decoder){
+        switch (status) {
+            case MQTTSessionStatusConnecting:
+                switch ([msg type]) {
+                    case MQTTConnack:
+                        if ([[msg data] length] != 2) {
+                            [self error:MQTTSessionEventProtocolError];
+                        }
+                        else {
+                            const UInt8 *bytes = [[msg data] bytes];
+                            if (bytes[1] == 0) {
+                                status = MQTTSessionStatusConnected;
+                                timer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:1.0]
+                                                                 interval:1.0
+                                                                   target:self
+                                                                 selector:@selector(timerHandler:)
+                                                                 userInfo:nil
+                                                                  repeats:YES];
+                                if ([delegate respondsToSelector:@selector(session:handleEvent:)]) {
+                                    [delegate session:self handleEvent:MQTTSessionEventConnected];
+                                }
+                                [runLoop addTimer:timer forMode:runLoopMode];
+                            }
+                            else {
+                                [self error:MQTTSessionEventConnectionRefused];
+                            }
+                        }
+                        break;
+                    default:
+                        [self error:MQTTSessionEventProtocolError];
+                        break;
+                }
+                break;
+            case MQTTSessionStatusConnected:
+                [self newMessage:msg];
+                break;
+            default:
+                break;
+        }
+    }
+}
+
+- (void)newMessage:(MQTTMessage*)msg {
+    switch ([msg type]) {
+    case MQTTPublish:
+        [self handlePublish:msg];
+        break;
+    case MQTTPuback:
+        [self handlePuback:msg];
+        break;
+    case MQTTPubrec:
+        [self handlePubrec:msg];
+        break;
+    case MQTTPubrel:
+        [self handlePubrel:msg];
+        break;
+    case MQTTPubcomp:
+        [self handlePubcomp:msg];
+        break;
+    default:
+        return;
+    }
+}
+
+- (void)handlePublish:(MQTTMessage*)msg {
+    if (![delegate respondsToSelector:@selector(session:newMessage:onTopic:)]) {
+        return;
+    }
+    NSData *data = [msg data];
+    if ([data length] < 2) {
+        return;
+    }
+    UInt8 const *bytes = [data bytes];
+    UInt16 topicLength = 256 * bytes[0] + bytes[1];
+    if ([data length] < 2 + topicLength) {
+        return;
+    }
+    NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)];
+    NSString *topic = [[NSString alloc] initWithData:topicData
+                                            encoding:NSUTF8StringEncoding];
+    NSRange range = NSMakeRange(2 + topicLength, [data length] - topicLength - 2);
+    data = [data subdataWithRange:range];
+    if ([msg qos] == 0) {
+        [delegate session:self newMessage:data onTopic:topic];
+    }
+    else {
+        if ([data length] < 2) {
+            return;
+        }
+        bytes = [data bytes];
+        UInt16 msgId = 256 * bytes[0] + bytes[1];
+        if (msgId == 0) {
+            return;
+        }
+        data = [data subdataWithRange:NSMakeRange(2, [data length] - 2)];
+        if ([msg qos] == 1) {
+            [delegate session:self newMessage:data onTopic:topic];
+            [self send:[MQTTMessage pubackMessageWithMessageId:msgId]];
+        }
+        else {
+            NSDictionary *dict = [NSDictionary dictionaryWithObjectsAndKeys:
+                data, @"data", topic, @"topic", nil];
+            [rxFlows setObject:dict forKey:[NSNumber numberWithUnsignedInt:msgId]];
+            [self send:[MQTTMessage pubrecMessageWithMessageId:msgId]];
+        }
+    }
+}
+
+- (void)handlePuback:(MQTTMessage*)msg {
+    if ([[msg data] length] != 2) {
+        return;
+    }
+    UInt8 const *bytes = [[msg data] bytes];
+    NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+    if ([msgId unsignedIntValue] == 0) {
+        return;
+    }
+    MQttTxFlow *flow = [txFlows objectForKey:msgId];
+    if (flow == nil) {
+        return;
+    }
+
+    if ([[flow msg] type] != MQTTPublish || [[flow msg] qos] != 1) {
+        return;
+    }
+
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+    [txFlows removeObjectForKey:msgId];
+}
+
+- (void)handlePubrec:(MQTTMessage*)msg {
+    if ([[msg data] length] != 2) {
+        return;
+    }
+    UInt8 const *bytes = [[msg data] bytes];
+    NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+    if ([msgId unsignedIntValue] == 0) {
+        return;
+    }
+    MQttTxFlow *flow = [txFlows objectForKey:msgId];
+    if (flow == nil) {
+        return;
+    }
+    msg = [flow msg];
+    if ([msg type] != MQTTPublish || [msg qos] != 2) {
+        return;
+    }
+    msg = [MQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]];
+    [flow setMsg:msg];
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+    [flow setDeadline:(ticks + 60)];
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId];
+
+    [self send:msg];
+}
+
+- (void)handlePubrel:(MQTTMessage*)msg {
+    if ([[msg data] length] != 2) {
+        return;
+    }
+    UInt8 const *bytes = [[msg data] bytes];
+    NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+    if ([msgId unsignedIntValue] == 0) {
+        return;
+    }
+    NSDictionary *dict = [rxFlows objectForKey:msgId];
+    if (dict != nil) {
+        [delegate session:self
+               newMessage:[dict valueForKey:@"data"]
+                  onTopic:[dict valueForKey:@"topic"]];
+        [rxFlows removeObjectForKey:msgId];
+    }
+    [self send:[MQTTMessage pubcompMessageWithMessageId:[msgId unsignedIntegerValue]]];
+}
+
+- (void)handlePubcomp:(MQTTMessage*)msg {
+    if ([[msg data] length] != 2) {
+        return;
+    }
+    UInt8 const *bytes = [[msg data] bytes];
+    NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+    if ([msgId unsignedIntValue] == 0) {
+        return;
+    }
+    MQttTxFlow *flow = [txFlows objectForKey:msgId];
+    if (flow == nil || [[flow msg] type] != MQTTPubrel) {
+        return;
+    }
+
+    [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+    [txFlows removeObjectForKey:msgId];
+}
+
+- (void)error:(MQTTSessionEvent)eventCode {
+    
+    [encoder close];
+    encoder = nil;
+    
+    [decoder close];
+    decoder = nil;
+    
+    if (timer != nil) {
+        [timer invalidate];
+        
+        timer = nil;
+    }
+    status = MQTTSessionStatusError;
+    
+    usleep(1000000); // 1 sec delay
+    
+    if ([delegate respondsToSelector:@selector(session:handleEvent:)]) {
+        [delegate session:self handleEvent:eventCode];
+    }
+
+}
+
+- (void)send:(MQTTMessage*)msg {
+    if ([encoder status] == MQTTEncoderStatusReady) {
+        [encoder encodeMessage:msg];
+    }
+    else {
+        [self.queue addObject:msg];
+    }
+}
+
+- (UInt16)nextMsgId {
+    txMsgId++;
+    while (txMsgId == 0 || [txFlows objectForKey:[NSNumber numberWithUnsignedInt:txMsgId]] != nil) {
+        txMsgId++;
+    }
+    return txMsgId;
+}
+
+@end
diff --git a/src/MqttSDK/MQttTxFlow.h b/src/MqttSDK/MQttTxFlow.h
new file mode 100644
index 0000000..59d9986
--- /dev/null
+++ b/src/MqttSDK/MQttTxFlow.h
@@ -0,0 +1,36 @@
+//
+// MQttTxFlow.h
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQttTxFlow : NSObject {
+    MQTTMessage *   msg;
+    unsigned int    deadline;
+}
+
++ (id)flowWithMsg:(MQTTMessage*)aMsg
+         deadline:(unsigned int)aDeadline;
+
+- (id)initWithMsg:(MQTTMessage*)aMsg deadline:(unsigned int)aDeadline;
+
+- (void)setMsg:(MQTTMessage*)aMsg;
+- (void)setDeadline:(unsigned int)newValue;
+- (MQTTMessage*)msg;
+- (unsigned int)deadline;
+
+@end
+
diff --git a/src/MqttSDK/MQttTxFlow.m b/src/MqttSDK/MQttTxFlow.m
new file mode 100644
index 0000000..80604f6
--- /dev/null
+++ b/src/MqttSDK/MQttTxFlow.m
@@ -0,0 +1,50 @@
+//
+// MQtttTxFlow.m
+// 
+// Copyright (c) 2011, 2013, 2lemetry LLC
+// 
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+// 
+// Contributors:
+//    Kyle Roche - initial API and implementation and/or initial documentation
+// 
+
+#import "MQttTxFlow.h"
+
+@implementation MQttTxFlow
+
++ (id)flowWithMsg:(MQTTMessage*)msg
+         deadline:(unsigned int)deadline {
+   return [[MQttTxFlow alloc] initWithMsg:msg deadline:deadline];
+}
+
+- (id)initWithMsg:(MQTTMessage*)aMsg
+         deadline:(unsigned int)aDeadline {
+   msg = aMsg;
+   deadline = aDeadline;
+   return self;
+}
+
+- (void)setMsg:(MQTTMessage*)aMsg {
+    msg = aMsg;
+}
+
+- (void)setDeadline:(unsigned int)newDeadline {
+    deadline = newDeadline;
+}
+
+- (MQTTMessage*)msg {
+    return msg;
+}
+
+- (unsigned int) deadline {
+    return deadline;
+}
+
+
+@end