3 * Copyright (c) 2013, Blue Static <http://www.bluestatic.org>
5 * This program is free software; you can redistribute it and/or modify it under the terms of the GNU
6 * General Public License as published by the Free Software Foundation; either version 2 of the
7 * License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
10 * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * General Public License for more details.
13 * You should have received a copy of the GNU General Public License along with this program; if not,
14 * write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17 #import "MessageQueue.h"
19 #include <netinet/in.h>
20 #include <sys/socket.h>
22 @interface MessageQueue (Private
)
23 // Thread main function that is started from -connect.
24 - (void)runMessageQueue
;
26 // All the following methods must be called from the -runMessageQueue thread.
28 // Creates a listening socket and schedules it in the run loop.
29 - (void)listenForClient
;
31 // Closes down the listening socket, the child socket, and the streams.
32 - (void)disconnectClient
;
34 // This first calls -disconnectClient and then stops the run loop and terminates
35 // the -runMessageQueue thread.
38 // Adds a |message| to |_queue|.
39 - (void)enqueueMessage
:(NSString
*)message
;
41 // If the write stream is ready and there is data to send, sends the next message.
42 - (void)dequeueAndSend
;
44 // Writes the string into the write stream.
45 - (void)performSend
:(NSString
*)message
;
47 // Reads bytes out of the read stream. This may be called multiple times if the
48 // message cannot be read in one pass.
49 - (void)readMessageFromStream
;
51 // Converts a CFErrorRef to an NSError and passes it to the delegate.
52 - (void)reportError
:(CFErrorRef
)error
;
54 // Forwarding methods from the CoreFoundation callbacks.
55 - (void)listenSocket
:(CFSocketRef
)socket acceptedSocket
:(CFSocketNativeHandle
)child
;
56 - (void)readStream
:(CFReadStreamRef
)stream handleEvent
:(CFStreamEventType
)event
;
57 - (void)writeStream
:(CFWriteStreamRef
)stream handleEvent
:(CFStreamEventType
)event
;
60 // CoreFoundation Callbacks ////////////////////////////////////////////////////
62 static void MessageQueueSocketAccept(CFSocketRef socket
,
63 CFSocketCallBackType callbackType
,
68 CFSocketNativeHandle child
= *(CFSocketNativeHandle
*)data
;
69 [(MessageQueue
*)self listenSocket
:socket acceptedSocket
:child
];
72 static void MessageQueueReadEvent(CFReadStreamRef stream
,
73 CFStreamEventType eventType
,
76 [(MessageQueue
*)self readStream
:stream handleEvent
:eventType
];
79 static void MessageQueueWriteEvent(CFWriteStreamRef stream
,
80 CFStreamEventType eventType
,
83 [(MessageQueue
*)self writeStream
:stream handleEvent
:eventType
];
86 ////////////////////////////////////////////////////////////////////////////////
88 @implementation MessageQueue
90 - (id)initWithPort
:(NSUInteger
)port delegate
:(id<MessageQueueDelegate
>)delegate
{
91 if ((self = [super init
])) {
93 _queue
= [[NSMutableArray alloc
] init
];
94 _delegate
= (BSProtocolThreadInvoker
<MessageQueueDelegate
>*)
95 [[BSProtocolThreadInvoker alloc
] initWithObject
:delegate
96 protocol
:@protocol(MessageQueueDelegate
)
97 thread
:[NSThread currentThread
]];
108 - (BOOL)isConnected
{
116 [NSThread detachNewThreadSelector
:@selector(runMessageQueue
)
122 [self performSelector
:@selector(stopRunLoop
)
128 - (void)sendMessage
:(NSString
*)message
{
129 [self performSelector
:@selector(enqueueMessage
:)
135 // Private /////////////////////////////////////////////////////////////////////
137 - (void)runMessageQueue
{
139 _thread
= [NSThread currentThread
];
140 _runLoop
= [NSRunLoop currentRunLoop
];
144 [self scheduleListenSocket
];
146 // Use CFRunLoop instead of NSRunLoop because the latter has no programmatic
155 - (void)scheduleListenSocket
{
156 // Create the address structure.
157 struct sockaddr_in address
;
158 memset(&address
, 0, sizeof(address
));
159 address.sin_len
= sizeof(address
);
160 address.sin_family
= AF_INET
;
161 address.sin_port
= htons(_port
);
162 address.sin_addr.s_addr
= htonl(INADDR_ANY
);
164 // Create the socket signature.
165 CFSocketSignature signature
;
166 signature.protocolFamily
= PF_INET
;
167 signature.socketType
= SOCK_STREAM
;
168 signature.protocol
= IPPROTO_TCP
;
169 signature.address
= (CFDataRef
)[NSData dataWithBytes
:&address length
:sizeof(address
)];
171 CFSocketContext context
= { 0 };
176 CFSocketCreateWithSocketSignature(kCFAllocatorDefault
,
177 &signature
, // Socket signature.
178 kCFSocketAcceptCallBack
, // Callback types.
179 &MessageQueueSocketAccept
, // Callback function.
180 &context
); // Context to pass to callout.
182 // Pump the run loop while waiting for the socket to be reusued. If told
183 // to quit while waiting, then break out of the loop.
184 if (CFRunLoopRunInMode(kCFRunLoopDefaultMode
, 1, FALSE) && _shouldQuit
)
186 NSLog(@
"Could not open socket");
187 //[connection_ errorEncountered:@"Could not open socket."];
191 // Allow old, yet-to-be recycled sockets to be reused.
193 setsockopt(CFSocketGetNative(_socket
), SOL_SOCKET
, SO_REUSEADDR
, &yes
, sizeof(int));
194 setsockopt(CFSocketGetNative(_socket
), SOL_SOCKET
, SO_REUSEPORT
, &yes
, sizeof(int));
196 // Schedule the socket on the run loop.
197 CFRunLoopSourceRef source
= CFSocketCreateRunLoopSource(kCFAllocatorDefault
, _socket
, 0);
198 CFRunLoopAddSource([_runLoop getCFRunLoop
], source
, kCFRunLoopCommonModes
);
202 - (void)disconnectClient
{
204 CFSocketInvalidate(_socket
);
210 CFReadStreamUnscheduleFromRunLoop(_readStream
, [_runLoop getCFRunLoop
], kCFRunLoopCommonModes
);
211 CFReadStreamClose(_readStream
);
212 CFRelease(_readStream
);
217 CFWriteStreamUnscheduleFromRunLoop(_writeStream
, [_runLoop getCFRunLoop
], kCFRunLoopCommonModes
);
218 CFWriteStreamClose(_writeStream
);
219 CFRelease(_writeStream
);
229 [_delegate messageQueueDidDisconnect
:self];
232 - (void)stopRunLoop
{
234 [self disconnectClient
];
235 CFRunLoopStop([_runLoop getCFRunLoop
]);
238 - (void)enqueueMessage
:(NSString
*)message
{
239 [_queue addObject
:message
];
240 [self dequeueAndSend
];
243 - (void)dequeueAndSend
{
247 if (!CFWriteStreamCanAcceptBytes(_writeStream
))
250 NSString
* message
= [_queue objectAtIndex
:0];
251 [self performSend
:message
];
252 [_queue removeObjectAtIndex
:0];
255 - (void)performSend
:(NSString
*)message
{
256 // TODO: May need to negotiate with the server as to the string encoding.
257 const NSStringEncoding kEncoding
= NSUTF8StringEncoding
;
258 // Add space for the NUL byte.
259 NSUInteger maxBufferSize
= [message maximumLengthOfBytesUsingEncoding
:kEncoding
] + 1;
261 UInt8
* buffer
= malloc(maxBufferSize
);
262 bzero(buffer
, maxBufferSize
);
264 NSUInteger bufferSize
= 0;
265 if (![message getBytes
:buffer
266 maxLength
:maxBufferSize
267 usedLength
:&bufferSize
270 range
:NSMakeRange(0, [message length
])
271 remainingRange
:NULL
]) {
276 // Include a NUL byte.
279 // Write the packet out, and spin in a busy wait loop if the stream is not ready. This
280 // method is only ever called in response to a stream ready event.
281 NSUInteger totalWritten
= 0;
282 while (totalWritten
< bufferSize
) {
283 CFIndex bytesWritten
= CFWriteStreamWrite(_writeStream
, buffer
+ totalWritten
, bufferSize
- totalWritten
);
284 if (bytesWritten
< 0) {
285 [self reportError
:CFWriteStreamCopyError(_writeStream
)];
288 totalWritten
+= bytesWritten
;
291 [_delegate messageQueue
:self didSendMessage
:message
];
296 - (void)readMessageFromStream
{
297 const NSUInteger kBufferSize
= 1024;
298 UInt8 buffer
[kBufferSize
];
299 CFIndex bufferOffset
= 0; // Starting point in |buffer| to work with.
300 CFIndex bytesRead
= CFReadStreamRead(_readStream
, buffer
, kBufferSize
);
301 const char* charBuffer
= (const char*)buffer
;
303 // The read loop works by going through the buffer until all the bytes have
305 while (bufferOffset
< bytesRead
) {
306 // Find the NUL separator, or the end of the string.
307 NSUInteger partLength
= 0;
308 for (CFIndex i
= bufferOffset
; i
< bytesRead
&& charBuffer
[i
] != '\0'; ++i
, ++partLength
) ;
310 // If there is not a current packet, set some state.
312 // Read the message header: the size. This will be |partLength| bytes.
313 _totalMessageSize
= atoi(charBuffer
+ bufferOffset
);
315 _message
= [[NSMutableString alloc
] initWithCapacity
:_totalMessageSize
];
316 bufferOffset
+= partLength
+ 1; // Pass over the NUL byte.
317 continue; // Spin the loop to begin reading actual data.
320 // Substring the byte stream and append it to the packet string.
321 CFStringRef bufferString
= CFStringCreateWithBytes(kCFAllocatorDefault
,
322 buffer
+ bufferOffset
, // Byte pointer, offset by start index.
323 partLength
, // Length.
324 kCFStringEncodingUTF8
,
326 [_message appendString
:(NSString
*)bufferString
];
327 CFRelease(bufferString
);
330 _messageSize
+= partLength
;
331 bufferOffset
+= partLength
+ 1;
333 // If this read finished the packet, handle it and reset.
334 if (_messageSize
>= _totalMessageSize
) {
335 [_delegate messageQueue
:self didReceiveMessage
:[_message autorelease
]];
338 // Process any outgoing messages.
339 [self dequeueAndSend
];
344 - (void)listenSocket
:(CFSocketRef
)socket acceptedSocket
:(CFSocketNativeHandle
)child
{
345 if (socket
!= _socket
) {
352 // Create the streams on the socket.
353 CFStreamCreatePairWithSocket(kCFAllocatorDefault
,
354 _child
, // Socket handle.
355 &_readStream
, // Read stream in-pointer.
356 &_writeStream
); // Write stream in-pointer.
358 // Create struct to register callbacks for the stream.
359 CFStreamClientContext context
= { 0 };
362 // Set the client of the read stream.
363 CFOptionFlags readFlags
= kCFStreamEventOpenCompleted |
364 kCFStreamEventHasBytesAvailable |
365 kCFStreamEventErrorOccurred |
366 kCFStreamEventEndEncountered
;
367 if (CFReadStreamSetClient(_readStream
, readFlags
, &MessageQueueReadEvent
, &context
))
368 // Schedule in run loop to do asynchronous communication with the engine.
369 CFReadStreamScheduleWithRunLoop(_readStream
, [_runLoop getCFRunLoop
], kCFRunLoopCommonModes
);
373 // Open the stream now that it's scheduled on the run loop.
374 if (!CFReadStreamOpen(_readStream
)) {
375 [self reportError
:CFReadStreamCopyError(_readStream
)];
379 // Set the client of the write stream.
380 CFOptionFlags writeFlags
= kCFStreamEventOpenCompleted |
381 kCFStreamEventCanAcceptBytes |
382 kCFStreamEventErrorOccurred |
383 kCFStreamEventEndEncountered
;
384 if (CFWriteStreamSetClient(_writeStream
, writeFlags
, &MessageQueueWriteEvent
, &context
))
385 // Schedule it in the run loop to receive error information.
386 CFWriteStreamScheduleWithRunLoop(_writeStream
, [_runLoop getCFRunLoop
], kCFRunLoopCommonModes
);
390 // Open the write stream.
391 if (!CFWriteStreamOpen(_writeStream
)) {
392 [self reportError
:CFWriteStreamCopyError(_writeStream
)];
397 [_delegate messageQueueDidConnect
:self];
399 CFSocketInvalidate(_socket
);
404 - (void)reportError
:(CFErrorRef
)error
406 [_delegate messageQueue
:self error
:(NSError
*)error
];
410 - (void)readStream
:(CFReadStreamRef
)stream handleEvent
:(CFStreamEventType
)event
412 assert(stream
== _readStream
);
415 case kCFStreamEventHasBytesAvailable
:
416 [self readMessageFromStream
];
419 case kCFStreamEventErrorOccurred
:
420 [self reportError
:CFReadStreamCopyError(stream
)];
424 case kCFStreamEventEndEncountered
:
434 - (void)writeStream
:(CFWriteStreamRef
)stream handleEvent
:(CFStreamEventType
)event
436 assert(stream
== _writeStream
);
438 case kCFStreamEventCanAcceptBytes
:
439 [self dequeueAndSend
];
442 case kCFStreamEventErrorOccurred
:
443 [self reportError
:CFWriteStreamCopyError(stream
)];
447 case kCFStreamEventEndEncountered
: