3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17 #import "MessageQueue.h"
19 #include <dispatch/dispatch.h>
20 #include <netinet/in.h>
21 #include <sys/socket.h>
23 @implementation MessageQueue
{
24 // The port number on which to open a listening socket.
27 // The dispatch queue for this instance.
28 dispatch_queue_t _dispatchQueue
;
30 // Whether or not the message queue is connected to a client.
33 // A queue of messages that are waiting to be sent.
34 NSMutableArray
* _messageQueue
;
36 // The delegate for this class.
37 BSProtocolThreadInvoker
<MessageQueueDelegate
>* _delegate
;
43 // The two dispatch sources for the |_child|.
44 dispatch_source_t _readSource
;
45 dispatch_source_t _writeSource
;
47 // When a message is being read, this temporary buffer is used to build up
48 // the complete message from successive reads.
49 NSMutableString
* _message
;
50 NSUInteger _totalMessageSize
;
51 NSUInteger _messageSize
;
54 - (id)initWithPort
:(NSUInteger
)port delegate
:(id<MessageQueueDelegate
>)delegate
{
55 if ((self = [super init
])) {
57 _dispatchQueue
= dispatch_queue_create(
58 [[NSString stringWithFormat
:@
"org.bluestatic.MacGDBp.MessageQueue.%p", self] UTF8String
],
59 DISPATCH_QUEUE_SERIAL
);
60 _messageQueue
= [[NSMutableArray alloc
] init
];
61 _delegate
= (BSProtocolThreadInvoker
<MessageQueueDelegate
>*)
62 [[BSProtocolThreadInvoker alloc
] initWithObject
:delegate
63 protocol
:@protocol(MessageQueueDelegate
)
64 thread
:[NSThread currentThread
]];
70 dispatch_release(_dispatchQueue
);
71 [_messageQueue release
];
85 dispatch_async(_dispatchQueue
, ^
{ [self openListeningSocket
]; });
89 dispatch_async(_dispatchQueue
, ^
{ [self disconnectClient
]; });
93 - (void)sendMessage
:(NSString
*)message
{
94 dispatch_async(_dispatchQueue
, ^
{
95 [_messageQueue addObject
:message
];
96 [self dequeueAndSend
];
100 // Private /////////////////////////////////////////////////////////////////////
102 - (void)openListeningSocket
{
105 _socket
= socket(PF_INET
, SOCK_STREAM
, 0);
107 NSLog(@
"Could not connect to socket: %d %s", errno
, strerror(errno
));
111 // Allow old, yet-to-be recycled sockets to be reused.
113 setsockopt(_socket
, SOL_SOCKET
, SO_REUSEADDR
, &yes
, sizeof(int));
114 setsockopt(_socket
, SOL_SOCKET
, SO_REUSEPORT
, &yes
, sizeof(int));
116 // Bind to the address.
117 struct sockaddr_in address
= {0};
118 address.sin_len
= sizeof(address
);
119 address.sin_family
= AF_INET
;
120 address.sin_port
= htons(_port
);
121 address.sin_addr.s_addr
= htonl(INADDR_ANY
);
125 rv
= bind(_socket
, &address
, sizeof(address
));
127 NSLog(@
"Could not bind to socket: %d, %s", errno
, strerror(errno
));
131 // Listen for a connection.
132 rv
= listen(_socket
, 1);
134 NSLog(@
"Could not listen on socket: %d, %s", errno
, strerror(errno
));
139 _readSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, _socket
, 0, _dispatchQueue
);
140 dispatch_source_set_event_handler(_readSource
, ^
{
141 [self acceptConnection
];
143 dispatch_resume(_readSource
);
146 // Closes down the listening socket, the child socket, and the streams.
147 - (void)disconnectClient
{
149 dispatch_source_cancel(_readSource
);
150 dispatch_release(_readSource
);
155 dispatch_source_cancel(_writeSource
);
156 dispatch_release(_writeSource
);
166 [_messageQueue removeAllObjects
];
168 [_delegate messageQueueDidDisconnect
:self];
171 // If the write stream is ready and there is data to send, sends the next message.
172 - (void)dequeueAndSend
{
173 if (![_messageQueue count
])
176 NSString
* message
= [_messageQueue objectAtIndex
:0];
177 [self performSend
:message
];
178 [_messageQueue removeObjectAtIndex
:0];
181 // Writes the string into the write stream.
182 - (void)performSend
:(NSString
*)message
{
183 // TODO: May need to negotiate with the server as to the string encoding.
184 const NSStringEncoding kEncoding
= NSUTF8StringEncoding
;
185 // Add space for the NUL byte.
186 NSUInteger maxBufferSize
= [message maximumLengthOfBytesUsingEncoding
:kEncoding
] + 1;
188 UInt8
* buffer
= malloc(maxBufferSize
);
189 bzero(buffer
, maxBufferSize
);
191 NSUInteger bufferSize
= 0;
192 if (![message getBytes
:buffer
193 maxLength
:maxBufferSize
194 usedLength
:&bufferSize
197 range
:NSMakeRange(0, [message length
])
198 remainingRange
:NULL
]) {
203 // Include a NUL byte.
206 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
207 // method is only ever called in response to a stream ready event.
208 NSUInteger totalWritten
= 0;
209 while (totalWritten
< bufferSize
) {
210 ssize_t bytesWritten
= write(_socket
, buffer
+ totalWritten
, bufferSize
- totalWritten
);
211 if (bytesWritten
< 0) {
212 NSLog(@
"Failed to write to stream: %d, %s", errno
, strerror(errno
));
215 totalWritten
+= bytesWritten
;
218 [_delegate messageQueue
:self didSendMessage
:message
];
223 // Reads bytes out of the read stream. This may be called multiple times if the
224 // message cannot be read in one pass.
225 - (void)readMessageFromStream
{
226 const NSUInteger kBufferSize
= 1024;
227 UInt8 buffer
[kBufferSize
];
228 CFIndex bufferOffset
= 0; // Starting point in |buffer| to work with.
229 ssize_t bytesRead
= read(_socket
, buffer
, kBufferSize
);
230 if (bytesRead
== 0) {
231 [self disconnectClient
];
234 const char* charBuffer
= (const char*)buffer
;
236 // The read loop works by going through the buffer until all the bytes have
238 while (bufferOffset
< bytesRead
) {
239 // Find the NUL separator, or the end of the string.
240 NSUInteger partLength
= 0;
241 for (CFIndex i
= bufferOffset
; i
< bytesRead
&& charBuffer
[i
] != '\0'; ++i
, ++partLength
) ;
243 // If there is not a current packet, set some state.
245 // Read the message header: the size. This will be |partLength| bytes.
246 _totalMessageSize
= atoi(charBuffer
+ bufferOffset
);
248 _message
= [[NSMutableString alloc
] initWithCapacity
:_totalMessageSize
];
249 bufferOffset
+= partLength
+ 1; // Pass over the NUL byte.
250 continue; // Spin the loop to begin reading actual data.
253 // Substring the byte stream and append it to the packet string.
254 CFStringRef bufferString
= CFStringCreateWithBytes(kCFAllocatorDefault
,
255 buffer
+ bufferOffset
, // Byte pointer, offset by start index.
256 partLength
, // Length.
257 kCFStringEncodingUTF8
,
259 [_message appendString
:(NSString
*)bufferString
];
260 CFRelease(bufferString
);
263 _messageSize
+= partLength
;
264 bufferOffset
+= partLength
+ 1;
266 // If this read finished the packet, handle it and reset.
267 if (_messageSize
>= _totalMessageSize
) {
268 [_delegate messageQueue
:self didReceiveMessage
:[_message autorelease
]];
271 // Process any outgoing messages.
272 [self dequeueAndSend
];
277 - (void)acceptConnection
{
280 struct sockaddr_in address
= {0};
281 socklen_t addressLength
= sizeof(address
);
282 int connection
= accept(_socket
, &address
, &addressLength
);
283 if (connection
< 0) {
284 NSLog(@
"Failed to accept connection: %d, %s", errno
, strerror(errno
));
285 [self disconnectClient
];
289 dispatch_source_cancel(_readSource
);
292 _readSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, connection
, 0, _dispatchQueue
);
293 dispatch_source_set_event_handler(_readSource
, ^
{
294 [self readMessageFromStream
];
297 _writeSource
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, connection
, 0, _dispatchQueue
);
298 dispatch_source_set_event_handler(_writeSource
, ^
{
299 [self dequeueAndSend
];
302 _socket
= connection
;
304 dispatch_resume(_readSource
);
305 dispatch_resume(_writeSource
);
307 [_delegate messageQueueDidConnect
:self];