3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17 #import "MessageQueue.h"
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
26 #import "BSProtocolThreadInvoker.h"
28 @implementation MessageQueue
{
29 // The port number on which to open a listening socket.
32 // All the ivars beneath this must be accessed from this queue.
33 //////////////////////////////////////////////////////////////////////////////
34 dispatch_queue_t _dispatchQueue
;
36 // Whether or not the message queue is connected to a client.
39 // A queue of messages that are waiting to be sent.
40 NSMutableArray
* _messageQueue
;
42 // The delegate for this class.
43 BSProtocolThreadInvoker
<MessageQueueDelegate
>* _delegate
;
45 // The socket for the queue. This will either be a listening socket, waiting
46 // to accept connections. Or it will be a connected socket with a server.
49 // The dispatch sources for |_socket|, run on |_dispatchQueue|. If this is
50 // for a listening socket, only |_readSource| will be non-NULL. If
51 // |_connected| is false, both will be NULL.
52 dispatch_source_t _readSource
;
53 dispatch_source_t _writeSource
;
55 // Whether |_writeSource| has been suspended through |-dequeueAndSend|.
58 // When a message is being read, this temporary buffer is used to build up
59 // the complete message from successive reads.
60 NSMutableString
* _message
;
61 NSUInteger _totalMessageSize
;
62 NSUInteger _messageSize
;
65 - (id)initWithPort
:(NSUInteger
)port delegate
:(id<MessageQueueDelegate
>)delegate
{
66 if ((self = [super init
])) {
68 _dispatchQueue
= dispatch_queue_create(
69 [[NSString stringWithFormat
:@
"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String
],
70 DISPATCH_QUEUE_SERIAL
);
71 _messageQueue
= [[NSMutableArray alloc
] init
];
72 _delegate
= (BSProtocolThreadInvoker
<MessageQueueDelegate
>*)
73 [[BSProtocolThreadInvoker alloc
] initWithObject
:delegate
74 protocol
:@protocol(MessageQueueDelegate
)
75 thread
:[NSThread currentThread
]];
82 dispatch_sync(_dispatchQueue
, ^
{ [self disconnectClient
]; });
83 dispatch_release(_dispatchQueue
);
84 [_messageQueue release
];
90 BOOL __block connected
;
91 dispatch_sync(_dispatchQueue
, ^
{ connected
= _connected
; });
96 dispatch_async(_dispatchQueue
, ^
{
100 [self openListeningSocket
];
105 dispatch_async(_dispatchQueue
, ^
{ [self disconnectClient
]; });
108 - (void)sendMessage
:(NSString
*)message
{
109 dispatch_async(_dispatchQueue
, ^
{
110 [_messageQueue addObject
:message
];
111 [self dequeueAndSend
];
115 // Private /////////////////////////////////////////////////////////////////////
117 - (void)openListeningSocket
{
119 _socket
= socket(PF_INET
, SOCK_STREAM
, 0);
121 NSLog(@
"Could not connect to socket: %d %s", errno
, strerror(errno
));
125 // Allow old, yet-to-be recycled sockets to be reused.
127 setsockopt(_socket
, SOL_SOCKET
, SO_REUSEADDR
, &yes
, sizeof(int));
128 setsockopt(_socket
, SOL_SOCKET
, SO_REUSEPORT
, &yes
, sizeof(int));
130 // Bind to the address.
131 struct sockaddr_in address
= {0};
132 address.sin_len
= sizeof(address
);
133 address.sin_family
= AF_INET
;
134 address.sin_port
= htons(_port
);
135 address.sin_addr.s_addr
= htonl(INADDR_ANY
);
139 rv
= bind(_socket
, (struct sockaddr
*)&address
, sizeof(address
));
141 NSLog(@
"Could not bind to socket: %d, %s", errno
, strerror(errno
));
145 // Listen for a connection.
146 rv
= listen(_socket
, 1);
148 NSLog(@
"Could not listen on socket: %d, %s", errno
, strerror(errno
));
153 _readSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, _socket
, 0, _dispatchQueue
);
154 dispatch_source_set_event_handler(_readSource
, ^
{
155 [self acceptConnection
];
157 dispatch_resume(_readSource
);
162 // Closes down the listening socket, the child socket, and the streams.
163 - (void)disconnectClient
{
168 dispatch_source_cancel(_readSource
);
169 dispatch_release(_readSource
);
174 if (_writeSuspended
) {
175 _writeSuspended
= NO
;
176 dispatch_resume(_writeSource
);
178 dispatch_source_cancel(_writeSource
);
179 dispatch_release(_writeSource
);
188 [_messageQueue removeAllObjects
];
191 [_delegate messageQueueDidDisconnect
:self];
194 // If the write stream is ready and there is data to send, sends the next message.
195 - (void)dequeueAndSend
{
196 if ([_messageQueue count
] == 0) {
197 // There are no outgoing messages, so suspend the dispatch source to avoid
198 // needless callouts to this method.
200 _writeSuspended
= YES
;
201 dispatch_suspend(_writeSource
);
204 } else if (_writeSuspended
) {
205 // A new message has arrived with the source suspended. Resume it, which
206 // will arrange for a callout back here when the socket is ready.
207 _writeSuspended
= NO
;
208 dispatch_resume(_writeSource
);
212 NSString
* message
= [_messageQueue objectAtIndex
:0];
213 [self performSend
:message
];
214 [_messageQueue removeObjectAtIndex
:0];
217 // Writes the string into the write stream.
218 - (void)performSend
:(NSString
*)message
{
219 // TODO: May need to negotiate with the server as to the string encoding.
220 const NSStringEncoding kEncoding
= NSUTF8StringEncoding
;
221 // Add space for the NUL byte.
222 NSUInteger maxBufferSize
= [message maximumLengthOfBytesUsingEncoding
:kEncoding
] + 1;
224 UInt8
* buffer
= calloc(maxBufferSize
, sizeof(UInt8
));
225 NSUInteger bufferSize
= 0;
226 if (![message getBytes
:buffer
227 maxLength
:maxBufferSize
228 usedLength
:&bufferSize
231 range
:NSMakeRange(0, [message length
])
232 remainingRange
:NULL
]) {
237 // Include a NUL byte.
240 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
241 // method is only ever called in response to a stream ready event.
242 NSUInteger totalWritten
= 0;
243 while (totalWritten
< bufferSize
) {
244 ssize_t bytesWritten
= write(_socket
, buffer
+ totalWritten
, bufferSize
- totalWritten
);
245 if (bytesWritten
< 0) {
246 NSLog(@
"Failed to write to stream: %d, %s", errno
, strerror(errno
));
249 totalWritten
+= bytesWritten
;
252 [_delegate messageQueue
:self didSendMessage
:message
];
257 // Reads bytes out of the read stream. This may be called multiple times if the
258 // message cannot be read in one pass.
259 - (void)readMessageFromStream
{
260 const NSUInteger kBufferSize
= 1024;
261 char buffer
[kBufferSize
];
262 ssize_t bufferOffset
= 0; // Starting point in |buffer| to work with.
263 ssize_t bytesRead
= read(_socket
, buffer
, kBufferSize
);
264 if (bytesRead
== 0) {
265 [self disconnectClient
];
269 // The read loop works by going through the buffer until all the bytes have
271 while (bufferOffset
< bytesRead
) {
272 // Find the NUL separator, or the end of the string.
273 NSUInteger partLength
= 0;
274 for (ssize_t i
= bufferOffset
; i
< bytesRead
&& buffer
[i
] != '\0'; ++i
, ++partLength
) ;
276 // If there is not a current packet, set some state.
278 // Read the message header: the size. This will be |partLength| bytes.
279 _totalMessageSize
= atoi(buffer
+ bufferOffset
);
281 _message
= [[NSMutableString alloc
] initWithCapacity
:_totalMessageSize
];
282 bufferOffset
+= partLength
+ 1; // Pass over the NUL byte.
283 continue; // Spin the loop to begin reading actual data.
286 // Substring the byte stream and append it to the packet string.
287 NSString
* bufferString
= [[NSString alloc
] initWithBytesNoCopy
:buffer
+ bufferOffset
289 encoding
:NSUTF8StringEncoding
291 [_message appendString
:[bufferString autorelease
]];
294 _messageSize
+= partLength
;
295 bufferOffset
+= partLength
+ 1;
297 // If this read finished the packet, handle it and reset.
298 if (_messageSize
>= _totalMessageSize
) {
299 [_delegate messageQueue
:self didReceiveMessage
:[_message autorelease
]];
305 - (void)acceptConnection
{
306 struct sockaddr_in address
= {0};
307 socklen_t addressLength
= sizeof(address
);
308 int connection
= accept(_socket
, (struct sockaddr
*)&address
, &addressLength
);
309 if (connection
< 0) {
310 NSLog(@
"Failed to accept connection: %d, %s", errno
, strerror(errno
));
311 [self disconnectClient
];
315 dispatch_source_cancel(_readSource
);
318 _readSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, connection
, 0, _dispatchQueue
);
319 dispatch_source_set_event_handler(_readSource
, ^
{
320 [self readMessageFromStream
];
323 _writeSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, connection
, 0, _dispatchQueue
);
324 dispatch_source_set_event_handler(_writeSource
, ^
{
325 [self dequeueAndSend
];
328 _socket
= connection
;
330 dispatch_resume(_readSource
);
331 dispatch_resume(_writeSource
);
333 [_delegate messageQueueDidConnect
:self];