Jamoma API  0.6.0.a19
posix/UdpSocket.cpp
1 /*
2  oscpack -- Open Sound Control (OSC) packet manipulation library
3  http://www.rossbencina.com/code/oscpack
4 
5  Copyright (c) 2004-2013 Ross Bencina <rossb@audiomulch.com>
6 
7  Permission is hereby granted, free of charge, to any person obtaining
8  a copy of this software and associated documentation files
9  (the "Software"), to deal in the Software without restriction,
10  including without limitation the rights to use, copy, modify, merge,
11  publish, distribute, sublicense, and/or sell copies of the Software,
12  and to permit persons to whom the Software is furnished to do so,
13  subject to the following conditions:
14 
15  The above copyright notice and this permission notice shall be
16  included in all copies or substantial portions of the Software.
17 
18  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
21  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
22  ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
23  CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 */
26 
27 /*
28  The text above constitutes the entire oscpack license; however,
29  the oscpack developer(s) also make the following non-binding requests:
30 
31  Any person wishing to distribute modifications to the Software is
32  requested to send the modifications to the original developer so that
33  they can be incorporated into the canonical version. It is also
34  requested that these non-binding requests be included whenever the
35  above license is reproduced.
36 */
37 #include "ip/UdpSocket.h"
38 
39 #include <pthread.h>
40 #include <unistd.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <netdb.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/time.h>
47 #include <netinet/in.h> // for sockaddr_in
48 
49 #include <signal.h>
50 #include <math.h>
51 #include <errno.h>
52 #include <string.h>
53 
54 #include <algorithm>
55 #include <cassert>
56 #include <cstring> // for memset
57 #include <stdexcept>
58 #include <vector>
59 
60 #include "ip/PacketListener.h"
61 #include "ip/TimerListener.h"
62 
63 
64 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
65 // pre system 10.3 didn't have socklen_t
66 typedef ssize_t socklen_t;
67 #endif
68 
69 
70 static void SockaddrFromIpEndpointName( struct sockaddr_in& sockAddr, const IpEndpointName& endpoint )
71 {
72  std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
73  sockAddr.sin_family = AF_INET;
74 
75  sockAddr.sin_addr.s_addr =
76  (endpoint.address == IpEndpointName::ANY_ADDRESS)
77  ? INADDR_ANY
78  : htonl( endpoint.address );
79 
80  sockAddr.sin_port =
81  (endpoint.port == IpEndpointName::ANY_PORT)
82  ? 0
83  : htons( endpoint.port );
84 }
85 
86 
87 static IpEndpointName IpEndpointNameFromSockaddr( const struct sockaddr_in& sockAddr )
88 {
89  return IpEndpointName(
90  (sockAddr.sin_addr.s_addr == INADDR_ANY)
91  ? IpEndpointName::ANY_ADDRESS
92  : ntohl( sockAddr.sin_addr.s_addr ),
93  (sockAddr.sin_port == 0)
94  ? IpEndpointName::ANY_PORT
95  : ntohs( sockAddr.sin_port )
96  );
97 }
98 
99 
100 class UdpSocket::Implementation{
101  bool isBound_;
102  bool isConnected_;
103 
104  int socket_;
105  struct sockaddr_in connectedAddr_;
106  struct sockaddr_in sendToAddr_;
107 
108 public:
109 
110  Implementation()
111  : isBound_( false )
112  , isConnected_( false )
113  , socket_( -1 )
114  {
115  if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
116  throw std::runtime_error("unable to create udp socket\n");
117  }
118 
119  std::memset( &sendToAddr_, 0, sizeof(sendToAddr_) );
120  sendToAddr_.sin_family = AF_INET;
121  }
122 
123  ~Implementation()
124  {
125  if (socket_ != -1) close(socket_);
126  }
127 
128  void SetEnableBroadcast( bool enableBroadcast )
129  {
130  int broadcast = (enableBroadcast) ? 1 : 0; // int on posix
131  setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof(broadcast));
132  }
133 
134  void SetAllowReuse( bool allowReuse )
135  {
136  int reuseAddr = (allowReuse) ? 1 : 0; // int on posix
137  setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
138 
139 #ifdef __APPLE__
140  // needed also for OS X - enable multiple listeners for a single port on same network interface
141  int reusePort = (allowReuse) ? 1 : 0; // int on posix
142  setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort, sizeof(reusePort));
143 #endif
144  }
145 
146  IpEndpointName LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
147  {
148  assert( isBound_ );
149 
150  // first connect the socket to the remote server
151 
152  struct sockaddr_in connectSockAddr;
153  SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
154 
155  if (connect(socket_, (struct sockaddr *)&connectSockAddr, sizeof(connectSockAddr)) < 0) {
156  throw std::runtime_error("unable to connect udp socket\n");
157  }
158 
159  // get the address
160 
161  struct sockaddr_in sockAddr;
162  std::memset( (char *)&sockAddr, 0, sizeof(sockAddr ) );
163  socklen_t length = sizeof(sockAddr);
164  if (getsockname(socket_, (struct sockaddr *)&sockAddr, &length) < 0) {
165  throw std::runtime_error("unable to getsockname\n");
166  }
167 
168  if( isConnected_ ){
169  // reconnect to the connected address
170 
171  if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
172  throw std::runtime_error("unable to connect udp socket\n");
173  }
174 
175  }else{
176  // unconnect from the remote address
177 
178  struct sockaddr_in unconnectSockAddr;
179  std::memset( (char *)&unconnectSockAddr, 0, sizeof(unconnectSockAddr ) );
180  unconnectSockAddr.sin_family = AF_UNSPEC;
181  // address fields are zero
182  int connectResult = connect(socket_, (struct sockaddr *)&unconnectSockAddr, sizeof(unconnectSockAddr));
183  if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
184  throw std::runtime_error("unable to un-connect udp socket\n");
185  }
186  }
187 
188  return IpEndpointNameFromSockaddr( sockAddr );
189  }
190 
191  void Connect( const IpEndpointName& remoteEndpoint )
192  {
193  SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
194 
195  if (connect(socket_, (struct sockaddr *)&connectedAddr_, sizeof(connectedAddr_)) < 0) {
196  throw std::runtime_error("unable to connect udp socket\n");
197  }
198 
199  isConnected_ = true;
200  }
201 
202  void Send( const char *data, std::size_t size )
203  {
204  assert( isConnected_ );
205 
206  send( socket_, data, size, 0 );
207  }
208 
209  void SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
210  {
211  sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
212  sendToAddr_.sin_port = htons( remoteEndpoint.port );
213 
214  sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_, sizeof(sendToAddr_) );
215  }
216 
217  void Bind( const IpEndpointName& localEndpoint )
218  {
219  struct sockaddr_in bindSockAddr;
220  SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
221 
222  if (bind(socket_, (struct sockaddr *)&bindSockAddr, sizeof(bindSockAddr)) < 0) {
223  throw std::runtime_error("unable to bind udp socket\n");
224  }
225 
226  isBound_ = true;
227  }
228 
229  bool IsBound() const { return isBound_; }
230 
231  std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
232  {
233  assert( isBound_ );
234 
235  struct sockaddr_in fromAddr;
236  socklen_t fromAddrLen = sizeof(fromAddr);
237 
238  ssize_t result = recvfrom(socket_, data, size, 0,
239  (struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
240  if( result < 0 )
241  return 0;
242 
243  remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
244  remoteEndpoint.port = ntohs(fromAddr.sin_port);
245 
246  return (std::size_t)result;
247  }
248 
249  int Socket() { return socket_; }
250 };
251 
252 UdpSocket::UdpSocket()
253 {
254  impl_ = new Implementation();
255 }
256 
257 UdpSocket::~UdpSocket()
258 {
259  delete impl_;
260 }
261 
262 void UdpSocket::SetEnableBroadcast( bool enableBroadcast )
263 {
264  impl_->SetEnableBroadcast( enableBroadcast );
265 }
266 
267 void UdpSocket::SetAllowReuse( bool allowReuse )
268 {
269  impl_->SetAllowReuse( allowReuse );
270 }
271 
272 IpEndpointName UdpSocket::LocalEndpointFor( const IpEndpointName& remoteEndpoint ) const
273 {
274  return impl_->LocalEndpointFor( remoteEndpoint );
275 }
276 
277 void UdpSocket::Connect( const IpEndpointName& remoteEndpoint )
278 {
279  impl_->Connect( remoteEndpoint );
280 }
281 
282 void UdpSocket::Send( const char *data, std::size_t size )
283 {
284  impl_->Send( data, size );
285 }
286 
287 void UdpSocket::SendTo( const IpEndpointName& remoteEndpoint, const char *data, std::size_t size )
288 {
289  impl_->SendTo( remoteEndpoint, data, size );
290 }
291 
292 void UdpSocket::Bind( const IpEndpointName& localEndpoint )
293 {
294  impl_->Bind( localEndpoint );
295 }
296 
297 bool UdpSocket::IsBound() const
298 {
299  return impl_->IsBound();
300 }
301 
302 std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint, char *data, std::size_t size )
303 {
304  return impl_->ReceiveFrom( remoteEndpoint, data, size );
305 }
306 
307 
308 struct AttachedTimerListener{
309  AttachedTimerListener( int id, int p, TimerListener *tl )
310  : initialDelayMs( id )
311  , periodMs( p )
312  , listener( tl ) {}
313  int initialDelayMs;
314  int periodMs;
315  TimerListener *listener;
316 };
317 
318 
319 static bool CompareScheduledTimerCalls(
320  const std::pair< double, AttachedTimerListener > & lhs, const std::pair< double, AttachedTimerListener > & rhs )
321 {
322  return lhs.first < rhs.first;
323 }
324 
325 
326 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
327 
328 extern "C" /*static*/ void InterruptSignalHandler( int );
329 /*static*/ void InterruptSignalHandler( int )
330 {
331  multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
332  signal( SIGINT, SIG_DFL );
333 }
334 
335 
336 class SocketReceiveMultiplexer::Implementation{
337  std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
338  std::vector< AttachedTimerListener > timerListeners_;
339 
340  volatile bool break_;
341  int breakPipe_[2]; // [0] is the reader descriptor and [1] the writer
342 
343  double GetCurrentTimeMs() const
344  {
345  struct timeval t;
346 
347  gettimeofday( &t, 0 );
348 
349  return ((double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
350  }
351 
352 public:
353  Implementation()
354  {
355  if( pipe(breakPipe_) != 0 )
356  throw std::runtime_error( "creation of asynchronous break pipes failed\n" );
357  }
358 
359  ~Implementation()
360  {
361  close( breakPipe_[0] );
362  close( breakPipe_[1] );
363  }
364 
365  void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
366  {
367  assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
368  // we don't check that the same socket has been added multiple times, even though this is an error
369  socketListeners_.push_back( std::make_pair( listener, socket ) );
370  }
371 
372  void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
373  {
374  std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
375  std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
376  assert( i != socketListeners_.end() );
377 
378  socketListeners_.erase( i );
379  }
380 
381  void AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
382  {
383  timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
384  }
385 
386  void AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
387  {
388  timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
389  }
390 
391  void DetachPeriodicTimerListener( TimerListener *listener )
392  {
393  std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
394  while( i != timerListeners_.end() ){
395  if( i->listener == listener )
396  break;
397  ++i;
398  }
399 
400  assert( i != timerListeners_.end() );
401 
402  timerListeners_.erase( i );
403  }
404 
405  void Run()
406  {
407  break_ = false;
408  char *data = 0;
409 
410  try{
411 
412  // configure the master fd_set for select()
413 
414  fd_set masterfds, tempfds;
415  FD_ZERO( &masterfds );
416  FD_ZERO( &tempfds );
417 
418  // in addition to listening to the inbound sockets we
419  // also listen to the asynchronous break pipe, so that AsynchronousBreak()
420  // can break us out of select() from another thread.
421  FD_SET( breakPipe_[0], &masterfds );
422  int fdmax = breakPipe_[0];
423 
424  for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
425  i != socketListeners_.end(); ++i ){
426 
427  if( fdmax < i->second->impl_->Socket() )
428  fdmax = i->second->impl_->Socket();
429  FD_SET( i->second->impl_->Socket(), &masterfds );
430  }
431 
432 
433  // configure the timer queue
434  double currentTimeMs = GetCurrentTimeMs();
435 
436  // expiry time ms, listener
437  std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
438  for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
439  i != timerListeners_.end(); ++i )
440  timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
441  std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
442 
443  const int MAX_BUFFER_SIZE = 4098;
444  data = new char[ MAX_BUFFER_SIZE ];
445  IpEndpointName remoteEndpoint;
446 
447  struct timeval timeout;
448 
449  while( !break_ ){
450  tempfds = masterfds;
451 
452  struct timeval *timeoutPtr = 0;
453  if( !timerQueue_.empty() ){
454  double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
455  if( timeoutMs < 0 )
456  timeoutMs = 0;
457 
458  long timoutSecondsPart = (long)(timeoutMs * .001);
459  timeout.tv_sec = (time_t)timoutSecondsPart;
460  // 1000000 microseconds in a second
461  timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000);
462  timeoutPtr = &timeout;
463  }
464 
465  if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){
466  if( break_ ){
467  break;
468  }else if( errno == EINTR ){
469  // on returning an error, select() doesn't clear tempfds.
470  // so tempfds would remain all set, which would cause read( breakPipe_[0]...
471  // below to block indefinitely. therefore if select returns EINTR we restart
472  // the while() loop instead of continuing on to below.
473  continue;
474  }else{
475  throw std::runtime_error("select failed\n");
476  }
477  }
478 
479  if( FD_ISSET( breakPipe_[0], &tempfds ) ){
480  // clear pending data from the asynchronous break pipe
481  char c;
482  read( breakPipe_[0], &c, 1 );
483  }
484 
485  if( break_ )
486  break;
487 
488  for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
489  i != socketListeners_.end(); ++i ){
490 
491  if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
492 
493  std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
494  if( size > 0 ){
495  i->first->ProcessPacket( data, (int)size, remoteEndpoint );
496  if( break_ )
497  break;
498  }
499  }
500  }
501 
502  // execute any expired timers
503  currentTimeMs = GetCurrentTimeMs();
504  bool resort = false;
505  for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
506  i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
507 
508  i->second.listener->TimerExpired();
509  if( break_ )
510  break;
511 
512  i->first += i->second.periodMs;
513  resort = true;
514  }
515  if( resort )
516  std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
517  }
518 
519  delete [] data;
520  }catch(...){
521  if( data )
522  delete [] data;
523  throw;
524  }
525  }
526 
527  void Break()
528  {
529  break_ = true;
530  }
531 
532  void AsynchronousBreak()
533  {
534  break_ = true;
535 
536  // Send a termination message to the asynchronous break pipe, so select() will return
537  write( breakPipe_[1], "!", 1 );
538  }
539 };
540 
541 
542 
543 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
544 {
545  impl_ = new Implementation();
546 }
547 
548 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
549 {
550  delete impl_;
551 }
552 
553 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
554 {
555  impl_->AttachSocketListener( socket, listener );
556 }
557 
558 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
559 {
560  impl_->DetachSocketListener( socket, listener );
561 }
562 
563 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int periodMilliseconds, TimerListener *listener )
564 {
565  impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
566 }
567 
568 void SocketReceiveMultiplexer::AttachPeriodicTimerListener( int initialDelayMilliseconds, int periodMilliseconds, TimerListener *listener )
569 {
570  impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
571 }
572 
573 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
574 {
575  impl_->DetachPeriodicTimerListener( listener );
576 }
577 
578 void SocketReceiveMultiplexer::Run()
579 {
580  impl_->Run();
581 }
582 
583 void SocketReceiveMultiplexer::RunUntilSigInt()
584 {
585  assert( multiplexerInstanceToAbortWithSigInt_ == 0 ); /* at present we support only one multiplexer instance running until sig int */
586  multiplexerInstanceToAbortWithSigInt_ = this;
587  signal( SIGINT, InterruptSignalHandler );
588  impl_->Run();
589  signal( SIGINT, SIG_DFL );
590  multiplexerInstanceToAbortWithSigInt_ = 0;
591 }
592 
593 void SocketReceiveMultiplexer::Break()
594 {
595  impl_->Break();
596 }
597 
598 void SocketReceiveMultiplexer::AsynchronousBreak()
599 {
600  impl_->AsynchronousBreak();
601 }
602