#import "MessageQueue.h"
+#include <dispatch/dispatch.h>
#include <netinet/in.h>
#include <sys/socket.h>
-@interface MessageQueue (Private)
-// Thread main function that is started from -connect.
-- (void)runMessageQueue;
+@implementation MessageQueue {
+ // The port number on which to open a listening socket.
+ NSUInteger _port;
-// All the following methods must be called from the -runMessageQueue thread.
+ // The dispatch queue for this instance.
+ dispatch_queue_t _dispatchQueue;
-// Creates a listening socket and schedules it in the run loop.
-- (void)listenForClient;
+ // Whether or not the message queue is connected to a client.
+ BOOL _connected;
-// Closes down the listening socket, the child socket, and the streams.
-- (void)disconnectClient;
-
-// This first calls -disconnectClient and then stops the run loop and terminates
-// the -runMessageQueue thread.
-- (void)stopRunLoop;
+ // A queue of messages that are waiting to be sent.
+ NSMutableArray* _messageQueue;
-// Adds a |message| to |_queue|.
-- (void)enqueueMessage:(NSString*)message;
+ // The delegate for this class.
+ BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
-// If the write stream is ready and there is data to send, sends the next message.
-- (void)dequeueAndSend;
+ BOOL _listening;
-// Writes the string into the write stream.
-- (void)performSend:(NSString*)message;
+ int _socket;
-// Reads bytes out of the read stream. This may be called multiple times if the
-// message cannot be read in one pass.
-- (void)readMessageFromStream;
+ // The two dispatch sources for the |_child|.
+ dispatch_source_t _readSource;
+ dispatch_source_t _writeSource;
-// Converts a CFErrorRef to an NSError and passes it to the delegate.
-- (void)reportError:(CFErrorRef)error;
-
-// Forwarding methods from the CoreFoundation callbacks.
-- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
-- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
-- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
-@end
-
-// CoreFoundation Callbacks ////////////////////////////////////////////////////
-
-static void MessageQueueSocketAccept(CFSocketRef socket,
- CFSocketCallBackType callbackType,
- CFDataRef address,
- const void* data,
- void* self)
-{
- CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
- [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
+ // When a message is being read, this temporary buffer is used to build up
+ // the complete message from successive reads.
+ NSMutableString* _message;
+ NSUInteger _totalMessageSize;
+ NSUInteger _messageSize;
}
-static void MessageQueueReadEvent(CFReadStreamRef stream,
- CFStreamEventType eventType,
- void* self)
-{
- [(MessageQueue*)self readStream:stream handleEvent:eventType];
-}
-
-static void MessageQueueWriteEvent(CFWriteStreamRef stream,
- CFStreamEventType eventType,
- void* self)
-{
- [(MessageQueue*)self writeStream:stream handleEvent:eventType];
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-@implementation MessageQueue
-
- (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
if ((self = [super init])) {
_port = port;
- _queue = [[NSMutableArray alloc] init];
+ _dispatchQueue = dispatch_queue_create(
+ [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
+ DISPATCH_QUEUE_SERIAL);
+ _messageQueue = [[NSMutableArray alloc] init];
_delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
[[BSProtocolThreadInvoker alloc] initWithObject:delegate
protocol:@protocol(MessageQueueDelegate)
}
- (void)dealloc {
- [_queue release];
+ dispatch_release(_dispatchQueue);
+ [_messageQueue release];
[_delegate release];
[super dealloc];
}
}
- (void)connect {
- if (_thread)
+ if (_connected)
return;
- [NSThread detachNewThreadSelector:@selector(runMessageQueue)
- toTarget:self
- withObject:nil];
+ _connected = YES;
+ dispatch_async(_dispatchQueue, ^{ [self openListeningSocket]; });
}
- (void)disconnect {
- [self performSelector:@selector(stopRunLoop)
- onThread:_thread
- withObject:nil
- waitUntilDone:NO];
+ dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
+ _connected = NO;
}
- (void)sendMessage:(NSString*)message {
- [self performSelector:@selector(enqueueMessage:)
- onThread:_thread
- withObject:message
- waitUntilDone:NO];
+ dispatch_async(_dispatchQueue, ^{
+ [_messageQueue addObject:message];
+ [self dequeueAndSend];
+ });
}
// Private /////////////////////////////////////////////////////////////////////
-- (void)runMessageQueue {
- @autoreleasepool {
- _thread = [NSThread currentThread];
- _runLoop = [NSRunLoop currentRunLoop];
-
- _connected = NO;
- _shouldQuit = NO;
- [self scheduleListenSocket];
-
- // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
- // stop routine.
- CFRunLoopRun();
+- (void)openListeningSocket {
+ // Create a socket.
+ do {
+ _socket = socket(PF_INET, SOCK_STREAM, 0);
+ if (_socket < 0) {
+ NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
+ }
+ } while (!_socket);
- _thread = nil;
- _runLoop = nil;
- }
-}
+ // Allow old, yet-to-be recycled sockets to be reused.
+ int yes = 1;
+ setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
+ setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
-- (void)scheduleListenSocket {
- // Create the address structure.
- struct sockaddr_in address;
- memset(&address, 0, sizeof(address));
+ // Bind to the address.
+ struct sockaddr_in address = {0};
address.sin_len = sizeof(address);
address.sin_family = AF_INET;
address.sin_port = htons(_port);
address.sin_addr.s_addr = htonl(INADDR_ANY);
- // Create the socket signature.
- CFSocketSignature signature;
- signature.protocolFamily = PF_INET;
- signature.socketType = SOCK_STREAM;
- signature.protocol = IPPROTO_TCP;
- signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
-
- CFSocketContext context = { 0 };
- context.info = self;
-
+ int rv;
do {
- _socket =
- CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
- &signature, // Socket signature.
- kCFSocketAcceptCallBack, // Callback types.
- &MessageQueueSocketAccept, // Callback function.
- &context); // Context to pass to callout.
- if (!_socket) {
- // Pump the run loop while waiting for the socket to be reusued. If told
- // to quit while waiting, then break out of the loop.
- if (CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1, FALSE) && _shouldQuit)
- return;
- NSLog(@"Could not open socket");
- //[connection_ errorEncountered:@"Could not open socket."];
+ rv = bind(_socket, &address, sizeof(address));
+ if (rv != 0) {
+ NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
}
- } while (!_socket);
+ } while (rv != 0);
- // Allow old, yet-to-be recycled sockets to be reused.
- int yes = 1;
- setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
- setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
-
- // Schedule the socket on the run loop.
- CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
- CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
- CFRelease(source);
+ // Listen for a connection.
+ rv = listen(_socket, 1);
+ if (rv < 0) {
+ NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
+ close(_socket);
+ _connected = NO;
+ return;
+ }
+ _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
+ dispatch_source_set_event_handler(_readSource, ^{
+ [self acceptConnection];
+ });
+ dispatch_resume(_readSource);
}
+// Closes down the listening socket, the child socket, and the streams.
- (void)disconnectClient {
- if (_socket) {
- CFSocketInvalidate(_socket);
- CFRelease(_socket);
- _socket = NULL;
+ if (_readSource) {
+ dispatch_source_cancel(_readSource);
+ dispatch_release(_readSource);
+ _readSource = NULL;
}
- if (_readStream) {
- CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
- CFReadStreamClose(_readStream);
- CFRelease(_readStream);
- _readStream = NULL;
+ if (_writeSource) {
+ dispatch_source_cancel(_writeSource);
+ dispatch_release(_writeSource);
+ _writeSource = NULL;
}
- if (_writeStream) {
- CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
- CFWriteStreamClose(_writeStream);
- CFRelease(_writeStream);
- _writeStream = NULL;
+ if (_socket) {
+ close(_socket);
+ _socket = -1;
}
- if (_child) {
- close(_child);
- _child = NULL;
- }
+ _listening = NO;
+ [_messageQueue removeAllObjects];
- _connected = NO;
[_delegate messageQueueDidDisconnect:self];
}
-- (void)stopRunLoop {
- _shouldQuit = YES;
- [self disconnectClient];
- CFRunLoopStop([_runLoop getCFRunLoop]);
-}
-
-- (void)enqueueMessage:(NSString*)message {
- [_queue addObject:message];
- [self dequeueAndSend];
-}
-
+// If the write stream is ready and there is data to send, sends the next message.
- (void)dequeueAndSend {
- if (![_queue count])
+ if (![_messageQueue count])
return;
- if (!CFWriteStreamCanAcceptBytes(_writeStream))
- return;
-
- NSString* message = [_queue objectAtIndex:0];
+ NSString* message = [_messageQueue objectAtIndex:0];
[self performSend:message];
- [_queue removeObjectAtIndex:0];
+ [_messageQueue removeObjectAtIndex:0];
}
+// Writes the string into the write stream.
- (void)performSend:(NSString*)message {
// TODO: May need to negotiate with the server as to the string encoding.
const NSStringEncoding kEncoding = NSUTF8StringEncoding;
// method is only ever called in response to a stream ready event.
NSUInteger totalWritten = 0;
while (totalWritten < bufferSize) {
- CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
+ ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
if (bytesWritten < 0) {
- [self reportError:CFWriteStreamCopyError(_writeStream)];
+ NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
break;
}
totalWritten += bytesWritten;
free(buffer);
}
+// Reads bytes out of the read stream. This may be called multiple times if the
+// message cannot be read in one pass.
- (void)readMessageFromStream {
const NSUInteger kBufferSize = 1024;
UInt8 buffer[kBufferSize];
CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
- CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
+ ssize_t bytesRead = read(_socket, buffer, kBufferSize);
+ if (bytesRead == 0) {
+ [self disconnectClient];
+ return;
+ }
const char* charBuffer = (const char*)buffer;
// The read loop works by going through the buffer until all the bytes have
}
}
-- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
- if (socket != _socket) {
- // TODO: error
- return;
- }
-
- _child = child;
-
- // Create the streams on the socket.
- CFStreamCreatePairWithSocket(kCFAllocatorDefault,
- _child, // Socket handle.
- &_readStream, // Read stream in-pointer.
- &_writeStream); // Write stream in-pointer.
-
- // Create struct to register callbacks for the stream.
- CFStreamClientContext context = { 0 };
- context.info = self;
-
- // Set the client of the read stream.
- CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
- kCFStreamEventHasBytesAvailable |
- kCFStreamEventErrorOccurred |
- kCFStreamEventEndEncountered;
- if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
- // Schedule in run loop to do asynchronous communication with the engine.
- CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
- else
- return;
-
- // Open the stream now that it's scheduled on the run loop.
- if (!CFReadStreamOpen(_readStream)) {
- [self reportError:CFReadStreamCopyError(_readStream)];
- return;
- }
-
- // Set the client of the write stream.
- CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
- kCFStreamEventCanAcceptBytes |
- kCFStreamEventErrorOccurred |
- kCFStreamEventEndEncountered;
- if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
- // Schedule it in the run loop to receive error information.
- CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
- else
- return;
+- (void)acceptConnection {
+ _listening = NO;
- // Open the write stream.
- if (!CFWriteStreamOpen(_writeStream)) {
- [self reportError:CFWriteStreamCopyError(_writeStream)];
+ struct sockaddr_in address = {0};
+ socklen_t addressLength = sizeof(address);
+ int connection = accept(_socket, &address, &addressLength);
+ if (connection < 0) {
+ NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
+ [self disconnectClient];
return;
}
- _connected = YES;
- [_delegate messageQueueDidConnect:self];
+ dispatch_source_cancel(_readSource);
+ close(_socket);
- CFSocketInvalidate(_socket);
- CFRelease(_socket);
- _socket = NULL;
-}
+ _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
+ dispatch_source_set_event_handler(_readSource, ^{
+ [self readMessageFromStream];
+ });
-- (void)reportError:(CFErrorRef)error
-{
- [_delegate messageQueue:self error:(NSError*)error];
- CFRelease(error);
-}
+ _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
+ dispatch_source_set_event_handler(_writeSource, ^{
+ [self dequeueAndSend];
+ });
-- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
-{
- assert(stream == _readStream);
- switch (event)
- {
- case kCFStreamEventHasBytesAvailable:
- [self readMessageFromStream];
- break;
+ _socket = connection;
- case kCFStreamEventErrorOccurred:
- [self reportError:CFReadStreamCopyError(stream)];
- [self stopRunLoop];
- break;
-
- case kCFStreamEventEndEncountered:
- [self stopRunLoop];
- break;
-
- default:
- // TODO: error
- break;
- };
-}
+ dispatch_resume(_readSource);
+ dispatch_resume(_writeSource);
-- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
-{
- assert(stream == _writeStream);
- switch (event) {
- case kCFStreamEventCanAcceptBytes:
- [self dequeueAndSend];
- break;
-
- case kCFStreamEventErrorOccurred:
- [self reportError:CFWriteStreamCopyError(stream)];
- [self stopRunLoop];
- break;
-
- case kCFStreamEventEndEncountered:
- [self stopRunLoop];
- break;
-
- default:
- // TODO: error
- break;
- }
+ [_delegate messageQueueDidConnect:self];
}
@end