40 #include "../../os/MutexLock.hpp" 41 #include "../../Activity.hpp" 42 #include "../../base/ChannelElementBase.hpp" 43 #include "../../Logger.hpp" 45 #include <sys/select.h> 48 namespace RTT {
namespace mqueue {
class Dispatcher; } }
69 typedef std::map<mqd_t,base::ChannelElementBase*> MQMap;
82 highsock(0), do_exit(
false)
87 log(
Info) <<
"Dispacher cleans up: no more work."<<endlog();
92 void build_select_list() {
109 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
110 FD_SET( it->first, &socks);
111 if (
int(it->first) > highsock)
112 highsock = int(it->first);
123 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
124 if ( FD_ISSET( it->first, &socks) ) {
126 it->second->signal();
135 if ( DispatchI == 0) {
145 log(
Error) <<
"Invalid mqd_t given to MQueue Dispatcher." <<endlog();
148 log(
Debug) <<
"Dispatcher is monitoring mqdes "<< mqdes <<endlog();
151 if (mqmap.count(mqdes) == 0)
158 log(
Debug) <<
"Dispatcher drops mqdes "<< mqdes <<endlog();
160 if (mqmap.count(mqdes)) {
161 mqmap.erase( mqmap.find(mqdes) );
172 struct timeval timeout;
177 timeout.tv_usec = 50000;
182 readsocks = select(highsock+1, &socks, (fd_set *) 0,
183 (fd_set *) 0, &timeout);
197 log(
Error) <<
"Dispatcher failed to select on message queues. Stopped thread. error: "<<strerror(errno)<<endlog();
201 else if (readsocks == 0) {
C++ abstraction of atomic integer operations.
void intrusive_ptr_release(const RTT::mqueue::Dispatcher *p)
boost::intrusive_ptr< Dispatcher > shared_ptr
void removeQueue(mqd_t mqdes)
const int HighestPriority
An integer denoting the highest priority of the selected OS.
void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher *p)
This object waits on a set of open message queue file descriptors and signals the channel that has re...
An Activity is an object that represents a thread.
void addQueue(mqd_t mqdes, base::ChannelElementBase *chan)
Notify the Logger in which 'module' the message occured.
An object oriented wrapper around a non recursive mutex.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
virtual bool start()
Start the activity.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
static Dispatcher::shared_ptr Instance()