Rewrite MessageQueue to use libdispatch instead of CFStream and a dedicated thread.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <sys/socket.h>
22
23 @implementation MessageQueue {
24 // The port number on which to open a listening socket.
25 NSUInteger _port;
26
27 // The dispatch queue for this instance.
28 dispatch_queue_t _dispatchQueue;
29
30 // Whether or not the message queue is connected to a client.
31 BOOL _connected;
32
33 // A queue of messages that are waiting to be sent.
34 NSMutableArray* _messageQueue;
35
36 // The delegate for this class.
37 BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
38
39 BOOL _listening;
40
41 int _socket;
42
43 // The two dispatch sources for the |_child|.
44 dispatch_source_t _readSource;
45 dispatch_source_t _writeSource;
46
47 // When a message is being read, this temporary buffer is used to build up
48 // the complete message from successive reads.
49 NSMutableString* _message;
50 NSUInteger _totalMessageSize;
51 NSUInteger _messageSize;
52 }
53
54 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
55 if ((self = [super init])) {
56 _port = port;
57 _dispatchQueue = dispatch_queue_create(
58 [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
59 DISPATCH_QUEUE_SERIAL);
60 _messageQueue = [[NSMutableArray alloc] init];
61 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
62 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
63 protocol:@protocol(MessageQueueDelegate)
64 thread:[NSThread currentThread]];
65 }
66 return self;
67 }
68
69 - (void)dealloc {
70 dispatch_release(_dispatchQueue);
71 [_messageQueue release];
72 [_delegate release];
73 [super dealloc];
74 }
75
76 - (BOOL)isConnected {
77 return _connected;
78 }
79
80 - (void)connect {
81 if (_connected)
82 return;
83
84 _connected = YES;
85 dispatch_async(_dispatchQueue, ^{ [self openListeningSocket]; });
86 }
87
88 - (void)disconnect {
89 dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
90 _connected = NO;
91 }
92
93 - (void)sendMessage:(NSString*)message {
94 dispatch_async(_dispatchQueue, ^{
95 [_messageQueue addObject:message];
96 [self dequeueAndSend];
97 });
98 }
99
100 // Private /////////////////////////////////////////////////////////////////////
101
102 - (void)openListeningSocket {
103 // Create a socket.
104 do {
105 _socket = socket(PF_INET, SOCK_STREAM, 0);
106 if (_socket < 0) {
107 NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
108 }
109 } while (!_socket);
110
111 // Allow old, yet-to-be recycled sockets to be reused.
112 int yes = 1;
113 setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
114 setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
115
116 // Bind to the address.
117 struct sockaddr_in address = {0};
118 address.sin_len = sizeof(address);
119 address.sin_family = AF_INET;
120 address.sin_port = htons(_port);
121 address.sin_addr.s_addr = htonl(INADDR_ANY);
122
123 int rv;
124 do {
125 rv = bind(_socket, &address, sizeof(address));
126 if (rv != 0) {
127 NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
128 }
129 } while (rv != 0);
130
131 // Listen for a connection.
132 rv = listen(_socket, 1);
133 if (rv < 0) {
134 NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
135 close(_socket);
136 _connected = NO;
137 return;
138 }
139 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
140 dispatch_source_set_event_handler(_readSource, ^{
141 [self acceptConnection];
142 });
143 dispatch_resume(_readSource);
144 }
145
146 // Closes down the listening socket, the child socket, and the streams.
147 - (void)disconnectClient {
148 if (_readSource) {
149 dispatch_source_cancel(_readSource);
150 dispatch_release(_readSource);
151 _readSource = NULL;
152 }
153
154 if (_writeSource) {
155 dispatch_source_cancel(_writeSource);
156 dispatch_release(_writeSource);
157 _writeSource = NULL;
158 }
159
160 if (_socket) {
161 close(_socket);
162 _socket = -1;
163 }
164
165 _listening = NO;
166 [_messageQueue removeAllObjects];
167
168 [_delegate messageQueueDidDisconnect:self];
169 }
170
171 // If the write stream is ready and there is data to send, sends the next message.
172 - (void)dequeueAndSend {
173 if (![_messageQueue count])
174 return;
175
176 NSString* message = [_messageQueue objectAtIndex:0];
177 [self performSend:message];
178 [_messageQueue removeObjectAtIndex:0];
179 }
180
181 // Writes the string into the write stream.
182 - (void)performSend:(NSString*)message {
183 // TODO: May need to negotiate with the server as to the string encoding.
184 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
185 // Add space for the NUL byte.
186 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
187
188 UInt8* buffer = malloc(maxBufferSize);
189 bzero(buffer, maxBufferSize);
190
191 NSUInteger bufferSize = 0;
192 if (![message getBytes:buffer
193 maxLength:maxBufferSize
194 usedLength:&bufferSize
195 encoding:kEncoding
196 options:0
197 range:NSMakeRange(0, [message length])
198 remainingRange:NULL]) {
199 free(buffer);
200 return;
201 }
202
203 // Include a NUL byte.
204 ++bufferSize;
205
206 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
207 // method is only ever called in response to a stream ready event.
208 NSUInteger totalWritten = 0;
209 while (totalWritten < bufferSize) {
210 ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
211 if (bytesWritten < 0) {
212 NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
213 break;
214 }
215 totalWritten += bytesWritten;
216 }
217
218 [_delegate messageQueue:self didSendMessage:message];
219
220 free(buffer);
221 }
222
223 // Reads bytes out of the read stream. This may be called multiple times if the
224 // message cannot be read in one pass.
225 - (void)readMessageFromStream {
226 const NSUInteger kBufferSize = 1024;
227 UInt8 buffer[kBufferSize];
228 CFIndex bufferOffset = 0; // Starting point in |buffer| to work with.
229 ssize_t bytesRead = read(_socket, buffer, kBufferSize);
230 if (bytesRead == 0) {
231 [self disconnectClient];
232 return;
233 }
234 const char* charBuffer = (const char*)buffer;
235
236 // The read loop works by going through the buffer until all the bytes have
237 // been processed.
238 while (bufferOffset < bytesRead) {
239 // Find the NUL separator, or the end of the string.
240 NSUInteger partLength = 0;
241 for (CFIndex i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
242
243 // If there is not a current packet, set some state.
244 if (!_message) {
245 // Read the message header: the size. This will be |partLength| bytes.
246 _totalMessageSize = atoi(charBuffer + bufferOffset);
247 _messageSize = 0;
248 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
249 bufferOffset += partLength + 1; // Pass over the NUL byte.
250 continue; // Spin the loop to begin reading actual data.
251 }
252
253 // Substring the byte stream and append it to the packet string.
254 CFStringRef bufferString = CFStringCreateWithBytes(kCFAllocatorDefault,
255 buffer + bufferOffset, // Byte pointer, offset by start index.
256 partLength, // Length.
257 kCFStringEncodingUTF8,
258 true);
259 [_message appendString:(NSString*)bufferString];
260 CFRelease(bufferString);
261
262 // Advance counters.
263 _messageSize += partLength;
264 bufferOffset += partLength + 1;
265
266 // If this read finished the packet, handle it and reset.
267 if (_messageSize >= _totalMessageSize) {
268 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
269 _message = nil;
270
271 // Process any outgoing messages.
272 [self dequeueAndSend];
273 }
274 }
275 }
276
277 - (void)acceptConnection {
278 _listening = NO;
279
280 struct sockaddr_in address = {0};
281 socklen_t addressLength = sizeof(address);
282 int connection = accept(_socket, &address, &addressLength);
283 if (connection < 0) {
284 NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
285 [self disconnectClient];
286 return;
287 }
288
289 dispatch_source_cancel(_readSource);
290 close(_socket);
291
292 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
293 dispatch_source_set_event_handler(_readSource, ^{
294 [self readMessageFromStream];
295 });
296
297 _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
298 dispatch_source_set_event_handler(_writeSource, ^{
299 [self dequeueAndSend];
300 });
301
302 _socket = connection;
303
304 dispatch_resume(_readSource);
305 dispatch_resume(_writeSource);
306
307 [_delegate messageQueueDidConnect:self];
308 }
309
310 @end