39 #include "../Port.hpp" 41 #include "../base/InputPortInterface.hpp" 42 #include "../DataFlowInterface.hpp" 43 #include "../types/TypeMarshaller.hpp" 49 bool LocalConnID::isSameID(
ConnID const&
id)
const 54 else return real_id->
ptr == this->ptr;
61 bool StreamConnID::isSameID(
ConnID const&
id)
const 66 else return real_id->
name_id == this->name_id;
69 ConnID* StreamConnID::clone()
const {
80 if (!type_info || input_port.
getTypeInfo() != type_info)
82 log(
Error) <<
"Type of port " << output_port.
getName() <<
" is not registered into the type system, cannot marshal it into the right transporter" << endlog();
88 log(
Error) <<
"Type " << type_info->
getTypeName() <<
" cannot be marshalled into the requested transporter (id:"<< transport<<
")." << endlog();
95 buildRemoteChannelOutput(output_port, type_info, input_port, policy);
102 if (!channel_input->connectTo(channel_output, policy.
mandatory)) {
103 channel_input->disconnect(channel_output,
true);
104 channel_output->disconnect(channel_input,
false);
112 next_hop = channel_input;
113 while(next_hop->getInput() && next_hop->getInput() != output_port.
getEndpoint()) {
114 next_hop = next_hop->getInput();
119 log(
Error) <<
"The output port "<< output_port.
getName()
120 <<
" could not successfully use the connection to input port " << input_port.
getName() <<endlog();
121 channel_input->disconnect(channel_output,
true);
126 if ( !channel_output->channelReady( channel_input, policy, output_port.
getPortID() ) ) {
128 <<
" could not successfully read from the connection from output port " << output_port.
getName() <<endlog();
130 channel_output->disconnect(channel_input,
false);
134 log(
Debug) <<
"Connected output port "<< output_port.
getName()
135 <<
" successfully to " << input_port.
getName() <<endlog();
141 log(
Error) <<
"Need a transport for creating streams." <<endlog();
146 log(
Error) <<
"Could not create transport stream for port "<< output_port.
getName() <<
" with transport id " << policy.
transport <<endlog();
147 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
155 log(
Debug) <<
"Could not determine sample size for type " << type->
getTypeName() << endlog();
159 if ( !chan_stream ) {
160 log(
Error) <<
"Transport failed to create remote channel for output stream of port "<<output_port.
getName() << endlog();
165 channel_input->connectTo( chan_stream, policy.
mandatory );
167 if ( !output_port.
addConnection( conn_id, chan_stream, policy ) ) {
169 channel_input->disconnect( chan_stream,
true );
170 log(
Error) <<
"Failed to create output stream for output port "<< output_port.
getName() <<endlog();
174 log(
Info) <<
"Created output stream for output port "<< output_port.
getName() <<endlog();
180 log(
Error) <<
"Need a transport for creating streams." <<endlog();
185 log(
Error) <<
"Could not create transport stream for port "<< input_port.
getName() <<
" with transport id " << policy.
transport <<endlog();
186 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
195 log(
Error) <<
"Transport failed to create remote channel for input stream of port " << input_port.
getName() << endlog();
199 chan = chan->getOutputEndPoint();
202 chan->connectTo( outhalf, policy.
mandatory );
203 if ( !outhalf->channelReady(chan, policy, conn_id) ) {
205 chan->disconnect(
true);
206 log(
Error) <<
"Failed to create input stream for input port " << input_port.
getName() <<endlog();
210 log(
Info) <<
"Created input stream for input port " << input_port.
getName() <<endlog();
216 if (!shared_connection)
return false;
221 (shared_connection->getConnPolicy()->type != policy.
type) ||
222 (shared_connection->getConnPolicy()->size != policy.
size) ||
223 (shared_connection->getConnPolicy()->lock_policy != policy.
lock_policy)
226 log(
Error) <<
"You mixed incompatible connection policies for shared connection '" << shared_connection->getName() <<
"': " 227 <<
"The new connection requests a " << policy <<
" connection, " 228 <<
"but the existing connection is of type " << *(shared_connection->getConnPolicy()) <<
"." << endlog();
233 policy.
name_id = shared_connection->getName();
237 if ( !output_port->
addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
239 log(
Error) <<
"The output port "<< output_port->
getName()
240 <<
" could not successfully connect to shared connection '" << shared_connection->getName() <<
"'." << endlog();
249 if ( !input_port->
addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
252 <<
" could not successfully connect to shared connection '" << shared_connection->getName() <<
"'." << endlog();
264 shared_connection.reset();
271 if (!shared_connection) {
278 if (shared_connection == input_ports_shared_connection) {
279 RTT::log(
RTT::Info) <<
"Output port '" << output_port->
getName() <<
"' and input port '" << input_port->
getName() <<
"' are already connected to the same shared connection." << RTT::endlog();
281 }
else if (input_ports_shared_connection) {
282 RTT::log(
RTT::Error) <<
"Output port '" << output_port->
getName() <<
"' and input port '" << input_port->
getName() <<
"' are already connected to different shared connections!" << RTT::endlog();
283 shared_connection.reset();
290 if (!shared_connection) {
292 shared_connection = SharedConnectionRepository::Instance()->get(policy.
name_id);
293 }
else if (shared_connection->getName() != policy.
name_id) {
294 RTT::log(
RTT::Error) <<
"At least one of the given ports is already connected to shared connection '" << shared_connection->getName() <<
"' but you requested to connect to '" << policy.
name_id <<
"'!" << RTT::endlog();
295 shared_connection.reset();
300 return bool(shared_connection);
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port's type.
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
static base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
base::PortInterface const * ptr
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
const std::string & getTypeName() const
Return the type name which was first registered.
Represents a Stream connection created by the ConnFactory.
int data_size
Suggest the payload size of the data sent over this channel.
virtual internal::SharedConnectionBase::shared_ptr getSharedConnection() const
Returns a pointer to the shared connection element this port may be connected to. ...
int lock_policy
This is the locking policy on the connection.
const std::string & getName() const
Get the name of this Port.
virtual DataSourceBase::shared_ptr getDataSource() const =0
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
int type
DATA, BUFFER or CIRCULAR_BUFFER.
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
int size
If the connection is a buffered connection, the size of the buffer.
virtual ChannelElementBase * getEndpoint() const =0
Returns the input or output endpoint of this port (if any).
Represents a local connection created by the ConnFactory.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
Objects implementing this interface have the capability to convert data sources to and from a binary ...
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
virtual int serverProtocol() const
Returns the protocol over which this port can be accessed.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
int transport
The prefered transport used.
virtual void disconnect()
Removes any connection that either go to or come from this port.
virtual internal::ConnID * getPortID() const
Returns the identity of this port in a ConnID object.
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
This class is used in places where a permanent representation of a reference to a connection is neede...
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
std::string name_id
The name of this connection.