42 #include <sys/types.h> 48 #include <boost/algorithm/string.hpp> 51 #include "../../types/TypeTransporter.hpp" 52 #include "../../types/TypeMarshaller.hpp" 53 #include "../../Logger.hpp" 55 #include "../../base/PortInterface.hpp" 56 #include "../../DataFlowInterface.hpp" 57 #include "../../TaskContext.hpp" 65 mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
82 throw std::runtime_error(
"MQ name_id not set, and the port is either not attached to a task, or said task has no name. Cannot create a reasonably unique MQ name automatically");
84 std::stringstream name_stream;
86 std::string name = name_stream.str();
87 boost::algorithm::replace_all(name,
"/",
"_");
92 mattr.mq_maxmsg = policy.
size ? policy.
size : 10;
96 throw std::runtime_error(
"Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
98 throw std::runtime_error(
"Could not open message queue with zero message size.");
101 oflag |= O_WRONLY | O_NONBLOCK;
104 mqdes = mq_open(policy.
name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
108 int the_error = errno;
109 log(
Error) <<
"FAILED opening '" << policy.
name_id <<
"' with message size " << mattr.mq_msgsize <<
", buffer size " << mattr.mq_maxmsg <<
" for " 110 << (is_sender ?
"writing :" :
"reading :") << endlog();
115 log(
Error) <<
"The queue exists, but the caller does not have permission to open it in the specified mode." << endlog();
119 log(
Error) <<
"Wrong mqueue name given OR, In a process that is unprivileged (does not have the " 120 <<
"CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, " 121 <<
"attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.)" << endlog();
124 log(
Error) <<
"The process already has the maximum number of files and message queues open." << endlog();
127 log(
Error) <<
"Name was too long." << endlog();
130 log(
Error) <<
"The system limit on the total number of open files and message queues has been reached." << endlog();
134 <<
"Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7)." 138 log(
Error) <<
"Insufficient memory." << endlog();
141 log(
Error) <<
"Submit a bug report. An unexpected mq error occured with errno=" << errno <<
": " << strerror(errno) << endlog();
143 throw std::runtime_error(
"Could not open message queue: mq_open returned -1.");
146 log(
Debug) <<
"Opened '" << policy.
name_id <<
"' with mqdes='" <<
mqdes <<
"', msg size='"<<mattr.mq_msgsize<<
"' an queue length='"<<mattr.mq_maxmsg<<
"' for " << (is_sender ?
"writing." :
"reading.") << endlog();
172 mq_unlink(
mqname.c_str());
209 struct timespec abs_timeout;
212 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
213 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
227 log(
Error) <<
"Failed to initialize MQ Channel Element with initial data sample." << endlog();
233 log(
Error) <<
"Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog();
249 struct timespec abs_timeout;
252 abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
253 abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
261 log(
Error) <<
"Failed to read from MQ Channel Element: no data received within 500ms!" <<endlog();
276 log(
Error) <<
"MQChannel: failed to marshal sample" << endlog();
280 char* lbuf = (
char*) blob.first;
281 if (mq_send(
mqdes, lbuf, blob.second, 0) == -1)
286 log(
Error) <<
"MQChannel "<<
mqdes <<
" became invalid (mq length="<<
max_size<<
", msg length="<<blob.second<<
"): " << strerror(errno) << endlog();
virtual bool updateFromBlob(const void *blob, int size, base::DataSourceBase::shared_ptr target, void *cookie=0) const =0
Update target with the contents of blob which is an object of a protocol.
types::TypeMarshaller const & mtransport
Transport marshaller used for size calculations and data updates.
virtual void * createCookie() const
Overload in subclasses for marshallers that need to allocate some internal data.
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in md...
bool mqRead(base::DataSourceBase::shared_ptr ds)
Read from the message queue.
void * marshaller_cookie
A private blob that is returned by mtransport.getCookie().
bool mis_sender
True if this object is a sender.
int data_size
Suggest the payload size of the data sent over this channel.
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
std::string mqname
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated wh...
int max_size
The size of buf.
const std::string & getName() const
Get the name of this Port.
A connection policy object describes how a given connection should behave.
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Write to the message queue.
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it's size...
int size
If the connection is a buffered connection, the size of the buffer.
DataFlowInterface * getInterface() const
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
Convenient short notation for every sub-namespace of RTT.
Objects implementing this interface have the capability to convert data sources to and from a binary ...
int mdata_size
The size of the data, as specified in the ConnPolicy when creating the stream, or calculated using th...
char * buf
Send/Receive buffer.
virtual void deleteCookie(void *cookie) const
Called to delete a cookie created with createCookie.
virtual std::pair< void const *, int > fillBlob(base::DataSourceBase::shared_ptr source, void *blob, int size, void *cookie=0) const =0
Create an transportable object for a protocol which contains the value of source. ...
bool minit_done
True if setupStream() was called, false after cleanupStream().
MQSendRecv(types::TypeMarshaller const &transport)
Create a channel element for remote data exchange.
Notify the Logger in which 'module' the message occured.
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
nsecs Seconds_to_nsecs(const Seconds s)
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
mqd_t mqdes
MQueue file descriptor.
std::string name_id
The name of this connection.
virtual const std::string & getName() const
Returns the name of this TaskContext.
TaskContext * getOwner() const
Returns the component this interface belongs to.
static Dispatcher::shared_ptr Instance()