Rewrite MessageQueue to use libdispatch instead of CFStream and a dedicated thread.
authorRobert Sesek <rsesek@bluestatic.org>
Sat, 10 Oct 2015 03:18:37 +0000 (23:18 -0400)
committerRobert Sesek <rsesek@bluestatic.org>
Sat, 10 Oct 2015 03:18:37 +0000 (23:18 -0400)
Source/MessageQueue.h
Source/MessageQueue.m

index a038cbddc404be934b9e33844112ba026bf8b6df..e3ad629cb370f082027180249dd10ce5102ce15b 100644 (file)
 // program with which it exchanges UTF8 string messages. A message contains two
 // parts, both terminated by '\0'. The first is an ASCII integer number that is
 // the length of the second part. The second part is the actual string message.
-@interface MessageQueue : NSObject {
- @private
-  // The port number on which to open a listening socket.
-  NSUInteger _port;
-
-  // The thread and its run loop on which this class primarily operates.
-  NSThread* _thread;
-  NSRunLoop* _runLoop;
-
-  // Whether or not the run loop should quit.
-  BOOL _shouldQuit;
-
-  // Whether or not the message queue is connected to a client.
-  BOOL _connected;
-
-  // A queue of messages that are waiting to be sent.
-  NSMutableArray* _queue;
-
-  // The delegate for this class.
-  BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
-
-  // The socket that listens for new incoming connections.
-  CFSocketRef _socket;
-
-  // The child socket that has been accepted from |_socket|.
-  CFSocketNativeHandle _child;
-
-  // The read and write streams that are created on the |_child| socket.
-  CFReadStreamRef _readStream;
-  CFWriteStreamRef _writeStream;
-
-  // When a message is being read, this temporary buffer is used to build up
-  // the complete message from successive reads.
-  NSMutableString* _message;
-  NSUInteger _totalMessageSize;
-  NSUInteger _messageSize;
-}
+@interface MessageQueue : NSObject
 
 // Creates a new MessasgeQueue that will listen on |port| and report information
 // to its |delegate|.
index d9e76a6c8a9631fc9c6ce56aa7d9bfb326743092..f66a06bc5bf2095873ee18780172a41a403d073a 100644 (file)
 
 #import "MessageQueue.h"
 
+#include <dispatch/dispatch.h>
 #include <netinet/in.h>
 #include <sys/socket.h>
 
