37 #include "ip/UdpSocket.h"
44 #include <sys/types.h>
45 #include <sys/socket.h>
47 #include <netinet/in.h>
60 #include "ip/PacketListener.h"
61 #include "ip/TimerListener.h"
64 #if defined(__APPLE__) && !defined(_SOCKLEN_T)
66 typedef ssize_t socklen_t;
70 static void SockaddrFromIpEndpointName(
struct sockaddr_in& sockAddr,
const IpEndpointName& endpoint )
72 std::memset( (
char *)&sockAddr, 0,
sizeof(sockAddr ) );
73 sockAddr.sin_family = AF_INET;
75 sockAddr.sin_addr.s_addr =
76 (endpoint.address == IpEndpointName::ANY_ADDRESS)
78 : htonl( endpoint.address );
81 (endpoint.port == IpEndpointName::ANY_PORT)
83 : htons( endpoint.port );
87 static IpEndpointName IpEndpointNameFromSockaddr(
const struct sockaddr_in& sockAddr )
89 return IpEndpointName(
90 (sockAddr.sin_addr.s_addr == INADDR_ANY)
91 ? IpEndpointName::ANY_ADDRESS
92 : ntohl( sockAddr.sin_addr.s_addr ),
93 (sockAddr.sin_port == 0)
94 ? IpEndpointName::ANY_PORT
95 : ntohs( sockAddr.sin_port )
100 class UdpSocket::Implementation{
105 struct sockaddr_in connectedAddr_;
106 struct sockaddr_in sendToAddr_;
112 , isConnected_( false )
115 if( (socket_ = socket( AF_INET, SOCK_DGRAM, 0 )) == -1 ){
116 throw std::runtime_error(
"unable to create udp socket\n");
119 std::memset( &sendToAddr_, 0,
sizeof(sendToAddr_) );
120 sendToAddr_.sin_family = AF_INET;
125 if (socket_ != -1) close(socket_);
128 void SetEnableBroadcast(
bool enableBroadcast )
130 int broadcast = (enableBroadcast) ? 1 : 0;
131 setsockopt(socket_, SOL_SOCKET, SO_BROADCAST, &broadcast,
sizeof(broadcast));
134 void SetAllowReuse(
bool allowReuse )
136 int reuseAddr = (allowReuse) ? 1 : 0;
137 setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuseAddr,
sizeof(reuseAddr));
141 int reusePort = (allowReuse) ? 1 : 0;
142 setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reusePort,
sizeof(reusePort));
146 IpEndpointName LocalEndpointFor(
const IpEndpointName& remoteEndpoint )
const
152 struct sockaddr_in connectSockAddr;
153 SockaddrFromIpEndpointName( connectSockAddr, remoteEndpoint );
155 if (connect(socket_, (
struct sockaddr *)&connectSockAddr,
sizeof(connectSockAddr)) < 0) {
156 throw std::runtime_error(
"unable to connect udp socket\n");
161 struct sockaddr_in sockAddr;
162 std::memset( (
char *)&sockAddr, 0,
sizeof(sockAddr ) );
163 socklen_t length =
sizeof(sockAddr);
164 if (getsockname(socket_, (
struct sockaddr *)&sockAddr, &length) < 0) {
165 throw std::runtime_error(
"unable to getsockname\n");
171 if (connect(socket_, (
struct sockaddr *)&connectedAddr_,
sizeof(connectedAddr_)) < 0) {
172 throw std::runtime_error(
"unable to connect udp socket\n");
178 struct sockaddr_in unconnectSockAddr;
179 std::memset( (
char *)&unconnectSockAddr, 0,
sizeof(unconnectSockAddr ) );
180 unconnectSockAddr.sin_family = AF_UNSPEC;
182 int connectResult = connect(socket_, (
struct sockaddr *)&unconnectSockAddr,
sizeof(unconnectSockAddr));
183 if ( connectResult < 0 && errno != EAFNOSUPPORT ) {
184 throw std::runtime_error(
"unable to un-connect udp socket\n");
188 return IpEndpointNameFromSockaddr( sockAddr );
191 void Connect(
const IpEndpointName& remoteEndpoint )
193 SockaddrFromIpEndpointName( connectedAddr_, remoteEndpoint );
195 if (connect(socket_, (
struct sockaddr *)&connectedAddr_,
sizeof(connectedAddr_)) < 0) {
196 throw std::runtime_error(
"unable to connect udp socket\n");
202 void Send(
const char *data, std::size_t size )
204 assert( isConnected_ );
206 send( socket_, data, size, 0 );
209 void SendTo(
const IpEndpointName& remoteEndpoint,
const char *data, std::size_t size )
211 sendToAddr_.sin_addr.s_addr = htonl( remoteEndpoint.address );
212 sendToAddr_.sin_port = htons( remoteEndpoint.port );
214 sendto( socket_, data, size, 0, (sockaddr*)&sendToAddr_,
sizeof(sendToAddr_) );
217 void Bind(
const IpEndpointName& localEndpoint )
219 struct sockaddr_in bindSockAddr;
220 SockaddrFromIpEndpointName( bindSockAddr, localEndpoint );
222 if (bind(socket_, (
struct sockaddr *)&bindSockAddr,
sizeof(bindSockAddr)) < 0) {
223 throw std::runtime_error(
"unable to bind udp socket\n");
229 bool IsBound()
const {
return isBound_; }
231 std::size_t ReceiveFrom( IpEndpointName& remoteEndpoint,
char *data, std::size_t size )
235 struct sockaddr_in fromAddr;
236 socklen_t fromAddrLen =
sizeof(fromAddr);
238 ssize_t result = recvfrom(socket_, data, size, 0,
239 (
struct sockaddr *) &fromAddr, (socklen_t*)&fromAddrLen);
243 remoteEndpoint.address = ntohl(fromAddr.sin_addr.s_addr);
244 remoteEndpoint.port = ntohs(fromAddr.sin_port);
246 return (std::size_t)result;
249 int Socket() {
return socket_; }
252 UdpSocket::UdpSocket()
254 impl_ =
new Implementation();
257 UdpSocket::~UdpSocket()
262 void UdpSocket::SetEnableBroadcast(
bool enableBroadcast )
264 impl_->SetEnableBroadcast( enableBroadcast );
267 void UdpSocket::SetAllowReuse(
bool allowReuse )
269 impl_->SetAllowReuse( allowReuse );
272 IpEndpointName UdpSocket::LocalEndpointFor(
const IpEndpointName& remoteEndpoint )
const
274 return impl_->LocalEndpointFor( remoteEndpoint );
277 void UdpSocket::Connect(
const IpEndpointName& remoteEndpoint )
279 impl_->Connect( remoteEndpoint );
282 void UdpSocket::Send(
const char *data, std::size_t size )
284 impl_->Send( data, size );
287 void UdpSocket::SendTo(
const IpEndpointName& remoteEndpoint,
const char *data, std::size_t size )
289 impl_->SendTo( remoteEndpoint, data, size );
292 void UdpSocket::Bind(
const IpEndpointName& localEndpoint )
294 impl_->Bind( localEndpoint );
297 bool UdpSocket::IsBound()
const
299 return impl_->IsBound();
302 std::size_t UdpSocket::ReceiveFrom( IpEndpointName& remoteEndpoint,
char *data, std::size_t size )
304 return impl_->ReceiveFrom( remoteEndpoint, data, size );
308 struct AttachedTimerListener{
309 AttachedTimerListener(
int id,
int p, TimerListener *tl )
310 : initialDelayMs( id )
315 TimerListener *listener;
319 static bool CompareScheduledTimerCalls(
320 const std::pair< double, AttachedTimerListener > & lhs,
const std::pair< double, AttachedTimerListener > & rhs )
322 return lhs.first < rhs.first;
326 SocketReceiveMultiplexer *multiplexerInstanceToAbortWithSigInt_ = 0;
328 extern "C" void InterruptSignalHandler(
int );
329 void InterruptSignalHandler(
int )
331 multiplexerInstanceToAbortWithSigInt_->AsynchronousBreak();
332 signal( SIGINT, SIG_DFL );
336 class SocketReceiveMultiplexer::Implementation{
337 std::vector< std::pair< PacketListener*, UdpSocket* > > socketListeners_;
338 std::vector< AttachedTimerListener > timerListeners_;
340 volatile bool break_;
343 double GetCurrentTimeMs()
const
347 gettimeofday( &t, 0 );
349 return ((
double)t.tv_sec*1000.) + ((double)t.tv_usec / 1000.);
355 if( pipe(breakPipe_) != 0 )
356 throw std::runtime_error(
"creation of asynchronous break pipes failed\n" );
361 close( breakPipe_[0] );
362 close( breakPipe_[1] );
365 void AttachSocketListener( UdpSocket *socket, PacketListener *listener )
367 assert( std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) ) == socketListeners_.end() );
369 socketListeners_.push_back( std::make_pair( listener, socket ) );
372 void DetachSocketListener( UdpSocket *socket, PacketListener *listener )
374 std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i =
375 std::find( socketListeners_.begin(), socketListeners_.end(), std::make_pair(listener, socket) );
376 assert( i != socketListeners_.end() );
378 socketListeners_.erase( i );
381 void AttachPeriodicTimerListener(
int periodMilliseconds, TimerListener *listener )
383 timerListeners_.push_back( AttachedTimerListener( periodMilliseconds, periodMilliseconds, listener ) );
386 void AttachPeriodicTimerListener(
int initialDelayMilliseconds,
int periodMilliseconds, TimerListener *listener )
388 timerListeners_.push_back( AttachedTimerListener( initialDelayMilliseconds, periodMilliseconds, listener ) );
391 void DetachPeriodicTimerListener( TimerListener *listener )
393 std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
394 while( i != timerListeners_.end() ){
395 if( i->listener == listener )
400 assert( i != timerListeners_.end() );
402 timerListeners_.erase( i );
414 fd_set masterfds, tempfds;
415 FD_ZERO( &masterfds );
421 FD_SET( breakPipe_[0], &masterfds );
422 int fdmax = breakPipe_[0];
424 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
425 i != socketListeners_.end(); ++i ){
427 if( fdmax < i->second->impl_->Socket() )
428 fdmax = i->second->impl_->Socket();
429 FD_SET( i->second->impl_->Socket(), &masterfds );
434 double currentTimeMs = GetCurrentTimeMs();
437 std::vector< std::pair< double, AttachedTimerListener > > timerQueue_;
438 for( std::vector< AttachedTimerListener >::iterator i = timerListeners_.begin();
439 i != timerListeners_.end(); ++i )
440 timerQueue_.push_back( std::make_pair( currentTimeMs + i->initialDelayMs, *i ) );
441 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
443 const int MAX_BUFFER_SIZE = 4098;
444 data =
new char[ MAX_BUFFER_SIZE ];
445 IpEndpointName remoteEndpoint;
447 struct timeval timeout;
452 struct timeval *timeoutPtr = 0;
453 if( !timerQueue_.empty() ){
454 double timeoutMs = timerQueue_.front().first - GetCurrentTimeMs();
458 long timoutSecondsPart = (long)(timeoutMs * .001);
459 timeout.tv_sec = (time_t)timoutSecondsPart;
461 timeout.tv_usec = (suseconds_t)((timeoutMs - (timoutSecondsPart * 1000)) * 1000);
462 timeoutPtr = &timeout;
465 if( select( fdmax + 1, &tempfds, 0, 0, timeoutPtr ) < 0 ){
468 }
else if( errno == EINTR ){
475 throw std::runtime_error(
"select failed\n");
479 if( FD_ISSET( breakPipe_[0], &tempfds ) ){
482 read( breakPipe_[0], &c, 1 );
488 for( std::vector< std::pair< PacketListener*, UdpSocket* > >::iterator i = socketListeners_.begin();
489 i != socketListeners_.end(); ++i ){
491 if( FD_ISSET( i->second->impl_->Socket(), &tempfds ) ){
493 std::size_t size = i->second->ReceiveFrom( remoteEndpoint, data, MAX_BUFFER_SIZE );
495 i->first->ProcessPacket( data, (
int)size, remoteEndpoint );
503 currentTimeMs = GetCurrentTimeMs();
505 for( std::vector< std::pair< double, AttachedTimerListener > >::iterator i = timerQueue_.begin();
506 i != timerQueue_.end() && i->first <= currentTimeMs; ++i ){
508 i->second.listener->TimerExpired();
512 i->first += i->second.periodMs;
516 std::sort( timerQueue_.begin(), timerQueue_.end(), CompareScheduledTimerCalls );
532 void AsynchronousBreak()
537 write( breakPipe_[1],
"!", 1 );
543 SocketReceiveMultiplexer::SocketReceiveMultiplexer()
545 impl_ =
new Implementation();
548 SocketReceiveMultiplexer::~SocketReceiveMultiplexer()
553 void SocketReceiveMultiplexer::AttachSocketListener( UdpSocket *socket, PacketListener *listener )
555 impl_->AttachSocketListener( socket, listener );
558 void SocketReceiveMultiplexer::DetachSocketListener( UdpSocket *socket, PacketListener *listener )
560 impl_->DetachSocketListener( socket, listener );
563 void SocketReceiveMultiplexer::AttachPeriodicTimerListener(
int periodMilliseconds, TimerListener *listener )
565 impl_->AttachPeriodicTimerListener( periodMilliseconds, listener );
568 void SocketReceiveMultiplexer::AttachPeriodicTimerListener(
int initialDelayMilliseconds,
int periodMilliseconds, TimerListener *listener )
570 impl_->AttachPeriodicTimerListener( initialDelayMilliseconds, periodMilliseconds, listener );
573 void SocketReceiveMultiplexer::DetachPeriodicTimerListener( TimerListener *listener )
575 impl_->DetachPeriodicTimerListener( listener );
578 void SocketReceiveMultiplexer::Run()
583 void SocketReceiveMultiplexer::RunUntilSigInt()
585 assert( multiplexerInstanceToAbortWithSigInt_ == 0 );
586 multiplexerInstanceToAbortWithSigInt_ =
this;
587 signal( SIGINT, InterruptSignalHandler );
589 signal( SIGINT, SIG_DFL );
590 multiplexerInstanceToAbortWithSigInt_ = 0;
593 void SocketReceiveMultiplexer::Break()
598 void SocketReceiveMultiplexer::AsynchronousBreak()
600 impl_->AsynchronousBreak();