Invalidate and release the listening socket in -[MessageQueue disconnectClient].
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <netinet/in.h>
20 #include <sys/socket.h>
21
22 @interface MessageQueue (Private)
23 // Thread main function that is started from -connect.
24 - (void)runMessageQueue;
25
26 // All the following methods must be called from the -runMessageQueue thread.
27
28 // Creates a listening socket and schedules it in the run loop.
29 - (void)listenForClient;
30
31 // Closes down the listening socket, the child socket, and the streams.
32 - (void)disconnectClient;
33
34 // This first calls -disconnectClient and then stops the run loop and terminates
35 // the -runMessageQueue thread.
36 - (void)stopRunLoop;
37
38 // Adds a |message| to |_queue|.
39 - (void)enqueueMessage:(NSString*)message;
40
41 // If the write stream is ready and there is data to send, sends the next message.
42 - (void)dequeueAndSend;
43
44 // Writes the string into the write stream.
45 - (void)performSend:(NSString*)message;
46
47 // Reads bytes out of the read stream. This may be called multiple times if the
48 // message cannot be read in one pass.
49 - (void)readMessageFromStream;
50
51 // Converts a CFErrorRef to an NSError and passes it to the delegate.
52 - (void)reportError:(CFErrorRef)error;
53
54 // Forwarding methods from the CoreFoundation callbacks.
55 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
56 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
57 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
58 @end
59
60 // CoreFoundation Callbacks ////////////////////////////////////////////////////
61
62 static void MessageQueueSocketAccept(CFSocketRef socket,
63 CFSocketCallBackType callbackType,
64 CFDataRef address,
65 const void* data,
66 void* self)
67 {
68 CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
69 [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
70 }
71
72 static void MessageQueueReadEvent(CFReadStreamRef stream,
73 CFStreamEventType eventType,
74 void* self)
75 {
76 [(MessageQueue*)self readStream:stream handleEvent:eventType];
77 }
78
79 static void MessageQueueWriteEvent(CFWriteStreamRef stream,
80 CFStreamEventType eventType,
81 void* self)
82 {
83 [(MessageQueue*)self writeStream:stream handleEvent:eventType];
84 }
85
86 ////////////////////////////////////////////////////////////////////////////////
87
88 @implementation MessageQueue
89
90 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
91 if ((self = [super init])) {
92 _port = port;
93 _queue = [[NSMutableArray alloc] init];
94 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
95 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
96 protocol:@protocol(MessageQueueDelegate)
97 thread:[NSThread currentThread]];
98 }
99 return self;
100 }
101
102 - (void)dealloc {
103 [_queue release];
104 [_delegate release];
105 [super dealloc];
106 }
107
108 - (BOOL)isConnected {
109 return _connected;
110 }
111
112 - (void)connect {
113 if (_thread)
114 return;
115
116 [NSThread detachNewThreadSelector:@selector(runMessageQueue)
117 toTarget:self
118 withObject:nil];
119 }
120
121 - (void)disconnect {
122 [self performSelector:@selector(stopRunLoop)
123 onThread:_thread
124 withObject:nil
125 waitUntilDone:NO];
126 }
127
128 - (void)sendMessage:(NSString*)message {
129 [self performSelector:@selector(enqueueMessage:)
130 onThread:_thread
131 withObject:message
132 waitUntilDone:NO];
133 }
134
135 // Private /////////////////////////////////////////////////////////////////////
136
137 - (void)runMessageQueue {
138 @autoreleasepool {
139 _thread = [NSThread currentThread];
140 _runLoop = [NSRunLoop currentRunLoop];
141
142 _connected = NO;
143 _shouldQuit = NO;
144 [self scheduleListenSocket];
145
146 // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
147 // stop routine.
148 CFRunLoopRun();
149
150 _thread = nil;
151 _runLoop = nil;
152 }
153 }
154
155 - (void)scheduleListenSocket {
156 // Create the address structure.
157 struct sockaddr_in address;
158 memset(&address, 0, sizeof(address));
159 address.sin_len = sizeof(address);
160 address.sin_family = AF_INET;
161 address.sin_port = htons(_port);
162 address.sin_addr.s_addr = htonl(INADDR_ANY);
163
164 // Create the socket signature.
165 CFSocketSignature signature;
166 signature.protocolFamily = PF_INET;
167 signature.socketType = SOCK_STREAM;
168 signature.protocol = IPPROTO_TCP;
169 signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
170
171 CFSocketContext context = { 0 };
172 context.info = self;
173
174 do {
175 _socket =
176 CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
177 &signature, // Socket signature.
178 kCFSocketAcceptCallBack, // Callback types.
179 &MessageQueueSocketAccept, // Callback function.
180 &context); // Context to pass to callout.
181 if (!_socket) {
182 // Pump the run loop while waiting for the socket to be reusued. If told
183 // to quit while waiting, then break out of the loop.
184 if (CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1, FALSE) && _shouldQuit)
185 return;
186 NSLog(@"Could not open socket");
187 //[connection_ errorEncountered:@"Could not open socket."];
188 }
189 } while (!_socket);
190
191 // Allow old, yet-to-be recycled sockets to be reused.
192 int yes = 1;
193 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
194 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
195
196 // Schedule the socket on the run loop.
197 CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
198 CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
199 CFRelease(source);
200 }
201
202 - (void)disconnectClient {
203 if (_socket) {
204 CFSocketInvalidate(_socket);
205 CFRelease(_socket);
206 _socket = NULL;
207 }
208
209 if (_readStream) {
210 CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
211 CFReadStreamClose(_readStream);
212 CFRelease(_readStream);
213 _readStream = NULL;
214 }
215
216 if (_writeStream) {
217 CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
218 CFWriteStreamClose(_writeStream);
219 CFRelease(_writeStream);
220 _writeStream = NULL;
221 }
222
223 if (_child) {
224 close(_child);
225 _child = NULL;
226 }
227
228 _connected = NO;
229 [_delegate messageQueueDidDisconnect:self];
230 }
231
232 - (void)stopRunLoop {
233 _shouldQuit = YES;
234 [self disconnectClient];
235 CFRunLoopStop([_runLoop getCFRunLoop]);
236 }
237
238 - (void)enqueueMessage:(NSString*)message {
239 [_queue addObject:message];
240 [self dequeueAndSend];
241 }
242
243 - (void)dequeueAndSend {
244 if (![_queue count])
245 return;
246
247 if (!CFWriteStreamCanAcceptBytes(_writeStream))
248 return;
249
250 NSString* message = [_queue objectAtIndex:0];
251 [self performSend:message];
252 [_queue removeObjectAtIndex:0];
253 }
254
255 - (void)performSend:(NSString*)message {
256 // TODO: May need to negotiate with the server as to the string encoding.
257 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
258 // Add space for the NUL byte.
259 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
260
261 UInt8* buffer = malloc(maxBufferSize);
262 bzero(buffer, maxBufferSize);
263
264 NSUInteger bufferSize = 0;
265 if (![message getBytes:buffer
266 maxLength:maxBufferSize
267 usedLength:&bufferSize
268 encoding:kEncoding
269 options:0
270 range:NSMakeRange(0, [message length])
271 remainingRange:NULL]) {
272 free(buffer);
273 return;
274 }
275
276 // Include a NUL byte.
277 ++bufferSize;
278
279 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
280 // method is only ever called in response to a stream ready event.
281 NSUInteger totalWritten = 0;
282 while (totalWritten < bufferSize) {
283 CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
284 if (bytesWritten < 0) {
285 [self reportError:CFWriteStreamCopyError(_writeStream)];
286 break;
287 }
288 totalWritten += bytesWritten;
289 }
290
291 [_delegate messageQueue:self didSendMessage:message];
292
293 free(buffer);
294 }
295
296 - (void)readMessageFromStream {
297 const NSUInteger kBufferSize = 1024;
298 UInt8 buffer[kBufferSize];
299 CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
300 CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
301 const char* charBuffer = (const char*)buffer;
302
303 // The read loop works by going through the buffer until all the bytes have
304 // been processed.
305 while (bufferOffset < bytesRead) {
306 // Find the NUL separator, or the end of the string.
307 NSUInteger partLength = 0;
308 for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
309
310 // If there is not a current packet, set some state.
311 if (!_message) {
312 // Read the message header: the size. This will be |partLength| bytes.
313 _totalMessageSize = atoi(charBuffer + bufferOffset);
314 _messageSize = 0;
315 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
316 bufferOffset += partLength + 1; // Pass over the NUL byte.
317 continue; // Spin the loop to begin reading actual data.
318 }
319
320 // Substring the byte stream and append it to the packet string.
321 CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault,
322 buffer + bufferOffset, // Byte pointer, offset by start index.
323 partLength, // Length.
324 kCFStringEncodingUTF8,
325 true);
326 [_message appendString:(NSString*)bufferString];
327 CFRelease(bufferString);
328
329 // Advance counters.
330 _messageSize += partLength;
331 bufferOffset += partLength + 1;
332
333 // If this read finished the packet, handle it and reset.
334 if (_messageSize >= _totalMessageSize) {
335 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
336 _message = nil;
337
338 // Process any outgoing messages.
339 [self dequeueAndSend];
340 }
341 }
342 }
343
344 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
345 if (socket != _socket) {
346 // TODO: error
347 return;
348 }
349
350 _child = child;
351
352 // Create the streams on the socket.
353 CFStreamCreatePairWithSocket(kCFAllocatorDefault,
354 _child, // Socket handle.
355 &_readStream, // Read stream in-pointer.
356 &_writeStream); // Write stream in-pointer.
357
358 // Create struct to register callbacks for the stream.
359 CFStreamClientContext context = { 0 };
360 context.info = self;
361
362 // Set the client of the read stream.
363 CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
364 kCFStreamEventHasBytesAvailable |
365 kCFStreamEventErrorOccurred |
366 kCFStreamEventEndEncountered;
367 if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
368 // Schedule in run loop to do asynchronous communication with the engine.
369 CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
370 else
371 return;
372
373 // Open the stream now that it's scheduled on the run loop.
374 if (!CFReadStreamOpen(_readStream)) {
375 [self reportError:CFReadStreamCopyError(_readStream)];
376 return;
377 }
378
379 // Set the client of the write stream.
380 CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
381 kCFStreamEventCanAcceptBytes |
382 kCFStreamEventErrorOccurred |
383 kCFStreamEventEndEncountered;
384 if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
385 // Schedule it in the run loop to receive error information.
386 CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
387 else
388 return;
389
390 // Open the write stream.
391 if (!CFWriteStreamOpen(_writeStream)) {
392 [self reportError:CFWriteStreamCopyError(_writeStream)];
393 return;
394 }
395
396 _connected = YES;
397 [_delegate messageQueueDidConnect:self];
398
399 CFSocketInvalidate(_socket);
400 CFRelease(_socket);
401 _socket = NULL;
402 }
403
404 - (void)reportError:(CFErrorRef)error
405 {
406 [_delegate messageQueue:self error:(NSError*)error];
407 CFRelease(error);
408 }
409
410 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
411 {
412 assert(stream == _readStream);
413 switch (event)
414 {
415 case kCFStreamEventHasBytesAvailable:
416 [self readMessageFromStream];
417 break;
418
419 case kCFStreamEventErrorOccurred:
420 [self reportError:CFReadStreamCopyError(stream)];
421 [self stopRunLoop];
422 break;
423
424 case kCFStreamEventEndEncountered:
425 [self stopRunLoop];
426 break;
427
428 default:
429 // TODO: error
430 break;
431 };
432 }
433
434 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
435 {
436 assert(stream == _writeStream);
437 switch (event) {
438 case kCFStreamEventCanAcceptBytes:
439 [self dequeueAndSend];
440 break;
441
442 case kCFStreamEventErrorOccurred:
443 [self reportError:CFWriteStreamCopyError(stream)];
444 [self stopRunLoop];
445 break;
446
447 case kCFStreamEventEndEncountered:
448 [self stopRunLoop];
449 break;
450
451 default:
452 // TODO: error
453 break;
454 }
455 }
456
457 @end