-@interface MessageQueue (Private)
-// Thread main function that is started from -connect.
-- (void)runMessageQueue;
+@implementation MessageQueue {
+  // The port number on which to open a listening socket.
+  NSUInteger _port;
 
-// All the following methods must be called from the -runMessageQueue thread.
+  // The dispatch queue for this instance.
+  dispatch_queue_t _dispatchQueue;
 
-// Creates a listening socket and schedules it in the run loop.
-- (void)listenForClient;
+  // Whether or not the message queue is connected to a client.
+  BOOL _connected;
 
-// Closes down the listening socket, the child socket, and the streams.
-- (void)disconnectClient;
-
-// This first calls -disconnectClient and then stops the run loop and terminates
-// the -runMessageQueue thread.
-- (void)stopRunLoop;
+  // A queue of messages that are waiting to be sent.
+  NSMutableArray* _messageQueue;
 
-// Adds a |message| to |_queue|.
-- (void)enqueueMessage:(NSString*)message;
+  // The delegate for this class.
+  BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
 
-// If the write stream is ready and there is data to send, sends the next message.
-- (void)dequeueAndSend;
+  BOOL _listening;
 
-// Writes the string into the write stream.
-- (void)performSend:(NSString*)message;
+  int _socket;
 
-// Reads bytes out of the read stream. This may be called multiple times if the
-// message cannot be read in one pass.
-- (void)readMessageFromStream;
+  // The two dispatch sources for the |_child|.
+  dispatch_source_t _readSource;
+  dispatch_source_t _writeSource;
 
-// Converts a CFErrorRef to an NSError and passes it to the delegate.
-- (void)reportError:(CFErrorRef)error;
-
-// Forwarding methods from the CoreFoundation callbacks.
-- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
-- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
-- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
-@end
-
-// CoreFoundation Callbacks ////////////////////////////////////////////////////
-
-static void MessageQueueSocketAccept(CFSocketRef socket,
-                                     CFSocketCallBackType callbackType,
-                                     CFDataRef address,
-                                     const void* data,
-                                     void* self)
-{
-  CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
-  [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
+  // When a message is being read, this temporary buffer is used to build up
+  // the complete message from successive reads.
+  NSMutableString* _message;
+  NSUInteger _totalMessageSize;
+  NSUInteger _messageSize;
 }
 
-static void MessageQueueReadEvent(CFReadStreamRef stream,
-                                  CFStreamEventType eventType,
-                                  void* self)
-{
-  [(MessageQueue*)self readStream:stream handleEvent:eventType];
-}
-
-static void MessageQueueWriteEvent(CFWriteStreamRef stream,
-                                   CFStreamEventType eventType,
-                                   void* self)
-{
-  [(MessageQueue*)self writeStream:stream handleEvent:eventType];
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-@implementation MessageQueue
-
 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
   if ((self = [super init])) {
     _port = port;
-    _queue = [[NSMutableArray alloc] init];
+    _dispatchQueue = dispatch_queue_create(
+        [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
+        DISPATCH_QUEUE_SERIAL);
+    _messageQueue = [[NSMutableArray alloc] init];
     _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
         [[BSProtocolThreadInvoker alloc] initWithObject:delegate
                                          protocol:@protocol(MessageQueueDelegate)
@@ -100,7 +67,8 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream,
 }
 
 - (void)dealloc {
-  [_queue release];
+  dispatch_release(_dispatchQueue);
+  [_messageQueue release];
   [_delegate release];
   [super dealloc];
 }
@@ -110,148 +78,107 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream,
 }
 
 - (void)connect {
-  if (_thread)
+  if (_connected)
     return;
 
-  [NSThread detachNewThreadSelector:@selector(runMessageQueue)
-                           toTarget:self
-                         withObject:nil];
+  _connected = YES;
+  dispatch_async(_dispatchQueue, ^{ [self openListeningSocket]; });
 }
 
 - (void)disconnect {
-  [self performSelector:@selector(stopRunLoop)
-               onThread:_thread
-             withObject:nil
-          waitUntilDone:NO];
+  dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
+  _connected = NO;
 }
 
 - (void)sendMessage:(NSString*)message {
-  [self performSelector:@selector(enqueueMessage:)
-               onThread:_thread
-             withObject:message
-          waitUntilDone:NO];
+  dispatch_async(_dispatchQueue, ^{
+    [_messageQueue addObject:message];
+    [self dequeueAndSend];
+  });
 }
 
 // Private /////////////////////////////////////////////////////////////////////
 
-- (void)runMessageQueue {
-  @autoreleasepool {
-    _thread = [NSThread currentThread];
-    _runLoop = [NSRunLoop currentRunLoop];
-
-    _connected = NO;
-    _shouldQuit = NO;
-    [self scheduleListenSocket];
-
-    // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
-    // stop routine.
-    CFRunLoopRun();
+- (void)openListeningSocket {
+  // Create a socket.
+  do {
+    _socket = socket(PF_INET, SOCK_STREAM, 0);
+    if (_socket < 0) {
+      NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
+    }
+  } while (!_socket);
 
-    _thread = nil;
-    _runLoop = nil;
-  }
-}
+  // Allow old, yet-to-be recycled sockets to be reused.
+  int yes = 1;
+  setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
+  setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
 
-- (void)scheduleListenSocket {
-  // Create the address structure.
-  struct sockaddr_in address;
-  memset(&address, 0, sizeof(address));
+  // Bind to the address.
+  struct sockaddr_in address = {0};
   address.sin_len = sizeof(address);
   address.sin_family = AF_INET;
   address.sin_port = htons(_port);
   address.sin_addr.s_addr = htonl(INADDR_ANY);
 
-  // Create the socket signature.
-  CFSocketSignature signature;
-  signature.protocolFamily = PF_INET;
-  signature.socketType = SOCK_STREAM;
-  signature.protocol = IPPROTO_TCP;
-  signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
-
-  CFSocketContext context = { 0 };
-  context.info = self;
-
+  int rv;
   do {
-    _socket =
-        CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
-                                          &signature,  // Socket signature.
-                                          kCFSocketAcceptCallBack,  // Callback types.
-                                          &MessageQueueSocketAccept,  // Callback function.
-                                          &context);  // Context to pass to callout.
-    if (!_socket) {
-      // Pump the run loop while waiting for the socket to be reusued. If told
-      // to quit while waiting, then break out of the loop.
-      if (CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1, FALSE) && _shouldQuit)
-        return;
-      NSLog(@"Could not open socket");
-      //[connection_ errorEncountered:@"Could not open socket."];
+    rv = bind(_socket, &address, sizeof(address));
+    if (rv !=  0) {
+      NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
     }
-  } while (!_socket);
+  } while (rv != 0);
 
-  // Allow old, yet-to-be recycled sockets to be reused.
-  int yes = 1;
-  setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
-  setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
-
-  // Schedule the socket on the run loop.
-  CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
-  CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
-  CFRelease(source);
+  // Listen for a connection.
+  rv = listen(_socket, 1);
+  if (rv < 0) {
+    NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
+    close(_socket);
+    _connected = NO;
+    return;
+  }
+  _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
+  dispatch_source_set_event_handler(_readSource, ^{
+    [self acceptConnection];
+  });
+  dispatch_resume(_readSource);
 }
 
+// Closes down the listening socket, the child socket, and the streams.
 - (void)disconnectClient {
-  if (_socket) {
-    CFSocketInvalidate(_socket);
-    CFRelease(_socket);
-    _socket = NULL;
+  if (_readSource) {
+    dispatch_source_cancel(_readSource);
+    dispatch_release(_readSource);
+    _readSource = NULL;
   }
 
-  if (_readStream) {
-    CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
-    CFReadStreamClose(_readStream);
-    CFRelease(_readStream);
-    _readStream = NULL;
+  if (_writeSource) {
+    dispatch_source_cancel(_writeSource);
+    dispatch_release(_writeSource);
+    _writeSource = NULL;
   }
 
-  if (_writeStream) {
-    CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
-    CFWriteStreamClose(_writeStream);
-    CFRelease(_writeStream);
-    _writeStream = NULL;
+  if (_socket) {
+    close(_socket);
+    _socket = -1;
   }
 
-  if (_child) {
-    close(_child);
-    _child = NULL;
-  }
+  _listening = NO;
+  [_messageQueue removeAllObjects];
 
-  _connected = NO;
   [_delegate messageQueueDidDisconnect:self];
 }
 
-- (void)stopRunLoop {
-  _shouldQuit = YES;
-  [self disconnectClient];
-  CFRunLoopStop([_runLoop getCFRunLoop]);
-}
-
-- (void)enqueueMessage:(NSString*)message {
-  [_queue addObject:message];
-  [self dequeueAndSend];
-}
-
+// If the write stream is ready and there is data to send, sends the next message.
 - (void)dequeueAndSend {
-  if (![_queue count])
+  if (![_messageQueue count])
     return;
 
-  if (!CFWriteStreamCanAcceptBytes(_writeStream))
-    return;
-
-  NSString* message = [_queue objectAtIndex:0];
+  NSString* message = [_messageQueue objectAtIndex:0];
   [self performSend:message];
-  [_queue removeObjectAtIndex:0];
+  [_messageQueue removeObjectAtIndex:0];
 }
 
+// Writes the string into the write stream.
 - (void)performSend:(NSString*)message {
   // TODO: May need to negotiate with the server as to the string encoding.
   const NSStringEncoding kEncoding = NSUTF8StringEncoding;
@@ -280,9 +207,9 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream,
   // method is only ever called in response to a stream ready event.
   NSUInteger totalWritten = 0;
   while (totalWritten < bufferSize) {
-    CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
+    ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
     if (bytesWritten < 0) {
-      [self reportError:CFWriteStreamCopyError(_writeStream)];
+      NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
       break;
     }
     totalWritten += bytesWritten;
@@ -293,11 +220,17 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream,
   free(buffer);
 }
 
+// Reads bytes out of the read stream. This may be called multiple times if the
+// message cannot be read in one pass.
 - (void)readMessageFromStream {
   const NSUInteger kBufferSize = 1024;
   UInt8 buffer[kBufferSize];
   CFIndex bufferOffset = 0;  // Starting point in |buffer| to work with.
-  CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
+  ssize_t bytesRead = read(_socket, buffer, kBufferSize);
+  if (bytesRead == 0) {
+    [self disconnectClient];
+    return;
+  }
   const char* charBuffer = (const char*)buffer;
 
   // The read loop works by going through the buffer until all the bytes have
@@ -341,117 +274,37 @@ static void MessageQueueWriteEvent(CFWriteStreamRef stream,
   }
 }
 
-- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
-  if (socket != _socket) {
-    // TODO: error
-    return;
-  }
-
-  _child = child;
-
-  // Create the streams on the socket.
-  CFStreamCreatePairWithSocket(kCFAllocatorDefault,
-                               _child,  // Socket handle.
-                               &_readStream,  // Read stream in-pointer.
-                               &_writeStream);  // Write stream in-pointer.
-
-  // Create struct to register callbacks for the stream.
-  CFStreamClientContext context = { 0 };
-  context.info = self;
-
-  // Set the client of the read stream.
-  CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
-                            kCFStreamEventHasBytesAvailable |
-                            kCFStreamEventErrorOccurred |
-                            kCFStreamEventEndEncountered;
-  if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
-    // Schedule in run loop to do asynchronous communication with the engine.
-    CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
-  else
-    return;
-
-  // Open the stream now that it's scheduled on the run loop.
-  if (!CFReadStreamOpen(_readStream)) {
-    [self reportError:CFReadStreamCopyError(_readStream)];
-    return;
-  }
-
-  // Set the client of the write stream.
-  CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
-                             kCFStreamEventCanAcceptBytes |
-                             kCFStreamEventErrorOccurred |
-                             kCFStreamEventEndEncountered;
-  if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
-    // Schedule it in the run loop to receive error information.
-    CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
-  else
-    return;
+- (void)acceptConnection {
+  _listening = NO;
 
-  // Open the write stream.
-  if (!CFWriteStreamOpen(_writeStream)) {
-    [self reportError:CFWriteStreamCopyError(_writeStream)];
+  struct sockaddr_in address = {0};
+  socklen_t addressLength = sizeof(address);
+  int connection = accept(_socket, &address, &addressLength);
+  if (connection < 0) {
+    NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
+    [self disconnectClient];
     return;
   }
 
-  _connected = YES;
-  [_delegate messageQueueDidConnect:self];
+  dispatch_source_cancel(_readSource);
+  close(_socket);
 
-  CFSocketInvalidate(_socket);
-  CFRelease(_socket);
-  _socket = NULL;
-}
+  _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
+  dispatch_source_set_event_handler(_readSource, ^{
+    [self readMessageFromStream];
+  });
 
-- (void)reportError:(CFErrorRef)error
-{
-  [_delegate messageQueue:self error:(NSError*)error];
-  CFRelease(error);
-}
+  _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
+  dispatch_source_set_event_handler(_writeSource, ^{
+    [self dequeueAndSend];
+  });
 
-- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
-{
-  assert(stream == _readStream);
-  switch (event)
-  {
-    case kCFStreamEventHasBytesAvailable:
-      [self readMessageFromStream];
-      break;
+  _socket = connection;
 
-    case kCFStreamEventErrorOccurred:
-      [self reportError:CFReadStreamCopyError(stream)];
-      [self stopRunLoop];
-      break;
-
-    case kCFStreamEventEndEncountered:
-      [self stopRunLoop];
-      break;
-
-    default:
-      // TODO: error
-      break;
-  };
-}
+  dispatch_resume(_readSource);
+  dispatch_resume(_writeSource);
 
-- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
-{
-  assert(stream == _writeStream);
-  switch (event) {
-    case kCFStreamEventCanAcceptBytes:
-      [self dequeueAndSend];
-      break;
-
-    case kCFStreamEventErrorOccurred:
-      [self reportError:CFWriteStreamCopyError(stream)];
-      [self stopRunLoop];
-      break;
-
-    case kCFStreamEventEndEncountered:
-      [self stopRunLoop];
-      break;
-
-    default:
-      // TODO: error
-      break;
-  }
+  [_delegate messageQueueDidConnect:self];
 }
 
 @end