From 3c20382fb673f1fd02aa9aa7cf20d4698b657146 Mon Sep 17 00:00:00 2001 From: Robert Sesek Date: Fri, 9 Oct 2015 23:18:37 -0400 Subject: [PATCH] Rewrite MessageQueue to use libdispatch instead of CFStream and a dedicated thread. --- Source/MessageQueue.h | 38 +---- Source/MessageQueue.m | 385 +++++++++++++----------------------------- 2 files changed, 120 insertions(+), 303 deletions(-) diff --git a/Source/MessageQueue.h b/Source/MessageQueue.h index a038cbd..e3ad629 100644 --- a/Source/MessageQueue.h +++ b/Source/MessageQueue.h @@ -24,43 +24,7 @@ // program with which it exchanges UTF8 string messages. A message contains two // parts, both terminated by '\0'. The first is an ASCII integer number that is // the length of the second part. The second part is the actual string message. -@interface MessageQueue : NSObject { - @private - // The port number on which to open a listening socket. - NSUInteger _port; - - // The thread and its run loop on which this class primarily operates. - NSThread* _thread; - NSRunLoop* _runLoop; - - // Whether or not the run loop should quit. - BOOL _shouldQuit; - - // Whether or not the message queue is connected to a client. - BOOL _connected; - - // A queue of messages that are waiting to be sent. - NSMutableArray* _queue; - - // The delegate for this class. - BSProtocolThreadInvoker* _delegate; - - // The socket that listens for new incoming connections. - CFSocketRef _socket; - - // The child socket that has been accepted from |_socket|. - CFSocketNativeHandle _child; - - // The read and write streams that are created on the |_child| socket. - CFReadStreamRef _readStream; - CFWriteStreamRef _writeStream; - - // When a message is being read, this temporary buffer is used to build up - // the complete message from successive reads. - NSMutableString* _message; - NSUInteger _totalMessageSize; - NSUInteger _messageSize; -} +@interface MessageQueue : NSObject // Creates a new MessasgeQueue that will listen on |port| and report information // to its |delegate|. diff --git a/Source/MessageQueue.m b/Source/MessageQueue.m index d9e76a6..f66a06b 100644 --- a/Source/MessageQueue.m +++ b/Source/MessageQueue.m @@ -16,81 +16,48 @@ #import "MessageQueue.h" +#include #include #include -@interface MessageQueue (Private) -// Thread main function that is started from -connect. -- (void)runMessageQueue; +@implementation MessageQueue { + // The port number on which to open a listening socket. + NSUInteger _port; -// All the following methods must be called from the -runMessageQueue thread. + // The dispatch queue for this instance. + dispatch_queue_t _dispatchQueue; -// Creates a listening socket and schedules it in the run loop. -- (void)listenForClient; + // Whether or not the message queue is connected to a client. + BOOL _connected; -// Closes down the listening socket, the child socket, and the streams. -- (void)disconnectClient; - -// This first calls -disconnectClient and then stops the run loop and terminates -// the -runMessageQueue thread. -- (void)stopRunLoop; + // A queue of messages that are waiting to be sent. + NSMutableArray* _messageQueue; -// Adds a |message| to |_queue|. -- (void)enqueueMessage:(NSString*)message; + // The delegate for this class. + BSProtocolThreadInvoker* _delegate; -// If the write stream is ready and there is data to send, sends the next message. -- (void)dequeueAndSend; + BOOL _listening; -// Writes the string into the write stream. -- (void)performSend:(NSString*)message; + int _socket; -// Reads bytes out of the read stream. This may be called multiple times if the -// message cannot be read in one pass. -- (void)readMessageFromStream; + // The two dispatch sources for the |_child|. + dispatch_source_t _readSource; + dispatch_source_t _writeSource; -// Converts a CFErrorRef to an NSError and passes it to the delegate. -- (void)reportError:(CFErrorRef)error; - -// Forwarding methods from the CoreFoundation callbacks. -- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child; -- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event; -- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event; -@end - -// CoreFoundation Callbacks //////////////////////////////////////////////////// - -static void MessageQueueSocketAccept(CFSocketRef socket, - CFSocketCallBackType callbackType, - CFDataRef address, - const void* data, - void* self) -{ - CFSocketNativeHandle child = *(CFSocketNativeHandle*)data; - [(MessageQueue*)self listenSocket:socket acceptedSocket:child]; + // When a message is being read, this temporary buffer is used to build up + // the complete message from successive reads. + NSMutableString* _message; + NSUInteger _totalMessageSize; + NSUInteger _messageSize; } -static void MessageQueueReadEvent(CFReadStreamRef stream, - CFStreamEventType eventType, - void* self) -{ - [(MessageQueue*)self readStream:stream handleEvent:eventType]; -} - -static void MessageQueueWriteEvent(CFWriteStreamRef stream, - CFStreamEventType eventType, - void* self) -{ - [(MessageQueue*)self writeStream:stream handleEvent:eventType]; -} - -//////////////////////////////////////////////////////////////////////////////// - -@implementation MessageQueue - - (id)initWithPort:(NSUInteger)port delegate:(id)delegate { if ((self = [super init])) { _port = port; - _queue = [[NSMutableArray alloc] init]; + _dispatchQueue = dispatch_queue_create( + [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String], + DISPATCH_QUEUE_SERIAL); + _messageQueue = [[NSMutableArray alloc] init]; _delegate = (BSProtocolThreadInvoker*) [[BSProtocolThreadInvoker alloc] initWithObject:delegate protocol:@protocol(MessageQueueDelegate) @@ -100,7 +67,8 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream, } - (void)dealloc { - [_queue release]; + dispatch_release(_dispatchQueue); + [_messageQueue release]; [_delegate release]; [super dealloc]; } @@ -110,148 +78,107 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream, } - (void)connect { - if (_thread) + if (_connected) return; - [NSThread detachNewThreadSelector:@selector(runMessageQueue) - toTarget:self - withObject:nil]; + _connected = YES; + dispatch_async(_dispatchQueue, ^{ [self openListeningSocket]; }); } - (void)disconnect { - [self performSelector:@selector(stopRunLoop) - onThread:_thread - withObject:nil - waitUntilDone:NO]; + dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; }); + _connected = NO; } - (void)sendMessage:(NSString*)message { - [self performSelector:@selector(enqueueMessage:) - onThread:_thread - withObject:message - waitUntilDone:NO]; + dispatch_async(_dispatchQueue, ^{ + [_messageQueue addObject:message]; + [self dequeueAndSend]; + }); } // Private ///////////////////////////////////////////////////////////////////// -- (void)runMessageQueue { - @autoreleasepool { - _thread = [NSThread currentThread]; - _runLoop = [NSRunLoop currentRunLoop]; - - _connected = NO; - _shouldQuit = NO; - [self scheduleListenSocket]; - - // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic - // stop routine. - CFRunLoopRun(); +- (void)openListeningSocket { + // Create a socket. + do { + _socket = socket(PF_INET, SOCK_STREAM, 0); + if (_socket < 0) { + NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno)); + } + } while (!_socket); - _thread = nil; - _runLoop = nil; - } -} + // Allow old, yet-to-be recycled sockets to be reused. + int yes = 1; + setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int)); -- (void)scheduleListenSocket { - // Create the address structure. - struct sockaddr_in address; - memset(&address, 0, sizeof(address)); + // Bind to the address. + struct sockaddr_in address = {0}; address.sin_len = sizeof(address); address.sin_family = AF_INET; address.sin_port = htons(_port); address.sin_addr.s_addr = htonl(INADDR_ANY); - // Create the socket signature. - CFSocketSignature signature; - signature.protocolFamily = PF_INET; - signature.socketType = SOCK_STREAM; - signature.protocol = IPPROTO_TCP; - signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)]; - - CFSocketContext context = { 0 }; - context.info = self; - + int rv; do { - _socket = - CFSocketCreateWithSocketSignature(kCFAllocatorDefault, - &signature, // Socket signature. - kCFSocketAcceptCallBack, // Callback types. - &MessageQueueSocketAccept, // Callback function. - &context); // Context to pass to callout. - if (!_socket) { - // Pump the run loop while waiting for the socket to be reusued. If told - // to quit while waiting, then break out of the loop. - if (CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1, FALSE) && _shouldQuit) - return; - NSLog(@"Could not open socket"); - //[connection_ errorEncountered:@"Could not open socket."]; + rv = bind(_socket, &address, sizeof(address)); + if (rv != 0) { + NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno)); } - } while (!_socket); + } while (rv != 0); - // Allow old, yet-to-be recycled sockets to be reused. - int yes = 1; - setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int)); - - // Schedule the socket on the run loop. - CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0); - CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes); - CFRelease(source); + // Listen for a connection. + rv = listen(_socket, 1); + if (rv < 0) { + NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno)); + close(_socket); + _connected = NO; + return; + } + _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue); + dispatch_source_set_event_handler(_readSource, ^{ + [self acceptConnection]; + }); + dispatch_resume(_readSource); } +// Closes down the listening socket, the child socket, and the streams. - (void)disconnectClient { - if (_socket) { - CFSocketInvalidate(_socket); - CFRelease(_socket); - _socket = NULL; + if (_readSource) { + dispatch_source_cancel(_readSource); + dispatch_release(_readSource); + _readSource = NULL; } - if (_readStream) { - CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); - CFReadStreamClose(_readStream); - CFRelease(_readStream); - _readStream = NULL; + if (_writeSource) { + dispatch_source_cancel(_writeSource); + dispatch_release(_writeSource); + _writeSource = NULL; } - if (_writeStream) { - CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); - CFWriteStreamClose(_writeStream); - CFRelease(_writeStream); - _writeStream = NULL; + if (_socket) { + close(_socket); + _socket = -1; } - if (_child) { - close(_child); - _child = NULL; - } + _listening = NO; + [_messageQueue removeAllObjects]; - _connected = NO; [_delegate messageQueueDidDisconnect:self]; } -- (void)stopRunLoop { - _shouldQuit = YES; - [self disconnectClient]; - CFRunLoopStop([_runLoop getCFRunLoop]); -} - -- (void)enqueueMessage:(NSString*)message { - [_queue addObject:message]; - [self dequeueAndSend]; -} - +// If the write stream is ready and there is data to send, sends the next message. - (void)dequeueAndSend { - if (![_queue count]) + if (![_messageQueue count]) return; - if (!CFWriteStreamCanAcceptBytes(_writeStream)) - return; - - NSString* message = [_queue objectAtIndex:0]; + NSString* message = [_messageQueue objectAtIndex:0]; [self performSend:message]; - [_queue removeObjectAtIndex:0]; + [_messageQueue removeObjectAtIndex:0]; } +// Writes the string into the write stream. - (void)performSend:(NSString*)message { // TODO: May need to negotiate with the server as to the string encoding. const NSStringEncoding kEncoding = NSUTF8StringEncoding; @@ -280,9 +207,9 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream, // method is only ever called in response to a stream ready event. NSUInteger totalWritten = 0; while (totalWritten < bufferSize) { - CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten); + ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten); if (bytesWritten < 0) { - [self reportError:CFWriteStreamCopyError(_writeStream)]; + NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno)); break; } totalWritten += bytesWritten; @@ -293,11 +220,17 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream, free(buffer); } +// Reads bytes out of the read stream. This may be called multiple times if the +// message cannot be read in one pass. - (void)readMessageFromStream { const NSUInteger kBufferSize = 1024; UInt8 buffer[kBufferSize]; CFIndex bufferOffset = 0; // Starting point in |buffer| to work with. - CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize); + ssize_t bytesRead = read(_socket, buffer, kBufferSize); + if (bytesRead == 0) { + [self disconnectClient]; + return; + } const char* charBuffer = (const char*)buffer; // The read loop works by going through the buffer until all the bytes have @@ -341,117 +274,37 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream, } } -- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child { - if (socket != _socket) { - // TODO: error - return; - } - - _child = child; - - // Create the streams on the socket. - CFStreamCreatePairWithSocket(kCFAllocatorDefault, - _child, // Socket handle. - &_readStream, // Read stream in-pointer. - &_writeStream); // Write stream in-pointer. - - // Create struct to register callbacks for the stream. - CFStreamClientContext context = { 0 }; - context.info = self; - - // Set the client of the read stream. - CFOptionFlags readFlags = kCFStreamEventOpenCompleted | - kCFStreamEventHasBytesAvailable | - kCFStreamEventErrorOccurred | - kCFStreamEventEndEncountered; - if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context)) - // Schedule in run loop to do asynchronous communication with the engine. - CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); - else - return; - - // Open the stream now that it's scheduled on the run loop. - if (!CFReadStreamOpen(_readStream)) { - [self reportError:CFReadStreamCopyError(_readStream)]; - return; - } - - // Set the client of the write stream. - CFOptionFlags writeFlags = kCFStreamEventOpenCompleted | - kCFStreamEventCanAcceptBytes | - kCFStreamEventErrorOccurred | - kCFStreamEventEndEncountered; - if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context)) - // Schedule it in the run loop to receive error information. - CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); - else - return; +- (void)acceptConnection { + _listening = NO; - // Open the write stream. - if (!CFWriteStreamOpen(_writeStream)) { - [self reportError:CFWriteStreamCopyError(_writeStream)]; + struct sockaddr_in address = {0}; + socklen_t addressLength = sizeof(address); + int connection = accept(_socket, &address, &addressLength); + if (connection < 0) { + NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno)); + [self disconnectClient]; return; } - _connected = YES; - [_delegate messageQueueDidConnect:self]; + dispatch_source_cancel(_readSource); + close(_socket); - CFSocketInvalidate(_socket); - CFRelease(_socket); - _socket = NULL; -} + _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue); + dispatch_source_set_event_handler(_readSource, ^{ + [self readMessageFromStream]; + }); -- (void)reportError:(CFErrorRef)error -{ - [_delegate messageQueue:self error:(NSError*)error]; - CFRelease(error); -} + _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue); + dispatch_source_set_event_handler(_writeSource, ^{ + [self dequeueAndSend]; + }); -- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event -{ - assert(stream == _readStream); - switch (event) - { - case kCFStreamEventHasBytesAvailable: - [self readMessageFromStream]; - break; + _socket = connection; - case kCFStreamEventErrorOccurred: - [self reportError:CFReadStreamCopyError(stream)]; - [self stopRunLoop]; - break; - - case kCFStreamEventEndEncountered: - [self stopRunLoop]; - break; - - default: - // TODO: error - break; - }; -} + dispatch_resume(_readSource); + dispatch_resume(_writeSource); -- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event -{ - assert(stream == _writeStream); - switch (event) { - case kCFStreamEventCanAcceptBytes: - [self dequeueAndSend]; - break; - - case kCFStreamEventErrorOccurred: - [self reportError:CFWriteStreamCopyError(stream)]; - [self stopRunLoop]; - break; - - case kCFStreamEventEndEncountered: - [self stopRunLoop]; - break; - - default: - // TODO: error - break; - } + [_delegate messageQueueDidConnect:self]; } @end -- 2.22.5