From 0965dca2ed0247bd4d5ae99b4da5034f617e0fcf Mon Sep 17 00:00:00 2001 From: Robert Sesek Date: Sat, 10 Oct 2015 01:15:51 -0400 Subject: [PATCH] Cleanup from the MessageQueue rewrite. --- Source/MessageQueue.m | 78 +++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/Source/MessageQueue.m b/Source/MessageQueue.m index f66a06b..9860e68 100644 --- a/Source/MessageQueue.m +++ b/Source/MessageQueue.m @@ -18,13 +18,19 @@ #include #include +#include #include +#include +#include + +#import "BSProtocolThreadInvoker.h" @implementation MessageQueue { // The port number on which to open a listening socket. NSUInteger _port; - // The dispatch queue for this instance. + // All the ivars beneath this must be accessed from this queue. + ////////////////////////////////////////////////////////////////////////////// dispatch_queue_t _dispatchQueue; // Whether or not the message queue is connected to a client. @@ -36,11 +42,13 @@ // The delegate for this class. BSProtocolThreadInvoker* _delegate; - BOOL _listening; - + // The socket for the queue. This will either be a listening socket, waiting + // to accept connections. Or it will be a connected socket with a server. int _socket; - // The two dispatch sources for the |_child|. + // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is + // for a listening socket, only |_readSource| will be non-NULL. If + // |_connected| is false, both will be NULL. dispatch_source_t _readSource; dispatch_source_t _writeSource; @@ -62,11 +70,13 @@ [[BSProtocolThreadInvoker alloc] initWithObject:delegate protocol:@protocol(MessageQueueDelegate) thread:[NSThread currentThread]]; + _socket = -1; } return self; } - (void)dealloc { + dispatch_sync(_dispatchQueue, ^{ [self disconnectClient]; }); dispatch_release(_dispatchQueue); [_messageQueue release]; [_delegate release]; @@ -74,20 +84,22 @@ } - (BOOL)isConnected { - return _connected; + BOOL __block connected; + dispatch_sync(_dispatchQueue, ^{ connected = _connected; }); + return connected; } - (void)connect { - if (_connected) - return; + dispatch_async(_dispatchQueue, ^{ + if (_connected) + return; - _connected = YES; - dispatch_async(_dispatchQueue, ^{ [self openListeningSocket]; }); + [self openListeningSocket]; + }); } - (void)disconnect { dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; }); - _connected = NO; } - (void)sendMessage:(NSString*)message { @@ -101,12 +113,11 @@ - (void)openListeningSocket { // Create a socket. - do { - _socket = socket(PF_INET, SOCK_STREAM, 0); - if (_socket < 0) { - NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno)); - } - } while (!_socket); + _socket = socket(PF_INET, SOCK_STREAM, 0); + if (_socket < 0) { + NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno)); + return; + } // Allow old, yet-to-be recycled sockets to be reused. int yes = 1; @@ -133,7 +144,7 @@ if (rv < 0) { NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno)); close(_socket); - _connected = NO; + _socket = -1; return; } _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue); @@ -141,6 +152,8 @@ [self acceptConnection]; }); dispatch_resume(_readSource); + + _connected = YES; } // Closes down the listening socket, the child socket, and the streams. @@ -157,14 +170,14 @@ _writeSource = NULL; } - if (_socket) { + if (_socket != -1) { close(_socket); _socket = -1; } - _listening = NO; [_messageQueue removeAllObjects]; + _connected = NO; [_delegate messageQueueDidDisconnect:self]; } @@ -185,9 +198,7 @@ // Add space for the NUL byte. NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1; - UInt8* buffer = malloc(maxBufferSize); - bzero(buffer, maxBufferSize); - + UInt8* buffer = calloc(maxBufferSize, sizeof(UInt8)); NSUInteger bufferSize = 0; if (![message getBytes:buffer maxLength:maxBufferSize @@ -224,8 +235,8 @@ // message cannot be read in one pass. - (void)readMessageFromStream { const NSUInteger kBufferSize = 1024; - UInt8 buffer[kBufferSize]; - CFIndex bufferOffset = 0; // Starting point in |buffer| to work with. + char buffer[kBufferSize]; + ssize_t bufferOffset = 0; // Starting point in |buffer| to work with. ssize_t bytesRead = read(_socket, buffer, kBufferSize); if (bytesRead == 0) { [self disconnectClient]; @@ -238,7 +249,7 @@ while (bufferOffset < bytesRead) { // Find the NUL separator, or the end of the string. NSUInteger partLength = 0; - for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ; + for (ssize_t i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ; // If there is not a current packet, set some state. if (!_message) { @@ -251,13 +262,11 @@ } // Substring the byte stream and append it to the packet string. - CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault, - buffer + bufferOffset, // Byte pointer, offset by start index. - partLength, // Length. - kCFStringEncodingUTF8, - true); - [_message appendString:(NSString*)bufferString]; - CFRelease(bufferString); + NSString* bufferString = [[NSString alloc] initWithBytesNoCopy:buffer + bufferOffset + length:partLength + encoding:NSUTF8StringEncoding + freeWhenDone:NO]; + [_message appendString:[bufferString autorelease]]; // Advance counters. _messageSize += partLength; @@ -267,16 +276,11 @@ if (_messageSize >= _totalMessageSize) { [_delegate messageQueue:self didReceiveMessage:[_message autorelease]]; _message = nil; - - // Process any outgoing messages. - [self dequeueAndSend]; } } } - (void)acceptConnection { - _listening = NO; - struct sockaddr_in address = {0}; socklen_t addressLength = sizeof(address); int connection = accept(_socket, &address, &addressLength); -- 2.22.5