Write BSProtocolThreadInvoker to replace ThreadSafeDelegate.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <netinet/in.h>
20 #include <sys/socket.h>
21
22 @interface MessageQueue (Private)
23 // Thread main function that is started from -connect.
24 - (void)runMessageQueue;
25
26 // All the following methods must be called from the -runMessageQueue thread.
27
28 // Creates a listening socket and schedules it in the run loop.
29 - (void)listenForClient;
30
31 // Closes down the listening socket, the child socket, and the streams.
32 - (void)disconnectClient;
33
34 // This first calls -disconnectClient and then stops the run loop and terminates
35 // the -runMessageQueue thread.
36 - (void)stopRunLoop;
37
38 // Adds a |message| to |_queue|.
39 - (void)enqueueMessage:(NSString*)message;
40
41 // If the write stream is ready and there is data to send, sends the next message.
42 - (void)dequeueAndSend;
43
44 // Writes the string into the write stream.
45 - (void)performSend:(NSString*)message;
46
47 // Reads bytes out of the read stream. This may be called multiple times if the
48 // message cannot be read in one pass.
49 - (void)readMessageFromStream;
50
51 // Forwarding methods from the CoreFoundation callbacks.
52 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
53 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
54 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
55 @end
56
57 // CoreFoundation Callbacks ////////////////////////////////////////////////////
58
59 static void MessageQueueSocketAccept(CFSocketRef socket,
60 CFSocketCallBackType callbackType,
61 CFDataRef address,
62 const void* data,
63 void* self)
64 {
65 CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
66 [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
67 }
68
69 static void MessageQueueReadEvent(CFReadStreamRef stream,
70 CFStreamEventType eventType,
71 void* self)
72 {
73 [(MessageQueue*)self readStream:stream handleEvent:eventType];
74 }
75
76 static void MessageQueueWriteEvent(CFWriteStreamRef stream,
77 CFStreamEventType eventType,
78 void* self)
79 {
80 [(MessageQueue*)self writeStream:stream handleEvent:eventType];
81 }
82
83 ////////////////////////////////////////////////////////////////////////////////
84
85 @implementation MessageQueue
86
87 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
88 if ((self = [super init])) {
89 _port = port;
90 _queue = [[NSMutableArray alloc] init];
91 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
92 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
93 protocol:@protocol(MessageQueueDelegate)
94 thread:[NSThread currentThread]];
95 }
96 return self;
97 }
98
99 - (void)dealloc {
100 [_queue release];
101 [_delegate release];
102 [super dealloc];
103 }
104
105 - (BOOL)isConnected {
106 return _connected;
107 }
108
109 - (void)connect {
110 if (_thread)
111 return;
112
113 [NSThread detachNewThreadSelector:@selector(runMessageQueue)
114 toTarget:self
115 withObject:nil];
116 }
117
118 - (void)disconnect {
119 [self performSelector:@selector(stopRunLoop)
120 onThread:_thread
121 withObject:nil
122 waitUntilDone:NO];
123 }
124
125 - (void)sendMessage:(NSString*)message {
126 [self performSelector:@selector(enqueueMessage:)
127 onThread:_thread
128 withObject:message
129 waitUntilDone:NO];
130 }
131
132 // Private /////////////////////////////////////////////////////////////////////
133
134 - (void)runMessageQueue {
135 @autoreleasepool {
136 _thread = [NSThread currentThread];
137 _runLoop = [NSRunLoop currentRunLoop];
138
139 _connected = NO;
140 [self scheduleListenSocket];
141
142 // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
143 // stop routine.
144 CFRunLoopRun();
145
146 _thread = nil;
147 _runLoop = nil;
148 }
149 }
150
151 - (void)scheduleListenSocket {
152 // Create the address structure.
153 struct sockaddr_in address;
154 memset(&address, 0, sizeof(address));
155 address.sin_len = sizeof(address);
156 address.sin_family = AF_INET;
157 address.sin_port = htons(_port);
158 address.sin_addr.s_addr = htonl(INADDR_ANY);
159
160 // Create the socket signature.
161 CFSocketSignature signature;
162 signature.protocolFamily = PF_INET;
163 signature.socketType = SOCK_STREAM;
164 signature.protocol = IPPROTO_TCP;
165 signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
166
167 CFSocketContext context = { 0 };
168 context.info = self;
169
170 do {
171 _socket =
172 CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
173 &signature, // Socket signature.
174 kCFSocketAcceptCallBack, // Callback types.
175 &MessageQueueSocketAccept, // Callback function.
176 &context); // Context to pass to callout.
177 if (!_socket) {
178 //[connection_ errorEncountered:@"Could not open socket."];
179 sleep(1);
180 }
181 } while (!_socket);
182
183 // Allow old, yet-to-be recycled sockets to be reused.
184 int yes = 1;
185 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
186 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
187
188 // Schedule the socket on the run loop.
189 CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
190 CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
191 CFRelease(source);
192 }
193
194 - (void)disconnectClient {
195 if (_readStream) {
196 CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
197 CFReadStreamClose(_readStream);
198 CFRelease(_readStream);
199 _readStream = NULL;
200 }
201
202 if (_writeStream) {
203 CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
204 CFWriteStreamClose(_writeStream);
205 CFRelease(_writeStream);
206 _writeStream = NULL;
207 }
208
209 if (_child) {
210 close(_child);
211 _child = NULL;
212 }
213
214 _connected = NO;
215 [_delegate messageQueueDidDisconnect:self];
216 }
217
218 - (void)stopRunLoop {
219 [self disconnectClient];
220 CFRunLoopStop([_runLoop getCFRunLoop]);
221 }
222
223 - (void)enqueueMessage:(NSString*)message {
224 [_queue addObject:message];
225 [self dequeueAndSend];
226 }
227
228 - (void)dequeueAndSend {
229 if (![_queue count])
230 return;
231
232 if (!CFWriteStreamCanAcceptBytes(_writeStream))
233 return;
234
235 NSString* message = [_queue objectAtIndex:0];
236 [self performSend:message];
237 [_queue removeObjectAtIndex:0];
238 }
239
240 - (void)performSend:(NSString*)message {
241 // TODO: May need to negotiate with the server as to the string encoding.
242 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
243 // Add space for the NUL byte.
244 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
245
246 UInt8* buffer = malloc(maxBufferSize);
247 bzero(buffer, maxBufferSize);
248
249 NSUInteger bufferSize = 0;
250 if (![message getBytes:buffer
251 maxLength:maxBufferSize
252 usedLength:&bufferSize
253 encoding:kEncoding
254 options:0
255 range:NSMakeRange(0, [message length])
256 remainingRange:NULL]) {
257 free(buffer);
258 return;
259 }
260
261 // Include a NUL byte.
262 ++bufferSize;
263
264 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
265 // method is only ever called in response to a stream ready event.
266 NSUInteger totalWritten = 0;
267 while (totalWritten < bufferSize) {
268 CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
269 if (bytesWritten < 0) {
270 CFErrorRef error = CFWriteStreamCopyError(_writeStream);
271 //ReportError(error);
272 break;
273 }
274 totalWritten += bytesWritten;
275 }
276
277 [_delegate messageQueue:self didSendMessage:message];
278
279 free(buffer);
280 }
281
282 - (void)readMessageFromStream {
283 const NSUInteger kBufferSize = 1024;
284 UInt8 buffer[kBufferSize];
285 CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
286 CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
287 const char* charBuffer = (const char*)buffer;
288
289 // The read loop works by going through the buffer until all the bytes have
290 // been processed.
291 while (bufferOffset < bytesRead) {
292 // Find the NUL separator, or the end of the string.
293 NSUInteger partLength = 0;
294 for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
295
296 // If there is not a current packet, set some state.
297 if (!_message) {
298 // Read the message header: the size. This will be |partLength| bytes.
299 _totalMessageSize = atoi(charBuffer + bufferOffset);
300 _messageSize = 0;
301 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
302 bufferOffset += partLength + 1; // Pass over the NUL byte.
303 continue; // Spin the loop to begin reading actual data.
304 }
305
306 // Substring the byte stream and append it to the packet string.
307 CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault,
308 buffer + bufferOffset, // Byte pointer, offset by start index.
309 partLength, // Length.
310 kCFStringEncodingUTF8,
311 true);
312 [_message appendString:(NSString*)bufferString];
313 CFRelease(bufferString);
314
315 // Advance counters.
316 _messageSize += partLength;
317 bufferOffset += partLength + 1;
318
319 // If this read finished the packet, handle it and reset.
320 if (_messageSize >= _totalMessageSize) {
321 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
322 _message = nil;
323
324 // Process any outgoing messages.
325 [self dequeueAndSend];
326 }
327 }
328 }
329
330 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
331 if (socket != _socket) {
332 // TODO: error
333 return;
334 }
335
336 _child = child;
337
338 // Create the streams on the socket.
339 CFStreamCreatePairWithSocket(kCFAllocatorDefault,
340 _child, // Socket handle.
341 &_readStream, // Read stream in-pointer.
342 &_writeStream); // Write stream in-pointer.
343
344 // Create struct to register callbacks for the stream.
345 CFStreamClientContext context = { 0 };
346 context.info = self;
347
348 // Set the client of the read stream.
349 CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
350 kCFStreamEventHasBytesAvailable |
351 kCFStreamEventErrorOccurred |
352 kCFStreamEventEndEncountered;
353 if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
354 // Schedule in run loop to do asynchronous communication with the engine.
355 CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
356 else
357 return;
358
359 // Open the stream now that it's scheduled on the run loop.
360 if (!CFReadStreamOpen(_readStream)) {
361 //ReportError(CFReadStreamCopyError(readStream_));
362 return;
363 }
364
365 // Set the client of the write stream.
366 CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
367 kCFStreamEventCanAcceptBytes |
368 kCFStreamEventErrorOccurred |
369 kCFStreamEventEndEncountered;
370 if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
371 // Schedule it in the run loop to receive error information.
372 CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
373 else
374 return;
375
376 // Open the write stream.
377 if (!CFWriteStreamOpen(_writeStream)) {
378 // ReportError(CFWriteStreamCopyError(_writeStream));
379 return;
380 }
381
382 _connected = YES;
383 [_delegate messageQueueDidConnect:self];
384
385 CFSocketInvalidate(_socket);
386 CFRelease(_socket);
387 _socket = NULL;
388 }
389
390 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
391 {
392 assert(stream == _readStream);
393 switch (event)
394 {
395 case kCFStreamEventHasBytesAvailable:
396 [self readMessageFromStream];
397 break;
398
399 case kCFStreamEventErrorOccurred:
400 //ReportError(CFReadStreamCopyError(stream));
401 [self stopRunLoop];
402 break;
403
404 case kCFStreamEventEndEncountered:
405 [self stopRunLoop];
406 break;
407
408 default:
409 // TODO: error
410 break;
411 };
412 }
413
414 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
415 {
416 assert(stream == _writeStream);
417 switch (event) {
418 case kCFStreamEventCanAcceptBytes:
419 [self dequeueAndSend];
420 break;
421
422 case kCFStreamEventErrorOccurred:
423 //ReportError(CFWriteStreamCopyError(stream));
424 [self stopRunLoop];
425 break;
426
427 case kCFStreamEventEndEncountered:
428 [self stopRunLoop];
429 break;
430
431 default:
432 // TODO: error
433 break;
434 }
435 }
436
437 @end