39 #include "../internal/Channels.hpp" 40 #include "../os/Atomic.hpp" 41 #include "../os/MutexLock.hpp" 42 #include <boost/lexical_cast.hpp> 71 if (!
addOutput(output, mandatory))
return false;
72 if (!output->addInput(
this)) {
82 if (this->output == output)
return true;
83 if (output && this->output)
return false;
91 if (!output || this->output == output) {
99 if (!input->addOutput(
this)) {
109 if (this->input == input)
return true;
110 if (input && this->input)
return false;
118 if (!input || this->input == input) {
141 if (!output->disconnect(
this,
true))
148 if (!input->disconnect(
this,
false))
160 return input ? input->getInputEndPoint() :
this;
166 return output ? output->getOutputEndPoint() :
this;
173 return output ? output->channelReady(
this, policy, conn_id) :
false;
176 bool ChannelElementBase::inputReady()
180 return input ? input->inputReady(
this) :
false;
199 return output->signalFrom(
this);
218 return std::string();
220 return output->getLocalURI();
224 return std::string(boost::lexical_cast<std::string>(
this));
228 return std::string(
"ChannelElementBase");
236 if (!input)
return false;
238 assert(std::find(inputs.begin(), inputs.end(),
input) == inputs.end());
239 if (std::find(inputs.begin(), inputs.end(),
input) != inputs.end())
return false;
240 inputs.push_back(input);
246 inputs.remove(input);
252 return inputs.size() > 0;
258 for (Inputs::const_iterator it = inputs.begin(); it != inputs.end(); ++it) {
259 if (!(*it)->inputReady(
this))
return false;
261 return !inputs.empty();
267 for (Inputs::const_iterator it = inputs.begin(); it != inputs.end(); ++it) {
275 bool was_last =
false;
279 Inputs::iterator found = std::find(inputs.begin(), inputs.end(), channel);
280 if (found == inputs.end()) {
286 if (!input->disconnect(
this, forward)) {
292 was_last = inputs.empty();
296 if (was_last && forward) {
302 }
else if (!forward) {
305 for (Inputs::iterator it = inputs.begin(); it != inputs.end(); ) {
307 input->disconnect(
this,
false);
310 assert(inputs.empty());
326 , mandatory(mandatory)
327 , disconnected(false)
332 return (this->channel == channel);
337 if (!output)
return false;
347 outputs.remove_if(boost::bind(&Output::operator==, _1, output));
360 output->channel->signalFrom(
this);
368 for (Outputs::const_iterator it =
outputs.begin(); it !=
outputs.end(); ++it) {
369 if (!it->channel->channelReady(
this, policy, conn_id))
return false;
378 bool was_last =
false;
388 if (!output.
channel->disconnect(
this, forward)) {
398 if (was_last && !forward) {
408 for (Outputs::iterator it =
outputs.begin(); it !=
outputs.end(); ) {
410 output.
channel->disconnect(
this,
true);
422 for (Outputs::iterator it =
outputs.begin(); it !=
outputs.end(); ) {
425 output.
channel->disconnect(
this,
true);
452 }
else if (forward) {
virtual void removeInput(shared_ptr const &input)
Remove an input from the inputs list.
virtual PortInterface * getPort() const
Gets the port this channel element is connected to.
void removeDisconnectedOutputs()
Iterate over all output channels and remove the ones that have been marked as disconnected (after a f...
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
This is called on the output half of a new connection by the connection factory in order to notify th...
virtual bool addInput(shared_ptr const &input)
Sets the new input channel element of this element or adds a channel to the inputs list...
virtual void clear()
Clears any data stored by the channel.
virtual bool inputReady(ChannelElementBase::shared_ptr const &caller)
This is called by an input port when it is ready to receive data.
virtual const ConnPolicy * getConnPolicy() const
Get a pointer to the connection policy used to build this channel element, if available.
Output(ChannelElementBase::shared_ptr const &channel, bool mandatory=true)
void RTT_API intrusive_ptr_add_ref(ChannelElementBase *e)
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
virtual bool disconnect(ChannelElementBase::shared_ptr const &channel, bool forward=false)
Overridden implementation of ChannelElementBase::disconnect(forward, channel).
bool operator==(ChannelElementBase::shared_ptr const &channel) const
virtual shared_ptr getInputEndPoint()
Returns the first input channel element of this connection.
MultipleOutputsChannelElementBase()
ChannelElementBase()
A default constructed ChannelElementBase has no input nor output configured.
A connection policy object describes how a given connection should behave.
virtual bool addOutput(shared_ptr const &output, bool mandatory=true)
Sets the new output channel element of this element or adds a channel to the outputs list...
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
shared_ptr getInput()
Returns the current input channel element.
Convenient short notation for every sub-namespace of RTT.
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
virtual void disconnect(bool forward)
Performs a disconnection of this channel's endpoints.
ChannelElementBase::shared_ptr channel
void oro_atomic_inc(oro_atomic_t *a)
Increment a atomically.
void ORO_ATOMIC_SETUP(oro_atomic_t *a, int n)
Initializes the uninitialized atomic structure a with a counter value of 'n'.
virtual std::string getLocalURI() const
This function return the URI of this element.
virtual void removeOutput(ChannelElementBase::shared_ptr const &output)
Remove an output from the outputs list.
void ORO_ATOMIC_CLEANUP(oro_atomic_t *a)
Cleans up all resources allocated durint the setup of atomic structure a.
virtual bool addOutput(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Sets the new output channel element of this element or adds a channel to the outputs list...
virtual bool signal()
Overridden implementation of ChannelElementBase::signal() which forwards the signal to all outputs...
virtual bool connected()
Returns true, if this channel element is connected on the input or output side.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
int oro_atomic_dec_and_test(oro_atomic_t *a)
Decrement a atomically and test for zero.
void deref()
Decreases the reference count, and deletes the object if it is zero.
SharedMutexLock is a scope based Monitor, protecting critical sections with a SharedMutex object thro...
void RTT_API intrusive_ptr_release(ChannelElementBase *e)
virtual bool connectFrom(ChannelElementBase::shared_ptr const &input)
Connects a new input to this element.
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
Overridden implementation of ChannelElementBase::channelReady() which forwards the signal to all outp...
virtual ~ChannelElementBase()
virtual void removeOutput(shared_ptr const &output)
Remove an output from the outputs list.
virtual std::string getElementName() const
Returns the class name of this element.
This class is used in places where a permanent representation of a reference to a connection is neede...
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
virtual shared_ptr getOutputEndPoint()
Returns the last output channel element of this connection.
RTT::os::SharedMutex output_lock
RTT::os::SharedMutex outputs_lock
shared_ptr getOutput()
Returns the next channel element in the channel's propagation direction.
void ref()
Increases the reference count.
RTT::os::SharedMutex input_lock
virtual bool connected()
Returns true, if this channel element has at least one output, independent of whether is has an input...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...