39 #include "../Port.hpp" 41 #include "../base/InputPortInterface.hpp" 42 #include "../DataFlowInterface.hpp" 43 #include "../types/TypeMarshaller.hpp" 49 bool LocalConnID::isSameID(
ConnID const&
id)
const 54 else return real_id->
ptr == this->ptr;
61 bool StreamConnID::isSameID(
ConnID const&
id)
const 66 else return real_id->
name_id == this->name_id;
69 ConnID* StreamConnID::clone()
const {
80 if (!type_info || input_port.
getTypeInfo() != type_info)
82 log(
Error) <<
"Type of port " << output_port.
getName() <<
" is not registered into the type system, cannot marshal it into the right transporter" << endlog();
88 log(
Error) <<
"Type " << type_info->
getTypeName() <<
" cannot be marshalled into the requested transporter (id:"<< transport<<
")." << endlog();
95 buildRemoteChannelOutput(output_port, type_info, input_port, policy);
104 if ( input_port.
channelReady( channel_input->getOutputEndPoint(), policy ) ==
false ) {
107 <<
" could not successfully read from the connection from output port " << output_port.
getName() <<endlog();
111 log(
Debug) <<
"Connected output port "<< output_port.
getName()
112 <<
" successfully to " << input_port.
getName() <<endlog();
116 channel_input->disconnect(
true);
117 log(
Error) <<
"The output port "<< output_port.
getName()
118 <<
" could not successfully use the connection to input port " << input_port.
getName() <<endlog();
124 log(
Error) <<
"Need a transport for creating streams." <<endlog();
129 log(
Error) <<
"Could not create transport stream for port "<< output_port.
getName() <<
" with transport id " << policy.
transport <<endlog();
130 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
138 log(
Debug) <<
"Could not determine sample size for type " << type->
getTypeName() << endlog();
142 if ( !chan_stream ) {
143 log(
Error) <<
"Transport failed to create remote channel for output stream of port "<<output_port.
getName() << endlog();
146 chan->setOutput( chan_stream );
149 log(
Info) <<
"Created output stream for output port "<< output_port.
getName() <<endlog();
153 log(
Error) <<
"Failed to create output stream for output port "<< output_port.
getName() <<endlog();
159 log(
Error) <<
"Need a transport for creating streams." <<endlog();
164 log(
Error) <<
"Could not create transport stream for port "<< input_port.
getName() <<
" with transport id " << policy.
transport <<endlog();
165 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
174 log(
Error) <<
"Transport failed to create remote channel for input stream of port "<<input_port.
getName() << endlog();
180 chan->getOutputEndPoint()->setOutput( outhalf );
181 if ( input_port.
channelReady( chan->getOutputEndPoint(), policy ) ==
true ) {
182 log(
Info) <<
"Created input stream for input port "<< input_port.
getName() <<endlog();
187 log(
Error) <<
"Failed to create input stream for input port "<< input_port.
getName() <<endlog();
200 log(
Error) <<
"Could not create out-of-band transport for port "<< output_port.
getName() <<
" with transport id " << policy.
transport <<endlog();
201 log(
Error) <<
"No such transport registered. Check your policy.transport settings or add the transport for type "<< type->
getTypeName() <<endlog();
207 policy2.
pull =
false;
215 log(
Debug) <<
"Could not determine sample size for type " << type->
getTypeName() << endlog();
221 log(
Info) <<
"Receiving data for port "<<input_port.
getName() <<
" from out-of-band protocol "<< policy.
transport <<
" with id "<< policy2.
name_id<<endlog();
223 log(
Error) <<
"The type transporter for type "<<type->
getTypeName()<<
" failed to create a remote channel for port " << input_port.
getName()<<endlog();
226 ceb_input->getOutputEndPoint()->setOutput(output_half);
227 output_half = ceb_input;
235 log(
Info) <<
"Redirecting data for port "<< output_port.
getName() <<
" to out-of-band protocol "<< policy.
transport <<
" with id "<< policy2.
name_id <<endlog();
237 log(
Error) <<
"The type transporter for type "<<type->
getTypeName()<<
" failed to create a remote channel for port " << output_port.
getName()<<endlog();
242 ceb_output->getOutputEndPoint()->setOutput(output_half);
243 output_half = ceb_output;
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port's type.
base::PortInterface const * ptr
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
const std::string & getTypeName() const
Return the type name which was first registered.
Represents a Stream connection created by the ConnFactory.
int data_size
Suggest the payload size of the data sent over this channel.
const std::string & getName() const
Get the name of this Port.
virtual DataSourceBase::shared_ptr getDataSource() const =0
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
A connection policy object describes how a given connection should behave.
The base class of each OutputPort.
Represents a local connection created by the ConnFactory.
bool pull
If true, then the sink will have to pull data.
Objects implementing this interface have the capability to convert data sources to and from a binary ...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
virtual int serverProtocol() const
Returns the protocol over which this port can be accessed.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
Returns this type's transport for a given protocol.
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
int transport
The prefered transport used.
virtual void disconnect()
Removes any connection that either go to or come from this port.
virtual internal::ConnID * getPortID() const
Returns the identity of this port in a ConnID object.
This class is used in places where a permanent representation of a reference to a connection is neede...
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
std::string name_id
The name of this connection.
static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)