39 #ifndef ORO_CONN_FACTORY_HPP 40 #define ORO_CONN_FACTORY_HPP 47 #include "../base/PortInterface.hpp" 48 #include "../base/InputPortInterface.hpp" 49 #include "../base/OutputPortInterface.hpp" 50 #include "../DataFlowInterface.hpp" 52 #include "../base/DataObject.hpp" 53 #include "../base/DataObjectUnSync.hpp" 54 #include "../base/Buffer.hpp" 55 #include "../base/BufferUnSync.hpp" 56 #include "../Logger.hpp" 58 #include "../rtt-config.h" 164 #ifndef OROBLD_OS_NO_ASM 170 RTT::log(
Warning) <<
"lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
186 #ifndef OROBLD_OS_NO_ASM 192 RTT::log(
Warning) <<
"lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
225 bool pull = policy.
pull;
237 log(
Error) <<
"You tried to create a shared output buffer connection for output port " << port.
getName() <<
", " 238 <<
"but the port already has at least one incompatible outgoing connection." << endlog();
252 (buffer_policy.type != policy.
type) ||
253 (buffer_policy.size != policy.
size) ||
257 log(
Error) <<
"You mixed incompatible connection policies for the shared output buffer of port " << port.
getName() <<
": " 258 <<
"The new connection requests a " << policy <<
" connection, " 259 <<
"but the port already has a " << buffer_policy <<
" buffer." << endlog();
272 log(
Error) <<
"You mixed incompatible connection policies for output port " << port.
getName() <<
": " 273 <<
"The new connection requests a " << policy <<
" connection, " 274 <<
"but the port already has a " << buffer_policy <<
" buffer." << endlog();
303 bool pull = policy.
pull;
309 buffer = buildDataStorage<T>(policy, initial_value);
315 log(
Error) <<
"You tried to create a shared input buffer connection for input port " << port.
getName() <<
", " 316 <<
"but the port already has at least one incompatible incoming connection." << endlog();
330 (buffer_policy.type != policy.
type) ||
331 (buffer_policy.size != policy.
size) ||
335 log(
Error) <<
"You mixed incompatible connection policies for the shared input buffer of port " << port.
getName() <<
": " 336 <<
"The new connection requests a " << policy <<
" connection, " 337 <<
"but the port already has a " << buffer_policy <<
" buffer." << endlog();
350 log(
Error) <<
"You mixed incompatible connection policies for input port " << port.
getName() <<
": " 351 <<
"The new connection requests a " << policy <<
" connection, " 352 <<
"but the port already has a " << buffer_policy <<
" buffer." << endlog();
383 template <
typename T>
390 if (findSharedConnection(output_port, input_port, policy, shared_connection) && !shared_connection) {
395 if (input_port && !input_port->
isLocal()) {
397 log(
Error) <<
"Cannot create a shared connection for a remote input port or a non-standard transport without knowing the local output port." << endlog();
402 if (!shared_connection) {
405 log(
Error) <<
"Could not create a shared remote connection for input port '" << input_port->
getName() <<
"'." << endlog();
423 log(
Error) <<
"The remote side refused to connect the input port '" << input_port->
getName() <<
"' to the existing shared connection '" << shared_connection->getName() <<
"'." << endlog();
430 if (!shared_connection) {
437 return shared_connection;
452 if ( !output_port.
isLocal() ) {
453 log(
Error) <<
"Need a local OutputPort to create connections." <<endlog();
458 log(
Info) <<
"OutputPort " << output_port.
getName() <<
" is already connected to " << input_port.
getName() <<
", ignoring new connection." << endlog();
466 return createAndCheckSharedConnection(&output_port, &input_port, buildSharedConnection<T>(&output_port, &input_port, policy), policy);
476 log(
Error) <<
"Port " << input_port.
getName() <<
" is not compatible with " << output_port.
getName() << endlog();
490 output_half = buildRemoteChannelOutput( output_port, input_port, policy);
491 }
else if (input_p) {
492 return createOutOfBandConnection<T>( output_port, *input_p, policy);
494 log(
Error) <<
"Port " << input_port.
getName() <<
" is not compatible with " << output_port.
getName() << endlog();
505 channel_input = buildChannelInput<T>(output_port, policy);
507 if (!channel_input) {
508 output_half->disconnect(
true);
513 return createAndCheckConnection(output_port, input_port, channel_input, output_half, policy);
529 if (!chan)
return false;
530 return bool(createAndCheckStream(output_port, policy, chan, sid));
545 if (!outhalf)
return false;
546 return bool(createAndCheckStream(input_port, policy, outhalf, sid));
576 if (!channel_input)
return false;
579 if (!stream_input)
return false;
582 if (!channel_output)
return false;
585 if (!stream_output)
return false;
587 return stream_input->getOutputEndPoint()->connectTo(stream_output->getInputEndPoint(), policy.mandatory);
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
This class provides the basic tools to create channels that represent connections between two ports...
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
base::PortInterface const * ptr
A Lock-free buffer implementation to read and write data of type T in a FIFO way. ...
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
virtual ConnID * clone() const
static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort< T > &port, ConnPolicy const &policy, T const &initial_value=T())
During the process of building a connection between two ports, this method builds the output part of ...
boost::shared_ptr< DataObjectInterface< T > > shared_ptr
Used for shared_ptr management.
static const int CIRCULAR_BUFFER
Represents a Stream connection created by the ConnFactory.
virtual const ConnPolicy * getConnPolicy() const
Get a pointer to the connection policy used to build this channel element, if available.
int lock_policy
This is the locking policy on the connection.
A connection element that can store a fixed number of data samples.
const std::string & getName() const
Get the name of this Port.
virtual bool isSameID(ConnID const &id) const
int type
DATA, BUFFER or CIRCULAR_BUFFER.
virtual internal::ConnInputEndpoint< T > * getEndpoint() const
Returns the input or output endpoint of this port (if any).
A connection policy object describes how a given connection should behave.
A class which provides unprotected (not thread-safe) access to one typed element of data...
The base class of each OutputPort.
boost::shared_ptr< ConnFactory > ConnFactoryPtr
int size
If the connection is a buffered connection, the size of the buffer.
Implements a not threadsafe buffer.
Represents a local connection created by the ConnFactory.
bool pull
If true, then the sink will have to pull data.
StreamConnID(const std::string &name)
static bool createStream(InputPort< T > &input_port, ConnPolicy const &policy)
Creates, attaches and checks an inbound stream to an Input port.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
static base::ChannelElement< T > * buildDataStorage(ConnPolicy const &policy, const T &initial_value=T())
This method creates the connection element that will store data inside the connection, based on the given policy.
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
A typed version of ChannelElementBase.
virtual bool connectedTo(PortInterface *port)
Returns true if this port is connected to the given port.
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
static bool createStream(OutputPort< T > &output_port, ConnPolicy const &policy)
Creates, attaches and checks an outbound stream to an Output port.
static bool createOutOfBandConnection(OutputPort< T > &output_port, InputPort< T > &input_port, ConnPolicy const &policy)
This code is for setting up an in-process out-of-band connection.
static const int LOCK_FREE
boost::intrusive_ptr< ChannelElementBase > shared_ptr
Implements a very simple blocking thread-safe buffer, using mutexes (locks).
A class which provides locked/protected access to one typed element of data.
virtual base::ChannelElement< T >::shared_ptr getSharedBuffer() const
A connection element that stores a single data sample.
int transport
The prefered transport used.
LocalConnID(base::PortInterface const *obj)
A component's data output port.
static SharedConnectionBase::shared_ptr buildSharedConnection(OutputPort< T > *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy)
Tries to find an existing or creates a new shared connection object for the given output port...
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
This class is used in places where a permanent representation of a reference to a connection is neede...
boost::intrusive_ptr< ConnOutputEndpoint< T > > shared_ptr
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort< T > &port, ConnPolicy const &policy, bool force_unbuffered=false)
During the process of building a connection between two ports, this method builds the input half (sta...
The base class of every data flow port.
std::string name_id
The name of this connection.
T getLastWrittenValue() const
Returns the last written value written to this port, in case it is kept by this port, otherwise, returns a default T().
static bool createConnection(OutputPort< T > &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Creates a connection from a local output_port to a local or remote input_port.
virtual bool connected()
Returns true, if this channel element has at least one output, independent of whether is has an input...
boost::shared_ptr< BufferInterface< T > > shared_ptr