]> src.bluestatic.org Git - macgdbp.git/blob - Source/MessageQueue.m
Remove StackFrame.routingID.
[macgdbp.git] / Source / MessageQueue.m
1 /*
2 * MacGDBp
3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
4 *
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 #import "MessageQueue.h"
18
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <unistd.h>
25
26 #import "BSProtocolThreadInvoker.h"
27
28 @implementation MessageQueue {
29 // The port number on which to open a listening socket.
30 NSUInteger _port;
31
32 // All the ivars beneath this must be accessed from this queue.
33 //////////////////////////////////////////////////////////////////////////////
34 dispatch_queue_t _dispatchQueue;
35
36 // Whether or not the message queue is connected to a client.
37 BOOL _connected;
38
39 // A queue of messages that are waiting to be sent.
40 NSMutableArray* _messageQueue;
41
42 // The delegate for this class.
43 BSProtocolThreadInvoker<MessageQueueDelegate>* _delegate;
44
45 // The socket for the queue. This will either be a listening socket, waiting
46 // to accept connections. Or it will be a connected socket with a server.
47 int _socket;
48
49 // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is
50 // for a listening socket, only |_readSource| will be non-NULL. If
51 // |_connected| is false, both will be NULL.
52 dispatch_source_t _readSource;
53 dispatch_source_t _writeSource;
54
55 // When a message is being read, this temporary buffer is used to build up
56 // the complete message from successive reads.
57 NSMutableString* _message;
58 NSUInteger _totalMessageSize;
59 NSUInteger _messageSize;
60 }
61
62 - (id)initWithPort:(NSUInteger)port delegate:(id<MessageQueueDelegate>)delegate {
63 if ((self = [super init])) {
64 _port = port;
65 _dispatchQueue = dispatch_queue_create(
66 [[NSString stringWithFormat:@"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String],
67 DISPATCH_QUEUE_SERIAL);
68 _messageQueue = [[NSMutableArray alloc] init];
69 _delegate = (BSProtocolThreadInvoker<MessageQueueDelegate>*)
70 [[BSProtocolThreadInvoker alloc] initWithObject:delegate
71 protocol:@protocol(MessageQueueDelegate)
72 thread:[NSThread currentThread]];
73 _socket = -1;
74 }
75 return self;
76 }
77
78 - (void)dealloc {
79 dispatch_sync(_dispatchQueue, ^{ [self disconnectClient]; });
80 dispatch_release(_dispatchQueue);
81 [_messageQueue release];
82 [_delegate release];
83 [super dealloc];
84 }
85
86 - (BOOL)isConnected {
87 BOOL __block connected;
88 dispatch_sync(_dispatchQueue, ^{ connected = _connected; });
89 return connected;
90 }
91
92 - (void)connect {
93 dispatch_async(_dispatchQueue, ^{
94 if (_connected)
95 return;
96
97 [self openListeningSocket];
98 });
99 }
100
101 - (void)disconnect {
102 dispatch_async(_dispatchQueue, ^{ [self disconnectClient]; });
103 }
104
105 - (void)sendMessage:(NSString*)message {
106 dispatch_async(_dispatchQueue, ^{
107 [_messageQueue addObject:message];
108 [self dequeueAndSend];
109 });
110 }
111
112 // Private /////////////////////////////////////////////////////////////////////
113
114 - (void)openListeningSocket {
115 // Create a socket.
116 _socket = socket(PF_INET, SOCK_STREAM, 0);
117 if (_socket < 0) {
118 NSLog(@"Could not connect to socket: %d %s", errno, strerror(errno));
119 return;
120 }
121
122 // Allow old, yet-to-be recycled sockets to be reused.
123 int yes = 1;
124 setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
125 setsockopt(_socket, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(int));
126
127 // Bind to the address.
128 struct sockaddr_in address = {0};
129 address.sin_len = sizeof(address);
130 address.sin_family = AF_INET;
131 address.sin_port = htons(_port);
132 address.sin_addr.s_addr = htonl(INADDR_ANY);
133
134 int rv;
135 do {
136 rv = bind(_socket, &address, sizeof(address));
137 if (rv != 0) {
138 NSLog(@"Could not bind to socket: %d, %s", errno, strerror(errno));
139 }
140 } while (rv != 0);
141
142 // Listen for a connection.
143 rv = listen(_socket, 1);
144 if (rv < 0) {
145 NSLog(@"Could not listen on socket: %d, %s", errno, strerror(errno));
146 close(_socket);
147 _socket = -1;
148 return;
149 }
150 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _socket, 0, _dispatchQueue);
151 dispatch_source_set_event_handler(_readSource, ^{
152 [self acceptConnection];
153 });
154 dispatch_resume(_readSource);
155
156 _connected = YES;
157 }
158
159 // Closes down the listening socket, the child socket, and the streams.
160 - (void)disconnectClient {
161 if (!_connected)
162 return;
163
164 if (_readSource) {
165 dispatch_source_cancel(_readSource);
166 dispatch_release(_readSource);
167 _readSource = NULL;
168 }
169
170 if (_writeSource) {
171 dispatch_source_cancel(_writeSource);
172 dispatch_release(_writeSource);
173 _writeSource = NULL;
174 }
175
176 if (_socket != -1) {
177 close(_socket);
178 _socket = -1;
179 }
180
181 [_messageQueue removeAllObjects];
182
183 _connected = NO;
184 [_delegate messageQueueDidDisconnect:self];
185 }
186
187 // If the write stream is ready and there is data to send, sends the next message.
188 - (void)dequeueAndSend {
189 if (![_messageQueue count])
190 return;
191
192 NSString* message = [_messageQueue objectAtIndex:0];
193 [self performSend:message];
194 [_messageQueue removeObjectAtIndex:0];
195 }
196
197 // Writes the string into the write stream.
198 - (void)performSend:(NSString*)message {
199 // TODO: May need to negotiate with the server as to the string encoding.
200 const NSStringEncoding kEncoding = NSUTF8StringEncoding;
201 // Add space for the NUL byte.
202 NSUInteger maxBufferSize = [message maximumLengthOfBytesUsingEncoding:kEncoding] + 1;
203
204 UInt8* buffer = calloc(maxBufferSize, sizeof(UInt8));
205 NSUInteger bufferSize = 0;
206 if (![message getBytes:buffer
207 maxLength:maxBufferSize
208 usedLength:&bufferSize
209 encoding:kEncoding
210 options:0
211 range:NSMakeRange(0, [message length])
212 remainingRange:NULL]) {
213 free(buffer);
214 return;
215 }
216
217 // Include a NUL byte.
218 ++bufferSize;
219
220 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
221 // method is only ever called in response to a stream ready event.
222 NSUInteger totalWritten = 0;
223 while (totalWritten < bufferSize) {
224 ssize_t bytesWritten = write(_socket, buffer + totalWritten, bufferSize - totalWritten);
225 if (bytesWritten < 0) {
226 NSLog(@"Failed to write to stream: %d, %s", errno, strerror(errno));
227 break;
228 }
229 totalWritten += bytesWritten;
230 }
231
232 [_delegate messageQueue:self didSendMessage:message];
233
234 free(buffer);
235 }
236
237 // Reads bytes out of the read stream. This may be called multiple times if the
238 // message cannot be read in one pass.
239 - (void)readMessageFromStream {
240 const NSUInteger kBufferSize = 1024;
241 char buffer[kBufferSize];
242 ssize_t bufferOffset = 0; // Starting point in |buffer| to work with.
243 ssize_t bytesRead = read(_socket, buffer, kBufferSize);
244 if (bytesRead == 0) {
245 [self disconnectClient];
246 return;
247 }
248 const char* charBuffer = (const char*)buffer;
249
250 // The read loop works by going through the buffer until all the bytes have
251 // been processed.
252 while (bufferOffset < bytesRead) {
253 // Find the NUL separator, or the end of the string.
254 NSUInteger partLength = 0;
255 for (ssize_t i = bufferOffset; i < bytesRead && charBuffer[i] != '\0'; ++i, ++partLength) ;
256
257 // If there is not a current packet, set some state.
258 if (!_message) {
259 // Read the message header: the size. This will be |partLength| bytes.
260 _totalMessageSize = atoi(charBuffer + bufferOffset);
261 _messageSize = 0;
262 _message = [[NSMutableString alloc] initWithCapacity:_totalMessageSize];
263 bufferOffset += partLength + 1; // Pass over the NUL byte.
264 continue; // Spin the loop to begin reading actual data.
265 }
266
267 // Substring the byte stream and append it to the packet string.
268 NSString* bufferString = [[NSString alloc] initWithBytesNoCopy:buffer + bufferOffset
269 length:partLength
270 encoding:NSUTF8StringEncoding
271 freeWhenDone:NO];
272 [_message appendString:[bufferString autorelease]];
273
274 // Advance counters.
275 _messageSize += partLength;
276 bufferOffset += partLength + 1;
277
278 // If this read finished the packet, handle it and reset.
279 if (_messageSize >= _totalMessageSize) {
280 [_delegate messageQueue:self didReceiveMessage:[_message autorelease]];
281 _message = nil;
282 }
283 }
284 }
285
286 - (void)acceptConnection {
287 struct sockaddr_in address = {0};
288 socklen_t addressLength = sizeof(address);
289 int connection = accept(_socket, &address, &addressLength);
290 if (connection < 0) {
291 NSLog(@"Failed to accept connection: %d, %s", errno, strerror(errno));
292 [self disconnectClient];
293 return;
294 }
295
296 dispatch_source_cancel(_readSource);
297 close(_socket);
298
299 _readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, connection, 0, _dispatchQueue);
300 dispatch_source_set_event_handler(_readSource, ^{
301 [self readMessageFromStream];
302 });
303
304 _writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, connection, 0, _dispatchQueue);
305 dispatch_source_set_event_handler(_writeSource, ^{
306 [self dequeueAndSend];
307 });
308
309 _socket = connection;
310
311 dispatch_resume(_readSource);
312 dispatch_resume(_writeSource);
313
314 [_delegate messageQueueDidConnect:self];
315 }
316
317 @end