]> src.bluestatic.org Git - macgdbp.git/blob - Source/MessageQueue.m
Add a README.md.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25
26 #import "BSProtocolThreadInvoker.h"
27
28 @implementation MessageQueue {
29 // The port number on which to open a listening socket.
30 NSUInteger _port;
31
32 // All the ivars beneath this must be accessed from this queue.
33 //////////////////////////////////////////////////////////////////////////////
34 dispatch_queue_t _dispatchQueue;
35
36 // Whether or not the message queue is connected to a client.
37 BOOL _connected;
38
39 // A queue of messages that are waiting to be sent.
40 NSMutableArray* _messageQueue;
41
42 // The delegate for this class.
43 BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
44
45 // The socket for the queue. This will either be a listening socket, waiting
46 // to accept connections. Or it will be a connected socket with a server.
47 int _socket;
48
49 // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is
50 // for a listening socket, only |_readSource| will be non-NULL. If
51 // |_connected| is false, both will be NULL.
52 dispatch_source_t _readSource;
53 dispatch_source_t _writeSource;
54
55 // Whether |_writeSource| has been suspended through |-dequeueAndSend|.
56 BOOL _writeSuspended;
57
58 // When a message is being read, this temporary buffer is used to build up
59 // the complete message from successive reads.
60 NSMutableString* _message;
61 NSUInteger _totalMessageSize;
62 NSUInteger _messageSize;
63 }
64
65 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
66 if ((self = [super init])) {
67 _port = port;
68 _dispatchQueue = dispatch_queue_create(
69 [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
70 DISPATCH_QUEUE_SERIAL);
71 _messageQueue = [[NSMutableArray alloc] init];
72 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
73 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
74 protocol:@protocol(MessageQueueDelegate)
75 thread:[NSThread currentThread]];
76 _socket = -1;
77 }
78 return self;
79 }
80
81 - (void)dealloc {
82 dispatch_sync(_dispatchQueue, ^{ [self disconnectClient]; });
83 }
84
85 - (BOOL)isConnected {
86 BOOL __block connected;
87 dispatch_sync(_dispatchQueue, ^{ connected = self->_connected; });
88 return connected;
89 }
90
91 - (void)connect {
92 dispatch_async(_dispatchQueue, ^{
93 if (self->_connected)
94 return;
95
96 [self openListeningSocket];
97 });
98 }
99
100 - (void)disconnect {
101 dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
102 }
103
104 - (void)sendMessage:(NSString*)message {
105 dispatch_async(_dispatchQueue, ^{
106 [self->_messageQueue addObject:message];
107 [self dequeueAndSend];
108 });
109 }
110
111 // Private /////////////////////////////////////////////////////////////////////
112
113 - (void)openListeningSocket {
114 // Create a socket.
115 _socket = socket(PF_INET, SOCK_STREAM, 0);
116 if (_socket < 0) {
117 NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
118 return;
119 }
120
121 // Allow old, yet-to-be recycled sockets to be reused.
122 int yes = 1;
123 setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
124 setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
125
126 // Bind to the address.
127 struct sockaddr_in address = {0};
128 address.sin_len = sizeof(address);
129 address.sin_family = AF_INET;
130 address.sin_port = htons(_port);
131 address.sin_addr.s_addr = htonl(INADDR_ANY);
132
133 int rv;
134 do {
135 rv = bind(_socket, (struct sockaddr*)&address, sizeof(address));
136 if (rv != 0) {
137 NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
138 }
139 } while (rv != 0);
140
141 // Listen for a connection.
142 rv = listen(_socket, 1);
143 if (rv < 0) {
144 NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
145 close(_socket);
146 _socket = -1;
147 return;
148 }
149 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
150 dispatch_source_set_event_handler(_readSource, ^{
151 [self acceptConnection];
152 });
153 dispatch_resume(_readSource);
154
155 _connected = YES;
156 }
157
158 // Closes down the listening socket, the child socket, and the streams.
159 - (void)disconnectClient {
160 if (!_connected)
161 return;
162
163 if (_readSource) {
164 dispatch_source_cancel(_readSource);
165 _readSource = NULL;
166 }
167
168 if (_writeSource) {
169 if (_writeSuspended) {
170 _writeSuspended = NO;
171 dispatch_resume(_writeSource);
172 }
173 dispatch_source_cancel(_writeSource);
174 _writeSource = NULL;
175 }
176
177 if (_socket != -1) {
178 close(_socket);
179 _socket = -1;
180 }
181
182 [_messageQueue removeAllObjects];
183
184 _connected = NO;
185 [_delegate messageQueueDidDisconnect:self];
186 }
187
188 // If the write stream is ready and there is data to send, sends the next message.
189 - (void)dequeueAndSend {
190 if ([_messageQueue count] == 0) {
191 // There are no outgoing messages, so suspend the dispatch source to avoid
192 // needless callouts to this method.
193 if (_writeSource) {
194 _writeSuspended = YES;
195 dispatch_suspend(_writeSource);
196 }
197 return;
198 } else if (_writeSuspended) {
199 // A new message has arrived with the source suspended. Resume it, which
200 // will arrange for a callout back here when the socket is ready.
201 _writeSuspended = NO;
202 dispatch_resume(_writeSource);
203 return;
204 }
205
206 NSString* message = [_messageQueue objectAtIndex:0];
207 [self performSend:message];
208 [_messageQueue removeObjectAtIndex:0];
209 }
210
211 // Writes the string into the write stream.
212 - (void)performSend:(NSString*)message {
213 // TODO: May need to negotiate with the server as to the string encoding.
214 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
215 // Add space for the NUL byte.
216 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
217
218 UInt8* buffer = calloc(maxBufferSize, sizeof(UInt8));
219 NSUInteger bufferSize = 0;
220 if (![message getBytes:buffer
221 maxLength:maxBufferSize
222 usedLength:&bufferSize
223 encoding:kEncoding
224 options:0
225 range:NSMakeRange(0, [message length])
226 remainingRange:NULL]) {
227 free(buffer);
228 return;
229 }
230
231 // Include a NUL byte.
232 ++bufferSize;
233
234 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
235 // method is only ever called in response to a stream ready event.
236 NSUInteger totalWritten = 0;
237 while (totalWritten < bufferSize) {
238 ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
239 if (bytesWritten < 0) {
240 NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
241 break;
242 }
243 totalWritten += bytesWritten;
244 }
245
246 [_delegate messageQueue:self didSendMessage:message];
247
248 free(buffer);
249 }
250
251 // Reads bytes out of the read stream. This may be called multiple times if the
252 // message cannot be read in one pass.
253 - (void)readMessageFromStream {
254 const NSUInteger kBufferSize = 1024;
255 char buffer[kBufferSize];
256 ssize_t bufferOffset = 0; // Starting point in |buffer| to work with.
257 ssize_t bytesRead = read(_socket, buffer, kBufferSize);
258 if (bytesRead == 0) {
259 [self disconnectClient];
260 return;
261 }
262
263 // The read loop works by going through the buffer until all the bytes have
264 // been processed.
265 while (bufferOffset < bytesRead) {
266 // Find the NUL separator, or the end of the string.
267 NSUInteger partLength = 0;
268 for (ssize_t i = bufferOffset; i < bytesRead && buffer[i] != '\0'; ++i, ++partLength) ;
269
270 // If there is not a current packet, set some state.
271 if (!_message) {
272 // Read the message header: the size. This will be |partLength| bytes.
273 _totalMessageSize = atoi(buffer + bufferOffset);
274 _messageSize = 0;
275 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
276 bufferOffset += partLength + 1; // Pass over the NUL byte.
277 continue; // Spin the loop to begin reading actual data.
278 }
279
280 // Substring the byte stream and append it to the packet string.
281 NSString* bufferString = [[NSString alloc] initWithBytesNoCopy:buffer + bufferOffset
282 length:partLength
283 encoding:NSUTF8StringEncoding
284 freeWhenDone:NO];
285 [_message appendString:bufferString];
286
287 // Advance counters.
288 _messageSize += partLength;
289 bufferOffset += partLength + 1;
290
291 // If this read finished the packet, handle it and reset.
292 if (_messageSize >= _totalMessageSize) {
293 [_delegate messageQueue:self didReceiveMessage:_message];
294 _message = nil;
295 }
296 }
297 }
298
299 - (void)acceptConnection {
300 struct sockaddr_in address = {0};
301 socklen_t addressLength = sizeof(address);
302 int connection = accept(_socket, (struct sockaddr*)&address, &addressLength);
303 if (connection < 0) {
304 NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
305 [self disconnectClient];
306 return;
307 }
308
309 dispatch_source_cancel(_readSource);
310 close(_socket);
311
312 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
313 dispatch_source_set_event_handler(_readSource, ^{
314 [self readMessageFromStream];
315 });
316
317 _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
318 dispatch_source_set_event_handler(_writeSource, ^{
319 [self dequeueAndSend];
320 });
321
322 _socket = connection;
323
324 dispatch_resume(_readSource);
325 dispatch_resume(_writeSource);
326
327 [_delegate messageQueueDidConnect:self];
328 }
329
330 @end