Fix a silly memory bug where MessageQueue._message was autoreleased.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <netinet/in.h>
20 #include <sys/socket.h>
21
22 @interface MessageQueue (Private)
23 // Thread main function that is started from -connect.
24 - (void)runMessageQueue;
25
26 // All the following methods must be called from the -runMessageQueue thread.
27
28 // Creates a listening socket and schedules it in the run loop.
29 - (void)listenForClient;
30
31 // Closes down the listening socket, the child socket, and the streams.
32 - (void)disconnectClient;
33
34 // This first calls -disconnectClient and then stops the run loop and terminates
35 // the -runMessageQueue thread.
36 - (void)stopRunLoop;
37
38 // Adds a |message| to |_queue|.
39 - (void)enqueueMessage:(NSString*)message;
40
41 // If the write stream is ready and there is data to send, sends the next message.
42 - (void)dequeueAndSend;
43
44 // Writes the string into the write stream.
45 - (void)performSend:(NSString*)message;
46
47 // Reads bytes out of the read stream. This may be called multiple times if the
48 // message cannot be read in one pass.
49 - (void)readMessageFromStream;
50
51 // Forwarding methods from the CoreFoundation callbacks.
52 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child;
53 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event;
54 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event;
55 @end
56
57 // CoreFoundation Callbacks ////////////////////////////////////////////////////
58
59 static void MessageQueueSocketAccept(CFSocketRef socket,
60 CFSocketCallBackType callbackType,
61 CFDataRef address,
62 const void* data,
63 void* self)
64 {
65 CFSocketNativeHandle child = *(CFSocketNativeHandle*)data;
66 [(MessageQueue*)self listenSocket:socket acceptedSocket:child];
67 }
68
69 static void MessageQueueReadEvent(CFReadStreamRef stream,
70 CFStreamEventType eventType,
71 void* self)
72 {
73 [(MessageQueue*)self readStream:stream handleEvent:eventType];
74 }
75
76 static void MessageQueueWriteEvent(CFWriteStreamRef stream,
77 CFStreamEventType eventType,
78 void* self)
79 {
80 [(MessageQueue*)self writeStream:stream handleEvent:eventType];
81 }
82
83 ////////////////////////////////////////////////////////////////////////////////
84
85 @implementation MessageQueue
86
87 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
88 if ((self = [super init])) {
89 _port = port;
90 _queue = [[NSMutableArray alloc] init];
91 _delegate = delegate;
92 }
93 return self;
94 }
95
96 - (void)dealloc {
97 [_queue release];
98 [super dealloc];
99 }
100
101 - (BOOL)isConnected {
102 return _connected;
103 }
104
105 - (void)connect {
106 if (_thread)
107 return;
108
109 [NSThread detachNewThreadSelector:@selector(runMessageQueue)
110 toTarget:self
111 withObject:nil];
112 }
113
114 - (void)disconnect {
115 [self performSelector:@selector(stopRunLoop)
116 onThread:_thread
117 withObject:nil
118 waitUntilDone:NO];
119 }
120
121 - (void)sendMessage:(NSString*)message {
122 [self performSelector:@selector(enqueueMessage:)
123 onThread:_thread
124 withObject:message
125 waitUntilDone:NO];
126 }
127
128 // Private /////////////////////////////////////////////////////////////////////
129
130 - (void)runMessageQueue {
131 @autoreleasepool {
132 _thread = [NSThread currentThread];
133 _runLoop = [NSRunLoop currentRunLoop];
134
135 _connected = NO;
136 [self scheduleListenSocket];
137
138 // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
139 // stop routine.
140 CFRunLoopRun();
141
142 _thread = nil;
143 _runLoop = nil;
144 }
145 }
146
147 - (void)scheduleListenSocket {
148 // Create the address structure.
149 struct sockaddr_in address;
150 memset(&address, 0, sizeof(address));
151 address.sin_len = sizeof(address);
152 address.sin_family = AF_INET;
153 address.sin_port = htons(_port);
154 address.sin_addr.s_addr = htonl(INADDR_ANY);
155
156 // Create the socket signature.
157 CFSocketSignature signature;
158 signature.protocolFamily = PF_INET;
159 signature.socketType = SOCK_STREAM;
160 signature.protocol = IPPROTO_TCP;
161 signature.address = (CFDataRef)[NSData dataWithBytes:&address length:sizeof(address)];
162
163 CFSocketContext context = { 0 };
164 context.info = self;
165
166 do {
167 _socket =
168 CFSocketCreateWithSocketSignature(kCFAllocatorDefault,
169 &signature, // Socket signature.
170 kCFSocketAcceptCallBack, // Callback types.
171 &MessageQueueSocketAccept, // Callback function.
172 &context); // Context to pass to callout.
173 if (!_socket) {
174 //[connection_ errorEncountered:@"Could not open socket."];
175 sleep(1);
176 }
177 } while (!_socket);
178
179 // Allow old, yet-to-be recycled sockets to be reused.
180 int yes = 1;
181 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
182 setsockopt(CFSocketGetNative(_socket), SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
183
184 // Schedule the socket on the run loop.
185 CFRunLoopSourceRef source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket, 0);
186 CFRunLoopAddSource([_runLoop getCFRunLoop], source, kCFRunLoopCommonModes);
187 CFRelease(source);
188 }
189
190 - (void)disconnectClient {
191 if (_readStream) {
192 CFReadStreamUnscheduleFromRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
193 CFReadStreamClose(_readStream);
194 CFRelease(_readStream);
195 _readStream = NULL;
196 }
197
198 if (_writeStream) {
199 CFWriteStreamUnscheduleFromRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
200 CFWriteStreamClose(_writeStream);
201 CFRelease(_writeStream);
202 _writeStream = NULL;
203 }
204
205 if (_child) {
206 close(_child);
207 _child = NULL;
208 }
209
210 _connected = NO;
211 [_delegate clientDidDisconnect:self];
212 }
213
214 - (void)stopRunLoop {
215 [self disconnectClient];
216 CFRunLoopStop([_runLoop getCFRunLoop]);
217 }
218
219 - (void)enqueueMessage:(NSString*)message {
220 [_queue addObject:message];
221 [self dequeueAndSend];
222 }
223
224 - (void)dequeueAndSend {
225 if (![_queue count])
226 return;
227
228 if (![_delegate shouldSendMessage])
229 return;
230
231 if (!CFWriteStreamCanAcceptBytes(_writeStream))
232 return;
233
234 NSString* message = [_queue objectAtIndex:0];
235 [self performSend:message];
236 [_queue removeObjectAtIndex:0];
237 }
238
239 - (void)performSend:(NSString*)message {
240 // TODO: May need to negotiate with the server as to the string encoding.
241 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
242 // Add space for the NUL byte.
243 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
244
245 UInt8* buffer = malloc(maxBufferSize);
246 bzero(buffer, maxBufferSize);
247
248 NSUInteger bufferSize = 0;
249 if (![message getBytes:buffer
250 maxLength:maxBufferSize
251 usedLength:&bufferSize
252 encoding:kEncoding
253 options:0
254 range:NSMakeRange(0, [message length])
255 remainingRange:NULL]) {
256 free(buffer);
257 return;
258 }
259
260 // Include a NUL byte.
261 ++bufferSize;
262
263 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
264 // method is only ever called in response to a stream ready event.
265 NSUInteger totalWritten = 0;
266 while (totalWritten < bufferSize) {
267 CFIndex bytesWritten = CFWriteStreamWrite(_writeStream, buffer + totalWritten, bufferSize - totalWritten);
268 if (bytesWritten < 0) {
269 CFErrorRef error = CFWriteStreamCopyError(_writeStream);
270 //ReportError(error);
271 break;
272 }
273 totalWritten += bytesWritten;
274 }
275
276 [_delegate didSendMessage:message];
277
278 free(buffer);
279 }
280
281 - (void)readMessageFromStream {
282 const NSUInteger kBufferSize = 1024;
283 UInt8 buffer[kBufferSize];
284 CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
285 CFIndex bytesRead = CFReadStreamRead(_readStream, buffer, kBufferSize);
286 const char* charBuffer = (const char*)buffer;
287
288 // The read loop works by going through the buffer until all the bytes have
289 // been processed.
290 while (bufferOffset < bytesRead) {
291 // Find the NUL separator, or the end of the string.
292 NSUInteger partLength = 0;
293 for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
294
295 // If there is not a current packet, set some state.
296 if (!_message) {
297 // Read the message header: the size. This will be |partLength| bytes.
298 _totalMessageSize = atoi(charBuffer + bufferOffset);
299 _messageSize = 0;
300 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
301 bufferOffset += partLength + 1; // Pass over the NUL byte.
302 continue; // Spin the loop to begin reading actual data.
303 }
304
305 // Substring the byte stream and append it to the packet string.
306 CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault,
307 buffer + bufferOffset, // Byte pointer, offset by start index.
308 partLength, // Length.
309 kCFStringEncodingUTF8,
310 true);
311 [_message appendString:(NSString*)bufferString];
312 CFRelease(bufferString);
313
314 // Advance counters.
315 _messageSize += partLength;
316 bufferOffset += partLength + 1;
317
318 // If this read finished the packet, handle it and reset.
319 if (_messageSize >= _totalMessageSize) {
320 [_delegate didReceiveMessage:[_message autorelease]];
321 _message = nil;
322 }
323 }
324 }
325
326 - (void)listenSocket:(CFSocketRef)socket acceptedSocket:(CFSocketNativeHandle)child {
327 if (socket != _socket) {
328 // TODO: error
329 return;
330 }
331
332 _child = child;
333
334 // Create the streams on the socket.
335 CFStreamCreatePairWithSocket(kCFAllocatorDefault,
336 _child, // Socket handle.
337 &_readStream, // Read stream in-pointer.
338 &_writeStream); // Write stream in-pointer.
339
340 // Create struct to register callbacks for the stream.
341 CFStreamClientContext context = { 0 };
342 context.info = self;
343
344 // Set the client of the read stream.
345 CFOptionFlags readFlags = kCFStreamEventOpenCompleted |
346 kCFStreamEventHasBytesAvailable |
347 kCFStreamEventErrorOccurred |
348 kCFStreamEventEndEncountered;
349 if (CFReadStreamSetClient(_readStream, readFlags, &MessageQueueReadEvent, &context))
350 // Schedule in run loop to do asynchronous communication with the engine.
351 CFReadStreamScheduleWithRunLoop(_readStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
352 else
353 return;
354
355 // Open the stream now that it's scheduled on the run loop.
356 if (!CFReadStreamOpen(_readStream)) {
357 //ReportError(CFReadStreamCopyError(readStream_));
358 return;
359 }
360
361 // Set the client of the write stream.
362 CFOptionFlags writeFlags = kCFStreamEventOpenCompleted |
363 kCFStreamEventCanAcceptBytes |
364 kCFStreamEventErrorOccurred |
365 kCFStreamEventEndEncountered;
366 if (CFWriteStreamSetClient(_writeStream, writeFlags, &MessageQueueWriteEvent, &context))
367 // Schedule it in the run loop to receive error information.
368 CFWriteStreamScheduleWithRunLoop(_writeStream, [_runLoop getCFRunLoop], kCFRunLoopCommonModes);
369 else
370 return;
371
372 // Open the write stream.
373 if (!CFWriteStreamOpen(_writeStream)) {
374 // ReportError(CFWriteStreamCopyError(_writeStream));
375 return;
376 }
377
378 _connected = YES;
379 [_delegate clientDidConnect:self];
380
381 CFSocketInvalidate(_socket);
382 CFRelease(_socket);
383 _socket = NULL;
384 }
385
386 - (void)readStream:(CFReadStreamRef)stream handleEvent:(CFStreamEventType)event
387 {
388 assert(stream == _readStream);
389 switch (event)
390 {
391 case kCFStreamEventHasBytesAvailable:
392 [self readMessageFromStream];
393 break;
394
395 case kCFStreamEventErrorOccurred:
396 //ReportError(CFReadStreamCopyError(stream));
397 [self stopRunLoop];
398 break;
399
400 case kCFStreamEventEndEncountered:
401 [self stopRunLoop];
402 break;
403
404 default:
405 // TODO: error
406 break;
407 };
408 }
409
410 - (void)writeStream:(CFWriteStreamRef)stream handleEvent:(CFStreamEventType)event
411 {
412 assert(stream == _writeStream);
413 switch (event) {
414 case kCFStreamEventCanAcceptBytes:
415 [self dequeueAndSend];
416 break;
417
418 case kCFStreamEventErrorOccurred:
419 //ReportError(CFWriteStreamCopyError(stream));
420 [self stopRunLoop];
421 break;
422
423 case kCFStreamEventEndEncountered:
424 [self stopRunLoop];
425 break;
426
427 default:
428 // TODO: error
429 break;
430 }
431 }
432
433 @end