40 #include "../ExecutionEngine.hpp" 41 #include "../base/TaskCore.hpp" 42 #include "../Logger.hpp" 50 #define pipe(X) _pipe((X), 1024, _O_BINARY) 56 #include <sys/select.h> 63 #include <boost/cstdint.hpp> 66 using namespace extras;
68 const char FileDescriptorActivity::CMD_ANY_COMMAND;
84 , m_has_timeout(false)
87 , m_update_sets(false)
91 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
104 :
Activity(scheduler, priority, 0.0, _r, name)
109 , m_has_timeout(false)
110 , m_break_loop(false)
112 , m_update_sets(false)
116 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
120 :
Activity(scheduler, priority, 0.0, _r, name)
123 , m_period(period >= 0.0 ? period : 0.0)
125 , m_has_timeout(false)
126 , m_break_loop(false)
128 , m_update_sets(false)
132 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
136 :
Activity(scheduler, priority, 0.0, cpu_affinity, _r, name)
139 , m_period(period >= 0.0 ? period : 0.0)
141 , m_has_timeout(false)
142 , m_break_loop(false)
144 , m_update_sets(false)
148 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
170 {
return m_timeout_us / 1000; }
172 {
return m_timeout_us; }
181 m_timeout_us = timeout_us;
185 log(
Error) <<
"Ignoring invalid timeout (" << timeout_us <<
")" << endlog();
192 log(
Error) <<
"negative file descriptor given to FileDescriptorActivity::watch" << endlog();
196 m_watched_fds.insert(fd);
197 FD_SET(fd, &m_fd_set);
202 m_watched_fds.erase(fd);
203 FD_CLR(fd, &m_fd_set);
208 m_watched_fds.clear();
212 void FileDescriptorActivity::triggerUpdateSets()
215 m_update_sets =
true;
217 int unused; (void)unused;
218 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
221 {
return FD_ISSET(fd, &m_fd_work); }
223 {
return m_has_error; }
225 {
return m_has_timeout; }
228 return FD_ISSET(fd, &m_fd_set); }
235 if (pipe(m_interrupt_pipe) == -1)
237 log(
Error) <<
"FileDescriptorActivity: cannot create control pipe" << endlog();
244 if ((flags = fcntl(m_interrupt_pipe[0], F_GETFL, 0)) == -1 ||
245 fcntl(m_interrupt_pipe[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
246 (flags = fcntl(m_interrupt_pipe[1], F_GETFL, 0)) == -1 ||
247 fcntl(m_interrupt_pipe[1], F_SETFL, flags | O_NONBLOCK) == -1)
249 close(m_interrupt_pipe[0]);
250 close(m_interrupt_pipe[1]);
251 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
252 log(
Error) <<
"FileDescriptorActivity: could not set the control pipe to non-blocking mode" << endlog();
258 m_break_loop =
false;
260 m_update_sets =
false;
264 close(m_interrupt_pipe[0]);
265 close(m_interrupt_pipe[1]);
266 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
267 log(
Error) <<
"FileDescriptorActivity: Activity::start() failed" << endlog();
279 int unused; (void)unused;
280 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
305 int pipe = m_interrupt_pipe[0];
306 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
307 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
313 if (m_watched_fds.empty())
316 max_fd = std::max(pipe, *m_watched_fds.rbegin());
318 m_fd_work = m_fd_set;
320 FD_SET(pipe, &m_fd_work);
324 if (m_timeout_us == 0)
326 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
330 static const int USECS_PER_SEC = 1000000;
331 timeval
timeout = { m_timeout_us / USECS_PER_SEC,
332 m_timeout_us % USECS_PER_SEC};
333 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
337 m_has_timeout =
false;
340 log(
Error) <<
"FileDescriptorActivity: error in select(), errno = " << errno << endlog();
346 m_has_timeout =
true;
350 if (ret > 0 && FD_ISSET(pipe, &m_fd_work))
359 int unused; (void)unused;
360 unused = read(pipe, &dummy, 1);
363 FD_ZERO(&watch_pipe);
364 FD_SET(pipe, &watch_pipe);
368 while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
372 bool do_trigger =
true;
373 bool user_trigger =
false;
382 m_update_sets =
false;
386 m_break_loop =
false;
399 else if ( user_trigger )
419 int unused; (void)unused;
420 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
450 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
451 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
virtual bool stop()
Stop the activity This will stop the activity by removing it from the 'run-queue' of a thread or call...
double Seconds
Seconds are stored as a double precision float.
A class for running a certain piece of code in a thread.
virtual void step()=0
The method that will be (periodically) executed when this object is run in an Activity.
RunnableInterface * runner
virtual bool isRunning() const
Query if the activity is initialized and executing.
An Activity executes a RunnableInterface object in a (periodic) thread.
NANO_TIME period
The period as it is passed to the operating system.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
virtual bool start()
Start the activity.
virtual void work(WorkReason reason)
Identical to step() but gives a reason why the function was called.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
virtual bool isActive() const
Query if the activity is started.