Cleanup from the MessageQueue rewrite.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25
26 #import "BSProtocolThreadInvoker.h"
27
28 @implementation MessageQueue {
29 // The port number on which to open a listening socket.
30 NSUInteger _port;
31
32 // All the ivars beneath this must be accessed from this queue.
33 //////////////////////////////////////////////////////////////////////////////
34 dispatch_queue_t _dispatchQueue;
35
36 // Whether or not the message queue is connected to a client.
37 BOOL _connected;
38
39 // A queue of messages that are waiting to be sent.
40 NSMutableArray* _messageQueue;
41
42 // The delegate for this class.
43 BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
44
45 // The socket for the queue. This will either be a listening socket, waiting
46 // to accept connections. Or it will be a connected socket with a server.
47 int _socket;
48
49 // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is
50 // for a listening socket, only |_readSource| will be non-NULL. If
51 // |_connected| is false, both will be NULL.
52 dispatch_source_t _readSource;
53 dispatch_source_t _writeSource;
54
55 // When a message is being read, this temporary buffer is used to build up
56 // the complete message from successive reads.
57 NSMutableString* _message;
58 NSUInteger _totalMessageSize;
59 NSUInteger _messageSize;
60 }
61
62 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
63 if ((self = [super init])) {
64 _port = port;
65 _dispatchQueue = dispatch_queue_create(
66 [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
67 DISPATCH_QUEUE_SERIAL);
68 _messageQueue = [[NSMutableArray alloc] init];
69 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
70 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
71 protocol:@protocol(MessageQueueDelegate)
72 thread:[NSThread currentThread]];
73 _socket = -1;
74 }
75 return self;
76 }
77
78 - (void)dealloc {
79 dispatch_sync(_dispatchQueue, ^{ [self disconnectClient]; });
80 dispatch_release(_dispatchQueue);
81 [_messageQueue release];
82 [_delegate release];
83 [super dealloc];
84 }
85
86 - (BOOL)isConnected {
87 BOOL __block connected;
88 dispatch_sync(_dispatchQueue, ^{ connected = _connected; });
89 return connected;
90 }
91
92 - (void)connect {
93 dispatch_async(_dispatchQueue, ^{
94 if (_connected)
95 return;
96
97 [self openListeningSocket];
98 });
99 }
100
101 - (void)disconnect {
102 dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
103 }
104
105 - (void)sendMessage:(NSString*)message {
106 dispatch_async(_dispatchQueue, ^{
107 [_messageQueue addObject:message];
108 [self dequeueAndSend];
109 });
110 }
111
112 // Private /////////////////////////////////////////////////////////////////////
113
114 - (void)openListeningSocket {
115 // Create a socket.
116 _socket = socket(PF_INET, SOCK_STREAM, 0);
117 if (_socket < 0) {
118 NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
119 return;
120 }
121
122 // Allow old, yet-to-be recycled sockets to be reused.
123 int yes = 1;
124 setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
125 setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
126
127 // Bind to the address.
128 struct sockaddr_in address = {0};
129 address.sin_len = sizeof(address);
130 address.sin_family = AF_INET;
131 address.sin_port = htons(_port);
132 address.sin_addr.s_addr = htonl(INADDR_ANY);
133
134 int rv;
135 do {
136 rv = bind(_socket, &address, sizeof(address));
137 if (rv != 0) {
138 NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
139 }
140 } while (rv != 0);
141
142 // Listen for a connection.
143 rv = listen(_socket, 1);
144 if (rv < 0) {
145 NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
146 close(_socket);
147 _socket = -1;
148 return;
149 }
150 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
151 dispatch_source_set_event_handler(_readSource, ^{
152 [self acceptConnection];
153 });
154 dispatch_resume(_readSource);
155
156 _connected = YES;
157 }
158
159 // Closes down the listening socket, the child socket, and the streams.
160 - (void)disconnectClient {
161 if (_readSource) {
162 dispatch_source_cancel(_readSource);
163 dispatch_release(_readSource);
164 _readSource = NULL;
165 }
166
167 if (_writeSource) {
168 dispatch_source_cancel(_writeSource);
169 dispatch_release(_writeSource);
170 _writeSource = NULL;
171 }
172
173 if (_socket != -1) {
174 close(_socket);
175 _socket = -1;
176 }
177
178 [_messageQueue removeAllObjects];
179
180 _connected = NO;
181 [_delegate messageQueueDidDisconnect:self];
182 }
183
184 // If the write stream is ready and there is data to send, sends the next message.
185 - (void)dequeueAndSend {
186 if (![_messageQueue count])
187 return;
188
189 NSString* message = [_messageQueue objectAtIndex:0];
190 [self performSend:message];
191 [_messageQueue removeObjectAtIndex:0];
192 }
193
194 // Writes the string into the write stream.
195 - (void)performSend:(NSString*)message {
196 // TODO: May need to negotiate with the server as to the string encoding.
197 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
198 // Add space for the NUL byte.
199 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
200
201 UInt8* buffer = calloc(maxBufferSize, sizeof(UInt8));
202 NSUInteger bufferSize = 0;
203 if (![message getBytes:buffer
204 maxLength:maxBufferSize
205 usedLength:&bufferSize
206 encoding:kEncoding
207 options:0
208 range:NSMakeRange(0, [message length])
209 remainingRange:NULL]) {
210 free(buffer);
211 return;
212 }
213
214 // Include a NUL byte.
215 ++bufferSize;
216
217 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
218 // method is only ever called in response to a stream ready event.
219 NSUInteger totalWritten = 0;
220 while (totalWritten < bufferSize) {
221 ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
222 if (bytesWritten < 0) {
223 NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
224 break;
225 }
226 totalWritten += bytesWritten;
227 }
228
229 [_delegate messageQueue:self didSendMessage:message];
230
231 free(buffer);
232 }
233
234 // Reads bytes out of the read stream. This may be called multiple times if the
235 // message cannot be read in one pass.
236 - (void)readMessageFromStream {
237 const NSUInteger kBufferSize = 1024;
238 char buffer[kBufferSize];
239 ssize_t bufferOffset = 0; // Starting point in |buffer| to work with.
240 ssize_t bytesRead = read(_socket, buffer, kBufferSize);
241 if (bytesRead == 0) {
242 [self disconnectClient];
243 return;
244 }
245 const char* charBuffer = (const char*)buffer;
246
247 // The read loop works by going through the buffer until all the bytes have
248 // been processed.
249 while (bufferOffset < bytesRead) {
250 // Find the NUL separator, or the end of the string.
251 NSUInteger partLength = 0;
252 for (ssize_t i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
253
254 // If there is not a current packet, set some state.
255 if (!_message) {
256 // Read the message header: the size. This will be |partLength| bytes.
257 _totalMessageSize = atoi(charBuffer + bufferOffset);
258 _messageSize = 0;
259 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
260 bufferOffset += partLength + 1; // Pass over the NUL byte.
261 continue; // Spin the loop to begin reading actual data.
262 }
263
264 // Substring the byte stream and append it to the packet string.
265 NSString* bufferString = [[NSString alloc] initWithBytesNoCopy:buffer + bufferOffset
266 length:partLength
267 encoding:NSUTF8StringEncoding
268 freeWhenDone:NO];
269 [_message appendString:[bufferString autorelease]];
270
271 // Advance counters.
272 _messageSize += partLength;
273 bufferOffset += partLength + 1;
274
275 // If this read finished the packet, handle it and reset.
276 if (_messageSize >= _totalMessageSize) {
277 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
278 _message = nil;
279 }
280 }
281 }
282
283 - (void)acceptConnection {
284 struct sockaddr_in address = {0};
285 socklen_t addressLength = sizeof(address);
286 int connection = accept(_socket, &address, &addressLength);
287 if (connection < 0) {
288 NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
289 [self disconnectClient];
290 return;
291 }
292
293 dispatch_source_cancel(_readSource);
294 close(_socket);
295
296 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
297 dispatch_source_set_event_handler(_readSource, ^{
298 [self readMessageFromStream];
299 });
300
301 _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
302 dispatch_source_set_event_handler(_writeSource, ^{
303 [self dequeueAndSend];
304 });
305
306 _socket = connection;
307
308 dispatch_resume(_readSource);
309 dispatch_resume(_writeSource);
310
311 [_delegate messageQueueDidConnect:self];
312 }
313
314 @end