Compare commits
16 Commits
mjcarroll/
...
asoragna/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8b94dc59e | ||
|
|
f8072f2fa2 | ||
|
|
fce021b149 | ||
|
|
c4f57a7998 | ||
|
|
d7fdb6184c | ||
|
|
58bcd3b822 | ||
|
|
26426adda9 | ||
|
|
6e1fea14e1 | ||
|
|
86c77143c9 | ||
|
|
b812790ee3 | ||
|
|
6ca1023ef7 | ||
|
|
77ede02251 | ||
|
|
a431256383 | ||
|
|
9d2849cb0a | ||
|
|
3610b68348 | ||
|
|
9c03a463c1 |
@@ -2,6 +2,26 @@
|
||||
Changelog for package rclcpp
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
21.1.1 (2023-05-11)
|
||||
-------------------
|
||||
* Fix race condition in events-executor (`#2177 <https://github.com/ros2/rclcpp/issues/2177>`_)
|
||||
* Add missing stdexcept include (`#2186 <https://github.com/ros2/rclcpp/issues/2186>`_)
|
||||
* Fix a format-security warning when building with clang (`#2171 <https://github.com/ros2/rclcpp/issues/2171>`_)
|
||||
* Fix delivered message kind (`#2175 <https://github.com/ros2/rclcpp/issues/2175>`_)
|
||||
* Contributors: Alberto Soragna, Chris Lalancette, methylDragon, Øystein Sture
|
||||
|
||||
21.1.0 (2023-04-27)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
|
||||
* Picking ABI-incompatible executor changes (`#2170 <https://github.com/ros2/rclcpp/issues/2170>`_)
|
||||
* add events-executor and timers-manager in rclcpp (`#2155 <https://github.com/ros2/rclcpp/issues/2155>`_)
|
||||
* Create common structures for executors to use (`#2143 <https://github.com/ros2/rclcpp/issues/2143>`_)
|
||||
* Implement deliver message kind (`#2168 <https://github.com/ros2/rclcpp/issues/2168>`_)
|
||||
* Contributors: Alberto Soragna, Lei Liu, Michael Carroll, methylDragon
|
||||
|
||||
20.0.0 (2023-04-13)
|
||||
-------------------
|
||||
* applied tracepoints for ring_buffer (`#2091 <https://github.com/ros2/rclcpp/issues/2091>`_)
|
||||
|
||||
@@ -64,6 +64,7 @@ set(${PROJECT_NAME}_SRCS
|
||||
src/rclcpp/executors/executor_notify_waitable.cpp
|
||||
src/rclcpp/executors/multi_threaded_executor.cpp
|
||||
src/rclcpp/executors/single_threaded_executor.cpp
|
||||
src/rclcpp/executors/static_executor_entities_collector.cpp
|
||||
src/rclcpp/executors/static_single_threaded_executor.cpp
|
||||
src/rclcpp/expand_topic_or_service_name.cpp
|
||||
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <variant> // NOLINT[build/include_order]
|
||||
#include <variant>
|
||||
|
||||
#include "rosidl_runtime_cpp/traits.hpp"
|
||||
#include "tracetools/tracetools.h"
|
||||
|
||||
@@ -187,35 +187,14 @@ public:
|
||||
RCLCPP_PUBLIC
|
||||
size_t size() const;
|
||||
|
||||
/// Return a reference to the 'can be taken' atomic boolean.
|
||||
/**
|
||||
* The resulting bool will be true in the case that no executor is currently
|
||||
* using an executable entity from this group.
|
||||
* The resulting bool will be false in the case that an executor is currently
|
||||
* using an executable entity from this group, and the group policy doesn't
|
||||
* allow a second take (eg mutual exclusion)
|
||||
* \return a reference to the flag
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::atomic_bool &
|
||||
can_be_taken_from();
|
||||
|
||||
/// Get the group type.
|
||||
/**
|
||||
* \return the group type
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
const CallbackGroupType &
|
||||
type() const;
|
||||
|
||||
/// Collect all of the entity pointers contained in this callback group.
|
||||
/**
|
||||
* \param[in] sub_func Function to execute for each subscription
|
||||
* \param[in] service_func Function to execute for each service
|
||||
* \param[in] client_func Function to execute for each client
|
||||
* \param[in] timer_func Function to execute for each timer
|
||||
* \param[in] waitable_fuinc Function to execute for each waitable
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void collect_all_ptrs(
|
||||
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
|
||||
|
||||
@@ -20,13 +20,13 @@
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
|
||||
#include <optional>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <variant> // NOLINT
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include "rcl/client.h"
|
||||
@@ -363,12 +363,16 @@ protected:
|
||||
std::shared_ptr<rclcpp::Context> context_;
|
||||
rclcpp::Logger node_logger_;
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
// It is important to declare on_new_response_callback_ before
|
||||
// client_handle_, so on destruction the client is
|
||||
// destroyed first. Otherwise, the rmw client callback
|
||||
// would point briefly to a destroyed function.
|
||||
std::function<void(size_t)> on_new_response_callback_{nullptr};
|
||||
// Declare client_handle_ after callback
|
||||
std::shared_ptr<rcl_client_t> client_handle_;
|
||||
|
||||
std::atomic<bool> in_use_by_wait_set_{false};
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
std::function<void(size_t)> on_new_response_callback_{nullptr};
|
||||
};
|
||||
|
||||
template<typename ServiceT>
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "rcl/context.h"
|
||||
#include "rcl/guard_condition.h"
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <deque>
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
#include <map>
|
||||
@@ -30,24 +29,26 @@
|
||||
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rcl/wait.h"
|
||||
#include "rclcpp/executors/executor_notify_waitable.hpp"
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
|
||||
#include "rclcpp/context.hpp"
|
||||
#include "rclcpp/contexts/default_context.hpp"
|
||||
#include "rclcpp/guard_condition.hpp"
|
||||
#include "rclcpp/executor_options.hpp"
|
||||
#include "rclcpp/executors/executor_entities_collection.hpp"
|
||||
#include "rclcpp/executors/executor_entities_collector.hpp"
|
||||
#include "rclcpp/future_return_code.hpp"
|
||||
#include "rclcpp/memory_strategies.hpp"
|
||||
#include "rclcpp/memory_strategy.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/utilities.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
#include "rclcpp/wait_set.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
|
||||
|
||||
// Forward declaration is used in convenience method signature.
|
||||
class Node;
|
||||
class ExecutorImplementation;
|
||||
@@ -402,6 +403,17 @@ public:
|
||||
void
|
||||
cancel();
|
||||
|
||||
/// Support dynamic switching of the memory strategy.
|
||||
/**
|
||||
* Switching the memory strategy while the executor is spinning in another threading could have
|
||||
* unintended consequences.
|
||||
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
|
||||
* \throws std::runtime_error if memory_strategy is null
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
|
||||
|
||||
/// Returns true if the executor is currently spinning.
|
||||
/**
|
||||
* This function can be called asynchronously from any thread.
|
||||
@@ -486,11 +498,6 @@ protected:
|
||||
static void
|
||||
execute_client(rclcpp::ClientBase::SharedPtr client);
|
||||
|
||||
/// Gather all of the waitable entities from associated nodes and callback groups.
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
collect_entities();
|
||||
|
||||
/// Block until more work becomes avilable or timeout is reached.
|
||||
/**
|
||||
* Builds a set of waitable entities, which are passed to the middleware.
|
||||
@@ -502,6 +509,62 @@ protected:
|
||||
void
|
||||
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
/// Find node associated with a callback group
|
||||
/**
|
||||
* \param[in] weak_groups_to_nodes map of callback groups to nodes
|
||||
* \param[in] group callback group to find assocatiated node
|
||||
* \return Pointer to associated node if found, else nullptr
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
|
||||
get_node_by_group(
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
|
||||
rclcpp::CallbackGroup::SharedPtr group);
|
||||
|
||||
/// Return true if the node has been added to this executor.
|
||||
/**
|
||||
* \param[in] node_ptr a shared pointer that points to a node base interface
|
||||
* \param[in] weak_groups_to_nodes map to nodes to lookup
|
||||
* \return true if the node is associated with the executor, otherwise false
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
|
||||
|
||||
/// Find the callback group associated with a timer
|
||||
/**
|
||||
* \param[in] timer Timer to find associated callback group
|
||||
* \return Pointer to callback group node if found, else nullptr
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::CallbackGroup::SharedPtr
|
||||
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
|
||||
|
||||
/// Add a callback group to an executor
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual void
|
||||
add_callback_group_to_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
|
||||
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
|
||||
|
||||
/// Remove a callback group from the executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual void
|
||||
remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
|
||||
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
|
||||
|
||||
/// Check for executable in ready state and populate union structure.
|
||||
/**
|
||||
* \param[out] any_executable populated union structure of ready executable
|
||||
@@ -512,6 +575,33 @@ protected:
|
||||
bool
|
||||
get_next_ready_executable(AnyExecutable & any_executable);
|
||||
|
||||
/// Check for executable in ready state and populate union structure.
|
||||
/**
|
||||
* This is the implementation of get_next_ready_executable that takes into
|
||||
* account the current state of callback groups' association with nodes and
|
||||
* executors.
|
||||
*
|
||||
* This checks in a particular order for available work:
|
||||
* * Timers
|
||||
* * Subscriptions
|
||||
* * Services
|
||||
* * Clients
|
||||
* * Waitable
|
||||
*
|
||||
* If the next executable is not associated with this executor/node pair,
|
||||
* then this method will return false.
|
||||
*
|
||||
* \param[out] any_executable populated union structure of ready executable
|
||||
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
|
||||
* \return true if an executable was ready and any_executable was populated,
|
||||
* otherwise false
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
get_next_ready_executable_from_map(
|
||||
AnyExecutable & any_executable,
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/// Wait for executable in ready state and populate union structure.
|
||||
/**
|
||||
* If an executable is ready, it will return immediately, otherwise
|
||||
@@ -529,6 +619,21 @@ protected:
|
||||
AnyExecutable & any_executable,
|
||||
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
/// Add all callback groups that can be automatically added from associated nodes.
|
||||
/**
|
||||
* The executor, before collecting entities, verifies if any callback group from
|
||||
* nodes associated with the executor, which is not already associated to an executor,
|
||||
* can be automatically added to this executor.
|
||||
* This takes care of any callback group that has been added to a node but not explicitly added
|
||||
* to the executor.
|
||||
* It is important to note that in order for the callback groups to be automatically added to an
|
||||
* executor through this function, the node of the callback groups needs to have been added
|
||||
* through the `add_node` method.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual void
|
||||
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
|
||||
|
||||
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
|
||||
std::atomic_bool spinning;
|
||||
|
||||
@@ -538,8 +643,16 @@ protected:
|
||||
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
|
||||
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
|
||||
|
||||
/// Wait set for managing entities that the rmw layer waits on.
|
||||
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
|
||||
|
||||
// Mutex to protect the subsequent memory_strategy_.
|
||||
mutable std::mutex mutex_;
|
||||
|
||||
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
|
||||
memory_strategy::MemoryStrategy::SharedPtr
|
||||
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
|
||||
|
||||
/// The context associated with this executor.
|
||||
std::shared_ptr<rclcpp::Context> context_;
|
||||
|
||||
@@ -549,33 +662,39 @@ protected:
|
||||
virtual void
|
||||
spin_once_impl(std::chrono::nanoseconds timeout);
|
||||
|
||||
/// Waitable containing guard conditions controlling the executor flow.
|
||||
/**
|
||||
* This waitable contains the interrupt and shutdown guard condition, as well
|
||||
* as the guard condition associated with each node and callback group.
|
||||
* By default, if any change is detected in the monitored entities, the notify
|
||||
* waitable will awake the executor and rebuild the collections.
|
||||
*/
|
||||
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
|
||||
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
const rclcpp::GuardCondition *,
|
||||
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
|
||||
WeakNodesToGuardConditionsMap;
|
||||
|
||||
std::atomic_bool entities_need_rebuild_;
|
||||
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
|
||||
const rclcpp::GuardCondition *,
|
||||
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
|
||||
WeakCallbackGroupsToGuardConditionsMap;
|
||||
|
||||
/// Collector used to associate executable entities from nodes and guard conditions
|
||||
rclcpp::executors::ExecutorEntitiesCollector collector_;
|
||||
/// maps nodes to guard conditions
|
||||
WeakNodesToGuardConditionsMap
|
||||
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// Waitset to be waited on.
|
||||
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
/// maps callback groups to guard conditions
|
||||
WeakCallbackGroupsToGuardConditionsMap
|
||||
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// Hold the current state of the collection being waited on by the waitset
|
||||
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
|
||||
mutex_);
|
||||
/// maps callback groups associated to nodes
|
||||
WeakCallbackGroupsToNodesMap
|
||||
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// Hold the current state of the notify waitable being waited on by the waitset
|
||||
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
|
||||
RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
/// maps callback groups to nodes associated with executor
|
||||
WeakCallbackGroupsToNodesMap
|
||||
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// Hold the list of executables currently available to be executed.
|
||||
std::deque<rclcpp::AnyExecutable> ready_executables_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
/// maps all callback groups to nodes
|
||||
WeakCallbackGroupsToNodesMap
|
||||
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// nodes that are associated with the executor
|
||||
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
|
||||
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
|
||||
|
||||
/// shutdown callback handle registered to Context
|
||||
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
|
||||
|
||||
@@ -0,0 +1,357 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
|
||||
#include <chrono>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rcl/wait.h"
|
||||
|
||||
#include "rclcpp/experimental/executable_list.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/memory_strategy.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace executors
|
||||
{
|
||||
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
|
||||
|
||||
class StaticExecutorEntitiesCollector final
|
||||
: public rclcpp::Waitable,
|
||||
public std::enable_shared_from_this<StaticExecutorEntitiesCollector>
|
||||
{
|
||||
public:
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector)
|
||||
|
||||
// Constructor
|
||||
RCLCPP_PUBLIC
|
||||
StaticExecutorEntitiesCollector() = default;
|
||||
|
||||
// Destructor
|
||||
RCLCPP_PUBLIC
|
||||
~StaticExecutorEntitiesCollector();
|
||||
|
||||
/// Initialize StaticExecutorEntitiesCollector
|
||||
/**
|
||||
* \param p_wait_set A reference to the wait set to be used in the executor
|
||||
* \param memory_strategy Shared pointer to the memory strategy to set.
|
||||
* \throws std::runtime_error if memory strategy is null
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
init(
|
||||
rcl_wait_set_t * p_wait_set,
|
||||
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
|
||||
|
||||
/// Finalize StaticExecutorEntitiesCollector to clear resources
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_init() {return initialized_;}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
fini();
|
||||
|
||||
/// Execute the waitable.
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
execute(std::shared_ptr<void> & data) override;
|
||||
|
||||
/// Take the data so that it can be consumed with `execute`.
|
||||
/**
|
||||
* For `StaticExecutorEntitiesCollector`, this always return `nullptr`.
|
||||
* \sa rclcpp::Waitable::take_data()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::shared_ptr<void>
|
||||
take_data() override;
|
||||
|
||||
/// Function to add_handles_to_wait_set and wait for work and
|
||||
/**
|
||||
* block until the wait set is ready or until the timeout has been exceeded.
|
||||
* \throws std::runtime_error if wait set couldn't be cleared or filled.
|
||||
* \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
/**
|
||||
* \throws std::runtime_error if it couldn't add guard condition to wait set
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_to_wait_set(rcl_wait_set_t * wait_set) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_ready_guard_conditions() override;
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_group
|
||||
* \return boolean whether the node from the callback group is new
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/// Remove a callback group from the executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr);
|
||||
|
||||
/// Remove a callback group from the executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_callback_group_from_map
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/**
|
||||
* \see rclcpp::Executor::add_node()
|
||||
* \throw std::runtime_error if node was already added
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
/**
|
||||
* \see rclcpp::Executor::remove_node()
|
||||
* \throw std::runtime_error if no guard condition is associated with node.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_all_callback_groups();
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::get_manually_added_callback_groups()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_manually_added_callback_groups();
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_automatically_added_callback_groups_from_nodes();
|
||||
|
||||
/// Complete all available queued work without blocking.
|
||||
/**
|
||||
* This function checks if after the guard condition was triggered
|
||||
* (or a spurious wakeup happened) we are really ready to execute
|
||||
* i.e. re-collect entities
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_ready(rcl_wait_set_t * wait_set) override;
|
||||
|
||||
/// Return number of timers
|
||||
/**
|
||||
* \return number of timers
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_timers() {return exec_list_.number_of_timers;}
|
||||
|
||||
/// Return number of subscriptions
|
||||
/**
|
||||
* \return number of subscriptions
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;}
|
||||
|
||||
/// Return number of services
|
||||
/**
|
||||
* \return number of services
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_services() {return exec_list_.number_of_services;}
|
||||
|
||||
/// Return number of clients
|
||||
/**
|
||||
* \return number of clients
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_clients() {return exec_list_.number_of_clients;}
|
||||
|
||||
/// Return number of waitables
|
||||
/**
|
||||
* \return number of waitables
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
get_number_of_waitables() {return exec_list_.number_of_waitables;}
|
||||
|
||||
/** Return a SubscritionBase Sharedptr by index.
|
||||
* \param[in] i The index of the SubscritionBase
|
||||
* \return a SubscritionBase shared pointer
|
||||
* \throws std::out_of_range if the argument is higher than the size of the structrue.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::SubscriptionBase::SharedPtr
|
||||
get_subscription(size_t i) {return exec_list_.subscription[i];}
|
||||
|
||||
/** Return a TimerBase Sharedptr by index.
|
||||
* \param[in] i The index of the TimerBase
|
||||
* \return a TimerBase shared pointer
|
||||
* \throws std::out_of_range if the argument is higher than the size.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::TimerBase::SharedPtr
|
||||
get_timer(size_t i) {return exec_list_.timer[i];}
|
||||
|
||||
/** Return a ServiceBase Sharedptr by index.
|
||||
* \param[in] i The index of the ServiceBase
|
||||
* \return a ServiceBase shared pointer
|
||||
* \throws std::out_of_range if the argument is higher than the size.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::ServiceBase::SharedPtr
|
||||
get_service(size_t i) {return exec_list_.service[i];}
|
||||
|
||||
/** Return a ClientBase Sharedptr by index
|
||||
* \param[in] i The index of the ClientBase
|
||||
* \return a ClientBase shared pointer
|
||||
* \throws std::out_of_range if the argument is higher than the size.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::ClientBase::SharedPtr
|
||||
get_client(size_t i) {return exec_list_.client[i];}
|
||||
|
||||
/** Return a Waitable Sharedptr by index
|
||||
* \param[in] i The index of the Waitable
|
||||
* \return a Waitable shared pointer
|
||||
* \throws std::out_of_range if the argument is higher than the size.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::Waitable::SharedPtr
|
||||
get_waitable(size_t i) {return exec_list_.waitable[i];}
|
||||
|
||||
private:
|
||||
/// Function to reallocate space for entities in the wait set.
|
||||
/**
|
||||
* \throws std::runtime_error if wait set couldn't be cleared or resized.
|
||||
*/
|
||||
void
|
||||
prepare_wait_set();
|
||||
|
||||
void
|
||||
fill_executable_list();
|
||||
|
||||
void
|
||||
fill_memory_strategy();
|
||||
|
||||
/// Return true if the node belongs to the collector
|
||||
/**
|
||||
* \param[in] node_ptr a node base interface shared pointer
|
||||
* \param[in] weak_groups_to_nodes map to nodes to lookup
|
||||
* \return boolean whether a node belongs the collector
|
||||
*/
|
||||
bool
|
||||
has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
|
||||
|
||||
/// Add all callback groups that can be automatically added by any executor
|
||||
/// and is not already associated with an executor from nodes
|
||||
/// that are associated with executor
|
||||
/**
|
||||
* \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor()
|
||||
*/
|
||||
void
|
||||
add_callback_groups_from_nodes_associated_to_executor();
|
||||
|
||||
void
|
||||
fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
|
||||
|
||||
/// Memory strategy: an interface for handling user-defined memory allocation strategies.
|
||||
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
|
||||
|
||||
// maps callback groups to nodes.
|
||||
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
|
||||
// maps callback groups to nodes.
|
||||
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
|
||||
|
||||
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
const rclcpp::GuardCondition *,
|
||||
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
|
||||
WeakNodesToGuardConditionsMap;
|
||||
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
|
||||
|
||||
/// List of weak nodes registered in the static executor
|
||||
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
|
||||
|
||||
// Mutex to protect vector of new nodes.
|
||||
std::mutex new_nodes_mutex_;
|
||||
std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> new_nodes_;
|
||||
|
||||
/// Wait set for managing entities that the rmw layer waits on.
|
||||
rcl_wait_set_t * p_wait_set_ = nullptr;
|
||||
|
||||
/// Executable list: timers, subscribers, clients, services and waitables
|
||||
rclcpp::experimental::ExecutableList exec_list_;
|
||||
|
||||
/// Bool to check if the entities collector has been initialized
|
||||
bool initialized_ = false;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
|
||||
@@ -15,13 +15,24 @@
|
||||
#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
|
||||
#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cassert>
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
#include "rmw/rmw.h"
|
||||
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/executors/executor_entities_collection.hpp"
|
||||
#include "rclcpp/executors/single_threaded_executor.hpp"
|
||||
#include "rclcpp/executors/static_executor_entities_collector.hpp"
|
||||
#include "rclcpp/experimental/executable_list.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/memory_strategies.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rclcpp/rate.hpp"
|
||||
#include "rclcpp/utilities.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
@@ -54,7 +65,7 @@ public:
|
||||
explicit StaticSingleThreadedExecutor(
|
||||
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
|
||||
|
||||
/// Default destructor.
|
||||
/// Default destrcutor.
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~StaticSingleThreadedExecutor();
|
||||
|
||||
@@ -105,20 +116,92 @@ public:
|
||||
void
|
||||
spin_all(std::chrono::nanoseconds max_duration) override;
|
||||
|
||||
/// Add a callback group to an executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::add_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Remove callback group from the executor
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_callback_group
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Add a node to the executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::add_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Convenience function which takes Node and forwards NodeBaseInterface.
|
||||
/**
|
||||
* \sa rclcpp::StaticSingleThreadedExecutor::add_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
|
||||
|
||||
/// Remove a node from the executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify = true) override;
|
||||
|
||||
/// Convenience function which takes Node and forwards NodeBaseInterface.
|
||||
/**
|
||||
* \sa rclcpp::Executor::remove_node
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_all_callback_groups() override;
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::get_manually_added_callback_groups()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_manually_added_callback_groups() override;
|
||||
|
||||
/// Get callback groups that belong to executor.
|
||||
/**
|
||||
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
get_automatically_added_callback_groups_from_nodes() override;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* @brief Executes ready executables from wait set.
|
||||
* @param collection entities to evaluate for ready executables.
|
||||
* @param wait_result result to check for ready executables.
|
||||
* @param spin_once if true executes only the first ready executable.
|
||||
* @return true if any executable was ready.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
execute_ready_executables(
|
||||
const rclcpp::executors::ExecutorEntitiesCollection & collection,
|
||||
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result,
|
||||
bool spin_once);
|
||||
execute_ready_executables(bool spin_once = false);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
@@ -130,6 +213,8 @@ protected:
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor)
|
||||
|
||||
StaticExecutorEntitiesCollector::SharedPtr entities_collector_;
|
||||
};
|
||||
|
||||
} // namespace executors
|
||||
|
||||
@@ -33,6 +33,7 @@ public:
|
||||
|
||||
virtual void clear() = 0;
|
||||
virtual bool has_data() const = 0;
|
||||
virtual size_t available_capacity() const = 0;
|
||||
};
|
||||
|
||||
} // namespace buffers
|
||||
|
||||
@@ -44,6 +44,7 @@ public:
|
||||
|
||||
virtual bool has_data() const = 0;
|
||||
virtual bool use_take_shared_method() const = 0;
|
||||
virtual size_t available_capacity() const = 0;
|
||||
};
|
||||
|
||||
template<
|
||||
@@ -143,6 +144,11 @@ public:
|
||||
return std::is_same<BufferT, MessageSharedPtr>::value;
|
||||
}
|
||||
|
||||
size_t available_capacity() const override
|
||||
{
|
||||
return buffer_->available_capacity();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
|
||||
|
||||
|
||||
@@ -148,6 +148,18 @@ public:
|
||||
return is_full_();
|
||||
}
|
||||
|
||||
/// Get the remaining capacity to store messages
|
||||
/**
|
||||
* This member function is thread-safe.
|
||||
*
|
||||
* \return the number of free capacity for new messages
|
||||
*/
|
||||
size_t available_capacity() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
return available_capacity_();
|
||||
}
|
||||
|
||||
void clear()
|
||||
{
|
||||
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
|
||||
@@ -189,6 +201,17 @@ private:
|
||||
return size_ == capacity_;
|
||||
}
|
||||
|
||||
/// Get the remaining capacity to store messages
|
||||
/**
|
||||
* This member function is not thread-safe.
|
||||
*
|
||||
* \return the number of free capacity for new messages
|
||||
*/
|
||||
inline size_t available_capacity_() const
|
||||
{
|
||||
return capacity_ - size_;
|
||||
}
|
||||
|
||||
size_t capacity_;
|
||||
|
||||
std::vector<BufferT> ring_buffer_;
|
||||
|
||||
@@ -243,6 +243,11 @@ private:
|
||||
std::function<void(size_t, int)>
|
||||
create_waitable_callback(const rclcpp::Waitable * waitable_id);
|
||||
|
||||
/// Utility to add the notify waitable to an entities collection
|
||||
void
|
||||
add_notify_waitable_to_collection(
|
||||
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);
|
||||
|
||||
/// Searches for the provided entity_id in the collection and returns the entity if valid
|
||||
template<typename CollectionType>
|
||||
typename CollectionType::EntitySharedPtr
|
||||
@@ -269,9 +274,12 @@ private:
|
||||
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
|
||||
|
||||
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
|
||||
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
|
||||
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
|
||||
|
||||
/// Mutex to protect the current_entities_collection_
|
||||
std::recursive_mutex collection_mutex_;
|
||||
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
|
||||
|
||||
/// Flag used to reduce the number of unnecessary waitable events
|
||||
std::atomic<bool> notify_waitable_event_pushed_ {false};
|
||||
|
||||
|
||||
@@ -306,6 +306,11 @@ public:
|
||||
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
|
||||
get_subscription_intra_process(uint64_t intra_process_subscription_id);
|
||||
|
||||
/// Return the lowest available capacity for all subscription buffers for a publisher id.
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;
|
||||
|
||||
private:
|
||||
struct SplittedSubscriptions
|
||||
{
|
||||
|
||||
@@ -62,6 +62,11 @@ public:
|
||||
void
|
||||
add_to_wait_set(rcl_wait_set_t * wait_set) override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
size_t
|
||||
available_capacity() const = 0;
|
||||
|
||||
bool
|
||||
is_ready(rcl_wait_set_t * wait_set) override = 0;
|
||||
|
||||
|
||||
@@ -169,6 +169,11 @@ public:
|
||||
return buffer_->use_take_shared_method();
|
||||
}
|
||||
|
||||
size_t available_capacity() const override
|
||||
{
|
||||
return buffer_->available_capacity();
|
||||
}
|
||||
|
||||
protected:
|
||||
void
|
||||
trigger_guard_condition() override
|
||||
|
||||
@@ -23,6 +23,9 @@
|
||||
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
#include "rcl_interfaces/srv/get_logger_levels.hpp"
|
||||
#include "rcl_interfaces/srv/set_logger_levels.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
namespace node_interfaces
|
||||
@@ -35,7 +38,7 @@ public:
|
||||
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeLoggingInterface)
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
explicit NodeLogging(rclcpp::node_interfaces::NodeBaseInterface * node_base);
|
||||
explicit NodeLogging(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
@@ -49,13 +52,21 @@ public:
|
||||
const char *
|
||||
get_logger_name() const override;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
create_logger_services(
|
||||
node_interfaces::NodeServicesInterface::SharedPtr node_services) override;
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(NodeLogging)
|
||||
|
||||
/// Handle to the NodeBaseInterface given in the constructor.
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base_;
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
|
||||
|
||||
rclcpp::Logger logger_;
|
||||
|
||||
rclcpp::Service<rcl_interfaces::srv::GetLoggerLevels>::SharedPtr get_loggers_service_;
|
||||
rclcpp::Service<rcl_interfaces::srv::SetLoggerLevels>::SharedPtr set_loggers_service_;
|
||||
};
|
||||
|
||||
} // namespace node_interfaces
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#include "rclcpp/logger.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/node_interfaces/node_services_interface.hpp"
|
||||
#include "rclcpp/node_interfaces/detail/node_interfaces_helpers.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
@@ -54,6 +55,13 @@ public:
|
||||
virtual
|
||||
const char *
|
||||
get_logger_name() const = 0;
|
||||
|
||||
/// create logger services
|
||||
RCLCPP_PUBLIC
|
||||
virtual
|
||||
void
|
||||
create_logger_services(
|
||||
node_interfaces::NodeServicesInterface::SharedPtr node_services) = 0;
|
||||
};
|
||||
|
||||
} // namespace node_interfaces
|
||||
|
||||
@@ -50,6 +50,7 @@ public:
|
||||
* - clock_type = RCL_ROS_TIME
|
||||
* - clock_qos = rclcpp::ClockQoS()
|
||||
* - use_clock_thread = true
|
||||
* - enable_logger_service = false
|
||||
* - rosout_qos = rclcpp::RosoutQoS()
|
||||
* - parameter_event_qos = rclcpp::ParameterEventQoS
|
||||
* - with history setting and depth from rmw_qos_profile_parameter_events
|
||||
@@ -232,6 +233,24 @@ public:
|
||||
NodeOptions &
|
||||
start_parameter_services(bool start_parameter_services);
|
||||
|
||||
/// Return the enable_logger_service flag.
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
enable_logger_service() const;
|
||||
|
||||
/// Set the enable_logger_service flag, return this for logger idiom.
|
||||
/**
|
||||
* If true, ROS services are created to allow external nodes to get
|
||||
* and set logger levels of this node.
|
||||
*
|
||||
* If false, loggers will still be configured and set logger levels locally,
|
||||
* but logger levels cannot be changed remotely .
|
||||
*
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
NodeOptions &
|
||||
enable_logger_service(bool enable_log_service);
|
||||
|
||||
/// Return the start_parameter_event_publisher flag.
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
@@ -421,6 +440,8 @@ private:
|
||||
|
||||
bool use_clock_thread_ {true};
|
||||
|
||||
bool enable_logger_service_ {false};
|
||||
|
||||
rclcpp::QoS parameter_event_qos_ = rclcpp::ParameterEventsQoS(
|
||||
rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_parameter_events)
|
||||
);
|
||||
|
||||
@@ -215,6 +215,17 @@ public:
|
||||
std::vector<rclcpp::NetworkFlowEndpoint>
|
||||
get_network_flow_endpoints() const;
|
||||
|
||||
/// Return the lowest available capacity for all subscription buffers.
|
||||
/**
|
||||
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
|
||||
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
|
||||
* On failure return 0.
|
||||
* \return lowest buffer capacity for all subscriptions
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
size_t
|
||||
lowest_available_ipm_capacity() const;
|
||||
|
||||
/// Wait until all published messages are acknowledged or until the specified timeout elapses.
|
||||
/**
|
||||
* This method waits until all published messages are acknowledged by all matching
|
||||
|
||||
@@ -265,15 +265,19 @@ protected:
|
||||
|
||||
std::shared_ptr<rcl_node_t> node_handle_;
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
// It is important to declare on_new_request_callback_ before
|
||||
// service_handle_, so on destruction the service is
|
||||
// destroyed first. Otherwise, the rmw service callback
|
||||
// would point briefly to a destroyed function.
|
||||
std::function<void(size_t)> on_new_request_callback_{nullptr};
|
||||
// Declare service_handle_ after callback
|
||||
std::shared_ptr<rcl_service_t> service_handle_;
|
||||
bool owns_rcl_handle_ = true;
|
||||
|
||||
rclcpp::Logger node_logger_;
|
||||
|
||||
std::atomic<bool> in_use_by_wait_set_{false};
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
std::function<void(size_t)> on_new_request_callback_{nullptr};
|
||||
};
|
||||
|
||||
template<typename ServiceT>
|
||||
|
||||
@@ -260,13 +260,13 @@ public:
|
||||
bool
|
||||
is_serialized() const;
|
||||
|
||||
/// Return the type of the subscription.
|
||||
/// Return the delivered message kind.
|
||||
/**
|
||||
* \return `DeliveredMessageKind`, which adjusts how messages are received and delivered.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
DeliveredMessageKind
|
||||
get_subscription_type() const;
|
||||
get_delivered_message_kind() const;
|
||||
|
||||
/// Get matching publisher count.
|
||||
/** \return The number of publishers on this topic. */
|
||||
@@ -645,6 +645,14 @@ protected:
|
||||
rclcpp::node_interfaces::NodeBaseInterface * const node_base_;
|
||||
|
||||
std::shared_ptr<rcl_node_t> node_handle_;
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
// It is important to declare on_new_message_callback_ before
|
||||
// subscription_handle_, so on destruction the subscription is
|
||||
// destroyed first. Otherwise, the rmw subscription callback
|
||||
// would point briefly to a destroyed function.
|
||||
std::function<void(size_t)> on_new_message_callback_{nullptr};
|
||||
// Declare subscription_handle_ after callback
|
||||
std::shared_ptr<rcl_subscription_t> subscription_handle_;
|
||||
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
|
||||
rclcpp::Logger node_logger_;
|
||||
@@ -663,15 +671,12 @@ private:
|
||||
RCLCPP_DISABLE_COPY(SubscriptionBase)
|
||||
|
||||
rosidl_message_type_support_t type_support_;
|
||||
DeliveredMessageKind delivered_message_type_;
|
||||
DeliveredMessageKind delivered_message_kind_;
|
||||
|
||||
std::atomic<bool> subscription_in_use_by_wait_set_{false};
|
||||
std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
|
||||
std::unordered_map<rclcpp::EventHandlerBase *,
|
||||
std::atomic<bool>> qos_events_in_use_by_wait_set_;
|
||||
|
||||
std::recursive_mutex callback_mutex_;
|
||||
std::function<void(size_t)> on_new_message_callback_{nullptr};
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
@@ -104,7 +104,7 @@ protected:
|
||||
// TODO(wjwwood): support custom allocator, maybe restrict to polymorphic allocator
|
||||
rcl_get_default_allocator());
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to create wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
|
||||
// (Re)build the wait set for the first time.
|
||||
@@ -192,7 +192,8 @@ protected:
|
||||
size_t services_from_waitables = 0;
|
||||
size_t events_from_waitables = 0;
|
||||
for (const auto & waitable_entry : waitables) {
|
||||
if (!waitable_entry.waitable) {
|
||||
auto waitable_ptr_pair = get_raw_pointer_from_smart_pointer(waitable_entry.waitable);
|
||||
if (nullptr == waitable_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -203,13 +204,13 @@ protected:
|
||||
needs_pruning_ = true;
|
||||
continue;
|
||||
}
|
||||
const auto & waitable = waitable_entry.waitable;
|
||||
subscriptions_from_waitables += waitable->get_number_of_ready_subscriptions();
|
||||
guard_conditions_from_waitables += waitable->get_number_of_ready_guard_conditions();
|
||||
timers_from_waitables += waitable->get_number_of_ready_timers();
|
||||
clients_from_waitables += waitable->get_number_of_ready_clients();
|
||||
services_from_waitables += waitable->get_number_of_ready_services();
|
||||
events_from_waitables += waitable->get_number_of_ready_events();
|
||||
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
|
||||
subscriptions_from_waitables += waitable.get_number_of_ready_subscriptions();
|
||||
guard_conditions_from_waitables += waitable.get_number_of_ready_guard_conditions();
|
||||
timers_from_waitables += waitable.get_number_of_ready_timers();
|
||||
clients_from_waitables += waitable.get_number_of_ready_clients();
|
||||
services_from_waitables += waitable.get_number_of_ready_services();
|
||||
events_from_waitables += waitable.get_number_of_ready_events();
|
||||
}
|
||||
rcl_ret_t ret = rcl_wait_set_resize(
|
||||
&rcl_wait_set_,
|
||||
@@ -221,7 +222,7 @@ protected:
|
||||
events_from_waitables
|
||||
);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
was_resized = true;
|
||||
// Assumption: the calling code ensures this function is not called
|
||||
@@ -237,13 +238,15 @@ protected:
|
||||
if (!was_resized) {
|
||||
rcl_ret_t ret = rcl_wait_set_clear(&rcl_wait_set_);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear the wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
// Add subscriptions.
|
||||
for (const auto & subscription_entry : subscriptions) {
|
||||
if (!subscription_entry.subscription) {
|
||||
auto subscription_ptr_pair =
|
||||
get_raw_pointer_from_smart_pointer(subscription_entry.subscription);
|
||||
if (nullptr == subscription_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -254,13 +257,12 @@ protected:
|
||||
needs_pruning_ = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
rcl_ret_t ret = rcl_wait_set_add_subscription(
|
||||
&rcl_wait_set_,
|
||||
subscription_entry.subscription->get_subscription_handle().get(),
|
||||
subscription_ptr_pair.second->get_subscription_handle().get(),
|
||||
nullptr);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,7 +271,8 @@ protected:
|
||||
[this](const auto & inner_guard_conditions)
|
||||
{
|
||||
for (const auto & guard_condition : inner_guard_conditions) {
|
||||
if (!guard_condition) {
|
||||
auto guard_condition_ptr_pair = get_raw_pointer_from_smart_pointer(guard_condition);
|
||||
if (nullptr == guard_condition_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -282,10 +285,10 @@ protected:
|
||||
}
|
||||
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
|
||||
&rcl_wait_set_,
|
||||
&guard_condition->get_rcl_guard_condition(),
|
||||
&guard_condition_ptr_pair.second->get_rcl_guard_condition(),
|
||||
nullptr);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -298,7 +301,8 @@ protected:
|
||||
|
||||
// Add timers.
|
||||
for (const auto & timer : timers) {
|
||||
if (!timer) {
|
||||
auto timer_ptr_pair = get_raw_pointer_from_smart_pointer(timer);
|
||||
if (nullptr == timer_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -311,16 +315,17 @@ protected:
|
||||
}
|
||||
rcl_ret_t ret = rcl_wait_set_add_timer(
|
||||
&rcl_wait_set_,
|
||||
timer->get_timer_handle().get(),
|
||||
timer_ptr_pair.second->get_timer_handle().get(),
|
||||
nullptr);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
// Add clients.
|
||||
for (const auto & client : clients) {
|
||||
if (!client) {
|
||||
auto client_ptr_pair = get_raw_pointer_from_smart_pointer(client);
|
||||
if (nullptr == client_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -333,17 +338,17 @@ protected:
|
||||
}
|
||||
rcl_ret_t ret = rcl_wait_set_add_client(
|
||||
&rcl_wait_set_,
|
||||
client->get_client_handle().get(),
|
||||
client_ptr_pair.second->get_client_handle().get(),
|
||||
nullptr);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Add services.
|
||||
for (const auto & service : services) {
|
||||
if (!service) {
|
||||
auto service_ptr_pair = get_raw_pointer_from_smart_pointer(service);
|
||||
if (nullptr == service_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -356,16 +361,17 @@ protected:
|
||||
}
|
||||
rcl_ret_t ret = rcl_wait_set_add_service(
|
||||
&rcl_wait_set_,
|
||||
service->get_service_handle().get(),
|
||||
service_ptr_pair.second->get_service_handle().get(),
|
||||
nullptr);
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
// Add waitables.
|
||||
for (auto & waitable_entry : waitables) {
|
||||
if (!waitable_entry.waitable) {
|
||||
auto waitable_ptr_pair = get_raw_pointer_from_smart_pointer(waitable_entry.waitable);
|
||||
if (nullptr == waitable_ptr_pair.second) {
|
||||
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
|
||||
if (HasStrongOwnership) {
|
||||
// This will not happen in fixed sized storage, as it holds
|
||||
@@ -376,7 +382,8 @@ protected:
|
||||
needs_pruning_ = true;
|
||||
continue;
|
||||
}
|
||||
waitable_entry.waitable->add_to_wait_set(&rcl_wait_set_);
|
||||
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
|
||||
waitable.add_to_wait_set(&rcl_wait_set_);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -204,19 +204,15 @@ public:
|
||||
void
|
||||
storage_rebuild_rcl_wait_set(const ArrayOfExtraGuardConditions & extra_guard_conditions)
|
||||
{
|
||||
this->storage_acquire_ownerships();
|
||||
|
||||
this->storage_rebuild_rcl_wait_set_with_sets(
|
||||
shared_subscriptions_,
|
||||
shared_guard_conditions_,
|
||||
subscriptions_,
|
||||
guard_conditions_,
|
||||
extra_guard_conditions,
|
||||
shared_timers_,
|
||||
shared_clients_,
|
||||
shared_services_,
|
||||
shared_waitables_
|
||||
timers_,
|
||||
clients_,
|
||||
services_,
|
||||
waitables_
|
||||
);
|
||||
|
||||
this->storage_release_ownerships();
|
||||
}
|
||||
|
||||
template<class EntityT, class SequenceOfEntitiesT>
|
||||
@@ -411,7 +407,6 @@ public:
|
||||
}
|
||||
};
|
||||
// Lock all the weak pointers and hold them until released.
|
||||
lock_all(subscriptions_, shared_subscriptions_);
|
||||
lock_all(guard_conditions_, shared_guard_conditions_);
|
||||
lock_all(timers_, shared_timers_);
|
||||
lock_all(clients_, shared_clients_);
|
||||
@@ -443,7 +438,6 @@ public:
|
||||
shared_ptr.reset();
|
||||
}
|
||||
};
|
||||
reset_all(shared_subscriptions_);
|
||||
reset_all(shared_guard_conditions_);
|
||||
reset_all(shared_timers_);
|
||||
reset_all(shared_clients_);
|
||||
|
||||
@@ -290,7 +290,7 @@ protected:
|
||||
return create_wait_result(WaitResultKind::Empty);
|
||||
} else {
|
||||
// Some other error case, throw.
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "rcl_wait() failed");
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
} while (should_loop());
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp</name>
|
||||
<version>20.0.0</version>
|
||||
<version>21.1.1</version>
|
||||
<description>The ROS client library in C++.</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -16,16 +16,13 @@
|
||||
|
||||
using rclcpp::AnyExecutable;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
AnyExecutable::AnyExecutable()
|
||||
: subscription(nullptr),
|
||||
timer(nullptr),
|
||||
service(nullptr),
|
||||
client(nullptr),
|
||||
waitable(nullptr),
|
||||
callback_group(nullptr),
|
||||
node_base(nullptr),
|
||||
data(nullptr)
|
||||
node_base(nullptr)
|
||||
{}
|
||||
|
||||
AnyExecutable::~AnyExecutable()
|
||||
|
||||
@@ -66,7 +66,6 @@ CallbackGroup::size() const
|
||||
timer_ptrs_.size() +
|
||||
waitable_ptrs_.size();
|
||||
}
|
||||
|
||||
void CallbackGroup::collect_all_ptrs(
|
||||
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
|
||||
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
|
||||
|
||||
@@ -13,8 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <map>
|
||||
#include <string>
|
||||
@@ -24,14 +22,13 @@
|
||||
|
||||
#include "rcl/allocator.h"
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rclcpp/executors/executor_notify_waitable.hpp"
|
||||
#include "rclcpp/subscription_wait_set_mask.hpp"
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
|
||||
#include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/executor.hpp"
|
||||
#include "rclcpp/guard_condition.hpp"
|
||||
#include "rclcpp/memory_strategy.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rclcpp/utilities.hpp"
|
||||
|
||||
@@ -41,24 +38,16 @@
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using rclcpp::exceptions::throw_from_rcl_error;
|
||||
using rclcpp::Executor;
|
||||
class rclcpp::ExecutorImplementation {};
|
||||
|
||||
/// Mask to indicate to the waitset to only add the subscription.
|
||||
/// The events and intraprocess waitable are already added via the callback group.
|
||||
static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false};
|
||||
class rclcpp::ExecutorImplementation {};
|
||||
|
||||
Executor::Executor(const rclcpp::ExecutorOptions & options)
|
||||
: spinning(false),
|
||||
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
|
||||
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
|
||||
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
|
||||
[this]() {
|
||||
this->entities_need_rebuild_.store(true);
|
||||
})),
|
||||
collector_(notify_waitable_),
|
||||
wait_set_({}, {}, {}, {}, {}, {}, options.context),
|
||||
current_notify_waitable_(notify_waitable_),
|
||||
memory_strategy_(options.memory_strategy),
|
||||
impl_(std::make_unique<rclcpp::ExecutorImplementation>())
|
||||
{
|
||||
// Store the context for later use.
|
||||
@@ -72,56 +61,74 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
|
||||
}
|
||||
});
|
||||
|
||||
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
|
||||
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
|
||||
// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
|
||||
// and one for the executor's guard cond (interrupt_guard_condition_)
|
||||
memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get());
|
||||
|
||||
// Put the executor's guard condition in
|
||||
memory_strategy_->add_guard_condition(*interrupt_guard_condition_.get());
|
||||
rcl_allocator_t allocator = memory_strategy_->get_allocator();
|
||||
|
||||
rcl_ret_t ret = rcl_wait_set_init(
|
||||
&wait_set_,
|
||||
0, 2, 0, 0, 0, 0,
|
||||
context_->get_rcl_context().get(),
|
||||
allocator);
|
||||
if (RCL_RET_OK != ret) {
|
||||
RCUTILS_LOG_ERROR_NAMED(
|
||||
"rclcpp",
|
||||
"failed to create wait set: %s", rcl_get_error_string().str);
|
||||
rcl_reset_error();
|
||||
throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor");
|
||||
}
|
||||
}
|
||||
|
||||
Executor::~Executor()
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
|
||||
notify_waitable_->remove_guard_condition(interrupt_guard_condition_);
|
||||
notify_waitable_->remove_guard_condition(shutdown_guard_condition_);
|
||||
|
||||
std::cout << "Executor::~Executor" << std::endl;
|
||||
|
||||
current_collection_.timers.update(
|
||||
{}, {},
|
||||
[this](auto timer) {
|
||||
std::cout << "remove_timer(" << timer.get() << ")" << std::endl;
|
||||
wait_set_.remove_timer(timer);
|
||||
// Disassociate all callback groups.
|
||||
for (auto & pair : weak_groups_to_nodes_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
}
|
||||
// Disassociate all nodes.
|
||||
std::for_each(
|
||||
weak_nodes_.begin(), weak_nodes_.end(), []
|
||||
(rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) {
|
||||
auto shared_node_ptr = weak_node_ptr.lock();
|
||||
if (shared_node_ptr) {
|
||||
std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
});
|
||||
weak_nodes_.clear();
|
||||
weak_groups_associated_with_executor_to_nodes_.clear();
|
||||
weak_groups_to_nodes_associated_with_executor_.clear();
|
||||
weak_groups_to_nodes_.clear();
|
||||
for (const auto & pair : weak_groups_to_guard_conditions_) {
|
||||
auto guard_condition = pair.second;
|
||||
memory_strategy_->remove_guard_condition(guard_condition);
|
||||
}
|
||||
weak_groups_to_guard_conditions_.clear();
|
||||
|
||||
current_collection_.subscriptions.update(
|
||||
{}, {},
|
||||
[this](auto subscription) {
|
||||
std::cout << "remove_subscription(" << subscription.get() << ")" << std::endl;
|
||||
wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
|
||||
});
|
||||
for (const auto & pair : weak_nodes_to_guard_conditions_) {
|
||||
auto guard_condition = pair.second;
|
||||
memory_strategy_->remove_guard_condition(guard_condition);
|
||||
}
|
||||
weak_nodes_to_guard_conditions_.clear();
|
||||
|
||||
current_collection_.clients.update(
|
||||
{}, {},
|
||||
[this](auto client) {
|
||||
std::cout << "remove_client(" << client.get() << ")" << std::endl;
|
||||
wait_set_.remove_client(client);});
|
||||
|
||||
current_collection_.services.update(
|
||||
{}, {},
|
||||
[this](auto service) {
|
||||
std::cout << "remove_service(" << service.get() << ")" << std::endl;
|
||||
wait_set_.remove_service(service);});
|
||||
|
||||
current_collection_.guard_conditions.update(
|
||||
{}, {},
|
||||
[this](auto guard_condition) {
|
||||
std::cout << "remove_guard_condition(" << guard_condition.get() << ")" << std::endl;
|
||||
wait_set_.remove_guard_condition(guard_condition);});
|
||||
|
||||
current_collection_.waitables.update(
|
||||
{}, {},
|
||||
[this](auto waitable) {
|
||||
std::cout << "remove_waitable(" << waitable.get() << ")" << std::endl;
|
||||
wait_set_.remove_waitable(waitable);});
|
||||
// Finalize the wait set.
|
||||
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
|
||||
RCUTILS_LOG_ERROR_NAMED(
|
||||
"rclcpp",
|
||||
"failed to destroy wait set: %s", rcl_get_error_string().str);
|
||||
rcl_reset_error();
|
||||
}
|
||||
// Remove and release the sigint guard condition
|
||||
memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get());
|
||||
memory_strategy_->remove_guard_condition(interrupt_guard_condition_.get());
|
||||
|
||||
// Remove shutdown callback handle registered to Context
|
||||
if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
|
||||
@@ -135,39 +142,95 @@ Executor::~Executor()
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
Executor::get_all_callback_groups()
|
||||
{
|
||||
this->collector_.update_collections();
|
||||
return this->collector_.get_all_callback_groups();
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
Executor::get_manually_added_callback_groups()
|
||||
{
|
||||
this->collector_.update_collections();
|
||||
return this->collector_.get_manually_added_callback_groups();
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
Executor::get_automatically_added_callback_groups_from_nodes()
|
||||
{
|
||||
this->collector_.update_collections();
|
||||
return this->collector_.get_automatically_added_callback_groups();
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
void
|
||||
Executor::add_callback_group(
|
||||
Executor::add_callback_groups_from_nodes_associated_to_executor()
|
||||
{
|
||||
for (auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (node) {
|
||||
node->for_each_callback_group(
|
||||
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
|
||||
{
|
||||
if (
|
||||
shared_group_ptr->automatically_add_to_executor_with_node() &&
|
||||
!shared_group_ptr->get_associated_with_executor_atomic().load())
|
||||
{
|
||||
this->add_callback_group_to_map(
|
||||
shared_group_ptr,
|
||||
node,
|
||||
weak_groups_to_nodes_associated_with_executor_,
|
||||
true);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Executor::add_callback_group_to_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
|
||||
bool notify)
|
||||
{
|
||||
(void) node_ptr;
|
||||
this->collector_.add_callback_group(group_ptr);
|
||||
// If the callback_group already has an executor
|
||||
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Callback group has already been added to an executor.");
|
||||
}
|
||||
|
||||
if (!spinning.load()) {
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
this->collect_entities();
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto insert_info =
|
||||
weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr));
|
||||
bool was_inserted = insert_info.second;
|
||||
if (!was_inserted) {
|
||||
throw std::runtime_error("Callback group was already added to executor.");
|
||||
}
|
||||
// Also add to the map that contains all callback groups
|
||||
weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
|
||||
|
||||
if (node_ptr->get_context()->is_valid()) {
|
||||
auto callback_group_guard_condition = group_ptr->get_notify_guard_condition();
|
||||
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
|
||||
// Add the callback_group's notify condition to the guard condition handles
|
||||
memory_strategy_->add_guard_condition(*callback_group_guard_condition);
|
||||
}
|
||||
|
||||
if (notify) {
|
||||
// Interrupt waiting to handle new node
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
@@ -178,23 +241,91 @@ Executor::add_callback_group(
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Executor::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
this->add_callback_group_to_map(
|
||||
group_ptr,
|
||||
node_ptr,
|
||||
weak_groups_associated_with_executor_to_nodes_,
|
||||
notify);
|
||||
}
|
||||
|
||||
void
|
||||
Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
this->collector_.add_node(node_ptr);
|
||||
|
||||
if (!spinning.load()) {
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
this->collect_entities();
|
||||
// If the node already has an executor
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error(
|
||||
std::string("Node '") + node_ptr->get_fully_qualified_name() +
|
||||
"' has already been added to an executor.");
|
||||
}
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
node_ptr->for_each_callback_group(
|
||||
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
|
||||
{
|
||||
if (!group_ptr->get_associated_with_executor_atomic().load() &&
|
||||
group_ptr->automatically_add_to_executor_with_node())
|
||||
{
|
||||
this->add_callback_group_to_map(
|
||||
group_ptr,
|
||||
node_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_,
|
||||
notify);
|
||||
}
|
||||
});
|
||||
|
||||
if (notify) {
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition on node add: ") + ex.what());
|
||||
const auto gc = node_ptr->get_shared_notify_guard_condition();
|
||||
weak_nodes_to_guard_conditions_[node_ptr] = gc.get();
|
||||
// Add the node's notify condition to the guard condition handles
|
||||
memory_strategy_->add_guard_condition(*gc);
|
||||
weak_nodes_.push_back(node_ptr);
|
||||
}
|
||||
|
||||
void
|
||||
Executor::remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
|
||||
bool notify)
|
||||
{
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
|
||||
if (iter != weak_groups_to_nodes.end()) {
|
||||
node_ptr = iter->second.lock();
|
||||
if (node_ptr == nullptr) {
|
||||
throw std::runtime_error("Node must not be deleted before its callback group(s).");
|
||||
}
|
||||
weak_groups_to_nodes.erase(iter);
|
||||
weak_groups_to_nodes_.erase(group_ptr);
|
||||
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
} else {
|
||||
throw std::runtime_error("Callback group needs to be associated with executor.");
|
||||
}
|
||||
// If the node was matched and removed, interrupt waiting.
|
||||
if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
|
||||
!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_))
|
||||
{
|
||||
auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
|
||||
if (iter != weak_groups_to_guard_conditions_.end()) {
|
||||
memory_strategy_->remove_guard_condition(iter->second);
|
||||
}
|
||||
weak_groups_to_guard_conditions_.erase(weak_group_ptr);
|
||||
|
||||
if (notify) {
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition on callback group remove: ") + ex.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,21 +335,11 @@ Executor::remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
bool notify)
|
||||
{
|
||||
this->collector_.remove_callback_group(group_ptr);
|
||||
|
||||
if (!spinning.load()) {
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
this->collect_entities();
|
||||
}
|
||||
if (notify) {
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition on callback group remove: ") + ex.what());
|
||||
}
|
||||
}
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
this->remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_associated_with_executor_to_nodes_,
|
||||
notify);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -230,22 +351,48 @@ Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
|
||||
void
|
||||
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
this->collector_.remove_node(node_ptr);
|
||||
|
||||
if (!spinning.load()) {
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
this->collect_entities();
|
||||
if (!node_ptr->get_associated_with_executor_atomic().load()) {
|
||||
throw std::runtime_error("Node needs to be associated with an executor.");
|
||||
}
|
||||
|
||||
if (notify) {
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition on node remove: ") + ex.what());
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
bool found_node = false;
|
||||
auto node_it = weak_nodes_.begin();
|
||||
while (node_it != weak_nodes_.end()) {
|
||||
bool matched = (node_it->lock() == node_ptr);
|
||||
if (matched) {
|
||||
found_node = true;
|
||||
node_it = weak_nodes_.erase(node_it);
|
||||
} else {
|
||||
++node_it;
|
||||
}
|
||||
}
|
||||
if (!found_node) {
|
||||
throw std::runtime_error("Node needs to be associated with this executor.");
|
||||
}
|
||||
|
||||
for (auto it = weak_groups_to_nodes_associated_with_executor_.begin();
|
||||
it != weak_groups_to_nodes_associated_with_executor_.end(); )
|
||||
{
|
||||
auto weak_node_ptr = it->second;
|
||||
auto shared_node_ptr = weak_node_ptr.lock();
|
||||
auto group_ptr = it->first.lock();
|
||||
|
||||
// Increment iterator before removing in case it's invalidated
|
||||
it++;
|
||||
if (shared_node_ptr == node_ptr) {
|
||||
remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_,
|
||||
notify);
|
||||
}
|
||||
}
|
||||
|
||||
memory_strategy_->remove_guard_condition(node_ptr->get_shared_notify_guard_condition().get());
|
||||
weak_nodes_to_guard_conditions_.erase(node_ptr);
|
||||
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -312,44 +459,21 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
|
||||
throw std::runtime_error("spin_some() called while already spinning");
|
||||
}
|
||||
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
|
||||
|
||||
size_t work_in_queue = 0;
|
||||
bool has_waited = false;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
work_in_queue = ready_executables_.size();
|
||||
}
|
||||
// The logic below is to guarantee that we:
|
||||
// a) run all of the work in the queue before we spin the first time
|
||||
// b) spin at least once
|
||||
// c) run all of the work in the queue after we spin
|
||||
|
||||
bool work_available = false;
|
||||
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
|
||||
AnyExecutable any_exec;
|
||||
if (work_in_queue > 0) {
|
||||
// If there is work in the queue, then execute it
|
||||
// This covers the case that there are things left in the queue from a
|
||||
// previous spin.
|
||||
if (get_next_ready_executable(any_exec)) {
|
||||
execute_any_executable(any_exec);
|
||||
}
|
||||
} else if (!has_waited && !work_in_queue) {
|
||||
// Once the ready queue is empty, then we need to wait at least once.
|
||||
wait_for_work(std::chrono::milliseconds(0));
|
||||
has_waited = true;
|
||||
} else if (has_waited && !work_in_queue) {
|
||||
// Once we have emptied the ready queue, but have already waited:
|
||||
if (!exhaustive) {
|
||||
// In the case of spin some, then we can exit
|
||||
break;
|
||||
} else {
|
||||
// In the case of spin all, then we will allow ourselves to wait again.
|
||||
has_waited = false;
|
||||
}
|
||||
if (!work_available) {
|
||||
wait_for_work(std::chrono::milliseconds::zero());
|
||||
}
|
||||
if (get_next_ready_executable(any_exec)) {
|
||||
execute_any_executable(any_exec);
|
||||
work_available = true;
|
||||
} else {
|
||||
if (!work_available || !exhaustive) {
|
||||
break;
|
||||
}
|
||||
work_available = false;
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
work_in_queue = ready_executables_.size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -384,13 +508,22 @@ Executor::cancel()
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
|
||||
{
|
||||
if (memory_strategy == nullptr) {
|
||||
throw std::runtime_error("Received NULL memory strategy in executor.");
|
||||
}
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
memory_strategy_ = memory_strategy;
|
||||
}
|
||||
|
||||
void
|
||||
Executor::execute_any_executable(AnyExecutable & any_exec)
|
||||
{
|
||||
if (!spinning.load()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (any_exec.timer) {
|
||||
TRACEPOINT(
|
||||
rclcpp_executor_execute,
|
||||
@@ -412,10 +545,16 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
|
||||
if (any_exec.waitable) {
|
||||
any_exec.waitable->execute(any_exec.data);
|
||||
}
|
||||
|
||||
// Reset the callback_group, regardless of type
|
||||
if (any_exec.callback_group) {
|
||||
any_exec.callback_group->can_be_taken_from().store(true);
|
||||
any_exec.callback_group->can_be_taken_from().store(true);
|
||||
// Wake the wait, because it may need to be recalculated or work that
|
||||
// was previously blocked is now available.
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -464,7 +603,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
|
||||
rclcpp::MessageInfo message_info;
|
||||
message_info.get_rmw_message_info().from_intra_process = false;
|
||||
|
||||
switch (subscription->get_subscription_type()) {
|
||||
switch (subscription->get_delivered_message_kind()) {
|
||||
// Deliver ROS message
|
||||
case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
|
||||
{
|
||||
@@ -584,117 +723,228 @@ Executor::execute_client(
|
||||
[&]() {client->handle_response(request_header, response);});
|
||||
}
|
||||
|
||||
void
|
||||
Executor::collect_entities()
|
||||
{
|
||||
// Get the current list of available waitables from the collector.
|
||||
rclcpp::executors::ExecutorEntitiesCollection collection;
|
||||
this->collector_.update_collections();
|
||||
auto callback_groups = this->collector_.get_all_callback_groups();
|
||||
rclcpp::executors::build_entities_collection(callback_groups, collection);
|
||||
|
||||
// Make a copy of notify waitable so we can continue to mutate the original
|
||||
// one outside of the execute loop.
|
||||
// This prevents the collection of guard conditions in the waitable from changing
|
||||
// while we are waiting on it.
|
||||
if (notify_waitable_) {
|
||||
current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
|
||||
*notify_waitable_);
|
||||
auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
|
||||
collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
|
||||
}
|
||||
|
||||
std::cout << "current_collection.timers: " << current_collection_.timers.size() << std::endl;
|
||||
std::cout << "next_collection.timers: " << collection.timers.size() << std::endl;
|
||||
// Update each of the groups of entities in the current collection, adding or removing
|
||||
// from the wait set as necessary.
|
||||
current_collection_.timers.update(
|
||||
collection.timers,
|
||||
[this](auto timer) {
|
||||
std::cout << "add_timer(" << timer << ")" << std::endl;
|
||||
wait_set_.add_timer(timer);},
|
||||
[this](auto timer) {
|
||||
std::cout << "remove_timer(" << timer << ")" << std::endl;
|
||||
wait_set_.remove_timer(timer);});
|
||||
|
||||
std::cout << "current_collection.subscriptions: " << current_collection_.subscriptions.size() << std::endl;
|
||||
std::cout << "next_collection.subscriptions: " << collection.subscriptions.size() << std::endl;
|
||||
current_collection_.subscriptions.update(
|
||||
collection.subscriptions,
|
||||
[this](auto subscription) {
|
||||
std::cout << "add_subscription(" << subscription << ")" << std::endl;
|
||||
wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
|
||||
},
|
||||
[this](auto subscription) {
|
||||
std::cout << "remove_subscription(" << subscription << ")" << std::endl;
|
||||
wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
|
||||
});
|
||||
|
||||
current_collection_.clients.update(
|
||||
collection.clients,
|
||||
[this](auto client) {wait_set_.add_client(client);},
|
||||
[this](auto client) {wait_set_.remove_client(client);});
|
||||
|
||||
current_collection_.services.update(
|
||||
collection.services,
|
||||
[this](auto service) {wait_set_.add_service(service);},
|
||||
[this](auto service) {wait_set_.remove_service(service);});
|
||||
|
||||
current_collection_.guard_conditions.update(
|
||||
collection.guard_conditions,
|
||||
[this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
|
||||
[this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
|
||||
|
||||
current_collection_.waitables.update(
|
||||
collection.waitables,
|
||||
[this](auto waitable) {
|
||||
wait_set_.add_waitable(waitable);
|
||||
},
|
||||
[this](auto waitable) {wait_set_.remove_waitable(waitable);});
|
||||
|
||||
this->entities_need_rebuild_.store(false);
|
||||
}
|
||||
|
||||
void
|
||||
Executor::wait_for_work(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
|
||||
this->collect_entities();
|
||||
// Check weak_nodes_ to find any callback group that is not owned
|
||||
// by an executor and add it to the list of callbackgroups for
|
||||
// collect entities. Also exchange to false so it is not
|
||||
// allowed to add to another executor
|
||||
add_callback_groups_from_nodes_associated_to_executor();
|
||||
|
||||
// Collect the subscriptions and timers to be waited on
|
||||
memory_strategy_->clear_handles();
|
||||
bool has_invalid_weak_groups_or_nodes =
|
||||
memory_strategy_->collect_entities(weak_groups_to_nodes_);
|
||||
|
||||
if (has_invalid_weak_groups_or_nodes) {
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
|
||||
for (auto pair : weak_groups_to_nodes_) {
|
||||
auto weak_group_ptr = pair.first;
|
||||
auto weak_node_ptr = pair.second;
|
||||
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
|
||||
invalid_group_ptrs.push_back(weak_group_ptr);
|
||||
auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr);
|
||||
if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) {
|
||||
auto guard_condition = node_guard_pair->second;
|
||||
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
|
||||
memory_strategy_->remove_guard_condition(guard_condition);
|
||||
}
|
||||
}
|
||||
}
|
||||
std::for_each(
|
||||
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
|
||||
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
|
||||
if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) !=
|
||||
weak_groups_to_nodes_associated_with_executor_.end())
|
||||
{
|
||||
weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
|
||||
}
|
||||
if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) !=
|
||||
weak_groups_associated_with_executor_to_nodes_.end())
|
||||
{
|
||||
weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
|
||||
}
|
||||
auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr);
|
||||
if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
|
||||
auto guard_condition = callback_guard_pair->second;
|
||||
weak_groups_to_guard_conditions_.erase(group_ptr);
|
||||
memory_strategy_->remove_guard_condition(guard_condition);
|
||||
}
|
||||
weak_groups_to_nodes_.erase(group_ptr);
|
||||
});
|
||||
}
|
||||
|
||||
// clear wait set
|
||||
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
|
||||
if (ret != RCL_RET_OK) {
|
||||
throw_from_rcl_error(ret, "Couldn't clear wait set");
|
||||
}
|
||||
|
||||
// The size of waitables are accounted for in size of the other entities
|
||||
ret = rcl_wait_set_resize(
|
||||
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
|
||||
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
|
||||
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
|
||||
memory_strategy_->number_of_ready_events());
|
||||
if (RCL_RET_OK != ret) {
|
||||
throw_from_rcl_error(ret, "Couldn't resize the wait set");
|
||||
}
|
||||
|
||||
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
|
||||
throw std::runtime_error("Couldn't fill wait set");
|
||||
}
|
||||
}
|
||||
|
||||
auto wait_result = wait_set_.wait(timeout);
|
||||
if (wait_result.kind() == WaitResultKind::Empty) {
|
||||
rcl_ret_t status =
|
||||
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
|
||||
if (status == RCL_RET_WAIT_SET_EMPTY) {
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in wait(). This should never happen.");
|
||||
"empty wait set received in rcl_wait(). This should never happen.");
|
||||
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
|
||||
using rclcpp::exceptions::throw_from_rcl_error;
|
||||
throw_from_rcl_error(status, "rcl_wait() failed");
|
||||
}
|
||||
rclcpp::executors::ready_executables(current_collection_, wait_result, ready_executables_);
|
||||
|
||||
// check the null handles in the wait set and remove them from the handles in memory strategy
|
||||
// for callback-based entities
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
memory_strategy_->remove_null_handles(&wait_set_);
|
||||
}
|
||||
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
|
||||
Executor::get_node_by_group(
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes,
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
if (!group) {
|
||||
return nullptr;
|
||||
}
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group);
|
||||
const auto finder = weak_groups_to_nodes.find(weak_group_ptr);
|
||||
if (finder != weak_groups_to_nodes.end()) {
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock();
|
||||
return node_ptr;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
rclcpp::CallbackGroup::SharedPtr
|
||||
Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
|
||||
auto group = pair.first.lock();
|
||||
if (!group) {
|
||||
continue;
|
||||
}
|
||||
auto timer_ref = group->find_timer_ptrs_if(
|
||||
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
|
||||
return timer_ptr == timer;
|
||||
});
|
||||
if (timer_ref) {
|
||||
return group;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
|
||||
auto group = pair.first.lock();
|
||||
if (!group) {
|
||||
continue;
|
||||
}
|
||||
auto timer_ref = group->find_timer_ptrs_if(
|
||||
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
|
||||
return timer_ptr == timer;
|
||||
});
|
||||
if (timer_ref) {
|
||||
return group;
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool
|
||||
Executor::get_next_ready_executable(AnyExecutable & any_executable)
|
||||
{
|
||||
bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
|
||||
return success;
|
||||
}
|
||||
|
||||
bool
|
||||
Executor::get_next_ready_executable_from_map(
|
||||
AnyExecutable & any_executable,
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes)
|
||||
{
|
||||
TRACEPOINT(rclcpp_executor_get_next_ready);
|
||||
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
if (ready_executables_.size() == 0) {
|
||||
return false;
|
||||
bool success = false;
|
||||
std::lock_guard<std::mutex> guard{mutex_};
|
||||
// Check the timers to see if there are any that are ready
|
||||
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
|
||||
if (any_executable.timer) {
|
||||
success = true;
|
||||
}
|
||||
if (!success) {
|
||||
// Check the subscriptions to see if there are any that are ready
|
||||
memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes);
|
||||
if (any_executable.subscription) {
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
// Check the services to see if there are any that are ready
|
||||
memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes);
|
||||
if (any_executable.service) {
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
// Check the clients to see if there are any that are ready
|
||||
memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes);
|
||||
if (any_executable.client) {
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
// Check the waitables to see if there are any that are ready
|
||||
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
|
||||
if (any_executable.waitable) {
|
||||
any_executable.data = any_executable.waitable->take_data();
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
// At this point any_executable should be valid with either a valid subscription
|
||||
// or a valid timer, or it should be a null shared_ptr
|
||||
if (success) {
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group;
|
||||
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
|
||||
if (iter == weak_groups_to_nodes.end()) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
any_executable = ready_executables_.front();
|
||||
ready_executables_.pop_front();
|
||||
|
||||
if (any_executable.callback_group &&
|
||||
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
|
||||
{
|
||||
assert(any_executable.callback_group->can_be_taken_from().load());
|
||||
any_executable.callback_group->can_be_taken_from().store(false);
|
||||
if (success) {
|
||||
// If it is valid, check to see if the group is mutually exclusive or
|
||||
// not, then mark it accordingly ..Check if the callback_group belongs to this executor
|
||||
if (any_executable.callback_group && any_executable.callback_group->type() == \
|
||||
CallbackGroupType::MutuallyExclusive)
|
||||
{
|
||||
// It should not have been taken otherwise
|
||||
assert(any_executable.callback_group->can_be_taken_from().load());
|
||||
// Set to false to indicate something is being run from this group
|
||||
// This is reset to true either when the any_executable is executed or when the
|
||||
// any_executable is destructued
|
||||
any_executable.callback_group->can_be_taken_from().store(false);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
// If there is no ready executable, return false
|
||||
return success;
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -717,6 +967,22 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos
|
||||
return success;
|
||||
}
|
||||
|
||||
// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
|
||||
bool
|
||||
Executor::has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes) const
|
||||
{
|
||||
return std::find_if(
|
||||
weak_groups_to_nodes.begin(),
|
||||
weak_groups_to_nodes.end(),
|
||||
[&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
|
||||
auto other_ptr = other.second.lock();
|
||||
return other_ptr == node_ptr;
|
||||
}) != weak_groups_to_nodes.end();
|
||||
}
|
||||
|
||||
bool
|
||||
Executor::is_spinning()
|
||||
{
|
||||
|
||||
@@ -222,7 +222,9 @@ ready_executables(
|
||||
executables.push_back(exec);
|
||||
added++;
|
||||
}
|
||||
|
||||
return added;
|
||||
}
|
||||
|
||||
} // namespace executors
|
||||
} // namespace rclcpp
|
||||
|
||||
@@ -105,8 +105,7 @@ void
|
||||
ExecutorEntitiesCollector::remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
if (!has_executor.exchange(false)) {
|
||||
if (!node_ptr->get_associated_with_executor_atomic().load()) {
|
||||
throw std::runtime_error(
|
||||
std::string("Node '") + node_ptr->get_fully_qualified_name() +
|
||||
"' needs to be associated with an executor.");
|
||||
@@ -162,6 +161,7 @@ ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPt
|
||||
throw std::runtime_error("Node must not be deleted before its callback group(s).");
|
||||
}
|
||||
*/
|
||||
|
||||
auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr);
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
bool associated = manually_added_groups_.count(group_ptr) != 0;
|
||||
@@ -314,11 +314,7 @@ ExecutorEntitiesCollector::process_queues()
|
||||
if (node_it != weak_nodes_.end()) {
|
||||
remove_weak_node(node_it);
|
||||
} else {
|
||||
// The node may have been destroyed and removed from the colletion before
|
||||
// we processed the queues. Don't throw if the pointer is already expired.
|
||||
if (!weak_node_ptr.expired()) {
|
||||
throw std::runtime_error("Node needs to be associated with this executor.");
|
||||
}
|
||||
throw std::runtime_error("Node needs to be associated with this executor.");
|
||||
}
|
||||
|
||||
auto node_ptr = weak_node_ptr.lock();
|
||||
|
||||
@@ -46,18 +46,20 @@ void
|
||||
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
|
||||
|
||||
for (auto weak_guard_condition : this->notify_guard_conditions_) {
|
||||
auto guard_condition = weak_guard_condition.lock();
|
||||
if (!guard_condition) {continue;}
|
||||
if (guard_condition) {
|
||||
auto rcl_guard_condition = &guard_condition->get_rcl_guard_condition();
|
||||
|
||||
rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
|
||||
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
|
||||
wait_set,
|
||||
rcl_guard_condition, NULL);
|
||||
|
||||
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
|
||||
wait_set, cond, NULL);
|
||||
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(
|
||||
ret, "failed to add guard condition to wait set");
|
||||
if (RCL_RET_OK != ret) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(
|
||||
ret, "failed to add guard condition to wait set");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,19 +99,6 @@ MultiThreadedExecutor::run(size_t this_thread_number)
|
||||
|
||||
execute_any_executable(any_exec);
|
||||
|
||||
if (any_exec.callback_group &&
|
||||
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive &&
|
||||
any_exec.callback_group->size() > 1)
|
||||
{
|
||||
try {
|
||||
interrupt_guard_condition_->trigger();
|
||||
} catch (const rclcpp::exceptions::RCLError & ex) {
|
||||
throw std::runtime_error(
|
||||
std::string(
|
||||
"Failed to trigger guard condition on callback group change: ") + ex.what());
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the callback_group to prevent the AnyExecutable destructor from
|
||||
// resetting the callback group `can_be_taken_from`
|
||||
any_exec.callback_group.reset();
|
||||
|
||||
@@ -0,0 +1,524 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/executors/static_executor_entities_collector.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/memory_strategy.hpp"
|
||||
#include "rclcpp/executors/static_single_threaded_executor.hpp"
|
||||
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
|
||||
|
||||
using rclcpp::executors::StaticExecutorEntitiesCollector;
|
||||
|
||||
StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector()
|
||||
{
|
||||
// Disassociate all callback groups and thus nodes.
|
||||
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
}
|
||||
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
|
||||
auto group = pair.first.lock();
|
||||
if (group) {
|
||||
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
}
|
||||
// Disassociate all nodes
|
||||
for (const auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (node) {
|
||||
std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
}
|
||||
}
|
||||
weak_groups_associated_with_executor_to_nodes_.clear();
|
||||
weak_groups_to_nodes_associated_with_executor_.clear();
|
||||
exec_list_.clear();
|
||||
weak_nodes_.clear();
|
||||
weak_nodes_to_guard_conditions_.clear();
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::init(
|
||||
rcl_wait_set_t * p_wait_set,
|
||||
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
|
||||
{
|
||||
// Empty initialize executable list
|
||||
exec_list_ = rclcpp::experimental::ExecutableList();
|
||||
// Get executor's wait_set_ pointer
|
||||
p_wait_set_ = p_wait_set;
|
||||
// Get executor's memory strategy ptr
|
||||
if (memory_strategy == nullptr) {
|
||||
throw std::runtime_error("Received NULL memory strategy in executor waitable.");
|
||||
}
|
||||
memory_strategy_ = memory_strategy;
|
||||
|
||||
// Get memory strategy and executable list. Prepare wait_set_
|
||||
std::shared_ptr<void> shared_ptr;
|
||||
execute(shared_ptr);
|
||||
|
||||
// The entities collector is now initialized
|
||||
initialized_ = true;
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::fini()
|
||||
{
|
||||
memory_strategy_->clear_handles();
|
||||
exec_list_.clear();
|
||||
}
|
||||
|
||||
std::shared_ptr<void>
|
||||
StaticExecutorEntitiesCollector::take_data()
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::execute(std::shared_ptr<void> & data)
|
||||
{
|
||||
(void) data;
|
||||
// Fill memory strategy with entities coming from weak_nodes_
|
||||
fill_memory_strategy();
|
||||
// Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy)
|
||||
fill_executable_list();
|
||||
// Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize)
|
||||
prepare_wait_set();
|
||||
// Add new nodes guard conditions to map
|
||||
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
|
||||
for (const auto & weak_node : new_nodes_) {
|
||||
if (auto node_ptr = weak_node.lock()) {
|
||||
weak_nodes_to_guard_conditions_[node_ptr] =
|
||||
node_ptr->get_shared_notify_guard_condition().get();
|
||||
}
|
||||
}
|
||||
new_nodes_.clear();
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::fill_memory_strategy()
|
||||
{
|
||||
memory_strategy_->clear_handles();
|
||||
bool has_invalid_weak_groups_or_nodes =
|
||||
memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_);
|
||||
// Clean up any invalid nodes, if they were detected
|
||||
if (has_invalid_weak_groups_or_nodes) {
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
|
||||
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
|
||||
auto & weak_group_ptr = pair.first;
|
||||
auto & weak_node_ptr = pair.second;
|
||||
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
|
||||
invalid_group_ptrs.push_back(weak_group_ptr);
|
||||
}
|
||||
}
|
||||
std::for_each(
|
||||
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
|
||||
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
|
||||
weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
|
||||
});
|
||||
}
|
||||
has_invalid_weak_groups_or_nodes =
|
||||
memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_);
|
||||
// Clean up any invalid nodes, if they were detected
|
||||
if (has_invalid_weak_groups_or_nodes) {
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
|
||||
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
|
||||
auto & weak_group_ptr = pair.first;
|
||||
const auto & weak_node_ptr = pair.second;
|
||||
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
|
||||
invalid_group_ptrs.push_back(weak_group_ptr);
|
||||
}
|
||||
}
|
||||
std::for_each(
|
||||
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
|
||||
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
|
||||
weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
|
||||
});
|
||||
}
|
||||
|
||||
// Add the static executor waitable to the memory strategy
|
||||
memory_strategy_->add_waitable_handle(this->shared_from_this());
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::fill_executable_list()
|
||||
{
|
||||
exec_list_.clear();
|
||||
add_callback_groups_from_nodes_associated_to_executor();
|
||||
fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_);
|
||||
fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_);
|
||||
// Add the executor's waitable to the executable list
|
||||
exec_list_.add_waitable(shared_from_this());
|
||||
}
|
||||
void
|
||||
StaticExecutorEntitiesCollector::fill_executable_list_from_map(
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes)
|
||||
{
|
||||
for (const auto & pair : weak_groups_to_nodes) {
|
||||
auto group = pair.first.lock();
|
||||
auto node = pair.second.lock();
|
||||
if (!node || !group || !group->can_be_taken_from().load()) {
|
||||
continue;
|
||||
}
|
||||
group->find_timer_ptrs_if(
|
||||
[this](const rclcpp::TimerBase::SharedPtr & timer) {
|
||||
if (timer) {
|
||||
exec_list_.add_timer(timer);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_subscription_ptrs_if(
|
||||
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
|
||||
if (subscription) {
|
||||
exec_list_.add_subscription(subscription);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_service_ptrs_if(
|
||||
[this](const rclcpp::ServiceBase::SharedPtr & service) {
|
||||
if (service) {
|
||||
exec_list_.add_service(service);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_client_ptrs_if(
|
||||
[this](const rclcpp::ClientBase::SharedPtr & client) {
|
||||
if (client) {
|
||||
exec_list_.add_client(client);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
group->find_waitable_ptrs_if(
|
||||
[this](const rclcpp::Waitable::SharedPtr & waitable) {
|
||||
if (waitable) {
|
||||
exec_list_.add_waitable(waitable);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::prepare_wait_set()
|
||||
{
|
||||
// clear wait set
|
||||
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
|
||||
throw std::runtime_error("Couldn't clear wait set");
|
||||
}
|
||||
|
||||
// The size of waitables are accounted for in size of the other entities
|
||||
rcl_ret_t ret = rcl_wait_set_resize(
|
||||
p_wait_set_, memory_strategy_->number_of_ready_subscriptions(),
|
||||
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
|
||||
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
|
||||
memory_strategy_->number_of_ready_events());
|
||||
|
||||
if (RCL_RET_OK != ret) {
|
||||
throw std::runtime_error(
|
||||
std::string("Couldn't resize the wait set: ") + rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
// clear wait set (memset to '0' all wait_set_ entities
|
||||
// but keeps the wait_set_ number of entities)
|
||||
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
|
||||
throw std::runtime_error("Couldn't clear wait set");
|
||||
}
|
||||
|
||||
if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) {
|
||||
throw std::runtime_error("Couldn't fill wait set");
|
||||
}
|
||||
|
||||
rcl_ret_t status =
|
||||
rcl_wait(p_wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
|
||||
|
||||
if (status == RCL_RET_WAIT_SET_EMPTY) {
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in rcl_wait(). This should never happen.");
|
||||
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
|
||||
using rclcpp::exceptions::throw_from_rcl_error;
|
||||
throw_from_rcl_error(status, "rcl_wait() failed");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set)
|
||||
{
|
||||
// Add waitable guard conditions (one for each registered node) into the wait set.
|
||||
for (const auto & pair : weak_nodes_to_guard_conditions_) {
|
||||
auto & gc = pair.second;
|
||||
detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc);
|
||||
}
|
||||
}
|
||||
|
||||
size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions()
|
||||
{
|
||||
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
|
||||
return weak_nodes_to_guard_conditions_.size() + new_nodes_.size();
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
bool is_new_node = false;
|
||||
// If the node already has an executor
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Node has already been added to an executor.");
|
||||
}
|
||||
node_ptr->for_each_callback_group(
|
||||
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
|
||||
{
|
||||
if (
|
||||
!group_ptr->get_associated_with_executor_atomic().load() &&
|
||||
group_ptr->automatically_add_to_executor_with_node())
|
||||
{
|
||||
is_new_node = (add_callback_group(
|
||||
group_ptr,
|
||||
node_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_) ||
|
||||
is_new_node);
|
||||
}
|
||||
});
|
||||
weak_nodes_.push_back(node_ptr);
|
||||
return is_new_node;
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
|
||||
{
|
||||
// If the callback_group already has an executor
|
||||
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
|
||||
if (has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Callback group has already been added to an executor.");
|
||||
}
|
||||
bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
|
||||
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto insert_info = weak_groups_to_nodes.insert(
|
||||
std::make_pair(weak_group_ptr, node_ptr));
|
||||
bool was_inserted = insert_info.second;
|
||||
if (!was_inserted) {
|
||||
throw std::runtime_error("Callback group was already added to executor.");
|
||||
}
|
||||
if (is_new_node) {
|
||||
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
|
||||
new_nodes_.push_back(node_ptr);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_);
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr)
|
||||
{
|
||||
return this->remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_associated_with_executor_to_nodes_);
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::remove_callback_group_from_map(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
|
||||
{
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
|
||||
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
|
||||
if (iter != weak_groups_to_nodes.end()) {
|
||||
node_ptr = iter->second.lock();
|
||||
if (node_ptr == nullptr) {
|
||||
throw std::runtime_error("Node must not be deleted before its callback group(s).");
|
||||
}
|
||||
weak_groups_to_nodes.erase(iter);
|
||||
} else {
|
||||
throw std::runtime_error("Callback group needs to be associated with executor.");
|
||||
}
|
||||
// If the node was matched and removed, interrupt waiting.
|
||||
if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
|
||||
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_))
|
||||
{
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr);
|
||||
weak_nodes_to_guard_conditions_.erase(node_weak_ptr);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
|
||||
{
|
||||
if (!node_ptr->get_associated_with_executor_atomic().load()) {
|
||||
return false;
|
||||
}
|
||||
bool node_found = false;
|
||||
auto node_it = weak_nodes_.begin();
|
||||
while (node_it != weak_nodes_.end()) {
|
||||
bool matched = (node_it->lock() == node_ptr);
|
||||
if (matched) {
|
||||
weak_nodes_.erase(node_it);
|
||||
node_found = true;
|
||||
break;
|
||||
}
|
||||
++node_it;
|
||||
}
|
||||
if (!node_found) {
|
||||
return false;
|
||||
}
|
||||
std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;
|
||||
std::for_each(
|
||||
weak_groups_to_nodes_associated_with_executor_.begin(),
|
||||
weak_groups_to_nodes_associated_with_executor_.end(),
|
||||
[&found_group_ptrs, node_ptr](std::pair<rclcpp::CallbackGroup::WeakPtr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> key_value_pair) {
|
||||
auto & weak_node_ptr = key_value_pair.second;
|
||||
auto shared_node_ptr = weak_node_ptr.lock();
|
||||
auto group_ptr = key_value_pair.first.lock();
|
||||
if (shared_node_ptr == node_ptr) {
|
||||
found_group_ptrs.push_back(group_ptr);
|
||||
}
|
||||
});
|
||||
std::for_each(
|
||||
found_group_ptrs.begin(), found_group_ptrs.end(), [this]
|
||||
(rclcpp::CallbackGroup::SharedPtr group_ptr) {
|
||||
this->remove_callback_group_from_map(
|
||||
group_ptr,
|
||||
weak_groups_to_nodes_associated_with_executor_);
|
||||
});
|
||||
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
|
||||
has_executor.store(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set)
|
||||
{
|
||||
// Check wait_set guard_conditions for added/removed entities to/from a node
|
||||
for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) {
|
||||
if (p_wait_set->guard_conditions[i] != NULL) {
|
||||
auto found_guard_condition = std::find_if(
|
||||
weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(),
|
||||
[&](std::pair<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
|
||||
const GuardCondition *> pair) -> bool {
|
||||
const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition();
|
||||
return &rcl_gc == p_wait_set->guard_conditions[i];
|
||||
});
|
||||
if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// None of the guard conditions triggered belong to a registered node
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
|
||||
bool
|
||||
StaticExecutorEntitiesCollector::has_node(
|
||||
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
|
||||
weak_groups_to_nodes) const
|
||||
{
|
||||
return std::find_if(
|
||||
weak_groups_to_nodes.begin(),
|
||||
weak_groups_to_nodes.end(),
|
||||
[&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
|
||||
auto other_ptr = other.second.lock();
|
||||
return other_ptr == node_ptr;
|
||||
}) != weak_groups_to_nodes.end();
|
||||
}
|
||||
|
||||
void
|
||||
StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor()
|
||||
{
|
||||
for (const auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (node) {
|
||||
node->for_each_callback_group(
|
||||
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
|
||||
{
|
||||
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
|
||||
!shared_group_ptr->get_associated_with_executor_atomic().load())
|
||||
{
|
||||
add_callback_group(
|
||||
shared_group_ptr,
|
||||
node,
|
||||
weak_groups_to_nodes_associated_with_executor_);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticExecutorEntitiesCollector::get_all_callback_groups()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticExecutorEntitiesCollector::get_manually_added_callback_groups()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes()
|
||||
{
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
|
||||
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
|
||||
groups.push_back(group_node_ptr.first);
|
||||
}
|
||||
return groups;
|
||||
}
|
||||
@@ -12,21 +12,31 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/executors/executor_entities_collection.hpp"
|
||||
#include "rclcpp/executors/executor_notify_waitable.hpp"
|
||||
#include "rclcpp/executors/static_single_threaded_executor.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rcpputils/scope_exit.hpp"
|
||||
|
||||
#include "rclcpp/executors/static_single_threaded_executor.hpp"
|
||||
#include "rclcpp/any_executable.hpp"
|
||||
|
||||
using rclcpp::executors::StaticSingleThreadedExecutor;
|
||||
using rclcpp::experimental::ExecutableList;
|
||||
|
||||
StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
|
||||
StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(
|
||||
const rclcpp::ExecutorOptions & options)
|
||||
: rclcpp::Executor(options)
|
||||
{
|
||||
entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
|
||||
}
|
||||
|
||||
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {}
|
||||
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor()
|
||||
{
|
||||
if (entities_collector_->is_init()) {
|
||||
entities_collector_->fini();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::spin()
|
||||
@@ -36,25 +46,14 @@ StaticSingleThreadedExecutor::spin()
|
||||
}
|
||||
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
|
||||
|
||||
// This is essentially the contents of the rclcpp::Executor::wait_for_work method,
|
||||
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
|
||||
// behavior.
|
||||
// Set memory_strategy_ and exec_list_ based on weak_nodes_
|
||||
// Prepare wait_set_ based on memory_strategy_
|
||||
entities_collector_->init(&wait_set_, memory_strategy_);
|
||||
|
||||
while (rclcpp::ok(this->context_) && spinning.load()) {
|
||||
std::deque<rclcpp::AnyExecutable> to_exec;
|
||||
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
|
||||
this->collect_entities();
|
||||
}
|
||||
|
||||
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(-1));
|
||||
if (wait_result.kind() == WaitResultKind::Empty) {
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in wait(). This should never happen.");
|
||||
continue;
|
||||
}
|
||||
execute_ready_executables(current_collection_, wait_result, false);
|
||||
// Refresh wait set and wait for work
|
||||
entities_collector_->refresh_wait_set();
|
||||
execute_ready_executables();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +80,11 @@ StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration)
|
||||
void
|
||||
StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
|
||||
{
|
||||
// Make sure the entities collector has been initialized
|
||||
if (!entities_collector_->is_init()) {
|
||||
entities_collector_->init(&wait_set_, memory_strategy_);
|
||||
}
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
auto max_duration_not_elapsed = [max_duration, start]() {
|
||||
if (std::chrono::nanoseconds(0) == max_duration) {
|
||||
@@ -101,21 +105,9 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati
|
||||
|
||||
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
|
||||
// Get executables that are ready now
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
|
||||
this->collect_entities();
|
||||
}
|
||||
|
||||
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(0));
|
||||
if (wait_result.kind() == WaitResultKind::Empty) {
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in wait(). This should never happen.");
|
||||
continue;
|
||||
}
|
||||
|
||||
entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero());
|
||||
// Execute ready executables
|
||||
bool work_available = execute_ready_executables(current_collection_, wait_result, false);
|
||||
bool work_available = execute_ready_executables();
|
||||
if (!work_available || !exhaustive) {
|
||||
break;
|
||||
}
|
||||
@@ -125,122 +117,164 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati
|
||||
void
|
||||
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
// Make sure the entities collector has been initialized
|
||||
if (!entities_collector_->is_init()) {
|
||||
entities_collector_->init(&wait_set_, memory_strategy_);
|
||||
}
|
||||
|
||||
if (rclcpp::ok(context_) && spinning.load()) {
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
|
||||
this->collect_entities();
|
||||
}
|
||||
|
||||
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
|
||||
if (wait_result.kind() == WaitResultKind::Empty) {
|
||||
RCUTILS_LOG_WARN_NAMED(
|
||||
"rclcpp",
|
||||
"empty wait set received in wait(). This should never happen.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait until we have a ready entity or timeout expired
|
||||
entities_collector_->refresh_wait_set(timeout);
|
||||
// Execute ready executables
|
||||
execute_ready_executables(current_collection_, wait_result, true);
|
||||
execute_ready_executables(true);
|
||||
}
|
||||
}
|
||||
|
||||
// This preserves the "scheduling semantics" of the StaticSingleThreadedExecutor
|
||||
// from the original implementation.
|
||||
bool StaticSingleThreadedExecutor::execute_ready_executables(
|
||||
const rclcpp::executors::ExecutorEntitiesCollection & collection,
|
||||
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result,
|
||||
bool spin_once)
|
||||
void
|
||||
StaticSingleThreadedExecutor::add_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr,
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
|
||||
bool notify)
|
||||
{
|
||||
bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr);
|
||||
if (is_new_node && notify) {
|
||||
// Interrupt waiting to handle new node
|
||||
interrupt_guard_condition_->trigger();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::add_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
bool is_new_node = entities_collector_->add_node(node_ptr);
|
||||
if (is_new_node && notify) {
|
||||
// Interrupt waiting to handle new node
|
||||
interrupt_guard_condition_->trigger();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
|
||||
{
|
||||
this->add_node(node_ptr->get_node_base_interface(), notify);
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::remove_callback_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify)
|
||||
{
|
||||
bool node_removed = entities_collector_->remove_callback_group(group_ptr);
|
||||
// If the node was matched and removed, interrupt waiting
|
||||
if (node_removed && notify) {
|
||||
interrupt_guard_condition_->trigger();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::remove_node(
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
bool node_removed = entities_collector_->remove_node(node_ptr);
|
||||
if (!node_removed) {
|
||||
throw std::runtime_error("Node needs to be associated with this executor.");
|
||||
}
|
||||
// If the node was matched and removed, interrupt waiting
|
||||
if (notify) {
|
||||
interrupt_guard_condition_->trigger();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticSingleThreadedExecutor::get_all_callback_groups()
|
||||
{
|
||||
return entities_collector_->get_all_callback_groups();
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticSingleThreadedExecutor::get_manually_added_callback_groups()
|
||||
{
|
||||
return entities_collector_->get_manually_added_callback_groups();
|
||||
}
|
||||
|
||||
std::vector<rclcpp::CallbackGroup::WeakPtr>
|
||||
StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes()
|
||||
{
|
||||
return entities_collector_->get_automatically_added_callback_groups_from_nodes();
|
||||
}
|
||||
|
||||
void
|
||||
StaticSingleThreadedExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
|
||||
{
|
||||
this->remove_node(node_ptr->get_node_base_interface(), notify);
|
||||
}
|
||||
|
||||
bool
|
||||
StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once)
|
||||
{
|
||||
bool any_ready_executable = false;
|
||||
|
||||
if (wait_result.kind() != rclcpp::WaitResultKind::Ready) {
|
||||
return any_ready_executable;
|
||||
// Execute all the ready subscriptions
|
||||
for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) {
|
||||
if (i < entities_collector_->get_number_of_subscriptions()) {
|
||||
if (wait_set_.subscriptions[i]) {
|
||||
execute_subscription(entities_collector_->get_subscription(i));
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
|
||||
|
||||
for (size_t ii = 0; ii < rcl_wait_set.size_of_timers; ++ii) {
|
||||
if (nullptr == rcl_wait_set.timers[ii]) {continue;}
|
||||
auto entity_iter = collection.timers.find(rcl_wait_set.timers[ii]);
|
||||
if (entity_iter != collection.timers.end()) {
|
||||
auto entity = entity_iter->second.entity.lock();
|
||||
if (!entity) {
|
||||
continue;
|
||||
// Execute all the ready timers
|
||||
for (size_t i = 0; i < wait_set_.size_of_timers; ++i) {
|
||||
if (i < entities_collector_->get_number_of_timers()) {
|
||||
if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
|
||||
auto timer = entities_collector_->get_timer(i);
|
||||
timer->call();
|
||||
execute_timer(std::move(timer));
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
if (!entity->call()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Execute all the ready services
|
||||
for (size_t i = 0; i < wait_set_.size_of_services; ++i) {
|
||||
if (i < entities_collector_->get_number_of_services()) {
|
||||
if (wait_set_.services[i]) {
|
||||
execute_service(entities_collector_->get_service(i));
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
execute_timer(entity);
|
||||
}
|
||||
}
|
||||
// Execute all the ready clients
|
||||
for (size_t i = 0; i < wait_set_.size_of_clients; ++i) {
|
||||
if (i < entities_collector_->get_number_of_clients()) {
|
||||
if (wait_set_.clients[i]) {
|
||||
execute_client(entities_collector_->get_client(i));
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Execute all the ready waitables
|
||||
for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) {
|
||||
auto waitable = entities_collector_->get_waitable(i);
|
||||
if (waitable->is_ready(&wait_set_)) {
|
||||
auto data = waitable->take_data();
|
||||
waitable->execute(data);
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t ii = 0; ii < rcl_wait_set.size_of_subscriptions; ++ii) {
|
||||
if (nullptr == rcl_wait_set.subscriptions[ii]) {continue;}
|
||||
auto entity_iter = collection.subscriptions.find(rcl_wait_set.subscriptions[ii]);
|
||||
if (entity_iter != collection.subscriptions.end()) {
|
||||
auto entity = entity_iter->second.entity.lock();
|
||||
if (!entity) {
|
||||
continue;
|
||||
}
|
||||
execute_subscription(entity);
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t ii = 0; ii < rcl_wait_set.size_of_services; ++ii) {
|
||||
if (nullptr == rcl_wait_set.services[ii]) {continue;}
|
||||
auto entity_iter = collection.services.find(rcl_wait_set.services[ii]);
|
||||
if (entity_iter != collection.services.end()) {
|
||||
auto entity = entity_iter->second.entity.lock();
|
||||
if (!entity) {
|
||||
continue;
|
||||
}
|
||||
execute_service(entity);
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t ii = 0; ii < rcl_wait_set.size_of_clients; ++ii) {
|
||||
if (nullptr == rcl_wait_set.clients[ii]) {continue;}
|
||||
auto entity_iter = collection.clients.find(rcl_wait_set.clients[ii]);
|
||||
if (entity_iter != collection.clients.end()) {
|
||||
auto entity = entity_iter->second.entity.lock();
|
||||
if (!entity) {
|
||||
continue;
|
||||
}
|
||||
execute_client(entity);
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & [handle, entry] : collection.waitables) {
|
||||
auto waitable = entry.entity.lock();
|
||||
if (!waitable) {
|
||||
continue;
|
||||
}
|
||||
if (!waitable->is_ready(&rcl_wait_set)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto data = waitable->take_data();
|
||||
waitable->execute(data);
|
||||
if (spin_once) {
|
||||
return true;
|
||||
}
|
||||
any_ready_executable = true;
|
||||
}
|
||||
return any_ready_executable;
|
||||
}
|
||||
|
||||
@@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
|
||||
timers_manager_ =
|
||||
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
|
||||
|
||||
this->current_entities_collection_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
|
||||
|
||||
notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
|
||||
[this]() {
|
||||
// This callback is invoked when:
|
||||
@@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
|
||||
this->refresh_current_collection_from_callback_groups();
|
||||
});
|
||||
|
||||
// Make sure that the notify waitable is immediately added to the collection
|
||||
// to avoid missing events
|
||||
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
|
||||
|
||||
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
|
||||
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
|
||||
|
||||
@@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(
|
||||
|
||||
this->entities_collector_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
|
||||
|
||||
this->current_entities_collection_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
|
||||
}
|
||||
|
||||
EventsExecutor::~EventsExecutor()
|
||||
@@ -269,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
|
||||
switch (event.type) {
|
||||
case ExecutorEventType::CLIENT_EVENT:
|
||||
{
|
||||
auto client = this->retrieve_entity(
|
||||
static_cast<const rcl_client_t *>(event.entity_key),
|
||||
current_entities_collection_->clients);
|
||||
|
||||
rclcpp::ClientBase::SharedPtr client;
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
client = this->retrieve_entity(
|
||||
static_cast<const rcl_client_t *>(event.entity_key),
|
||||
current_entities_collection_->clients);
|
||||
}
|
||||
if (client) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_client(client);
|
||||
@@ -283,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
|
||||
}
|
||||
case ExecutorEventType::SUBSCRIPTION_EVENT:
|
||||
{
|
||||
auto subscription = this->retrieve_entity(
|
||||
static_cast<const rcl_subscription_t *>(event.entity_key),
|
||||
current_entities_collection_->subscriptions);
|
||||
rclcpp::SubscriptionBase::SharedPtr subscription;
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
subscription = this->retrieve_entity(
|
||||
static_cast<const rcl_subscription_t *>(event.entity_key),
|
||||
current_entities_collection_->subscriptions);
|
||||
}
|
||||
if (subscription) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_subscription(subscription);
|
||||
@@ -295,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
|
||||
}
|
||||
case ExecutorEventType::SERVICE_EVENT:
|
||||
{
|
||||
auto service = this->retrieve_entity(
|
||||
static_cast<const rcl_service_t *>(event.entity_key),
|
||||
current_entities_collection_->services);
|
||||
|
||||
rclcpp::ServiceBase::SharedPtr service;
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
service = this->retrieve_entity(
|
||||
static_cast<const rcl_service_t *>(event.entity_key),
|
||||
current_entities_collection_->services);
|
||||
}
|
||||
if (service) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
execute_service(service);
|
||||
@@ -315,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
|
||||
}
|
||||
case ExecutorEventType::WAITABLE_EVENT:
|
||||
{
|
||||
auto waitable = this->retrieve_entity(
|
||||
static_cast<const rclcpp::Waitable *>(event.entity_key),
|
||||
current_entities_collection_->waitables);
|
||||
rclcpp::Waitable::SharedPtr waitable;
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
waitable = this->retrieve_entity(
|
||||
static_cast<const rclcpp::Waitable *>(event.entity_key),
|
||||
current_entities_collection_->waitables);
|
||||
}
|
||||
if (waitable) {
|
||||
for (size_t i = 0; i < event.num_events; i++) {
|
||||
auto data = waitable->take_data_by_entity_id(event.waitable_data);
|
||||
@@ -382,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
|
||||
void
|
||||
EventsExecutor::refresh_current_collection_from_callback_groups()
|
||||
{
|
||||
// Build the new collection
|
||||
this->entities_collector_->update_collections();
|
||||
auto callback_groups = this->entities_collector_->get_all_callback_groups();
|
||||
rclcpp::executors::ExecutorEntitiesCollection new_collection;
|
||||
@@ -395,18 +414,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
|
||||
// retrieved in the "standard" way.
|
||||
// To do it, we need to add the notify waitable as an entry in both the new and
|
||||
// current collections such that it's neither added or removed.
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
|
||||
new_collection.waitables.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
this->add_notify_waitable_to_collection(new_collection.waitables);
|
||||
|
||||
this->current_entities_collection_->waitables.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
// Acquire lock before modifying the current collection
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
|
||||
|
||||
this->refresh_current_collection(new_collection);
|
||||
}
|
||||
@@ -415,6 +427,9 @@ void
|
||||
EventsExecutor::refresh_current_collection(
|
||||
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
|
||||
{
|
||||
// Acquire lock before modifying the current collection
|
||||
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
|
||||
|
||||
current_entities_collection_->timers.update(
|
||||
new_collection.timers,
|
||||
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
|
||||
@@ -486,3 +501,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::add_notify_waitable_to_collection(
|
||||
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
|
||||
{
|
||||
// The notify waitable is not associated to any group, so use an invalid one
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
|
||||
collection.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -225,5 +225,52 @@ IntraProcessManager::can_communicate(
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t
|
||||
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
|
||||
{
|
||||
size_t capacity = std::numeric_limits<size_t>::max();
|
||||
|
||||
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
|
||||
if (publisher_it == pub_to_subs_.end()) {
|
||||
// Publisher is either invalid or no longer exists.
|
||||
RCLCPP_WARN(
|
||||
rclcpp::get_logger("rclcpp"),
|
||||
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
|
||||
return 0u;
|
||||
}
|
||||
|
||||
if (publisher_it->second.take_shared_subscriptions.empty() &&
|
||||
publisher_it->second.take_ownership_subscriptions.empty())
|
||||
{
|
||||
// no subscriptions available
|
||||
return 0u;
|
||||
}
|
||||
|
||||
auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
|
||||
{
|
||||
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
|
||||
if (subscription_it != subscriptions_.end()) {
|
||||
auto subscription = subscription_it->second.lock();
|
||||
if (subscription) {
|
||||
capacity = std::min(capacity, subscription->available_capacity());
|
||||
}
|
||||
} else {
|
||||
// Subscription is either invalid or no longer exists.
|
||||
RCLCPP_WARN(
|
||||
rclcpp::get_logger("rclcpp"),
|
||||
"Calling available_capacity for invalid or no longer existing subscription id");
|
||||
}
|
||||
};
|
||||
|
||||
for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
|
||||
available_capacity(sub_id);
|
||||
}
|
||||
|
||||
for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
|
||||
available_capacity(sub_id);
|
||||
}
|
||||
|
||||
return capacity;
|
||||
}
|
||||
} // namespace experimental
|
||||
} // namespace rclcpp
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "rcutils/macros.h"
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ Node::Node(
|
||||
options.use_intra_process_comms(),
|
||||
options.enable_topic_statistics())),
|
||||
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
|
||||
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
|
||||
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_)),
|
||||
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
|
||||
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
|
||||
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
|
||||
@@ -225,6 +225,10 @@ Node::Node(
|
||||
node_topics_->resolve_topic_name("/parameter_events"),
|
||||
options.parameter_event_qos(),
|
||||
rclcpp::detail::PublisherQosParametersTraits{});
|
||||
|
||||
if (options.enable_logger_service()) {
|
||||
node_logging_->create_logger_services(node_services_);
|
||||
}
|
||||
}
|
||||
|
||||
Node::Node(
|
||||
|
||||
@@ -12,11 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/node_impl.hpp"
|
||||
#include "rclcpp/node_interfaces/node_logging.hpp"
|
||||
#include "rclcpp/node_interfaces/node_services_interface.hpp"
|
||||
|
||||
using rclcpp::node_interfaces::NodeLogging;
|
||||
|
||||
NodeLogging::NodeLogging(rclcpp::node_interfaces::NodeBaseInterface * node_base)
|
||||
NodeLogging::NodeLogging(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base)
|
||||
: node_base_(node_base)
|
||||
{
|
||||
logger_ = rclcpp::get_logger(NodeLogging::get_logger_name());
|
||||
@@ -37,3 +39,55 @@ NodeLogging::get_logger_name() const
|
||||
{
|
||||
return rcl_node_get_logger_name(node_base_->get_rcl_node_handle());
|
||||
}
|
||||
|
||||
void NodeLogging::create_logger_services(
|
||||
node_interfaces::NodeServicesInterface::SharedPtr node_services)
|
||||
{
|
||||
rclcpp::ServicesQoS qos_profile;
|
||||
const std::string node_name = node_base_->get_name();
|
||||
auto callback_group = node_base_->get_default_callback_group();
|
||||
|
||||
get_loggers_service_ = rclcpp::create_service<rcl_interfaces::srv::GetLoggerLevels>(
|
||||
node_base_, node_services,
|
||||
node_name + "/get_logger_levels",
|
||||
[](
|
||||
const std::shared_ptr<rmw_request_id_t>,
|
||||
const std::shared_ptr<rcl_interfaces::srv::GetLoggerLevels::Request> request,
|
||||
std::shared_ptr<rcl_interfaces::srv::GetLoggerLevels::Response> response)
|
||||
{
|
||||
for (auto & name : request->names) {
|
||||
rcl_interfaces::msg::LoggerLevel logger_level;
|
||||
logger_level.name = name;
|
||||
auto ret = rcutils_logging_get_logger_level(name.c_str());
|
||||
if (ret < 0) {
|
||||
logger_level.level = 0;
|
||||
} else {
|
||||
logger_level.level = static_cast<uint8_t>(ret);
|
||||
}
|
||||
response->levels.push_back(std::move(logger_level));
|
||||
}
|
||||
},
|
||||
qos_profile, callback_group);
|
||||
|
||||
set_loggers_service_ = rclcpp::create_service<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
node_base_, node_services,
|
||||
node_name + "/set_logger_levels",
|
||||
[](
|
||||
const std::shared_ptr<rmw_request_id_t>,
|
||||
const std::shared_ptr<rcl_interfaces::srv::SetLoggerLevels::Request> request,
|
||||
std::shared_ptr<rcl_interfaces::srv::SetLoggerLevels::Response> response)
|
||||
{
|
||||
rcl_interfaces::msg::SetLoggerLevelsResult result;
|
||||
for (auto & level : request->levels) {
|
||||
auto ret = rcutils_logging_set_logger_level(level.name.c_str(), level.level);
|
||||
if (ret != RCUTILS_RET_OK) {
|
||||
result.successful = false;
|
||||
result.reason = rcutils_get_error_string().str;
|
||||
} else {
|
||||
result.successful = true;
|
||||
}
|
||||
response->results.push_back(std::move(result));
|
||||
}
|
||||
},
|
||||
qos_profile, callback_group);
|
||||
}
|
||||
|
||||
@@ -248,6 +248,19 @@ NodeOptions::start_parameter_services(bool start_parameter_services)
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool
|
||||
NodeOptions::enable_logger_service() const
|
||||
{
|
||||
return this->enable_logger_service_;
|
||||
}
|
||||
|
||||
NodeOptions &
|
||||
NodeOptions::enable_logger_service(bool enable_logger_service)
|
||||
{
|
||||
this->enable_logger_service_ = enable_logger_service;
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool
|
||||
NodeOptions::start_parameter_event_publisher() const
|
||||
{
|
||||
|
||||
@@ -384,3 +384,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin
|
||||
|
||||
return network_flow_endpoint_vector;
|
||||
}
|
||||
|
||||
size_t PublisherBase::lowest_available_ipm_capacity() const
|
||||
{
|
||||
if (!intra_process_is_enabled_) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
auto ipm = weak_ipm_.lock();
|
||||
|
||||
if (!ipm) {
|
||||
// TODO(ivanpauno): should this raise an error?
|
||||
RCLCPP_WARN(
|
||||
rclcpp::get_logger("rclcpp"),
|
||||
"Intra process manager died for a publisher.");
|
||||
return 0u;
|
||||
}
|
||||
|
||||
return ipm->lowest_available_capacity(intra_process_publisher_id_);
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ SubscriptionBase::SubscriptionBase(
|
||||
intra_process_subscription_id_(0),
|
||||
event_callbacks_(event_callbacks),
|
||||
type_support_(type_support_handle),
|
||||
delivered_message_type_(delivered_message_kind)
|
||||
delivered_message_kind_(delivered_message_kind)
|
||||
{
|
||||
auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
|
||||
{
|
||||
@@ -261,13 +261,13 @@ SubscriptionBase::get_message_type_support_handle() const
|
||||
bool
|
||||
SubscriptionBase::is_serialized() const
|
||||
{
|
||||
return delivered_message_type_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
|
||||
return delivered_message_kind_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
|
||||
}
|
||||
|
||||
rclcpp::DeliveredMessageKind
|
||||
SubscriptionBase::get_subscription_type() const
|
||||
SubscriptionBase::get_delivered_message_kind() const
|
||||
{
|
||||
return delivered_message_type_;
|
||||
return delivered_message_kind_;
|
||||
}
|
||||
|
||||
size_t
|
||||
|
||||
@@ -362,3 +362,42 @@ BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
BENCHMARK_F(
|
||||
PerformanceTestExecutorSimple,
|
||||
static_executor_entities_collector_execute)(benchmark::State & st)
|
||||
{
|
||||
rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_ =
|
||||
std::make_shared<rclcpp::executors::StaticExecutorEntitiesCollector>();
|
||||
entities_collector_->add_node(node->get_node_base_interface());
|
||||
|
||||
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
|
||||
rcl_allocator_t allocator = rcl_get_default_allocator();
|
||||
auto shared_context = node->get_node_base_interface()->get_context();
|
||||
rcl_context_t * context = shared_context->get_rcl_context().get();
|
||||
rcl_ret_t ret = rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
st.SkipWithError(rcutils_get_error_string().str);
|
||||
}
|
||||
RCPPUTILS_SCOPE_EXIT(
|
||||
{
|
||||
rcl_ret_t ret = rcl_wait_set_fini(&wait_set);
|
||||
if (ret != RCL_RET_OK) {
|
||||
st.SkipWithError(rcutils_get_error_string().str);
|
||||
}
|
||||
});
|
||||
|
||||
auto memory_strategy = rclcpp::memory_strategies::create_default_strategy();
|
||||
rclcpp::GuardCondition guard_condition(shared_context);
|
||||
|
||||
entities_collector_->init(&wait_set, memory_strategy);
|
||||
RCPPUTILS_SCOPE_EXIT(entities_collector_->fini());
|
||||
|
||||
reset_heap_counters();
|
||||
|
||||
for (auto _ : st) {
|
||||
(void)_;
|
||||
std::shared_ptr<void> data = entities_collector_->take_data();
|
||||
entities_collector_->execute(data);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -649,6 +649,13 @@ if(TARGET test_wait_for_message)
|
||||
target_link_libraries(test_wait_for_message ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_logger_service test_logger_service.cpp)
|
||||
if(TARGET test_logger_service)
|
||||
ament_target_dependencies(test_logger_service
|
||||
"rcl_interfaces")
|
||||
target_link_libraries(test_logger_service ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_interface_traits test_interface_traits.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_interface_traits)
|
||||
@@ -663,7 +670,7 @@ ament_add_gtest(
|
||||
test_executors
|
||||
executors/test_executors.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}"
|
||||
TIMEOUT 180)
|
||||
TIMEOUT 600)
|
||||
if(TARGET test_executors)
|
||||
ament_target_dependencies(test_executors
|
||||
"rcl"
|
||||
@@ -671,6 +678,14 @@ if(TARGET test_executors)
|
||||
target_link_libraries(test_executors ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_static_single_threaded_executor)
|
||||
ament_target_dependencies(test_static_single_threaded_executor
|
||||
"test_msgs")
|
||||
target_link_libraries(test_static_single_threaded_executor ${PROJECT_NAME} mimick)
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_multi_threaded_executor executors/test_multi_threaded_executor.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_multi_threaded_executor)
|
||||
@@ -679,6 +694,15 @@ if(TARGET test_multi_threaded_executor)
|
||||
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_static_executor_entities_collector executors/test_static_executor_entities_collector.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120)
|
||||
if(TARGET test_static_executor_entities_collector)
|
||||
ament_target_dependencies(test_static_executor_entities_collector
|
||||
"rcl"
|
||||
"test_msgs")
|
||||
target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick)
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120)
|
||||
if(TARGET test_entities_collector)
|
||||
|
||||
@@ -20,12 +20,14 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/time.h"
|
||||
@@ -43,18 +45,10 @@ template<typename T>
|
||||
class TestExecutors : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
void SetUp()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
|
||||
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
|
||||
std::stringstream test_name;
|
||||
test_name << test_info->test_case_name() << "_" << test_info->name();
|
||||
@@ -75,6 +69,8 @@ public:
|
||||
publisher.reset();
|
||||
subscription.reset();
|
||||
node.reset();
|
||||
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
rclcpp::Node::SharedPtr node;
|
||||
@@ -83,6 +79,8 @@ public:
|
||||
int callback_count;
|
||||
};
|
||||
|
||||
// spin_all and spin_some are not implemented correctly in StaticSingleThreadedExecutor, see:
|
||||
// https://github.com/ros2/rclcpp/issues/1219 for tracking
|
||||
template<typename T>
|
||||
class TestExecutorsStable : public TestExecutors<T> {};
|
||||
|
||||
@@ -124,6 +122,15 @@ public:
|
||||
// is updated.
|
||||
TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames);
|
||||
|
||||
// StaticSingleThreadedExecutor is not included in these tests for now, due to:
|
||||
// https://github.com/ros2/rclcpp/issues/1219
|
||||
using StandardExecutors =
|
||||
::testing::Types<
|
||||
rclcpp::executors::SingleThreadedExecutor,
|
||||
rclcpp::executors::MultiThreadedExecutor,
|
||||
rclcpp::experimental::executors::EventsExecutor>;
|
||||
TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames);
|
||||
|
||||
// Make sure that executors detach from nodes when destructing
|
||||
TYPED_TEST(TestExecutors, detachOnDestruction)
|
||||
{
|
||||
@@ -147,7 +154,10 @@ TYPED_TEST(TestExecutors, detachOnDestruction)
|
||||
}
|
||||
|
||||
// Make sure that the executor can automatically remove expired nodes correctly
|
||||
TYPED_TEST(TestExecutors, addTemporaryNode) {
|
||||
// Currently fails for StaticSingleThreadedExecutor so it is being skipped, see:
|
||||
// https://github.com/ros2/rclcpp/issues/1231
|
||||
TYPED_TEST(TestExecutorsStable, addTemporaryNode)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
@@ -257,17 +267,14 @@ TYPED_TEST(TestExecutors, spinWhileAlreadySpinning)
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
std::atomic_bool timer_completed = false;
|
||||
auto timer = this->node->create_wall_timer(
|
||||
1ms, [&]() {
|
||||
timer_completed.store(true);
|
||||
});
|
||||
bool timer_completed = false;
|
||||
auto timer = this->node->create_wall_timer(1ms, [&]() {timer_completed = true;});
|
||||
|
||||
std::thread spinner([&]() {executor.spin();});
|
||||
// Sleep for a short time to verify executor.spin() is going, and didn't throw.
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
while (!timer_completed.load() && (std::chrono::steady_clock::now() - start) < 10s) {
|
||||
while (!timer_completed && (std::chrono::steady_clock::now() - start) < 10s) {
|
||||
std::this_thread::sleep_for(1ms);
|
||||
}
|
||||
|
||||
@@ -718,6 +725,131 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
|
||||
spinner.join();
|
||||
}
|
||||
|
||||
// This test verifies that the add_node operation is robust wrt race conditions.
|
||||
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
|
||||
// thread-safe in all executor implementations.
|
||||
// The initial implementation of the events-executor contained a bug where the executor
|
||||
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
|
||||
// Manually adding a node to the executor results in a) producing a notify waitable event
|
||||
// and b) refreshing the executor collections.
|
||||
// The inconsistent state would happen if the event was processed before the collections were
|
||||
// finished to be refreshed: the executor would pick up the event but be unable to process it.
|
||||
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
|
||||
// notify waitable events to be pushed.
|
||||
// The behavior is observable only under heavy load, so this test spawns several worker
|
||||
// threads. Due to the nature of the bug, this test may still succeed even if the
|
||||
// bug is present. However repeated runs will show its flakiness nature and indicate
|
||||
// an eventual regression.
|
||||
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
// Spawn some threads to do some heavy work
|
||||
std::atomic<bool> should_cancel = false;
|
||||
std::vector<std::thread> stress_threads;
|
||||
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
|
||||
stress_threads.emplace_back(
|
||||
[&should_cancel, i]() {
|
||||
// This is just some arbitrary heavy work
|
||||
volatile size_t total = 0;
|
||||
for (size_t k = 0; k < 549528914167; k++) {
|
||||
if (should_cancel) {
|
||||
break;
|
||||
}
|
||||
total += k * (i + 42);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Create an executor
|
||||
auto executor = std::make_shared<ExecutorType>();
|
||||
// Start spinning
|
||||
auto executor_thread = std::thread(
|
||||
[executor]() {
|
||||
executor->spin();
|
||||
});
|
||||
// Add a node to the executor
|
||||
executor->add_node(this->node);
|
||||
|
||||
// Cancel the executor (make sure that it's already spinning first)
|
||||
while (!executor->is_spinning() && rclcpp::ok()) {
|
||||
continue;
|
||||
}
|
||||
executor->cancel();
|
||||
|
||||
// Try to join the thread after cancelling the executor
|
||||
// This is the "test". We want to make sure that we can still cancel the executor
|
||||
// regardless of the presence of race conditions
|
||||
executor_thread.join();
|
||||
|
||||
// The test is now completed: we can join the stress threads
|
||||
should_cancel = true;
|
||||
for (auto & t : stress_threads) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
// This test verifies the thread-safety of adding and removing a node
|
||||
// while the executor is spinning and events are ready.
|
||||
// This test does not contain expectations, but rather it verifies that
|
||||
// we can run a "stressful routine" without crashing.
|
||||
TYPED_TEST(TestExecutors, stressAddRemoveNode)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
ExecutorType executor;
|
||||
|
||||
// A timer that is "always" ready (the timer callback doesn't do anything)
|
||||
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});
|
||||
|
||||
// This thread spins the executor until it's cancelled
|
||||
std::thread spinner_thread([&]() {
|
||||
executor.spin();
|
||||
});
|
||||
|
||||
// This thread publishes data in a busy loop (the node has a subscription)
|
||||
std::thread publisher_thread1([&]() {
|
||||
for (size_t i = 0; i < 100000; i++) {
|
||||
this->publisher->publish(test_msgs::msg::Empty());
|
||||
}
|
||||
});
|
||||
std::thread publisher_thread2([&]() {
|
||||
for (size_t i = 0; i < 100000; i++) {
|
||||
this->publisher->publish(test_msgs::msg::Empty());
|
||||
}
|
||||
});
|
||||
|
||||
// This thread adds/remove the node that contains the entities in a busy loop
|
||||
std::thread add_remove_thread([&]() {
|
||||
for (size_t i = 0; i < 100000; i++) {
|
||||
executor.add_node(this->node);
|
||||
executor.remove_node(this->node);
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for the threads that do real work to finish
|
||||
publisher_thread1.join();
|
||||
publisher_thread2.join();
|
||||
add_remove_thread.join();
|
||||
|
||||
executor.cancel();
|
||||
spinner_thread.join();
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
|
||||
{
|
||||
|
||||
@@ -46,6 +46,23 @@ public:
|
||||
{
|
||||
spin_node_once_nanoseconds(node, std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
rclcpp::memory_strategy::MemoryStrategy * memory_strategy_ptr()
|
||||
{
|
||||
return memory_strategy_.get();
|
||||
}
|
||||
|
||||
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group(
|
||||
rclcpp::CallbackGroup::SharedPtr group)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard_{mutex_}; // only to make the TSA happy
|
||||
return get_node_by_group(weak_groups_to_nodes_, group);
|
||||
}
|
||||
|
||||
rclcpp::CallbackGroup::SharedPtr local_get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
|
||||
{
|
||||
return get_group_by_timer(timer);
|
||||
}
|
||||
};
|
||||
|
||||
class TestExecutor : public ::testing::Test
|
||||
@@ -113,7 +130,7 @@ TEST_F(TestExecutor, constructor_bad_wait_set_init) {
|
||||
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_init, RCL_RET_ERROR);
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
static_cast<void>(std::make_unique<DummyExecutor>()),
|
||||
std::runtime_error("Failed to create wait set: error not set"));
|
||||
std::runtime_error("Failed to create wait set in Executor constructor: error not set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, add_callback_group_twice) {
|
||||
@@ -125,7 +142,7 @@ TEST_F(TestExecutor, add_callback_group_twice) {
|
||||
cb_group->get_associated_with_executor_atomic().exchange(false);
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false),
|
||||
std::runtime_error("Callback group has already been added to this executor."));
|
||||
std::runtime_error("Callback group was already added to executor."));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, add_callback_group_failed_trigger_guard_condition) {
|
||||
@@ -151,15 +168,9 @@ TEST_F(TestExecutor, remove_callback_group_null_node) {
|
||||
|
||||
node.reset();
|
||||
|
||||
|
||||
/**
|
||||
* TODO(mjcarroll): Assert this when we are enforcing that nodes must be destroyed
|
||||
* after their created callback groups.
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.remove_callback_group(cb_group, false),
|
||||
std::runtime_error("Node must not be deleted before its callback group(s)."));
|
||||
*/
|
||||
EXPECT_NO_THROW(dummy.remove_callback_group(cb_group, false));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, remove_callback_group_failed_trigger_guard_condition) {
|
||||
@@ -186,7 +197,7 @@ TEST_F(TestExecutor, remove_node_not_associated) {
|
||||
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.remove_node(node->get_node_base_interface(), false),
|
||||
std::runtime_error("Node '/ns/node' needs to be associated with an executor."));
|
||||
std::runtime_error("Node needs to be associated with an executor."));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, remove_node_associated_with_different_executor) {
|
||||
@@ -200,7 +211,7 @@ TEST_F(TestExecutor, remove_node_associated_with_different_executor) {
|
||||
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy2.remove_node(node1->get_node_base_interface(), false),
|
||||
std::runtime_error("Node '/ns/node1' needs to be associated with this executor."));
|
||||
std::runtime_error("Node needs to be associated with this executor."));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_node_once_nanoseconds) {
|
||||
@@ -317,14 +328,42 @@ TEST_F(TestExecutor, cancel_failed_trigger_guard_condition) {
|
||||
std::runtime_error("Failed to trigger guard condition in cancel: error not set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, create_executor_fail_wait_set_clear) {
|
||||
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR);
|
||||
TEST_F(TestExecutor, set_memory_strategy_nullptr) {
|
||||
DummyExecutor dummy;
|
||||
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
DummyExecutor dummy,
|
||||
std::runtime_error("Couldn't clear the wait set: error not set"));
|
||||
dummy.set_memory_strategy(nullptr),
|
||||
std::runtime_error("Received NULL memory strategy in executor."));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_all_fail_wait_set_clear) {
|
||||
TEST_F(TestExecutor, set_memory_strategy) {
|
||||
DummyExecutor dummy;
|
||||
rclcpp::memory_strategy::MemoryStrategy::SharedPtr strategy =
|
||||
std::make_shared<
|
||||
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
|
||||
|
||||
dummy.set_memory_strategy(strategy);
|
||||
EXPECT_EQ(dummy.memory_strategy_ptr(), strategy.get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_once_failed_trigger_guard_condition) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
auto timer =
|
||||
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {});
|
||||
|
||||
dummy.add_node(node);
|
||||
// Wait for the wall timer to have expired.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
auto mock = mocking_utils::patch_and_return(
|
||||
"lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR);
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.spin_once(std::chrono::milliseconds(1)),
|
||||
std::runtime_error(
|
||||
"Failed to trigger guard condition from execute_any_executable: error not set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_some_fail_wait_set_clear) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
auto timer =
|
||||
@@ -332,10 +371,9 @@ TEST_F(TestExecutor, spin_all_fail_wait_set_clear) {
|
||||
|
||||
dummy.add_node(node);
|
||||
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR);
|
||||
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.spin_all(std::chrono::milliseconds(1)),
|
||||
std::runtime_error("Couldn't clear the wait set: error not set"));
|
||||
dummy.spin_some(std::chrono::milliseconds(1)),
|
||||
std::runtime_error("Couldn't clear wait set: error not set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_some_fail_wait_set_resize) {
|
||||
@@ -363,7 +401,7 @@ TEST_F(TestExecutor, spin_some_fail_add_handles_to_wait_set) {
|
||||
RCL_RET_ERROR);
|
||||
RCLCPP_EXPECT_THROW_EQ(
|
||||
dummy.spin_some(std::chrono::milliseconds(1)),
|
||||
std::runtime_error("Couldn't fill wait set: error not set"));
|
||||
std::runtime_error("Couldn't fill wait set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_some_fail_wait) {
|
||||
@@ -379,6 +417,71 @@ TEST_F(TestExecutor, spin_some_fail_wait) {
|
||||
std::runtime_error("rcl_wait() failed: error not set"));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_node_by_group_null_group) {
|
||||
DummyExecutor dummy;
|
||||
ASSERT_EQ(nullptr, dummy.local_get_node_by_group(nullptr));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_node_by_group) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false);
|
||||
ASSERT_EQ(node->get_node_base_interface().get(), dummy.local_get_node_by_group(cb_group).get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_node_by_group_not_found) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
ASSERT_EQ(nullptr, dummy.local_get_node_by_group(cb_group).get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_group_by_timer_nullptr) {
|
||||
DummyExecutor dummy;
|
||||
ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(nullptr));
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_group_by_timer) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
auto timer =
|
||||
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
|
||||
dummy.add_node(node);
|
||||
|
||||
ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_group_by_timer_with_deleted_group) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
auto timer =
|
||||
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
|
||||
dummy.add_node(node);
|
||||
|
||||
cb_group.reset();
|
||||
|
||||
ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(timer).get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, get_group_by_timer_add_callback_group) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
|
||||
rclcpp::CallbackGroupType::MutuallyExclusive);
|
||||
auto timer =
|
||||
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
|
||||
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false);
|
||||
|
||||
ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get());
|
||||
}
|
||||
|
||||
TEST_F(TestExecutor, spin_until_future_complete_in_spin_until_future_complete) {
|
||||
DummyExecutor dummy;
|
||||
auto node = std::make_shared<rclcpp::Node>("node", "ns");
|
||||
|
||||
@@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
|
||||
EXPECT_EQ(original_value, *popped_unique_msg);
|
||||
EXPECT_EQ(original_message_pointer, popped_message_pointer);
|
||||
}
|
||||
|
||||
/*
|
||||
Check the available buffer capacity while storing and consuming data from an intra-process
|
||||
buffer.
|
||||
The initial available buffer capacity should equal the buffer size.
|
||||
Inserting a message should decrease the available buffer capacity by 1.
|
||||
Consuming a message should increase the available buffer capacity by 1.
|
||||
*/
|
||||
TEST(TestIntraProcessBuffer, available_capacity) {
|
||||
using MessageT = char;
|
||||
using Alloc = std::allocator<void>;
|
||||
using Deleter = std::default_delete<MessageT>;
|
||||
using SharedMessageT = std::shared_ptr<const MessageT>;
|
||||
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
|
||||
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
|
||||
MessageT, Alloc, Deleter, UniqueMessageT>;
|
||||
|
||||
constexpr auto history_depth = 5u;
|
||||
|
||||
auto buffer_impl =
|
||||
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
|
||||
history_depth);
|
||||
|
||||
UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));
|
||||
|
||||
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
|
||||
|
||||
auto original_unique_msg = std::make_unique<char>('a');
|
||||
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
|
||||
auto original_value = *original_unique_msg;
|
||||
|
||||
intra_process_buffer.add_unique(std::move(original_unique_msg));
|
||||
|
||||
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
|
||||
|
||||
SharedMessageT popped_shared_msg;
|
||||
popped_shared_msg = intra_process_buffer.consume_shared();
|
||||
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());
|
||||
|
||||
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
|
||||
EXPECT_EQ(original_value, *popped_shared_msg);
|
||||
EXPECT_EQ(original_message_pointer, popped_message_pointer);
|
||||
|
||||
original_unique_msg = std::make_unique<char>('b');
|
||||
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
|
||||
original_value = *original_unique_msg;
|
||||
|
||||
intra_process_buffer.add_unique(std::move(original_unique_msg));
|
||||
|
||||
auto second_unique_msg = std::make_unique<char>('c');
|
||||
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
|
||||
auto second_value = *second_unique_msg;
|
||||
|
||||
intra_process_buffer.add_unique(std::move(second_unique_msg));
|
||||
|
||||
EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());
|
||||
|
||||
UniqueMessageT popped_unique_msg;
|
||||
popped_unique_msg = intra_process_buffer.consume_unique();
|
||||
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
|
||||
|
||||
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
|
||||
EXPECT_EQ(original_value, *popped_unique_msg);
|
||||
EXPECT_EQ(original_message_pointer, popped_message_pointer);
|
||||
|
||||
popped_unique_msg = intra_process_buffer.consume_unique();
|
||||
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
|
||||
|
||||
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
|
||||
EXPECT_EQ(second_value, *popped_unique_msg);
|
||||
EXPECT_EQ(second_message_pointer, popped_message_pointer);
|
||||
}
|
||||
|
||||
@@ -156,18 +156,26 @@ public:
|
||||
{
|
||||
message_ptr = reinterpret_cast<std::uintptr_t>(msg.get());
|
||||
shared_msg = msg;
|
||||
++num_msgs;
|
||||
}
|
||||
|
||||
void add(MessageUniquePtr msg)
|
||||
{
|
||||
message_ptr = reinterpret_cast<std::uintptr_t>(msg.get());
|
||||
unique_msg = std::move(msg);
|
||||
++num_msgs;
|
||||
}
|
||||
|
||||
void pop(std::uintptr_t & msg_ptr)
|
||||
{
|
||||
msg_ptr = message_ptr;
|
||||
message_ptr = 0;
|
||||
--num_msgs;
|
||||
}
|
||||
|
||||
size_t size() const
|
||||
{
|
||||
return num_msgs;
|
||||
}
|
||||
|
||||
// need to store the messages somewhere otherwise the memory address will be reused
|
||||
@@ -175,6 +183,8 @@ public:
|
||||
MessageUniquePtr unique_msg;
|
||||
|
||||
std::uintptr_t message_ptr;
|
||||
// count add and pop
|
||||
size_t num_msgs = 0u;
|
||||
};
|
||||
|
||||
} // namespace mock
|
||||
@@ -221,6 +231,10 @@ public:
|
||||
return topic_name.c_str();
|
||||
}
|
||||
|
||||
virtual
|
||||
size_t
|
||||
available_capacity() const = 0;
|
||||
|
||||
rclcpp::QoS qos_profile;
|
||||
std::string topic_name;
|
||||
};
|
||||
@@ -280,6 +294,12 @@ public:
|
||||
return take_shared_method;
|
||||
}
|
||||
|
||||
size_t
|
||||
available_capacity() const override
|
||||
{
|
||||
return qos_profile.depth() - buffer->size();
|
||||
}
|
||||
|
||||
bool take_shared_method;
|
||||
|
||||
typename rclcpp::experimental::buffers::mock::IntraProcessBuffer<MessageT>::UniquePtr buffer;
|
||||
@@ -712,3 +732,91 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
|
||||
EXPECT_EQ(original_message_pointer, received_message_pointer_10);
|
||||
EXPECT_NE(original_message_pointer, received_message_pointer_11);
|
||||
}
|
||||
|
||||
/*
|
||||
This tests the method "lowest_available_capacity":
|
||||
- Creates 1 publisher.
|
||||
- The available buffer capacity should be at least history size.
|
||||
- Add 2 subscribers.
|
||||
- Add everything to the intra-process manager.
|
||||
- All the entities are expected to have different ids.
|
||||
- Check the subscriptions count for the publisher.
|
||||
- The available buffer capacity should be the history size.
|
||||
- Publish one message (without receiving it).
|
||||
- The available buffer capacity should decrease by 1.
|
||||
- Publish another message (without receiving it).
|
||||
- The available buffer capacity should decrease by 1.
|
||||
- One subscriber receives one message.
|
||||
- The available buffer capacity should stay the same,
|
||||
as the other subscriber still has not freed its buffer.
|
||||
- The other subscriber receives one message.
|
||||
- The available buffer capacity should increase by 1.
|
||||
- One subscription goes out of scope.
|
||||
- The available buffer capacity should not change.
|
||||
*/
|
||||
TEST(TestIntraProcessManager, lowest_available_capacity) {
|
||||
using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager;
|
||||
using MessageT = rcl_interfaces::msg::Log;
|
||||
using PublisherT = rclcpp::mock::Publisher<MessageT>;
|
||||
using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess<MessageT>;
|
||||
|
||||
constexpr auto history_depth = 10u;
|
||||
|
||||
auto ipm = std::make_shared<IntraProcessManagerT>();
|
||||
|
||||
auto p1 = std::make_shared<PublisherT>(rclcpp::QoS(history_depth).best_effort());
|
||||
|
||||
auto s1 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).best_effort());
|
||||
auto s2 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).best_effort());
|
||||
|
||||
auto p1_id = ipm->add_publisher(p1);
|
||||
p1->set_intra_process_manager(p1_id, ipm);
|
||||
|
||||
auto c1 = ipm->lowest_available_capacity(p1_id);
|
||||
|
||||
ASSERT_LE(0u, c1);
|
||||
|
||||
auto s1_id = ipm->add_subscription(s1);
|
||||
auto s2_id = ipm->add_subscription(s2);
|
||||
|
||||
bool unique_ids = s1_id != s2_id && p1_id != s1_id;
|
||||
ASSERT_TRUE(unique_ids);
|
||||
|
||||
size_t p1_subs = ipm->get_subscription_count(p1_id);
|
||||
size_t non_existing_pub_subs = ipm->get_subscription_count(42);
|
||||
ASSERT_EQ(2u, p1_subs);
|
||||
ASSERT_EQ(0u, non_existing_pub_subs);
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
auto non_existing_pub_c = ipm->lowest_available_capacity(42);
|
||||
|
||||
ASSERT_EQ(history_depth, c1);
|
||||
ASSERT_EQ(0u, non_existing_pub_c);
|
||||
|
||||
auto unique_msg = std::make_unique<MessageT>();
|
||||
p1->publish(std::move(unique_msg));
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
ASSERT_EQ(history_depth - 1u, c1);
|
||||
|
||||
unique_msg = std::make_unique<MessageT>();
|
||||
p1->publish(std::move(unique_msg));
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
ASSERT_EQ(history_depth - 2u, c1);
|
||||
|
||||
s1->pop();
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
ASSERT_EQ(history_depth - 2u, c1);
|
||||
|
||||
s2->pop();
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
ASSERT_EQ(history_depth - 1u, c1);
|
||||
|
||||
ipm->get_subscription_intra_process(s1_id).reset();
|
||||
|
||||
c1 = ipm->lowest_available_capacity(p1_id);
|
||||
ASSERT_EQ(history_depth - 1u, c1);
|
||||
}
|
||||
|
||||
214
rclcpp/test/rclcpp/test_logger_service.cpp
Normal file
214
rclcpp/test/rclcpp/test_logger_service.cpp
Normal file
@@ -0,0 +1,214 @@
|
||||
// Copyright 2023 Sony Group Corporation.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rcl_interfaces/srv/get_logger_levels.hpp"
|
||||
#include "rcl_interfaces/srv/set_logger_levels.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class TestLoggerService : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
void SetUp()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
rclcpp::NodeOptions options = rclcpp::NodeOptions();
|
||||
options.enable_logger_service(true);
|
||||
node_ = std::make_shared<rclcpp::Node>("test_logger_service", "/test", options);
|
||||
}
|
||||
|
||||
void TearDown()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
rclcpp::Node::SharedPtr node_;
|
||||
std::thread thread_;
|
||||
};
|
||||
|
||||
TEST_F(TestLoggerService, check_connect_get_logger_service) {
|
||||
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
|
||||
"/test/test_logger_service/get_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(2s));
|
||||
}
|
||||
|
||||
TEST_F(TestLoggerService, check_connect_set_logger_service) {
|
||||
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
"/test/test_logger_service/set_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(2s));
|
||||
}
|
||||
|
||||
TEST_F(TestLoggerService, test_set_and_get_one_logging_level) {
|
||||
std::string test_logger_name = "rcl";
|
||||
uint8_t test_logger_level = 20;
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
"/test/test_logger_service/set_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
|
||||
auto level = rcl_interfaces::msg::LoggerLevel();
|
||||
level.name = test_logger_name;
|
||||
level.level = test_logger_level;
|
||||
request->levels.push_back(level);
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->results.size(), 1u);
|
||||
ASSERT_TRUE(result_get->results[0].successful);
|
||||
ASSERT_STREQ(result_get->results[0].reason.c_str(), "");
|
||||
}
|
||||
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
|
||||
"/test/test_logger_service/get_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::GetLoggerLevels::Request>();
|
||||
request->names.emplace_back(test_logger_name);
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->levels.size(), 1u);
|
||||
ASSERT_STREQ(result_get->levels[0].name.c_str(), test_logger_name.c_str());
|
||||
ASSERT_EQ(result_get->levels[0].level, test_logger_level);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestLoggerService, test_set_and_get_multi_logging_level) {
|
||||
std::vector<std::pair<std::string, uint8_t>> test_data {
|
||||
{"rcl", 30},
|
||||
{"rclcpp", 40},
|
||||
{"/test/test_logger_service", 50}
|
||||
};
|
||||
|
||||
// Set multi log levels
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
"/test/test_logger_service/set_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
|
||||
for (auto & set_level : test_data) {
|
||||
auto level = rcl_interfaces::msg::LoggerLevel();
|
||||
level.name = std::get<0>(set_level);
|
||||
level.level = std::get<1>(set_level);
|
||||
request->levels.push_back(level);
|
||||
}
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->results.size(), test_data.size());
|
||||
for (uint32_t i = 0; i < test_data.size(); i++) {
|
||||
ASSERT_TRUE(result_get->results[0].successful);
|
||||
}
|
||||
}
|
||||
|
||||
// Get multi log levels
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
|
||||
"/test/test_logger_service/get_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::GetLoggerLevels::Request>();
|
||||
for (auto & set_level : test_data) {
|
||||
request->names.emplace_back(std::get<0>(set_level));
|
||||
}
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->levels.size(), test_data.size());
|
||||
for (uint32_t i = 0; i < test_data.size(); i++) {
|
||||
ASSERT_STREQ(result_get->levels[i].name.c_str(), std::get<0>(test_data[i]).c_str());
|
||||
ASSERT_EQ(result_get->levels[i].level, std::get<1>(test_data[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestLoggerService, test_set_logging_level_with_invalid_param) {
|
||||
std::vector<std::pair<std::string, uint8_t>> test_data {
|
||||
{"rcl", 12},
|
||||
{"/test/test_logger_service", 22}
|
||||
};
|
||||
|
||||
// Set multi log levels
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
"/test/test_logger_service/set_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
|
||||
for (auto & set_level : test_data) {
|
||||
auto level = rcl_interfaces::msg::LoggerLevel();
|
||||
level.name = std::get<0>(set_level);
|
||||
level.level = std::get<1>(set_level);
|
||||
request->levels.push_back(level);
|
||||
}
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->results.size(), test_data.size());
|
||||
for (uint32_t i = 0; i < test_data.size(); i++) {
|
||||
ASSERT_FALSE(result_get->results[i].successful);
|
||||
// Check string starts with prefix
|
||||
ASSERT_EQ(
|
||||
result_get->results[i].reason.rfind("Unable to determine severity_string for severity", 0),
|
||||
0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestLoggerService, test_set_logging_level_with_partial_invalid_param) {
|
||||
std::vector<std::pair<std::string, uint8_t>> test_data {
|
||||
{"rcl", 20},
|
||||
{"rclcpp", 22},
|
||||
{"/test/test_logger_service", 30}
|
||||
};
|
||||
|
||||
// Set multi log levels
|
||||
{
|
||||
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
|
||||
"/test/test_logger_service/set_logger_levels");
|
||||
ASSERT_TRUE(client->wait_for_service(1s));
|
||||
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
|
||||
for (auto & set_level : test_data) {
|
||||
auto level = rcl_interfaces::msg::LoggerLevel();
|
||||
level.name = std::get<0>(set_level);
|
||||
level.level = std::get<1>(set_level);
|
||||
request->levels.push_back(level);
|
||||
}
|
||||
auto result = client->async_send_request(request);
|
||||
ASSERT_EQ(
|
||||
rclcpp::spin_until_future_complete(node_, result),
|
||||
rclcpp::FutureReturnCode::SUCCESS);
|
||||
auto result_get = result.get();
|
||||
ASSERT_EQ(result_get->results.size(), test_data.size());
|
||||
ASSERT_TRUE(result_get->results[0].successful);
|
||||
ASSERT_FALSE(result_get->results[1].successful);
|
||||
ASSERT_TRUE(result_get->results[2].successful);
|
||||
}
|
||||
}
|
||||
@@ -266,6 +266,11 @@ TEST(TestNodeOptions, bool_setters_and_getters) {
|
||||
EXPECT_FALSE(options.automatically_declare_parameters_from_overrides());
|
||||
options.automatically_declare_parameters_from_overrides(true);
|
||||
EXPECT_TRUE(options.automatically_declare_parameters_from_overrides());
|
||||
|
||||
options.enable_logger_service(false);
|
||||
EXPECT_FALSE(options.enable_logger_service());
|
||||
options.enable_logger_service(true);
|
||||
EXPECT_TRUE(options.enable_logger_service());
|
||||
}
|
||||
|
||||
TEST(TestNodeOptions, parameter_event_qos) {
|
||||
|
||||
@@ -629,6 +629,41 @@ TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) {
|
||||
EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(6000)));
|
||||
}
|
||||
|
||||
TEST_F(TestPublisher, lowest_available_ipm_capacity) {
|
||||
constexpr auto history_depth = 10u;
|
||||
|
||||
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
|
||||
|
||||
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> options_ipm_disabled;
|
||||
options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
|
||||
|
||||
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> options_ipm_enabled;
|
||||
options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
|
||||
|
||||
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Strings>) {};
|
||||
auto pub_ipm_disabled = node->create_publisher<test_msgs::msg::Strings>(
|
||||
"topic", history_depth,
|
||||
options_ipm_disabled);
|
||||
auto pub_ipm_enabled = node->create_publisher<test_msgs::msg::Strings>(
|
||||
"topic", history_depth,
|
||||
options_ipm_enabled);
|
||||
auto sub = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"topic",
|
||||
history_depth,
|
||||
do_nothing);
|
||||
|
||||
ASSERT_EQ(1, pub_ipm_enabled->get_intra_process_subscription_count());
|
||||
ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity());
|
||||
ASSERT_EQ(history_depth, pub_ipm_enabled->lowest_available_ipm_capacity());
|
||||
|
||||
auto msg = std::make_shared<test_msgs::msg::Strings>();
|
||||
ASSERT_NO_THROW(pub_ipm_disabled->publish(*msg));
|
||||
ASSERT_NO_THROW(pub_ipm_enabled->publish(*msg));
|
||||
|
||||
ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity());
|
||||
ASSERT_EQ(history_depth - 1u, pub_ipm_enabled->lowest_available_ipm_capacity());
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
TestWaitForAllAckedWithParm,
|
||||
TestPublisherWaitForAllAcked,
|
||||
|
||||
@@ -70,7 +70,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
|
||||
// before calling get_child of Logger
|
||||
{
|
||||
RCLCPP_INFO(
|
||||
rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
|
||||
rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
|
||||
@@ -83,7 +83,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
|
||||
// after calling get_child of Logger
|
||||
// 1. use child_logger directly
|
||||
{
|
||||
RCLCPP_INFO(child_logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(child_logger, "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
@@ -93,7 +93,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
|
||||
|
||||
// 2. use rclcpp::get_logger
|
||||
{
|
||||
RCLCPP_INFO(rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
@@ -104,7 +104,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
|
||||
|
||||
// `child_logger` is end of life, there is no sublogger
|
||||
{
|
||||
RCLCPP_INFO(rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
|
||||
@@ -119,7 +119,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_parent_log) {
|
||||
|
||||
rclcpp::Logger logger = this->node->get_logger();
|
||||
ASSERT_EQ(logger.get_name(), logger_name);
|
||||
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
@@ -133,14 +133,14 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_child_log) {
|
||||
this->rosout_msg_name = logger_name;
|
||||
|
||||
rclcpp::Logger logger = this->node->get_logger();
|
||||
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
|
||||
received_msg_promise = {};
|
||||
|
||||
logger = this->node->get_logger().get_child("child1");
|
||||
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
|
||||
future = received_msg_promise.get_future();
|
||||
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
@@ -148,14 +148,14 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_child_log) {
|
||||
received_msg_promise = {};
|
||||
|
||||
logger = this->node->get_logger().get_child("child2");
|
||||
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
|
||||
future = received_msg_promise.get_future();
|
||||
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
|
||||
received_msg_promise = {};
|
||||
|
||||
this->rosout_msg_name = "ns.test_rosout_subscription.child2";
|
||||
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
|
||||
future = received_msg_promise.get_future();
|
||||
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
@@ -171,7 +171,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild_hierarchy) {
|
||||
rclcpp::Logger grandchild_logger =
|
||||
this->node->get_logger().get_child("child").get_child("grandchild");
|
||||
ASSERT_EQ(grandchild_logger.get_name(), logger_name);
|
||||
RCLCPP_INFO(grandchild_logger, this->rosout_msg_data.c_str());
|
||||
RCLCPP_INFO(grandchild_logger, "%s", this->rosout_msg_data.c_str());
|
||||
auto future = received_msg_promise.get_future();
|
||||
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
|
||||
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
|
||||
|
||||
@@ -3,6 +3,15 @@ Changelog for package rclcpp_action
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
||||
21.1.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.1.0 (2023-04-27)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
|
||||
20.0.0 (2023-04-13)
|
||||
-------------------
|
||||
* extract the result response before the callback is issued. (`#2132 <https://github.com/ros2/rclcpp/issues/2132>`_)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_action</name>
|
||||
<version>20.0.0</version>
|
||||
<version>21.1.1</version>
|
||||
<description>Adds action APIs for C++.</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -2,6 +2,15 @@
|
||||
Changelog for package rclcpp_components
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
21.1.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.1.0 (2023-04-27)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
|
||||
20.0.0 (2023-04-13)
|
||||
-------------------
|
||||
* Update all rclcpp packages to C++17. (`#2121 <https://github.com/ros2/rclcpp/issues/2121>`_)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_components</name>
|
||||
<version>20.0.0</version>
|
||||
<version>21.1.1</version>
|
||||
<description>Package containing tools for dynamically loadable components</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -3,6 +3,18 @@ Changelog for package rclcpp_lifecycle
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
||||
21.1.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.1.0 (2023-04-27)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
|
||||
* Support publishing loaned messages in LifecyclePublisher (`#2159 <https://github.com/ros2/rclcpp/issues/2159>`_)
|
||||
* Contributors: Lei Liu, Michael Babenko
|
||||
|
||||
20.0.0 (2023-04-13)
|
||||
-------------------
|
||||
* Fixes to silence some clang warnings. (`#2127 <https://github.com/ros2/rclcpp/issues/2127>`_)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_lifecycle</name>
|
||||
<version>20.0.0</version>
|
||||
<version>21.1.1</version>
|
||||
<description>Package containing a prototype for lifecycle implementation</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -76,7 +76,7 @@ LifecycleNode::LifecycleNode(
|
||||
options.use_intra_process_comms(),
|
||||
options.enable_topic_statistics())),
|
||||
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
|
||||
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
|
||||
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_)),
|
||||
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
|
||||
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
|
||||
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
|
||||
|
||||
Reference in New Issue
Block a user