Zero-warning build.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25
26 #import "BSProtocolThreadInvoker.h"
27
28 @implementation MessageQueue {
29 // The port number on which to open a listening socket.
30 NSUInteger _port;
31
32 // All the ivars beneath this must be accessed from this queue.
33 //////////////////////////////////////////////////////////////////////////////
34 dispatch_queue_t _dispatchQueue;
35
36 // Whether or not the message queue is connected to a client.
37 BOOL _connected;
38
39 // A queue of messages that are waiting to be sent.
40 NSMutableArray* _messageQueue;
41
42 // The delegate for this class.
43 BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
44
45 // The socket for the queue. This will either be a listening socket, waiting
46 // to accept connections. Or it will be a connected socket with a server.
47 int _socket;
48
49 // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is
50 // for a listening socket, only |_readSource| will be non-NULL. If
51 // |_connected| is false, both will be NULL.
52 dispatch_source_t _readSource;
53 dispatch_source_t _writeSource;
54
55 // Whether |_writeSource| has been suspended through |-dequeueAndSend|.
56 BOOL _writeSuspended;
57
58 // When a message is being read, this temporary buffer is used to build up
59 // the complete message from successive reads.
60 NSMutableString* _message;
61 NSUInteger _totalMessageSize;
62 NSUInteger _messageSize;
63 }
64
65 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
66 if ((self = [super init])) {
67 _port = port;
68 _dispatchQueue = dispatch_queue_create(
69 [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
70 DISPATCH_QUEUE_SERIAL);
71 _messageQueue = [[NSMutableArray alloc] init];
72 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
73 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
74 protocol:@protocol(MessageQueueDelegate)
75 thread:[NSThread currentThread]];
76 _socket = -1;
77 }
78 return self;
79 }
80
81 - (void)dealloc {
82 dispatch_sync(_dispatchQueue, ^{ [self disconnectClient]; });
83 dispatch_release(_dispatchQueue);
84 [_messageQueue release];
85 [_delegate release];
86 [super dealloc];
87 }
88
89 - (BOOL)isConnected {
90 BOOL __block connected;
91 dispatch_sync(_dispatchQueue, ^{ connected = _connected; });
92 return connected;
93 }
94
95 - (void)connect {
96 dispatch_async(_dispatchQueue, ^{
97 if (_connected)
98 return;
99
100 [self openListeningSocket];
101 });
102 }
103
104 - (void)disconnect {
105 dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
106 }
107
108 - (void)sendMessage:(NSString*)message {
109 dispatch_async(_dispatchQueue, ^{
110 [_messageQueue addObject:message];
111 [self dequeueAndSend];
112 });
113 }
114
115 // Private /////////////////////////////////////////////////////////////////////
116
117 - (void)openListeningSocket {
118 // Create a socket.
119 _socket = socket(PF_INET, SOCK_STREAM, 0);
120 if (_socket < 0) {
121 NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
122 return;
123 }
124
125 // Allow old, yet-to-be recycled sockets to be reused.
126 int yes = 1;
127 setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
128 setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
129
130 // Bind to the address.
131 struct sockaddr_in address = {0};
132 address.sin_len = sizeof(address);
133 address.sin_family = AF_INET;
134 address.sin_port = htons(_port);
135 address.sin_addr.s_addr = htonl(INADDR_ANY);
136
137 int rv;
138 do {
139 rv = bind(_socket, (struct sockaddr*)&address, sizeof(address));
140 if (rv != 0) {
141 NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
142 }
143 } while (rv != 0);
144
145 // Listen for a connection.
146 rv = listen(_socket, 1);
147 if (rv < 0) {
148 NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
149 close(_socket);
150 _socket = -1;
151 return;
152 }
153 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
154 dispatch_source_set_event_handler(_readSource, ^{
155 [self acceptConnection];
156 });
157 dispatch_resume(_readSource);
158
159 _connected = YES;
160 }
161
162 // Closes down the listening socket, the child socket, and the streams.
163 - (void)disconnectClient {
164 if (!_connected)
165 return;
166
167 if (_readSource) {
168 dispatch_source_cancel(_readSource);
169 dispatch_release(_readSource);
170 _readSource = NULL;
171 }
172
173 if (_writeSource) {
174 if (_writeSuspended) {
175 _writeSuspended = NO;
176 dispatch_resume(_writeSource);
177 }
178 dispatch_source_cancel(_writeSource);
179 dispatch_release(_writeSource);
180 _writeSource = NULL;
181 }
182
183 if (_socket != -1) {
184 close(_socket);
185 _socket = -1;
186 }
187
188 [_messageQueue removeAllObjects];
189
190 _connected = NO;
191 [_delegate messageQueueDidDisconnect:self];
192 }
193
194 // If the write stream is ready and there is data to send, sends the next message.
195 - (void)dequeueAndSend {
196 if ([_messageQueue count] == 0) {
197 // There are no outgoing messages, so suspend the dispatch source to avoid
198 // needless callouts to this method.
199 if (_writeSource) {
200 _writeSuspended = YES;
201 dispatch_suspend(_writeSource);
202 }
203 return;
204 } else if (_writeSuspended) {
205 // A new message has arrived with the source suspended. Resume it, which
206 // will arrange for a callout back here when the socket is ready.
207 _writeSuspended = NO;
208 dispatch_resume(_writeSource);
209 return;
210 }
211
212 NSString* message = [_messageQueue objectAtIndex:0];
213 [self performSend:message];
214 [_messageQueue removeObjectAtIndex:0];
215 }
216
217 // Writes the string into the write stream.
218 - (void)performSend:(NSString*)message {
219 // TODO: May need to negotiate with the server as to the string encoding.
220 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
221 // Add space for the NUL byte.
222 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
223
224 UInt8* buffer = calloc(maxBufferSize, sizeof(UInt8));
225 NSUInteger bufferSize = 0;
226 if (![message getBytes:buffer
227 maxLength:maxBufferSize
228 usedLength:&bufferSize
229 encoding:kEncoding
230 options:0
231 range:NSMakeRange(0, [message length])
232 remainingRange:NULL]) {
233 free(buffer);
234 return;
235 }
236
237 // Include a NUL byte.
238 ++bufferSize;
239
240 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
241 // method is only ever called in response to a stream ready event.
242 NSUInteger totalWritten = 0;
243 while (totalWritten < bufferSize) {
244 ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
245 if (bytesWritten < 0) {
246 NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
247 break;
248 }
249 totalWritten += bytesWritten;
250 }
251
252 [_delegate messageQueue:self didSendMessage:message];
253
254 free(buffer);
255 }
256
257 // Reads bytes out of the read stream. This may be called multiple times if the
258 // message cannot be read in one pass.
259 - (void)readMessageFromStream {
260 const NSUInteger kBufferSize = 1024;
261 char buffer[kBufferSize];
262 ssize_t bufferOffset = 0; // Starting point in |buffer| to work with.
263 ssize_t bytesRead = read(_socket, buffer, kBufferSize);
264 if (bytesRead == 0) {
265 [self disconnectClient];
266 return;
267 }
268 const char* charBuffer = (const char*)buffer;
269
270 // The read loop works by going through the buffer until all the bytes have
271 // been processed.
272 while (bufferOffset < bytesRead) {
273 // Find the NUL separator, or the end of the string.
274 NSUInteger partLength = 0;
275 for (ssize_t i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
276
277 // If there is not a current packet, set some state.
278 if (!_message) {
279 // Read the message header: the size. This will be |partLength| bytes.
280 _totalMessageSize = atoi(charBuffer + bufferOffset);
281 _messageSize = 0;
282 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
283 bufferOffset += partLength + 1; // Pass over the NUL byte.
284 continue; // Spin the loop to begin reading actual data.
285 }
286
287 // Substring the byte stream and append it to the packet string.
288 NSString* bufferString = [[NSString alloc] initWithBytesNoCopy:buffer + bufferOffset
289 length:partLength
290 encoding:NSUTF8StringEncoding
291 freeWhenDone:NO];
292 [_message appendString:[bufferString autorelease]];
293
294 // Advance counters.
295 _messageSize += partLength;
296 bufferOffset += partLength + 1;
297
298 // If this read finished the packet, handle it and reset.
299 if (_messageSize >= _totalMessageSize) {
300 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
301 _message = nil;
302 }
303 }
304 }
305
306 - (void)acceptConnection {
307 struct sockaddr_in address = {0};
308 socklen_t addressLength = sizeof(address);
309 int connection = accept(_socket, (struct sockaddr*)&address, &addressLength);
310 if (connection < 0) {
311 NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
312 [self disconnectClient];
313 return;
314 }
315
316 dispatch_source_cancel(_readSource);
317 close(_socket);
318
319 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
320 dispatch_source_set_event_handler(_readSource, ^{
321 [self readMessageFromStream];
322 });
323
324 _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
325 dispatch_source_set_event_handler(_writeSource, ^{
326 [self dequeueAndSend];
327 });
328
329 _socket = connection;
330
331 dispatch_resume(_readSource);
332 dispatch_resume(_writeSource);
333
334 [_delegate messageQueueDidConnect:self];
335 }
336
337 @end