From ba6c3defa661747388d0b0d0004efc1fe5b1ca93 Mon Sep 17 00:00:00 2001 From: Robert Sesek Date: Mon, 17 Jun 2013 22:29:10 -0400 Subject: [PATCH] Write MessageQueue to replace most NetworkCallbackController. This refactors pieces of NetworkConnection and NetworkCallbackController into a simpler class that merely deals with sequenced strings over a socket. --- MacGDBp.xcodeproj/project.pbxproj | 14 + Source/MessageQueue.h | 101 +++++++ Source/MessageQueue.m | 431 ++++++++++++++++++++++++++++++ 3 files changed, 546 insertions(+) create mode 100644 Source/MessageQueue.h create mode 100644 Source/MessageQueue.m diff --git a/MacGDBp.xcodeproj/project.pbxproj b/MacGDBp.xcodeproj/project.pbxproj index 7640dc9..3c91f76 100644 --- a/MacGDBp.xcodeproj/project.pbxproj +++ b/MacGDBp.xcodeproj/project.pbxproj @@ -39,6 +39,7 @@ 1EC6965712BBC6A700A8D984 /* LICENSE in Resources */ = {isa = PBXBuildFile; fileRef = 1EC6965112BBC6A700A8D984 /* LICENSE */; }; 1EC6965812BBC6A700A8D984 /* modp_b64.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1EC6965212BBC6A700A8D984 /* modp_b64.cc */; }; 1EDA9CF812DD13B300596211 /* BSLineNumberRulerView.mm in Sources */ = {isa = PBXBuildFile; fileRef = 1EDA9CF712DD13B300596211 /* BSLineNumberRulerView.mm */; }; + 1EEBE842176FEA80003622C3 /* MessageQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = 1EEBE841176FEA80003622C3 /* MessageQueue.m */; }; 1EEBFBE50D34C793008F835B /* Debugger.xib in Resources */ = {isa = PBXBuildFile; fileRef = 1EEBFBE30D34C793008F835B /* Debugger.xib */; }; 1EEBFC2B0D358EBD008F835B /* StepIn.png in Resources */ = {isa = PBXBuildFile; fileRef = 1EEBFC2A0D358EBD008F835B /* StepIn.png */; }; 1EEBFC370D358F1B008F835B /* StepOut.png in Resources */ = {isa = PBXBuildFile; fileRef = 1EEBFC360D358F1B008F835B /* StepOut.png */; }; @@ -121,6 +122,8 @@ 1EC6965412BBC6A700A8D984 /* modp_b64_data.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = modp_b64_data.h; sourceTree = ""; }; 1EDA9CF612DD13B300596211 /* BSLineNumberRulerView.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = BSLineNumberRulerView.h; path = Source/BSLineNumberRulerView.h; sourceTree = ""; }; 1EDA9CF712DD13B300596211 /* BSLineNumberRulerView.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; name = BSLineNumberRulerView.mm; path = Source/BSLineNumberRulerView.mm; sourceTree = ""; }; + 1EEBE840176FEA80003622C3 /* MessageQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = MessageQueue.h; path = Source/MessageQueue.h; sourceTree = ""; }; + 1EEBE841176FEA80003622C3 /* MessageQueue.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = MessageQueue.m; path = Source/MessageQueue.m; sourceTree = ""; }; 1EEBFBE40D34C793008F835B /* English */ = {isa = PBXFileReference; lastKnownFileType = file.xib; name = English; path = English.lproj/Debugger.xib; sourceTree = ""; }; 1EEBFC2A0D358EBD008F835B /* StepIn.png */ = {isa = PBXFileReference; lastKnownFileType = image.png; name = StepIn.png; path = Icons/StepIn.png; sourceTree = ""; }; 1EEBFC360D358F1B008F835B /* StepOut.png */ = {isa = PBXFileReference; lastKnownFileType = image.png; name = StepOut.png; path = Icons/StepOut.png; sourceTree = ""; }; @@ -161,6 +164,7 @@ 1E02C3D30C60EC2C006F1752 /* AppDelegate.h */, 1E02C3D40C60EC2C006F1752 /* AppDelegate.m */, 1E9582640E252480001A3D89 /* Preferences */, + 1EEBE83F176FEA5C003622C3 /* Protocol */, 1E1E52C10DF9B1FB00D334F9 /* Connection */, 1EB7BEB90ECF3BC60033283A /* Debugger */, 1E1E52C00DF9B1E700D334F9 /* Breakpoints */, @@ -310,6 +314,15 @@ path = modp_b64; sourceTree = ""; }; + 1EEBE83F176FEA5C003622C3 /* Protocol */ = { + isa = PBXGroup; + children = ( + 1EEBE840176FEA80003622C3 /* MessageQueue.h */, + 1EEBE841176FEA80003622C3 /* MessageQueue.m */, + ); + name = Protocol; + sourceTree = ""; + }; 29B97314FDCFA39411CA2CEA /* MacGDBp */ = { isa = PBXGroup; children = ( @@ -474,6 +487,7 @@ 1E11814A1319805E003BFEF1 /* BSSourceViewTextView.m in Sources */, 1E108E40136CC8B9002E34E0 /* EvalController.m in Sources */, 1E109019136DD92D002E34E0 /* StripLineBreaksValueTransformer.m in Sources */, + 1EEBE842176FEA80003622C3 /* MessageQueue.m in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/Source/MessageQueue.h b/Source/MessageQueue.h new file mode 100644 index 0000000..a7878b0 --- /dev/null +++ b/Source/MessageQueue.h @@ -0,0 +1,101 @@ +/* + * MacGDBp + * Copyright (c) 2013, Blue Static + * + * This program is free software; you can redistribute it and/or modify it under the terms of the GNU + * General Public License as published by the Free Software Foundation; either version 2 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with this program; if not, + * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#import + +@protocol MessageQueueDelegate; + +// MessageQueue operates a listening socket, that is connected to another +// program with which it exchanges UTF8 string messages. A message contains two +// parts, both terminated by '\0'. The first is an ASCII integer number that is +// the length of the second part. The second part is the actual string message. +@interface MessageQueue : NSObject { + @private + // The port number on which to open a listening socket. + NSUInteger _port; + + // The thread and its run loop on which this class primarily operates. + NSThread* _thread; + NSRunLoop* _runLoop; + + // Whether or not the message queue is connected to a client. + BOOL _connected; + + // A queue of messages that are waiting to be sent. + NSMutableArray* _queue; + + // The delegate for this class. + id _delegate; + + // The socket that listens for new incoming connections. + CFSocketRef _socket; + + // The child socket that has been accepted from |_socket|. + CFSocketNativeHandle _child; + + // The read and write streams that are created on the |_child| socket. + CFReadStreamRef _readStream; + CFWriteStreamRef _writeStream; + + // When a message is being read, this temporary buffer is used to build up + // the complete message from successive reads. + NSMutableString* _message; + NSUInteger _totalMessageSize; + NSUInteger _messageSize; +} + +// Creates a new MessasgeQueue that will listen on |port| and report information +// to its |delegate|. +- (id)initWithPort:(NSUInteger)port delegate:(id)delegate; + +// Whether or not the message queue has attached itself to a child. +- (BOOL)isConnected; + +// Opens a socket that will listen for connections. +- (void)connect; + +// Closes either the listening or child socket and completely disconnects. +- (void)disconnect; + +// Enqueues a |message| to be sent to the client. This may be called from any +// thread. +- (void)sendMessage:(NSString*)message; + +@end + +// Delegate //////////////////////////////////////////////////////////////////// + +// The delegate for the message queue. These methods may be called on any thread. +@protocol MessageQueueDelegate +// Callback for any errors that the MessageQueue encounters. +- (void)messageQueueError:(NSError*)error; + +// Called when the listening socket has accepted a child socket. +- (void)clientDidConnect:(MessageQueue*)queue; + +// Called when the child socket has been disconnected. +- (void)clientDidDisconnect:(MessageQueue*)queue; + +// If the write stream is ready, the delegate controls whether or not the next +// pending message should be sent via the result of this method. +- (BOOL)shouldSendMessage; + +// Callback for when a message has been sent. +- (void)didSendMessage:(NSString*)message; + +// Callback with the message content when one has been receieved. +- (void)didReceiveMessage:(NSString*)message; +@end diff --git a/Source/MessageQueue.m b/Source/MessageQueue.m new file mode 100644 index 0000000..7f533b4 --- /dev/null +++ b/Source/MessageQueue.m @@ -0,0 +1,431 @@ +/* + * MacGDBp + * Copyright (c) 2013, Blue Static + * + * This program is free software; you can redistribute it and/or modify it under the terms of the GNU + * General Public License as published by the Free Software Foundation; either version 2 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with this program; if not, + * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + */ + +#import "MessageQueue.h" + +#include +#include + +@interface MessageQueue (Private) +// Thread main function that is started from -connect. +- (void)runMessageQueue; + +// All the following methods must be called from the -runMessageQueue thread. + +// Creates a listening socket and schedules it in the run loop. +- (void)listenForClient; + +// Closes down the listening socket, the child socket, and the streams. +- (void)disconnectClient; + +// This first calls -disconnectClient and then stops the run loop and terminates +// the -runMessageQueue thread. +- (void)stopRunLoop; + +// Adds a |message| to |_queue|. +- (void)enqueueMessage:(NSString*)message; + +// If the write stream is ready and there is data to send, sends the next message. +- (void)dequeueAndSend; + +// Writes the string into the write stream. +- (void)performSend:(NSString*)message; + +// Reads bytes out of the read stream. This may be called multiple times if the +// message cannot be read in one pass. +- (void)readMessageFromStream; + +// Forwarding methods from the CoreFoundation callbacks. +- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child; +- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event; +- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event; +@end + +// CoreFoundation Callbacks //////////////////////////////////////////////////// + +static void MessageQueueSocketAccept(CFSocketRef socket, + CFSocketCallBackType callbackType, + CFDataRef address, + const void* data, + void* self) +{ + CFSocketNativeHandle child = *(CFSocketNativeHandle*)data; + [(MessageQueue*)self listenSocket:socket acceptedSocket:child]; +} + +static void MessageQueueReadEvent(CFReadStreamRef stream, + CFStreamEventType eventType, + void* self) +{ + [(MessageQueue*)self readStream:stream handleEvent:eventType]; +} + +static void MessageQueueWriteEvent(CFWriteStreamRef stream, + CFStreamEventType eventType, + void* self) +{ + [(MessageQueue*)self writeStream:stream handleEvent:eventType]; +} + +//////////////////////////////////////////////////////////////////////////////// + +@implementation MessageQueue + +- (id)initWithPort:(NSUInteger)port delegate:(id)delegate { + if ((self = [super init])) { + _port = port; + _queue = [[NSMutableArray alloc] init]; + _delegate = delegate; + } + return self; +} + +- (void)dealloc { + [_queue release]; + [super dealloc]; +} + +- (BOOL)isConnected { + return _connected; +} + +- (void)connect { + if (_thread) + return; + + [NSThread detachNewThreadSelector:@selector(runMessageQueue) + toTarget:self + withObject:nil]; +} + +- (void)disconnect { + [self performSelector:@selector(stopRunLoop) + onThread:_thread + withObject:nil + waitUntilDone:NO]; +} + +- (void)sendMessage:(NSString*)message { + [self performSelector:@selector(enqueueMessage:) + onThread:_thread + withObject:message + waitUntilDone:NO]; +} + +// Private ///////////////////////////////////////////////////////////////////// + +- (void)runMessageQueue { + @autoreleasepool { + _thread = [NSThread currentThread]; + _runLoop = [NSRunLoop currentRunLoop]; + + _connected = NO; + [self scheduleListenSocket]; + + // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic + // stop routine. + CFRunLoopRun(); + + _thread = nil; + _runLoop = nil; + } +} + +- (void)scheduleListenSocket { + // Create the address structure. + struct sockaddr_in address; + memset(&address, 0, sizeof(address)); + address.sin_len = sizeof(address); + address.sin_family = AF_INET; + address.sin_port = htons(_port); + address.sin_addr.s_addr = htonl(INADDR_ANY); + + // Create the socket signature. + CFSocketSignature signature; + signature.protocolFamily = PF_INET; + signature.socketType = SOCK_STREAM; + signature.protocol = IPPROTO_TCP; + signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)]; + + CFSocketContext context = { 0 }; + context.info = self; + + do { + _socket = + CFSocketCreateWithSocketSignature(kCFAllocatorDefault, + &signature, // Socket signature. + kCFSocketAcceptCallBack, // Callback types. + &MessageQueueSocketAccept, // Callback function. + &context); // Context to pass to callout. + if (!_socket) { + //[connection_ errorEncountered:@"Could not open socket."]; + sleep(1); + } + } while (!_socket); + + // Allow old, yet-to-be recycled sockets to be reused. + int yes = 1; + setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int)); + + // Schedule the socket on the run loop. + CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0); + CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes); + CFRelease(source); +} + +- (void)disconnectClient { + if (_readStream) { + CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); + CFReadStreamClose(_readStream); + CFRelease(_readStream); + _readStream = NULL; + } + + if (_writeStream) { + CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); + CFWriteStreamClose(_writeStream); + CFRelease(_writeStream); + _writeStream = NULL; + } + + if (_child) { + close(_child); + _child = NULL; + } + + _connected = NO; + [_delegate clientDidDisconnect:self]; +} + +- (void)stopRunLoop { + [self disconnectClient]; + CFRunLoopStop([_runLoop getCFRunLoop]); +} + +- (void)enqueueMessage:(NSString*)message { + [_queue addObject:message]; + [self dequeueAndSend]; +} + +- (void)dequeueAndSend { + if (![_queue count]) + return; + + if (![_delegate shouldSendMessage]) + return; + + if (!CFWriteStreamCanAcceptBytes(_writeStream)) + return; + + NSString* message = [_queue objectAtIndex:0]; + [_queue removeObjectAtIndex:0]; + [self performSend:message]; +} + +- (void)performSend:(NSString*)message { + // TODO: May need to negotiate with the server as to the string encoding. + const NSStringEncoding kEncoding = NSUTF8StringEncoding; + // Add space for the NUL byte. + NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1; + + UInt8* buffer = malloc(maxBufferSize); + bzero(buffer, maxBufferSize); + + NSUInteger bufferSize = 0; + if (![message getBytes:buffer + maxLength:maxBufferSize + usedLength:&bufferSize + encoding:kEncoding + options:0 + range:NSMakeRange(0, [message length]) + remainingRange:NULL]) { + free(buffer); + return; + } + + // Include a NUL byte. + ++bufferSize; + + // Write the packet out, and spin in a busy wait loop if the stream is not ready. This + // method is only ever called in response to a stream ready event. + NSUInteger totalWritten = 0; + while (totalWritten < bufferSize) { + CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten); + if (bytesWritten < 0) { + CFErrorRef error = CFWriteStreamCopyError(_writeStream); + //ReportError(error); + break; + } + totalWritten += bytesWritten; + } + + [_delegate didSendMessage:message]; + + free(buffer); +} + +- (void)readMessageFromStream { + const NSUInteger kBufferSize = 1024; + UInt8 buffer[kBufferSize]; + CFIndex bufferOffset = 0; // Starting point in |buffer| to work with. + CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize); + const char* charBuffer = (const char*)buffer; + + // The read loop works by going through the buffer until all the bytes have + // been processed. + while (bufferOffset < bytesRead) { + // Find the NUL separator, or the end of the string. + NSUInteger partLength = 0; + for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ; + + // If there is not a current packet, set some state. + if (!_message) { + // Read the message header: the size. This will be |partLength| bytes. + _totalMessageSize = atoi(charBuffer + bufferOffset); + _messageSize = 0; + _message = [NSMutableString stringWithCapacity:_totalMessageSize]; + bufferOffset += partLength + 1; // Pass over the NUL byte. + continue; // Spin the loop to begin reading actual data. + } + + // Substring the byte stream and append it to the packet string. + CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault, + buffer + bufferOffset, // Byte pointer, offset by start index. + partLength, // Length. + kCFStringEncodingUTF8, + true); + [_message appendString:(NSString*)bufferString]; + CFRelease(bufferString); + + // Advance counters. + _messageSize += partLength; + bufferOffset += partLength + 1; + + // If this read finished the packet, handle it and reset. + if (_messageSize >= _totalMessageSize) { + [_delegate didReceiveMessage:[_message autorelease]]; + _message = nil; + } + } +} + +- (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child { + if (socket != _socket) { + // TODO: error + return; + } + + // Create the streams on the socket. + CFStreamCreatePairWithSocket(kCFAllocatorDefault, + socket, // Socket handle. + &_readStream, // Read stream in-pointer. + &_writeStream); // Write stream in-pointer. + + // Create struct to register callbacks for the stream. + CFStreamClientContext context = { 0 }; + context.info = self; + + // Set the client of the read stream. + CFOptionFlags readFlags = kCFStreamEventOpenCompleted | + kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | + kCFStreamEventEndEncountered; + if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context)) + // Schedule in run loop to do asynchronous communication with the engine. + CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); + else + return; + + // Open the stream now that it's scheduled on the run loop. + if (!CFReadStreamOpen(_readStream)) { + //ReportError(CFReadStreamCopyError(readStream_)); + return; + } + + // Set the client of the write stream. + CFOptionFlags writeFlags = kCFStreamEventOpenCompleted | + kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | + kCFStreamEventEndEncountered; + if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context)) + // Schedule it in the run loop to receive error information. + CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes); + else + return; + + // Open the write stream. + if (!CFWriteStreamOpen(_writeStream)) { +// ReportError(CFWriteStreamCopyError(_writeStream)); + return; + } + + _connected = YES; + [_delegate clientDidConnect:self]; + + CFSocketInvalidate(_socket); + CFRelease(_socket); + _socket = NULL; +} + +- (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event +{ + assert(stream == _readStream); + switch (event) + { + case kCFStreamEventHasBytesAvailable: + [self readMessageFromStream]; + break; + + case kCFStreamEventErrorOccurred: + //ReportError(CFReadStreamCopyError(stream)); + [self stopRunLoop]; + break; + + case kCFStreamEventEndEncountered: + [self stopRunLoop]; + break; + + default: + // TODO: error + break; + }; +} + +- (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event +{ + assert(stream == _writeStream); + switch (event) { + case kCFStreamEventCanAcceptBytes: + [self dequeueAndSend]; + break; + + case kCFStreamEventErrorOccurred: + //ReportError(CFWriteStreamCopyError(stream)); + [self stopRunLoop]; + break; + + case kCFStreamEventEndEncountered: + [self stopRunLoop]; + break; + + default: + // TODO: error + break; + } +} + +@end -- 2.22.5