Convert MessageQueue to use a ThreadSafeDelegate.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <netinet/in.h>
20 #include <sys/socket.h>
21
22 @interface MessageQueue (Private)
23 // Thread main function that is started from -connect.
24 - (void)runMessageQueue;
25
26 // All the following methods must be called from the -runMessageQueue thread.
27
28 // Creates a listening socket and schedules it in the run loop.
29 - (void)listenForClient;
30
31 // Closes down the listening socket, the child socket, and the streams.
32 - (void)disconnectClient;
33
34 // This first calls -disconnectClient and then stops the run loop and terminates
35 // the -runMessageQueue thread.
36 - (void)stopRunLoop;
37
38 // Adds a |message| to |_queue|.
39 - (void)enqueueMessage:(NSString*)message;
40
41 // If the write stream is ready and there is data to send, sends the next message.
42 - (void)dequeueAndSend;
43
44 // Writes the string into the write stream.
45 - (void)performSend:(NSString*)message;
46
47 // Reads bytes out of the read stream. This may be called multiple times if the
48 // message cannot be read in one pass.
49 - (void)readMessageFromStream;
50
51 // Forwarding methods from the CoreFoundation callbacks.
52 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
53 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
54 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
55 @end
56
57 // CoreFoundation Callbacks ////////////////////////////////////////////////////
58
59 static void MessageQueueSocketAccept(CFSocketRef socket,
60 CFSocketCallBackType callbackType,
61 CFDataRef address,
62 const void* data,
63 void* self)
64 {
65 CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
66 [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
67 }
68
69 static void MessageQueueReadEvent(CFReadStreamRef stream,
70 CFStreamEventType eventType,
71 void* self)
72 {
73 [(MessageQueue*)self readStream:stream handleEvent:eventType];
74 }
75
76 static void MessageQueueWriteEvent(CFWriteStreamRef stream,
77 CFStreamEventType eventType,
78 void* self)
79 {
80 [(MessageQueue*)self writeStream:stream handleEvent:eventType];
81 }
82
83 ////////////////////////////////////////////////////////////////////////////////
84
85 @implementation MessageQueue
86
87 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
88 if ((self = [super init])) {
89 _port = port;
90 _queue = [[NSMutableArray alloc] init];
91 _delegate = (ThreadSafeDeleage<MessageQueueDelegate>*)
92 [[ThreadSafeDeleage alloc] initWithObject:delegate
93 protocol:@protocol(MessageQueueDelegate)
94 thread:[NSThread currentThread]
95 modes:@[ NSDefaultRunLoopMode ]];
96 }
97 return self;
98 }
99
100 - (void)dealloc {
101 [_queue release];
102 [_delegate release];
103 [super dealloc];
104 }
105
106 - (BOOL)isConnected {
107 return _connected;
108 }
109
110 - (void)connect {
111 if (_thread)
112 return;
113
114 [NSThread detachNewThreadSelector:@selector(runMessageQueue)
115 toTarget:self
116 withObject:nil];
117 }
118
119 - (void)disconnect {
120 [self performSelector:@selector(stopRunLoop)
121 onThread:_thread
122 withObject:nil
123 waitUntilDone:NO];
124 }
125
126 - (void)sendMessage:(NSString*)message {
127 [self performSelector:@selector(enqueueMessage:)
128 onThread:_thread
129 withObject:message
130 waitUntilDone:NO];
131 }
132
133 // Private /////////////////////////////////////////////////////////////////////
134
135 - (void)runMessageQueue {
136 @autoreleasepool {
137 _thread = [NSThread currentThread];
138 _runLoop = [NSRunLoop currentRunLoop];
139
140 _connected = NO;
141 [self scheduleListenSocket];
142
143 // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
144 // stop routine.
145 CFRunLoopRun();
146
147 _thread = nil;
148 _runLoop = nil;
149 }
150 }
151
152 - (void)scheduleListenSocket {
153 // Create the address structure.
154 struct sockaddr_in address;
155 memset(&address, 0, sizeof(address));
156 address.sin_len = sizeof(address);
157 address.sin_family = AF_INET;
158 address.sin_port = htons(_port);
159 address.sin_addr.s_addr = htonl(INADDR_ANY);
160
161 // Create the socket signature.
162 CFSocketSignature signature;
163 signature.protocolFamily = PF_INET;
164 signature.socketType = SOCK_STREAM;
165 signature.protocol = IPPROTO_TCP;
166 signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
167
168 CFSocketContext context = { 0 };
169 context.info = self;
170
171 do {
172 _socket =
173 CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
174 &signature, // Socket signature.
175 kCFSocketAcceptCallBack, // Callback types.
176 &MessageQueueSocketAccept, // Callback function.
177 &context); // Context to pass to callout.
178 if (!_socket) {
179 //[connection_ errorEncountered:@"Could not open socket."];
180 sleep(1);
181 }
182 } while (!_socket);
183
184 // Allow old, yet-to-be recycled sockets to be reused.
185 int yes = 1;
186 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
187 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
188
189 // Schedule the socket on the run loop.
190 CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
191 CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
192 CFRelease(source);
193 }
194
195 - (void)disconnectClient {
196 if (_readStream) {
197 CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
198 CFReadStreamClose(_readStream);
199 CFRelease(_readStream);
200 _readStream = NULL;
201 }
202
203 if (_writeStream) {
204 CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
205 CFWriteStreamClose(_writeStream);
206 CFRelease(_writeStream);
207 _writeStream = NULL;
208 }
209
210 if (_child) {
211 close(_child);
212 _child = NULL;
213 }
214
215 _connected = NO;
216 [_delegate clientDidDisconnect:self];
217 }
218
219 - (void)stopRunLoop {
220 [self disconnectClient];
221 CFRunLoopStop([_runLoop getCFRunLoop]);
222 }
223
224 - (void)enqueueMessage:(NSString*)message {
225 [_queue addObject:message];
226 [self dequeueAndSend];
227 }
228
229 - (void)dequeueAndSend {
230 if (![_queue count])
231 return;
232
233 if (![(id<MessageQueueDelegate>)_delegate.object shouldSendMessage])
234 return;
235
236 if (!CFWriteStreamCanAcceptBytes(_writeStream))
237 return;
238
239 NSString* message = [_queue objectAtIndex:0];
240 [self performSend:message];
241 [_queue removeObjectAtIndex:0];
242 }
243
244 - (void)performSend:(NSString*)message {
245 // TODO: May need to negotiate with the server as to the string encoding.
246 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
247 // Add space for the NUL byte.
248 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
249
250 UInt8* buffer = malloc(maxBufferSize);
251 bzero(buffer, maxBufferSize);
252
253 NSUInteger bufferSize = 0;
254 if (![message getBytes:buffer
255 maxLength:maxBufferSize
256 usedLength:&bufferSize
257 encoding:kEncoding
258 options:0
259 range:NSMakeRange(0, [message length])
260 remainingRange:NULL]) {
261 free(buffer);
262 return;
263 }
264
265 // Include a NUL byte.
266 ++bufferSize;
267
268 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
269 // method is only ever called in response to a stream ready event.
270 NSUInteger totalWritten = 0;
271 while (totalWritten < bufferSize) {
272 CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
273 if (bytesWritten < 0) {
274 CFErrorRef error = CFWriteStreamCopyError(_writeStream);
275 //ReportError(error);
276 break;
277 }
278 totalWritten += bytesWritten;
279 }
280
281 [_delegate didSendMessage:message];
282
283 free(buffer);
284 }
285
286 - (void)readMessageFromStream {
287 const NSUInteger kBufferSize = 1024;
288 UInt8 buffer[kBufferSize];
289 CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
290 CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
291 const char* charBuffer = (const char*)buffer;
292
293 // The read loop works by going through the buffer until all the bytes have
294 // been processed.
295 while (bufferOffset < bytesRead) {
296 // Find the NUL separator, or the end of the string.
297 NSUInteger partLength = 0;
298 for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
299
300 // If there is not a current packet, set some state.
301 if (!_message) {
302 // Read the message header: the size. This will be |partLength| bytes.
303 _totalMessageSize = atoi(charBuffer + bufferOffset);
304 _messageSize = 0;
305 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
306 bufferOffset += partLength + 1; // Pass over the NUL byte.
307 continue; // Spin the loop to begin reading actual data.
308 }
309
310 // Substring the byte stream and append it to the packet string.
311 CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault,
312 buffer + bufferOffset, // Byte pointer, offset by start index.
313 partLength, // Length.
314 kCFStringEncodingUTF8,
315 true);
316 [_message appendString:(NSString*)bufferString];
317 CFRelease(bufferString);
318
319 // Advance counters.
320 _messageSize += partLength;
321 bufferOffset += partLength + 1;
322
323 // If this read finished the packet, handle it and reset.
324 if (_messageSize >= _totalMessageSize) {
325 [_delegate didReceiveMessage:[_message autorelease]];
326 _message = nil;
327
328 // Process any outgoing messages.
329 [self dequeueAndSend];
330 }
331 }
332 }
333
334 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
335 if (socket != _socket) {
336 // TODO: error
337 return;
338 }
339
340 _child = child;
341
342 // Create the streams on the socket.
343 CFStreamCreatePairWithSocket(kCFAllocatorDefault,
344 _child, // Socket handle.
345 &_readStream, // Read stream in-pointer.
346 &_writeStream); // Write stream in-pointer.
347
348 // Create struct to register callbacks for the stream.
349 CFStreamClientContext context = { 0 };
350 context.info = self;
351
352 // Set the client of the read stream.
353 CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
354 kCFStreamEventHasBytesAvailable |
355 kCFStreamEventErrorOccurred |
356 kCFStreamEventEndEncountered;
357 if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
358 // Schedule in run loop to do asynchronous communication with the engine.
359 CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
360 else
361 return;
362
363 // Open the stream now that it's scheduled on the run loop.
364 if (!CFReadStreamOpen(_readStream)) {
365 //ReportError(CFReadStreamCopyError(readStream_));
366 return;
367 }
368
369 // Set the client of the write stream.
370 CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
371 kCFStreamEventCanAcceptBytes |
372 kCFStreamEventErrorOccurred |
373 kCFStreamEventEndEncountered;
374 if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
375 // Schedule it in the run loop to receive error information.
376 CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
377 else
378 return;
379
380 // Open the write stream.
381 if (!CFWriteStreamOpen(_writeStream)) {
382 // ReportError(CFWriteStreamCopyError(_writeStream));
383 return;
384 }
385
386 _connected = YES;
387 [_delegate clientDidConnect:self];
388
389 CFSocketInvalidate(_socket);
390 CFRelease(_socket);
391 _socket = NULL;
392 }
393
394 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
395 {
396 assert(stream == _readStream);
397 switch (event)
398 {
399 case kCFStreamEventHasBytesAvailable:
400 [self readMessageFromStream];
401 break;
402
403 case kCFStreamEventErrorOccurred:
404 //ReportError(CFReadStreamCopyError(stream));
405 [self stopRunLoop];
406 break;
407
408 case kCFStreamEventEndEncountered:
409 [self stopRunLoop];
410 break;
411
412 default:
413 // TODO: error
414 break;
415 };
416 }
417
418 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
419 {
420 assert(stream == _writeStream);
421 switch (event) {
422 case kCFStreamEventCanAcceptBytes:
423 [self dequeueAndSend];
424 break;
425
426 case kCFStreamEventErrorOccurred:
427 //ReportError(CFWriteStreamCopyError(stream));
428 [self stopRunLoop];
429 break;
430
431 case kCFStreamEventEndEncountered:
432 [self stopRunLoop];
433 break;
434
435 default:
436 // TODO: error
437 break;
438 }
439 }
440
441 @end