40 #include "../ExecutionEngine.hpp" 41 #include "../base/TaskCore.hpp" 42 #include "../Logger.hpp" 50 #define pipe(X) _pipe((X), 1024, _O_BINARY) 56 #include <sys/select.h> 63 #include <boost/cstdint.hpp> 66 using namespace extras;
68 const char FileDescriptorActivity::CMD_ANY_COMMAND;
84 , m_has_timeout(false)
87 , m_update_sets(false)
91 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
104 :
Activity(scheduler, priority, 0.0, _r, name)
109 , m_has_timeout(false)
110 , m_break_loop(false)
112 , m_update_sets(false)
116 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
120 :
Activity(scheduler, priority, 0.0, _r, name)
123 , m_period(period >= 0.0 ? period : 0.0)
125 , m_has_timeout(false)
126 , m_break_loop(false)
128 , m_update_sets(false)
132 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
136 :
Activity(scheduler, priority, 0.0, cpu_affinity, _r, name)
139 , m_period(period >= 0.0 ? period : 0.0)
141 , m_has_timeout(false)
142 , m_break_loop(false)
144 , m_update_sets(false)
148 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
170 {
return m_timeout_us / 1000; }
172 {
return m_timeout_us; }
181 m_timeout_us = timeout_us;
185 log(
Error) <<
"Ignoring invalid timeout (" << timeout_us <<
")" << endlog();
192 log(
Error) <<
"negative file descriptor given to FileDescriptorActivity::watch" << endlog();
196 m_watched_fds.insert(fd);
197 FD_SET(fd, &m_fd_set);
202 m_watched_fds.erase(fd);
203 FD_CLR(fd, &m_fd_set);
208 m_watched_fds.clear();
212 void FileDescriptorActivity::triggerUpdateSets()
215 m_update_sets =
true;
217 int unused; (void)unused;
218 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
221 {
return FD_ISSET(fd, &m_fd_work); }
223 {
return m_has_error; }
225 {
return m_has_timeout; }
228 return FD_ISSET(fd, &m_fd_set); }
235 if (pipe(m_interrupt_pipe) == -1)
237 log(
Error) <<
"FileDescriptorActivity: cannot create control pipe" << endlog();
244 if ((flags = fcntl(m_interrupt_pipe[0], F_GETFL, 0)) == -1 ||
245 fcntl(m_interrupt_pipe[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
246 (flags = fcntl(m_interrupt_pipe[1], F_GETFL, 0)) == -1 ||
247 fcntl(m_interrupt_pipe[1], F_SETFL, flags | O_NONBLOCK) == -1)
249 close(m_interrupt_pipe[0]);
250 close(m_interrupt_pipe[1]);
251 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
252 log(
Error) <<
"FileDescriptorActivity: could not set the control pipe to non-blocking mode" << endlog();
258 m_break_loop =
false;
260 m_update_sets =
false;
264 close(m_interrupt_pipe[0]);
265 close(m_interrupt_pipe[1]);
266 m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
267 log(
Error) <<
"FileDescriptorActivity: Activity::start() failed" << endlog();
279 int unused; (void)unused;
280 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
299 int pipe = m_interrupt_pipe[0];
300 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
301 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
307 if (m_watched_fds.empty())
310 max_fd = std::max(pipe, *m_watched_fds.rbegin());
312 m_fd_work = m_fd_set;
314 FD_SET(pipe, &m_fd_work);
318 if (m_timeout_us == 0)
320 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
324 static const int USECS_PER_SEC = 1000000;
325 timeval timeout = { m_timeout_us / USECS_PER_SEC,
326 m_timeout_us % USECS_PER_SEC};
327 ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
331 m_has_timeout =
false;
334 log(
Error) <<
"FileDescriptorActivity: error in select(), errno = " << errno << endlog();
339 log(
Error) <<
"FileDescriptorActivity: timeout in select()" << endlog();
340 m_has_timeout =
true;
344 if (ret > 0 && FD_ISSET(pipe, &m_fd_work))
353 int unused; (void)unused;
354 unused = read(pipe, &dummy, 1);
357 FD_ZERO(&watch_pipe);
358 FD_SET(pipe, &watch_pipe);
362 while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
366 bool do_trigger =
true;
374 m_update_sets =
false;
378 m_break_loop =
false;
405 int unused; (void)unused;
406 unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
428 fd_watch watch_pipe_0(m_interrupt_pipe[0]);
429 fd_watch watch_pipe_1(m_interrupt_pipe[1]);
virtual bool stop()
Stop the activity This will stop the activity by removing it from the 'run-queue' of a thread or call...
double Seconds
Seconds are stored as a double precision float.
A class for running a certain piece of code in a thread.
virtual void step()=0
The method that will be periodically executed when this class is run in a periodic thread...
RunnableInterface * runner
virtual bool isRunning() const
Query if the activity is initialized and executing.
An Activity is an object that represents a thread.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
virtual bool start()
Start the activity.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
virtual bool isActive() const
Query if the activity is started.