Compare commits

..

32 Commits

Author SHA1 Message Date
Chris Lalancette
6e97990a32 22.0.0 2023-07-11 19:48:37 +00:00
Chris Lalancette
4ebc5f61d8 Changelog.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-07-11 19:48:30 +00:00
Emerson Knapp
a7a9b78fee Implement get_node_type_descriptions_interface for lifecyclenode and add smoke test for it (#2237)
Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
2023-07-11 08:41:53 -04:00
Chris Lalancette
945d254e32 Switch lifecycle to use the RCLCPP macros. (#2233)
This ensures that they'll go out to /rosout and the disk.

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-07-10 15:59:35 -04:00
Emerson Knapp
deebbc3ad6 Add new node interface TypeDescriptionsInterface to provide GetTypeDescription service (#2224)
* TypeDescriptions interface with readonly param configuration

* Add parameter descriptor, to make read only

* example of spinning in thread for get_type_description service

* Add a basic test for the new interface

* Fix tests with new parameter

* Add comments about builtin parameters

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
Signed-off-by: William Woodall <william@osrfoundation.org>
2023-07-07 13:10:27 -04:00
Nathan Wiebe Neufeldt
588dba7a70 Move always_false_v to detail namespace (#2232)
Since this is a common idiom, especially under this name, we should
define the `always_false_v` template within a namespace to avoid
conflict with other libraries and user code. This could either be
`rclcpp::detail` if it's intended only for internal use or just `rclcpp`
if it's intended as a public helper. In this PR, I've initially chosen
the former.

Signed-off-by: Nathan Wiebe Neufeldt <nwiebeneufeldt@clearpath.ai>
2023-07-05 16:55:11 -04:00
Chris Lalancette
2e355e4849 Revamp the test_subscription.cpp tests. (#2227)
The original motiviation to do this was a crash during
teardown when using a newer version of gtest.  But while
I was in here, I did a small overall cleanup, including:

1.  Moving code closer to where it is actually used.
2.  Getting rid of unused 'using' statements.
3.  Adding in missing includes.
4.  Properly tearing down and recreating the rclcpp
    context during test teardown (this fixed the actual
    bug).
5.  Making class members private where possible.
6.  Renaming class methods to our usual conventions.

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-06-26 12:59:56 -04:00
Tomoya Fujita
fe2e0e4c64 warning: comparison of integer expressions of different signedness (#2219)
https://github.com/ros2/rclcpp/pull/2167#issuecomment-1597197552

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
2023-06-22 08:02:27 -07:00
Eloy Briceno
005f6aefe9 Modifies timers API to select autostart state (#2005)
* Modifies timers API to select autostart state

* Removes unnecessary variables

* Adds autostart documentation and expands some timer test

Signed-off-by: Voldivh <eloyabmfcv@gmail.com>
2023-06-21 10:47:14 -04:00
Christopher Wecht
3a64349aec Enable callback group tests for connextdds (#2182)
* Enable callback group tests for connextdds

* Enable executors and event executor tests for connextdds

* Enable qos events tests for connextdds

* Less flaky qos_event tests

Signed-off-by: Christopher Wecht <cwecht@mailbox.org>
2023-06-14 08:33:33 -04:00
Chris Lalancette
3530b0959c 21.3.0 2023-06-12 12:45:11 +00:00
Chris Lalancette
4d12bcbca0 Changelog.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-06-12 12:45:00 +00:00
Chris Lalancette
1fff79089a Fix up misspellings of "receive". (#2208)
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-06-11 22:48:56 -07:00
Michael Carroll
b3518d12ca Remove flaky stressAddRemoveNode test (#2206)
Signed-off-by: Michael Carroll <mjcarroll@intrinsic.ai>
2023-06-09 11:18:04 -05:00
Christophe Bedard
4efc05266b Use TRACETOOLS_ prefix for tracepoint-related macros (#2162)
Signed-off-by: Christophe Bedard <bedard.christophe@gmail.com>
2023-06-08 12:38:49 -05:00
Chris Lalancette
dab9c8acdc 21.2.0 2023-06-07 13:28:18 +00:00
Chris Lalancette
867ad62da2 Changelog.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-06-07 13:28:05 +00:00
Chen Lihui
f8072f2fa2 remove nolint since ament_cpplint updated for the c++17 header (#2198)
Signed-off-by: Chen Lihui <lihui.chen@sony.com>
2023-05-22 21:22:13 -07:00
DensoADAS
fce021b149 Feature/available capacity of ipm (#2173)
* added available_capacity to get the lowest number of free capacity for intra-process communication for a publisher

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* added unit tests for available_capacity

Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* fixed typos in comments

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* Updated warning

Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* returning 0 if ipm is disabled in lowest_available_ipm_capacity

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* return 0 if no subscribers are present in lowest_available_capacity

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* updated unit test

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* update unit test

Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com>

* moved available_capacity to a lambda function to be able to handle subscriptions which went out of scope

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

* updated unit test to check subscriptions which went out of scope

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>

---------

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@eu.denso.com>
Co-authored-by: Joshua Hampp <j.hampp@denso-adas.de>
Co-authored-by: Joshua Hampp <j.hampp@eu.denso.com>
Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>
2023-05-19 19:31:59 +01:00
Alberto Soragna
c4f57a7998 add mutex to protect events_executor current entity collection (#2187)
* add mutex to protect events_executor current entity collection and unit-test

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* be more precise with mutex locks; make stress test less stressfull

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* fix uncrustify error

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

---------

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
2023-05-18 14:43:26 +01:00
mauropasse
d7fdb6184c Declare rclcpp callbacks before the rcl entities (#2024)
This is to ensure callbacks are destroyed last
on entities destruction, avoiding the gap in time
in which rmw entities hold a reference to a
destroyed function.

Signed-off-by: Mauro Passerino <mpasserino@irobot.com>
Co-authored-by: Mauro Passerino <mpasserino@irobot.com>
2023-05-15 15:55:08 -04:00
Yadunund
58bcd3b822 21.1.1 2023-05-11 18:59:17 +08:00
Yadunund
26426adda9 Changelog
Signed-off-by: Yadunund <yadunund@openrobotics.org>
2023-05-11 18:58:48 +08:00
Alberto Soragna
6e1fea14e1 Fix race condition in events-executor (#2177)
The initial implementation of the events-executor contained a bug where the executor
would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
Manually adding a node to the executor results in a) producing a notify waitable event
and b) refreshing the executor collections.
The inconsistent state would happen if the event was processed before the collections were
finished to be refreshed: the executor would pick up the event but be unable to process it.
This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
notify waitable events to be pushed.
The behavior is observable only under heavy load.

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
2023-05-05 09:04:35 -05:00
Øystein Sture
86c77143c9 Add missing stdexcept include (#2186)
Signed-off-by: Øystein Sture <os@skarvtech.com>
2023-05-04 15:48:20 -04:00
Chris Lalancette
b812790ee3 Fix a format-security warning when building with clang (#2171)
In particular, you should never have a "bare" string in a
printf-like call; that could potentially access uninitialized
memory. Instead, make sure to format the string with %s.

Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-05-01 17:06:55 -04:00
methylDragon
6ca1023ef7 Fix delivered message kind (#2175)
Signed-off-by: methylDragon <methylDragon@gmail.com>
2023-05-01 16:56:29 -04:00
Yadunund
77ede02251 21.1.0 2023-04-27 16:53:54 +08:00
Chris Lalancette
a431256383 21.0.0 2023-04-18 15:35:47 +00:00
Chris Lalancette
9d2849cb0a Changelog.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
2023-04-18 15:35:41 +00:00
Lei Liu
3610b68348 Add support for logging service. (#2122)
* Add support for logging service.

* Update to not modify interfaces and not change time_source

* Use unique_ptr for NodeBuiltinExecutorImpl

* Use user thread to run logger service

* Update code for lifecycle_node

Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>
2023-04-18 11:30:00 -04:00
Michael Carroll
9c03a463c1 Picking ABI-incompatible executor changes (#2170)
* Picking ABI-incompatible executor changes

* Add PIMPL

Signed-off-by: Michael Carroll <mjcarroll@intrinsic.ai>
2023-04-18 11:29:30 -04:00
91 changed files with 3901 additions and 1241 deletions

View File

@@ -2,6 +2,52 @@
Changelog for package rclcpp
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
22.0.0 (2023-07-11)
-------------------
* Implement get_node_type_descriptions_interface for lifecyclenode and add smoke test for it (`#2237 <https://github.com/ros2/rclcpp/issues/2237>`_)
* Add new node interface TypeDescriptionsInterface to provide GetTypeDescription service (`#2224 <https://github.com/ros2/rclcpp/issues/2224>`_)
* Move always_false_v to detail namespace (`#2232 <https://github.com/ros2/rclcpp/issues/2232>`_)
* Revamp the test_subscription.cpp tests. (`#2227 <https://github.com/ros2/rclcpp/issues/2227>`_)
* warning: comparison of integer expressions of different signedness (`#2219 <https://github.com/ros2/rclcpp/issues/2219>`_)
* Modifies timers API to select autostart state (`#2005 <https://github.com/ros2/rclcpp/issues/2005>`_)
* Enable callback group tests for connextdds (`#2182 <https://github.com/ros2/rclcpp/issues/2182>`_)
* Contributors: Chris Lalancette, Christopher Wecht, Eloy Briceno, Emerson Knapp, Nathan Wiebe Neufeldt, Tomoya Fujita
21.3.0 (2023-06-12)
-------------------
* Fix up misspellings of "receive". (`#2208 <https://github.com/ros2/rclcpp/issues/2208>`_)
* Remove flaky stressAddRemoveNode test (`#2206 <https://github.com/ros2/rclcpp/issues/2206>`_)
* Use TRACETOOLS\_ prefix for tracepoint-related macros (`#2162 <https://github.com/ros2/rclcpp/issues/2162>`_)
* Contributors: Chris Lalancette, Christophe Bedard, Michael Carroll
21.2.0 (2023-06-07)
-------------------
* remove nolint since ament_cpplint updated for the c++17 header (`#2198 <https://github.com/ros2/rclcpp/issues/2198>`_)
* Feature/available capacity of ipm (`#2173 <https://github.com/ros2/rclcpp/issues/2173>`_)
* add mutex to protect events_executor current entity collection (`#2187 <https://github.com/ros2/rclcpp/issues/2187>`_)
* Declare rclcpp callbacks before the rcl entities (`#2024 <https://github.com/ros2/rclcpp/issues/2024>`_)
* Contributors: Alberto Soragna, Chen Lihui, DensoADAS, mauropasse
21.1.1 (2023-05-11)
-------------------
* Fix race condition in events-executor (`#2177 <https://github.com/ros2/rclcpp/issues/2177>`_)
* Add missing stdexcept include (`#2186 <https://github.com/ros2/rclcpp/issues/2186>`_)
* Fix a format-security warning when building with clang (`#2171 <https://github.com/ros2/rclcpp/issues/2171>`_)
* Fix delivered message kind (`#2175 <https://github.com/ros2/rclcpp/issues/2175>`_)
* Contributors: Alberto Soragna, Chris Lalancette, methylDragon, Øystein Sture
21.1.0 (2023-04-27)
-------------------
21.0.0 (2023-04-18)
-------------------
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
* Picking ABI-incompatible executor changes (`#2170 <https://github.com/ros2/rclcpp/issues/2170>`_)
* add events-executor and timers-manager in rclcpp (`#2155 <https://github.com/ros2/rclcpp/issues/2155>`_)
* Create common structures for executors to use (`#2143 <https://github.com/ros2/rclcpp/issues/2143>`_)
* Implement deliver message kind (`#2168 <https://github.com/ros2/rclcpp/issues/2168>`_)
* Contributors: Alberto Soragna, Lei Liu, Michael Carroll, methylDragon
20.0.0 (2023-04-13)
-------------------
* applied tracepoints for ring_buffer (`#2091 <https://github.com/ros2/rclcpp/issues/2091>`_)

View File

@@ -64,6 +64,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
@@ -91,6 +92,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/node_interfaces/node_time_source.cpp
src/rclcpp/node_interfaces/node_timers.cpp
src/rclcpp/node_interfaces/node_topics.cpp
src/rclcpp/node_interfaces/node_type_descriptions.cpp
src/rclcpp/node_interfaces/node_waitables.cpp
src/rclcpp/node_options.cpp
src/rclcpp/parameter.cpp

View File

@@ -156,7 +156,7 @@ public:
const std::shared_ptr<rmw_request_id_t> & request_header,
std::shared_ptr<typename ServiceT::Request> request)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), false);
TRACETOOLS_TRACEPOINT(callback_start, static_cast<const void *>(this), false);
if (std::holds_alternative<std::monostate>(callback_)) {
// TODO(ivanpauno): Remove the set method, and force the users of this class
// to pass a callback at construnciton.
@@ -182,7 +182,7 @@ public:
const auto & cb = std::get<SharedPtrWithRequestHeaderCallback>(callback_);
cb(request_header, std::move(request), response);
}
TRACEPOINT(callback_end, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(callback_end, static_cast<const void *>(this));
return response;
}
@@ -191,9 +191,9 @@ public:
#ifndef TRACETOOLS_DISABLED
std::visit(
[this](auto && arg) {
if (TRACEPOINT_ENABLED(rclcpp_callback_register)) {
if (TRACETOOLS_TRACEPOINT_ENABLED(rclcpp_callback_register)) {
char * symbol = tracetools::get_symbol(arg);
DO_TRACEPOINT(
TRACETOOLS_DO_TRACEPOINT(
rclcpp_callback_register,
static_cast<const void *>(this),
symbol);

View File

@@ -20,7 +20,7 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <variant> // NOLINT[build/include_order]
#include <variant>
#include "rosidl_runtime_cpp/traits.hpp"
#include "tracetools/tracetools.h"
@@ -34,15 +34,15 @@
#include "rclcpp/type_adapter.hpp"
template<class>
inline constexpr bool always_false_v = false;
namespace rclcpp
{
namespace detail
{
template<class>
inline constexpr bool always_false_v = false;
template<typename MessageT, typename AllocatorT>
struct MessageDeleterHelper
{
@@ -492,7 +492,7 @@ public:
std::shared_ptr<ROSMessageType> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), false);
TRACETOOLS_TRACEPOINT(callback_start, static_cast<const void *>(this), false);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
@@ -580,10 +580,10 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(detail::always_false_v<T>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(callback_end, static_cast<const void *>(this));
}
// Dispatch when input is a serialized message and the output could be anything.
@@ -592,7 +592,7 @@ public:
std::shared_ptr<rclcpp::SerializedMessage> serialized_message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), false);
TRACETOOLS_TRACEPOINT(callback_start, static_cast<const void *>(this), false);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
@@ -660,10 +660,10 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(detail::always_false_v<T>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(callback_end, static_cast<const void *>(this));
}
void
@@ -671,7 +671,7 @@ public:
std::shared_ptr<const SubscribedType> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
TRACETOOLS_TRACEPOINT(callback_start, static_cast<const void *>(this), true);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
@@ -790,10 +790,10 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(detail::always_false_v<T>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(callback_end, static_cast<const void *>(this));
}
void
@@ -801,7 +801,7 @@ public:
std::unique_ptr<SubscribedType, SubscribedTypeDeleter> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
TRACETOOLS_TRACEPOINT(callback_start, static_cast<const void *>(this), true);
// Check if the variant is "unset", throw if it is.
if (callback_variant_.index() == 0) {
if (std::get<0>(callback_variant_) == nullptr) {
@@ -924,10 +924,10 @@ public:
}
// condition to catch unhandled callback types
else { // NOLINT[readability/braces]
static_assert(always_false_v<T>, "unhandled callback type");
static_assert(detail::always_false_v<T>, "unhandled callback type");
}
}, callback_variant_);
TRACEPOINT(callback_end, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(callback_end, static_cast<const void *>(this));
}
constexpr
@@ -965,9 +965,9 @@ public:
#ifndef TRACETOOLS_DISABLED
std::visit(
[this](auto && callback) {
if (TRACEPOINT_ENABLED(rclcpp_callback_register)) {
if (TRACETOOLS_TRACEPOINT_ENABLED(rclcpp_callback_register)) {
char * symbol = tracetools::get_symbol(callback);
DO_TRACEPOINT(
TRACETOOLS_DO_TRACEPOINT(
rclcpp_callback_register,
static_cast<const void *>(this),
symbol);

View File

@@ -187,35 +187,14 @@ public:
RCLCPP_PUBLIC
size_t size() const;
/// Return a reference to the 'can be taken' atomic boolean.
/**
* The resulting bool will be true in the case that no executor is currently
* using an executable entity from this group.
* The resulting bool will be false in the case that an executor is currently
* using an executable entity from this group, and the group policy doesn't
* allow a second take (eg mutual exclusion)
* \return a reference to the flag
*/
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
/// Get the group type.
/**
* \return the group type
*/
RCLCPP_PUBLIC
const CallbackGroupType &
type() const;
/// Collect all of the entity pointers contained in this callback group.
/**
* \param[in] sub_func Function to execute for each subscription
* \param[in] service_func Function to execute for each service
* \param[in] client_func Function to execute for each client
* \param[in] timer_func Function to execute for each timer
* \param[in] waitable_fuinc Function to execute for each waitable
*/
RCLCPP_PUBLIC
void collect_all_ptrs(
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,

View File

@@ -20,13 +20,13 @@
#include <future>
#include <memory>
#include <mutex>
#include <optional> // NOLINT, cpplint doesn't think this is a cpp std header
#include <optional>
#include <sstream>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <variant> // NOLINT
#include <variant>
#include <vector>
#include "rcl/client.h"
@@ -363,12 +363,16 @@ protected:
std::shared_ptr<rclcpp::Context> context_;
rclcpp::Logger node_logger_;
std::recursive_mutex callback_mutex_;
// It is important to declare on_new_response_callback_ before
// client_handle_, so on destruction the client is
// destroyed first. Otherwise, the rmw client callback
// would point briefly to a destroyed function.
std::function<void(size_t)> on_new_response_callback_{nullptr};
// Declare client_handle_ after callback
std::shared_ptr<rcl_client_t> client_handle_;
std::atomic<bool> in_use_by_wait_set_{false};
std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_response_callback_{nullptr};
};
template<typename ServiceT>

View File

@@ -26,6 +26,7 @@
#include <unordered_set>
#include <utility>
#include <vector>
#include <stdexcept>
#include "rcl/context.h"
#include "rcl/guard_condition.h"

View File

@@ -90,7 +90,8 @@ create_timer(
rclcpp::Clock::SharedPtr clock,
rclcpp::Duration period,
CallbackT && callback,
rclcpp::CallbackGroup::SharedPtr group = nullptr)
rclcpp::CallbackGroup::SharedPtr group = nullptr,
bool autostart = true)
{
return create_timer(
clock,
@@ -98,7 +99,8 @@ create_timer(
std::forward<CallbackT>(callback),
group,
node_base.get(),
node_timers.get());
node_timers.get(),
autostart);
}
/// Create a timer with a given clock
@@ -109,7 +111,8 @@ create_timer(
rclcpp::Clock::SharedPtr clock,
rclcpp::Duration period,
CallbackT && callback,
rclcpp::CallbackGroup::SharedPtr group = nullptr)
rclcpp::CallbackGroup::SharedPtr group = nullptr,
bool autostart = true)
{
return create_timer(
clock,
@@ -117,7 +120,8 @@ create_timer(
std::forward<CallbackT>(callback),
group,
rclcpp::node_interfaces::get_node_base_interface(node).get(),
rclcpp::node_interfaces::get_node_timers_interface(node).get());
rclcpp::node_interfaces::get_node_timers_interface(node).get(),
autostart);
}
/// Convenience method to create a general timer with node resources.
@@ -132,6 +136,7 @@ create_timer(
* \param group callback group
* \param node_base node base interface
* \param node_timers node timer interface
* \param autostart defines if the timer should start it's countdown on initialization or not.
* \return shared pointer to a generic timer
* \throws std::invalid_argument if either clock, node_base or node_timers
* are nullptr, or period is negative or too large
@@ -144,7 +149,8 @@ create_timer(
CallbackT callback,
rclcpp::CallbackGroup::SharedPtr group,
node_interfaces::NodeBaseInterface * node_base,
node_interfaces::NodeTimersInterface * node_timers)
node_interfaces::NodeTimersInterface * node_timers,
bool autostart = true)
{
if (clock == nullptr) {
throw std::invalid_argument{"clock cannot be null"};
@@ -160,7 +166,7 @@ create_timer(
// Add a new generic timer.
auto timer = rclcpp::GenericTimer<CallbackT>::make_shared(
std::move(clock), period_ns, std::move(callback), node_base->get_context());
std::move(clock), period_ns, std::move(callback), node_base->get_context(), autostart);
node_timers->add_timer(timer, group);
return timer;
}
@@ -187,7 +193,8 @@ create_wall_timer(
CallbackT callback,
rclcpp::CallbackGroup::SharedPtr group,
node_interfaces::NodeBaseInterface * node_base,
node_interfaces::NodeTimersInterface * node_timers)
node_interfaces::NodeTimersInterface * node_timers,
bool autostart = true)
{
if (node_base == nullptr) {
throw std::invalid_argument{"input node_base cannot be null"};
@@ -201,7 +208,7 @@ create_wall_timer(
// Add a new wall timer.
auto timer = rclcpp::WallTimer<CallbackT>::make_shared(
period_ns, std::move(callback), node_base->get_context());
period_ns, std::move(callback), node_base->get_context(), autostart);
node_timers->add_timer(timer, group);
return timer;
}

View File

@@ -19,7 +19,6 @@
#include <cassert>
#include <chrono>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <map>
@@ -30,24 +29,26 @@
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/wait_set.hpp"
namespace rclcpp
{
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
// Forward declaration is used in convenience method signature.
class Node;
class ExecutorImplementation;
@@ -402,6 +403,17 @@ public:
void
cancel();
/// Support dynamic switching of the memory strategy.
/**
* Switching the memory strategy while the executor is spinning in another threading could have
* unintended consequences.
* \param[in] memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory_strategy is null
*/
RCLCPP_PUBLIC
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
/// Returns true if the executor is currently spinning.
/**
* This function can be called asynchronously from any thread.
@@ -486,11 +498,6 @@ protected:
static void
execute_client(rclcpp::ClientBase::SharedPtr client);
/// Gather all of the waitable entities from associated nodes and callback groups.
RCLCPP_PUBLIC
void
collect_entities();
/// Block until more work becomes avilable or timeout is reached.
/**
* Builds a set of waitable entities, which are passed to the middleware.
@@ -502,6 +509,62 @@ protected:
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// Find node associated with a callback group
/**
* \param[in] weak_groups_to_nodes map of callback groups to nodes
* \param[in] group callback group to find assocatiated node
* \return Pointer to associated node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);
/// Return true if the node has been added to this executor.
/**
* \param[in] node_ptr a shared pointer that points to a node base interface
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return true if the node is associated with the executor, otherwise false
*/
RCLCPP_PUBLIC
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
/// Find the callback group associated with a timer
/**
* \param[in] timer Timer to find associated callback group
* \return Pointer to callback group node if found, else nullptr
*/
RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
/// Add a callback group to an executor
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
virtual void
add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
virtual void
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
/// Check for executable in ready state and populate union structure.
/**
* \param[out] any_executable populated union structure of ready executable
@@ -512,6 +575,33 @@ protected:
bool
get_next_ready_executable(AnyExecutable & any_executable);
/// Check for executable in ready state and populate union structure.
/**
* This is the implementation of get_next_ready_executable that takes into
* account the current state of callback groups' association with nodes and
* executors.
*
* This checks in a particular order for available work:
* * Timers
* * Subscriptions
* * Services
* * Clients
* * Waitable
*
* If the next executable is not associated with this executor/node pair,
* then this method will return false.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] weak_groups_to_nodes mapping of callback groups to nodes
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
/// Wait for executable in ready state and populate union structure.
/**
* If an executable is ready, it will return immediately, otherwise
@@ -529,6 +619,21 @@ protected:
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// Add all callback groups that can be automatically added from associated nodes.
/**
* The executor, before collecting entities, verifies if any callback group from
* nodes associated with the executor, which is not already associated to an executor,
* can be automatically added to this executor.
* This takes care of any callback group that has been added to a node but not explicitly added
* to the executor.
* It is important to note that in order for the callback groups to be automatically added to an
* executor through this function, the node of the callback groups needs to have been added
* through the `add_node` method.
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
@@ -538,8 +643,16 @@ protected:
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
// Mutex to protect the subsequent memory_strategy_.
mutable std::mutex mutex_;
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;
@@ -549,33 +662,39 @@ protected:
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);
/// Waitable containing guard conditions controlling the executor flow.
/**
* This waitable contains the interrupt and shutdown guard condition, as well
* as the guard condition associated with each node and callback group.
* By default, if any change is detected in the monitored entities, the notify
* waitable will awake the executor and rebuild the collections.
*/
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
std::atomic_bool entities_need_rebuild_;
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;
/// Collector used to associate executable entities from nodes and guard conditions
rclcpp::executors::ExecutorEntitiesCollector collector_;
/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Waitset to be waited on.
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the collection being waited on by the waitset
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the current state of the notify waitable being waited on by the waitset
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> current_notify_waitable_
RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// Hold the list of executables currently available to be executed.
std::deque<rclcpp::AnyExecutable> ready_executables_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// shutdown callback handle registered to Context
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;

View File

@@ -0,0 +1,357 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_
#include <chrono>
#include <list>
#include <map>
#include <memory>
#include <vector>
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/experimental/executable_list.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
namespace rclcpp
{
namespace executors
{
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
class StaticExecutorEntitiesCollector final
: public rclcpp::Waitable,
public std::enable_shared_from_this<StaticExecutorEntitiesCollector>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector)
// Constructor
RCLCPP_PUBLIC
StaticExecutorEntitiesCollector() = default;
// Destructor
RCLCPP_PUBLIC
~StaticExecutorEntitiesCollector();
/// Initialize StaticExecutorEntitiesCollector
/**
* \param p_wait_set A reference to the wait set to be used in the executor
* \param memory_strategy Shared pointer to the memory strategy to set.
* \throws std::runtime_error if memory strategy is null
*/
RCLCPP_PUBLIC
void
init(
rcl_wait_set_t * p_wait_set,
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
/// Finalize StaticExecutorEntitiesCollector to clear resources
RCLCPP_PUBLIC
bool
is_init() {return initialized_;}
RCLCPP_PUBLIC
void
fini();
/// Execute the waitable.
RCLCPP_PUBLIC
void
execute(std::shared_ptr<void> & data) override;
/// Take the data so that it can be consumed with `execute`.
/**
* For `StaticExecutorEntitiesCollector`, this always return `nullptr`.
* \sa rclcpp::Waitable::take_data()
*/
RCLCPP_PUBLIC
std::shared_ptr<void>
take_data() override;
/// Function to add_handles_to_wait_set and wait for work and
/**
* block until the wait set is ready or until the timeout has been exceeded.
* \throws std::runtime_error if wait set couldn't be cleared or filled.
* \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error()
*/
RCLCPP_PUBLIC
void
refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/**
* \throws std::runtime_error if it couldn't add guard condition to wait set
*/
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
RCLCPP_PUBLIC
size_t
get_number_of_ready_guard_conditions() override;
/// Add a callback group to an executor.
/**
* \see rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
bool
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
/// Add a callback group to an executor.
/**
* \see rclcpp::Executor::add_callback_group
* \return boolean whether the node from the callback group is new
*/
RCLCPP_PUBLIC
bool
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
bool
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr);
/// Remove a callback group from the executor.
/**
* \see rclcpp::Executor::remove_callback_group_from_map
*/
RCLCPP_PUBLIC
bool
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
/**
* \see rclcpp::Executor::add_node()
* \throw std::runtime_error if node was already added
*/
RCLCPP_PUBLIC
bool
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
/**
* \see rclcpp::Executor::remove_node()
* \throw std::runtime_error if no guard condition is associated with node.
*/
RCLCPP_PUBLIC
bool
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups();
/// Get callback groups that belong to executor.
/**
* \see rclcpp::Executor::get_manually_added_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups();
/// Get callback groups that belong to executor.
/**
* \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes();
/// Complete all available queued work without blocking.
/**
* This function checks if after the guard condition was triggered
* (or a spurious wakeup happened) we are really ready to execute
* i.e. re-collect entities
*/
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
/// Return number of timers
/**
* \return number of timers
*/
RCLCPP_PUBLIC
size_t
get_number_of_timers() {return exec_list_.number_of_timers;}
/// Return number of subscriptions
/**
* \return number of subscriptions
*/
RCLCPP_PUBLIC
size_t
get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;}
/// Return number of services
/**
* \return number of services
*/
RCLCPP_PUBLIC
size_t
get_number_of_services() {return exec_list_.number_of_services;}
/// Return number of clients
/**
* \return number of clients
*/
RCLCPP_PUBLIC
size_t
get_number_of_clients() {return exec_list_.number_of_clients;}
/// Return number of waitables
/**
* \return number of waitables
*/
RCLCPP_PUBLIC
size_t
get_number_of_waitables() {return exec_list_.number_of_waitables;}
/** Return a SubscritionBase Sharedptr by index.
* \param[in] i The index of the SubscritionBase
* \return a SubscritionBase shared pointer
* \throws std::out_of_range if the argument is higher than the size of the structrue.
*/
RCLCPP_PUBLIC
rclcpp::SubscriptionBase::SharedPtr
get_subscription(size_t i) {return exec_list_.subscription[i];}
/** Return a TimerBase Sharedptr by index.
* \param[in] i The index of the TimerBase
* \return a TimerBase shared pointer
* \throws std::out_of_range if the argument is higher than the size.
*/
RCLCPP_PUBLIC
rclcpp::TimerBase::SharedPtr
get_timer(size_t i) {return exec_list_.timer[i];}
/** Return a ServiceBase Sharedptr by index.
* \param[in] i The index of the ServiceBase
* \return a ServiceBase shared pointer
* \throws std::out_of_range if the argument is higher than the size.
*/
RCLCPP_PUBLIC
rclcpp::ServiceBase::SharedPtr
get_service(size_t i) {return exec_list_.service[i];}
/** Return a ClientBase Sharedptr by index
* \param[in] i The index of the ClientBase
* \return a ClientBase shared pointer
* \throws std::out_of_range if the argument is higher than the size.
*/
RCLCPP_PUBLIC
rclcpp::ClientBase::SharedPtr
get_client(size_t i) {return exec_list_.client[i];}
/** Return a Waitable Sharedptr by index
* \param[in] i The index of the Waitable
* \return a Waitable shared pointer
* \throws std::out_of_range if the argument is higher than the size.
*/
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_waitable(size_t i) {return exec_list_.waitable[i];}
private:
/// Function to reallocate space for entities in the wait set.
/**
* \throws std::runtime_error if wait set couldn't be cleared or resized.
*/
void
prepare_wait_set();
void
fill_executable_list();
void
fill_memory_strategy();
/// Return true if the node belongs to the collector
/**
* \param[in] node_ptr a node base interface shared pointer
* \param[in] weak_groups_to_nodes map to nodes to lookup
* \return boolean whether a node belongs the collector
*/
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
/// Add all callback groups that can be automatically added by any executor
/// and is not already associated with an executor from nodes
/// that are associated with executor
/**
* \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor()
*/
void
add_callback_groups_from_nodes_associated_to_executor();
void
fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
/// Memory strategy: an interface for handling user-defined memory allocation strategies.
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
// maps callback groups to nodes.
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
// maps callback groups to nodes.
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
/// List of weak nodes registered in the static executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
// Mutex to protect vector of new nodes.
std::mutex new_nodes_mutex_;
std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> new_nodes_;
/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t * p_wait_set_ = nullptr;
/// Executable list: timers, subscribers, clients, services and waitables
rclcpp::experimental::ExecutableList exec_list_;
/// Bool to check if the entities collector has been initialized
bool initialized_ = false;
};
} // namespace executors
} // namespace rclcpp
#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_

View File

@@ -15,13 +15,24 @@
#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_
#include <atomic>
#include <chrono>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <vector>
#include <string>
#include "rmw/rmw.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/executors/static_executor_entities_collector.hpp"
#include "rclcpp/experimental/executable_list.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
@@ -54,7 +65,7 @@ public:
explicit StaticSingleThreadedExecutor(
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
/// Default destructor.
/// Default destrcutor.
RCLCPP_PUBLIC
virtual ~StaticSingleThreadedExecutor();
@@ -105,20 +116,92 @@ public:
void
spin_all(std::chrono::nanoseconds max_duration) override;
/// Add a callback group to an executor.
/**
* \sa rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;
/// Remove callback group from the executor
/**
* \sa rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
void
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify = true) override;
/// Add a node to the executor.
/**
* \sa rclcpp::Executor::add_node
*/
RCLCPP_PUBLIC
void
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;
/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::StaticSingleThreadedExecutor::add_node
*/
RCLCPP_PUBLIC
void
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
/// Remove a node from the executor.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;
/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups() override;
/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_manually_added_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups() override;
/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes() override;
protected:
/**
* @brief Executes ready executables from wait set.
* @param collection entities to evaluate for ready executables.
* @param wait_result result to check for ready executables.
* @param spin_once if true executes only the first ready executable.
* @return true if any executable was ready.
*/
RCLCPP_PUBLIC
bool
execute_ready_executables(
const rclcpp::executors::ExecutorEntitiesCollection & collection,
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result,
bool spin_once);
execute_ready_executables(bool spin_once = false);
RCLCPP_PUBLIC
void
@@ -130,6 +213,8 @@ protected:
private:
RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor)
StaticExecutorEntitiesCollector::SharedPtr entities_collector_;
};
} // namespace executors

View File

@@ -33,6 +33,7 @@ public:
virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
};
} // namespace buffers

View File

@@ -44,6 +44,7 @@ public:
virtual bool has_data() const = 0;
virtual bool use_take_shared_method() const = 0;
virtual size_t available_capacity() const = 0;
};
template<
@@ -95,7 +96,7 @@ public:
buffer_ = std::move(buffer_impl);
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_buffer_to_ipb,
static_cast<const void *>(buffer_.get()),
static_cast<const void *>(this));
@@ -143,6 +144,11 @@ public:
return std::is_same<BufferT, MessageSharedPtr>::value;
}
size_t available_capacity() const override
{
return buffer_->available_capacity();
}
private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;

View File

@@ -52,7 +52,10 @@ public:
if (capacity == 0) {
throw std::invalid_argument("capacity must be a positive, non-zero value");
}
TRACEPOINT(rclcpp_construct_ring_buffer, static_cast<const void *>(this), capacity_);
TRACETOOLS_TRACEPOINT(
rclcpp_construct_ring_buffer,
static_cast<const void *>(this),
capacity_);
}
virtual ~RingBufferImplementation() {}
@@ -69,7 +72,7 @@ public:
write_index_ = next_(write_index_);
ring_buffer_[write_index_] = std::move(request);
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_ring_buffer_enqueue,
static_cast<const void *>(this),
write_index_,
@@ -98,7 +101,7 @@ public:
}
auto request = std::move(ring_buffer_[read_index_]);
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_ring_buffer_dequeue,
static_cast<const void *>(this),
read_index_,
@@ -148,9 +151,21 @@ public:
return is_full_();
}
/// Get the remaining capacity to store messages
/**
* This member function is thread-safe.
*
* \return the number of free capacity for new messages
*/
size_t available_capacity() const
{
std::lock_guard<std::mutex> lock(mutex_);
return available_capacity_();
}
void clear()
{
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
TRACETOOLS_TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
}
private:
@@ -189,6 +204,17 @@ private:
return size_ == capacity_;
}
/// Get the remaining capacity to store messages
/**
* This member function is not thread-safe.
*
* \return the number of free capacity for new messages
*/
inline size_t available_capacity_() const
{
return capacity_ - size_;
}
size_t capacity_;
std::vector<BufferT> ring_buffer_;

View File

@@ -243,6 +243,11 @@ private:
std::function<void(size_t, int)>
create_waitable_callback(const rclcpp::Waitable * waitable_id);
/// Utility to add the notify waitable to an entities collection
void
add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);
/// Searches for the provided entity_id in the collection and returns the entity if valid
template<typename CollectionType>
typename CollectionType::EntitySharedPtr
@@ -269,9 +274,12 @@ private:
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;
/// Mutex to protect the current_entities_collection_
std::recursive_mutex collection_mutex_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};

View File

@@ -306,6 +306,11 @@ public:
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
get_subscription_intra_process(uint64_t intra_process_subscription_id);
/// Return the lowest available capacity for all subscription buffers for a publisher id.
RCLCPP_PUBLIC
size_t
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;
private:
struct SplittedSubscriptions
{

View File

@@ -87,7 +87,7 @@ public:
buffer_type),
any_callback_(callback)
{
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_callback_added,
static_cast<const void *>(this),
static_cast<const void *>(&any_callback_));

View File

@@ -62,6 +62,11 @@ public:
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;
RCLCPP_PUBLIC
virtual
size_t
available_capacity() const = 0;
bool
is_ready(rcl_wait_set_t * wait_set) override = 0;

View File

@@ -93,7 +93,7 @@ public:
buffer_type,
qos_profile,
std::make_shared<Alloc>(subscribed_type_allocator_));
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_ipb_to_subscription,
static_cast<const void *>(buffer_.get()),
static_cast<const void *>(this));
@@ -169,6 +169,11 @@ public:
return buffer_->use_take_shared_method();
}
size_t available_capacity() const override
{
return buffer_->available_capacity();
}
protected:
void
trigger_guard_condition() override

View File

@@ -56,6 +56,7 @@
#include "rclcpp/node_interfaces/node_time_source_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions_interface.hpp"
#include "rclcpp/node_interfaces/node_waitables_interface.hpp"
#include "rclcpp/node_options.hpp"
#include "rclcpp/parameter.hpp"
@@ -232,13 +233,15 @@ public:
* \param[in] period Time interval between triggers of the callback.
* \param[in] callback User-defined callback function.
* \param[in] group Callback group to execute this timer's callback in.
* \param[in] autostart The state of the clock on initialization.
*/
template<typename DurationRepT = int64_t, typename DurationT = std::milli, typename CallbackT>
typename rclcpp::WallTimer<CallbackT>::SharedPtr
create_wall_timer(
std::chrono::duration<DurationRepT, DurationT> period,
CallbackT callback,
rclcpp::CallbackGroup::SharedPtr group = nullptr);
rclcpp::CallbackGroup::SharedPtr group = nullptr,
bool autostart = true);
/// Create a timer that uses the node clock to drive the callback.
/**
@@ -1454,6 +1457,11 @@ public:
rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr
get_node_time_source_interface();
/// Return the Node's internal NodeTypeDescriptionsInterface implementation.
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr
get_node_type_descriptions_interface();
/// Return the sub-namespace, if this is a sub-node, otherwise an empty string.
/**
* The returned sub-namespace is either the accumulated sub-namespaces which
@@ -1586,6 +1594,7 @@ private:
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters_;
rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr node_time_source_;
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr node_type_descriptions_;
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_;
const rclcpp::NodeOptions node_options_;

View File

@@ -110,14 +110,16 @@ typename rclcpp::WallTimer<CallbackT>::SharedPtr
Node::create_wall_timer(
std::chrono::duration<DurationRepT, DurationT> period,
CallbackT callback,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
bool autostart)
{
return rclcpp::create_wall_timer(
period,
std::move(callback),
group,
this->node_base_.get(),
this->node_timers_.get());
this->node_timers_.get(),
autostart);
}
template<typename DurationRepT, typename DurationT, typename CallbackT>

View File

@@ -30,6 +30,7 @@
rclcpp::node_interfaces::NodeTimeSourceInterface, \
rclcpp::node_interfaces::NodeTimersInterface, \
rclcpp::node_interfaces::NodeTopicsInterface, \
rclcpp::node_interfaces::NodeTypeDescriptionsInterface, \
rclcpp::node_interfaces::NodeWaitablesInterface
@@ -118,6 +119,7 @@ public:
* - rclcpp::node_interfaces::NodeTimeSourceInterface
* - rclcpp::node_interfaces::NodeTimersInterface
* - rclcpp::node_interfaces::NodeTopicsInterface
* - rclcpp::node_interfaces::NodeTypeDescriptionsInterface
* - rclcpp::node_interfaces::NodeWaitablesInterface
*
* Or you use custom interfaces as long as you make a template specialization

View File

@@ -23,6 +23,9 @@
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcl_interfaces/srv/get_logger_levels.hpp"
#include "rcl_interfaces/srv/set_logger_levels.hpp"
namespace rclcpp
{
namespace node_interfaces
@@ -35,7 +38,7 @@ public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeLoggingInterface)
RCLCPP_PUBLIC
explicit NodeLogging(rclcpp::node_interfaces::NodeBaseInterface * node_base);
explicit NodeLogging(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base);
RCLCPP_PUBLIC
virtual
@@ -49,13 +52,21 @@ public:
const char *
get_logger_name() const override;
RCLCPP_PUBLIC
void
create_logger_services(
node_interfaces::NodeServicesInterface::SharedPtr node_services) override;
private:
RCLCPP_DISABLE_COPY(NodeLogging)
/// Handle to the NodeBaseInterface given in the constructor.
rclcpp::node_interfaces::NodeBaseInterface * node_base_;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
rclcpp::Logger logger_;
rclcpp::Service<rcl_interfaces::srv::GetLoggerLevels>::SharedPtr get_loggers_service_;
rclcpp::Service<rcl_interfaces::srv::SetLoggerLevels>::SharedPtr set_loggers_service_;
};
} // namespace node_interfaces

View File

@@ -19,6 +19,7 @@
#include "rclcpp/logger.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
#include "rclcpp/node_interfaces/detail/node_interfaces_helpers.hpp"
#include "rclcpp/visibility_control.hpp"
@@ -54,6 +55,13 @@ public:
virtual
const char *
get_logger_name() const = 0;
/// create logger services
RCLCPP_PUBLIC
virtual
void
create_logger_services(
node_interfaces::NodeServicesInterface::SharedPtr node_services) = 0;
};
} // namespace node_interfaces

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_HPP_
#include <memory>
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_parameters_interface.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions_interface.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace node_interfaces
{
/// Implementation of the NodeTypeDescriptions part of the Node API.
class NodeTypeDescriptions : public NodeTypeDescriptionsInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTypeDescriptions)
RCLCPP_PUBLIC
explicit NodeTypeDescriptions(
NodeBaseInterface::SharedPtr node_base,
NodeLoggingInterface::SharedPtr node_logging,
NodeParametersInterface::SharedPtr node_parameters,
NodeServicesInterface::SharedPtr node_services);
RCLCPP_PUBLIC
virtual
~NodeTypeDescriptions();
private:
RCLCPP_DISABLE_COPY(NodeTypeDescriptions)
// Pimpl hides helper types and functions used for wrapping a C service, which would be
// awkward to expose in this header.
class NodeTypeDescriptionsImpl;
std::unique_ptr<NodeTypeDescriptionsImpl> impl_;
};
} // namespace node_interfaces
} // namespace rclcpp
#endif // RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_HPP_

View File

@@ -0,0 +1,44 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_INTERFACE_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_INTERFACE_HPP_
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/detail/node_interfaces_helpers.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace node_interfaces
{
/// Pure virtual interface class for the NodeTypeDescriptions part of the Node API.
class NodeTypeDescriptionsInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTypeDescriptionsInterface)
RCLCPP_PUBLIC
virtual
~NodeTypeDescriptionsInterface() = default;
};
} // namespace node_interfaces
} // namespace rclcpp
RCLCPP_NODE_INTERFACE_HELPERS_SUPPORT(
rclcpp::node_interfaces::NodeTypeDescriptionsInterface, type_descriptions)
#endif // RCLCPP__NODE_INTERFACES__NODE_TYPE_DESCRIPTIONS_INTERFACE_HPP_

View File

@@ -50,6 +50,7 @@ public:
* - clock_type = RCL_ROS_TIME
* - clock_qos = rclcpp::ClockQoS()
* - use_clock_thread = true
* - enable_logger_service = false
* - rosout_qos = rclcpp::RosoutQoS()
* - parameter_event_qos = rclcpp::ParameterEventQoS
* - with history setting and depth from rmw_qos_profile_parameter_events
@@ -232,6 +233,24 @@ public:
NodeOptions &
start_parameter_services(bool start_parameter_services);
/// Return the enable_logger_service flag.
RCLCPP_PUBLIC
bool
enable_logger_service() const;
/// Set the enable_logger_service flag, return this for logger idiom.
/**
* If true, ROS services are created to allow external nodes to get
* and set logger levels of this node.
*
* If false, loggers will still be configured and set logger levels locally,
* but logger levels cannot be changed remotely .
*
*/
RCLCPP_PUBLIC
NodeOptions &
enable_logger_service(bool enable_log_service);
/// Return the start_parameter_event_publisher flag.
RCLCPP_PUBLIC
bool
@@ -421,6 +440,8 @@ private:
bool use_clock_thread_ {true};
bool enable_logger_service_ {false};
rclcpp::QoS parameter_event_qos_ = rclcpp::ParameterEventsQoS(
rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_parameter_events)
);

View File

@@ -48,7 +48,7 @@ public:
*
* Example Usage:
*
* If you have recieved a parameter event and are only interested in parameters foo and
* If you have received a parameter event and are only interested in parameters foo and
* bar being added or changed but don't care about deletion.
*
* ```cpp

View File

@@ -421,7 +421,7 @@ protected:
void
do_inter_process_publish(const ROSMessageType & msg)
{
TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(&msg));
TRACETOOLS_TRACEPOINT(rclcpp_publish, nullptr, static_cast<const void *>(&msg));
auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);
if (RCL_RET_PUBLISHER_INVALID == status) {
@@ -484,7 +484,7 @@ protected:
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());
@@ -506,7 +506,7 @@ protected:
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());
@@ -529,7 +529,7 @@ protected:
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_intra_publish,
static_cast<const void *>(publisher_handle_.get()),
msg.get());

View File

@@ -215,6 +215,17 @@ public:
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;
/// Return the lowest available capacity for all subscription buffers.
/**
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
* On failure return 0.
* \return lowest buffer capacity for all subscriptions
*/
RCLCPP_PUBLIC
size_t
lowest_available_ipm_capacity() const;
/// Wait until all published messages are acknowledged or until the specified timeout elapses.
/**
* This method waits until all published messages are acknowledged by all matching

View File

@@ -265,15 +265,19 @@ protected:
std::shared_ptr<rcl_node_t> node_handle_;
std::recursive_mutex callback_mutex_;
// It is important to declare on_new_request_callback_ before
// service_handle_, so on destruction the service is
// destroyed first. Otherwise, the rmw service callback
// would point briefly to a destroyed function.
std::function<void(size_t)> on_new_request_callback_{nullptr};
// Declare service_handle_ after callback
std::shared_ptr<rcl_service_t> service_handle_;
bool owns_rcl_handle_ = true;
rclcpp::Logger node_logger_;
std::atomic<bool> in_use_by_wait_set_{false};
std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_request_callback_{nullptr};
};
template<typename ServiceT>
@@ -348,7 +352,7 @@ public:
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create service");
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_service_callback_added,
static_cast<const void *>(get_service_handle().get()),
static_cast<const void *>(&any_callback_));
@@ -383,7 +387,7 @@ public:
}
service_handle_ = service_handle;
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_service_callback_added,
static_cast<const void *>(get_service_handle().get()),
static_cast<const void *>(&any_callback_));
@@ -420,7 +424,7 @@ public:
// In this case, rcl owns the service handle memory
service_handle_ = std::shared_ptr<rcl_service_t>(new rcl_service_t);
service_handle_->impl = service_handle->impl;
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_service_callback_added,
static_cast<const void *>(get_service_handle().get()),
static_cast<const void *>(&any_callback_));

View File

@@ -185,7 +185,7 @@ public:
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
static_cast<const void *>(subscription_intra_process_.get()));
@@ -201,11 +201,11 @@ public:
this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
static_cast<const void *>(this));
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_callback_added,
static_cast<const void *>(this),
static_cast<const void *>(&any_callback_));

View File

@@ -260,13 +260,13 @@ public:
bool
is_serialized() const;
/// Return the type of the subscription.
/// Return the delivered message kind.
/**
* \return `DeliveredMessageKind`, which adjusts how messages are received and delivered.
*/
RCLCPP_PUBLIC
DeliveredMessageKind
get_subscription_type() const;
get_delivered_message_kind() const;
/// Get matching publisher count.
/** \return The number of publishers on this topic. */
@@ -645,6 +645,14 @@ protected:
rclcpp::node_interfaces::NodeBaseInterface * const node_base_;
std::shared_ptr<rcl_node_t> node_handle_;
std::recursive_mutex callback_mutex_;
// It is important to declare on_new_message_callback_ before
// subscription_handle_, so on destruction the subscription is
// destroyed first. Otherwise, the rmw subscription callback
// would point briefly to a destroyed function.
std::function<void(size_t)> on_new_message_callback_{nullptr};
// Declare subscription_handle_ after callback
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
rclcpp::Logger node_logger_;
@@ -663,15 +671,12 @@ private:
RCLCPP_DISABLE_COPY(SubscriptionBase)
rosidl_message_type_support_t type_support_;
DeliveredMessageKind delivered_message_type_;
DeliveredMessageKind delivered_message_kind_;
std::atomic<bool> subscription_in_use_by_wait_set_{false};
std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
std::unordered_map<rclcpp::EventHandlerBase *,
std::atomic<bool>> qos_events_in_use_by_wait_set_;
std::recursive_mutex callback_mutex_;
std::function<void(size_t)> on_new_message_callback_{nullptr};
};
} // namespace rclcpp

View File

@@ -53,12 +53,17 @@ public:
* \param clock A clock to use for time and sleeping
* \param period The interval at which the timer fires
* \param context node context
* \param autostart timer state on initialization
*
* In order to activate a timer that is not started on initialization,
* user should call the reset() method.
*/
RCLCPP_PUBLIC
explicit TimerBase(
Clock::SharedPtr clock,
std::chrono::nanoseconds period,
rclcpp::Context::SharedPtr context);
rclcpp::Context::SharedPtr context,
bool autostart = true);
/// TimerBase destructor
RCLCPP_PUBLIC
@@ -216,21 +221,22 @@ public:
* \param[in] period The interval at which the timer fires.
* \param[in] callback User-specified callback function.
* \param[in] context custom context to be used.
* \param autostart timer state on initialization
*/
explicit GenericTimer(
Clock::SharedPtr clock, std::chrono::nanoseconds period, FunctorT && callback,
rclcpp::Context::SharedPtr context
rclcpp::Context::SharedPtr context, bool autostart = true
)
: TimerBase(clock, period, context), callback_(std::forward<FunctorT>(callback))
: TimerBase(clock, period, context, autostart), callback_(std::forward<FunctorT>(callback))
{
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_timer_callback_added,
static_cast<const void *>(get_timer_handle().get()),
reinterpret_cast<const void *>(&callback_));
#ifndef TRACETOOLS_DISABLED
if (TRACEPOINT_ENABLED(rclcpp_callback_register)) {
if (TRACETOOLS_TRACEPOINT_ENABLED(rclcpp_callback_register)) {
char * symbol = tracetools::get_symbol(callback_);
DO_TRACEPOINT(
TRACETOOLS_DO_TRACEPOINT(
rclcpp_callback_register,
reinterpret_cast<const void *>(&callback_),
symbol);
@@ -269,9 +275,9 @@ public:
void
execute_callback() override
{
TRACEPOINT(callback_start, reinterpret_cast<const void *>(&callback_), false);
TRACETOOLS_TRACEPOINT(callback_start, reinterpret_cast<const void *>(&callback_), false);
execute_callback_delegate<>();
TRACEPOINT(callback_end, reinterpret_cast<const void *>(&callback_));
TRACETOOLS_TRACEPOINT(callback_end, reinterpret_cast<const void *>(&callback_));
}
// void specialization
@@ -330,13 +336,15 @@ public:
* \param period The interval at which the timer fires
* \param callback The callback function to execute every interval
* \param context node context
* \param autostart timer state on initialization
*/
WallTimer(
std::chrono::nanoseconds period,
FunctorT && callback,
rclcpp::Context::SharedPtr context)
rclcpp::Context::SharedPtr context,
bool autostart = true)
: GenericTimer<FunctorT>(
std::make_shared<Clock>(RCL_STEADY_TIME), period, std::move(callback), context)
std::make_shared<Clock>(RCL_STEADY_TIME), period, std::move(callback), context, autostart)
{}
protected:

View File

@@ -104,7 +104,7 @@ protected:
// TODO(wjwwood): support custom allocator, maybe restrict to polymorphic allocator
rcl_get_default_allocator());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to create wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
// (Re)build the wait set for the first time.
@@ -192,7 +192,8 @@ protected:
size_t services_from_waitables = 0;
size_t events_from_waitables = 0;
for (const auto & waitable_entry : waitables) {
if (!waitable_entry.waitable) {
auto waitable_ptr_pair = get_raw_pointer_from_smart_pointer(waitable_entry.waitable);
if (nullptr == waitable_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -203,13 +204,13 @@ protected:
needs_pruning_ = true;
continue;
}
const auto & waitable = waitable_entry.waitable;
subscriptions_from_waitables += waitable->get_number_of_ready_subscriptions();
guard_conditions_from_waitables += waitable->get_number_of_ready_guard_conditions();
timers_from_waitables += waitable->get_number_of_ready_timers();
clients_from_waitables += waitable->get_number_of_ready_clients();
services_from_waitables += waitable->get_number_of_ready_services();
events_from_waitables += waitable->get_number_of_ready_events();
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
subscriptions_from_waitables += waitable.get_number_of_ready_subscriptions();
guard_conditions_from_waitables += waitable.get_number_of_ready_guard_conditions();
timers_from_waitables += waitable.get_number_of_ready_timers();
clients_from_waitables += waitable.get_number_of_ready_clients();
services_from_waitables += waitable.get_number_of_ready_services();
events_from_waitables += waitable.get_number_of_ready_events();
}
rcl_ret_t ret = rcl_wait_set_resize(
&rcl_wait_set_,
@@ -221,7 +222,7 @@ protected:
events_from_waitables
);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
was_resized = true;
// Assumption: the calling code ensures this function is not called
@@ -237,13 +238,15 @@ protected:
if (!was_resized) {
rcl_ret_t ret = rcl_wait_set_clear(&rcl_wait_set_);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't clear the wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
// Add subscriptions.
for (const auto & subscription_entry : subscriptions) {
if (!subscription_entry.subscription) {
auto subscription_ptr_pair =
get_raw_pointer_from_smart_pointer(subscription_entry.subscription);
if (nullptr == subscription_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -254,13 +257,12 @@ protected:
needs_pruning_ = true;
continue;
}
rcl_ret_t ret = rcl_wait_set_add_subscription(
&rcl_wait_set_,
subscription_entry.subscription->get_subscription_handle().get(),
subscription_ptr_pair.second->get_subscription_handle().get(),
nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
@@ -269,7 +271,8 @@ protected:
[this](const auto & inner_guard_conditions)
{
for (const auto & guard_condition : inner_guard_conditions) {
if (!guard_condition) {
auto guard_condition_ptr_pair = get_raw_pointer_from_smart_pointer(guard_condition);
if (nullptr == guard_condition_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -282,10 +285,10 @@ protected:
}
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
&rcl_wait_set_,
&guard_condition->get_rcl_guard_condition(),
&guard_condition_ptr_pair.second->get_rcl_guard_condition(),
nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
};
@@ -298,7 +301,8 @@ protected:
// Add timers.
for (const auto & timer : timers) {
if (!timer) {
auto timer_ptr_pair = get_raw_pointer_from_smart_pointer(timer);
if (nullptr == timer_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -311,16 +315,17 @@ protected:
}
rcl_ret_t ret = rcl_wait_set_add_timer(
&rcl_wait_set_,
timer->get_timer_handle().get(),
timer_ptr_pair.second->get_timer_handle().get(),
nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
// Add clients.
for (const auto & client : clients) {
if (!client) {
auto client_ptr_pair = get_raw_pointer_from_smart_pointer(client);
if (nullptr == client_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -333,17 +338,17 @@ protected:
}
rcl_ret_t ret = rcl_wait_set_add_client(
&rcl_wait_set_,
client->get_client_handle().get(),
client_ptr_pair.second->get_client_handle().get(),
nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
// Add services.
for (const auto & service : services) {
if (!service) {
auto service_ptr_pair = get_raw_pointer_from_smart_pointer(service);
if (nullptr == service_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -356,16 +361,17 @@ protected:
}
rcl_ret_t ret = rcl_wait_set_add_service(
&rcl_wait_set_,
service->get_service_handle().get(),
service_ptr_pair.second->get_service_handle().get(),
nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't fill wait set");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
// Add waitables.
for (auto & waitable_entry : waitables) {
if (!waitable_entry.waitable) {
auto waitable_ptr_pair = get_raw_pointer_from_smart_pointer(waitable_entry.waitable);
if (nullptr == waitable_ptr_pair.second) {
// In this case it was probably stored as a weak_ptr, but is now locking to nullptr.
if (HasStrongOwnership) {
// This will not happen in fixed sized storage, as it holds
@@ -376,7 +382,8 @@ protected:
needs_pruning_ = true;
continue;
}
waitable_entry.waitable->add_to_wait_set(&rcl_wait_set_);
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
waitable.add_to_wait_set(&rcl_wait_set_);
}
}

View File

@@ -204,19 +204,15 @@ public:
void
storage_rebuild_rcl_wait_set(const ArrayOfExtraGuardConditions & extra_guard_conditions)
{
this->storage_acquire_ownerships();
this->storage_rebuild_rcl_wait_set_with_sets(
shared_subscriptions_,
shared_guard_conditions_,
subscriptions_,
guard_conditions_,
extra_guard_conditions,
shared_timers_,
shared_clients_,
shared_services_,
shared_waitables_
timers_,
clients_,
services_,
waitables_
);
this->storage_release_ownerships();
}
template<class EntityT, class SequenceOfEntitiesT>
@@ -411,7 +407,6 @@ public:
}
};
// Lock all the weak pointers and hold them until released.
lock_all(subscriptions_, shared_subscriptions_);
lock_all(guard_conditions_, shared_guard_conditions_);
lock_all(timers_, shared_timers_);
lock_all(clients_, shared_clients_);
@@ -443,7 +438,6 @@ public:
shared_ptr.reset();
}
};
reset_all(shared_subscriptions_);
reset_all(shared_guard_conditions_);
reset_all(shared_timers_);
reset_all(shared_clients_);

View File

@@ -290,7 +290,7 @@ protected:
return create_wait_result(WaitResultKind::Empty);
} else {
// Some other error case, throw.
rclcpp::exceptions::throw_from_rcl_error(ret, "rcl_wait() failed");
rclcpp::exceptions::throw_from_rcl_error(ret);
}
} while (should_loop());

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>rclcpp</name>
<version>20.0.0</version>
<version>22.0.0</version>
<description>The ROS client library in C++.</description>
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>

View File

@@ -16,16 +16,13 @@
using rclcpp::AnyExecutable;
RCLCPP_PUBLIC
AnyExecutable::AnyExecutable()
: subscription(nullptr),
timer(nullptr),
service(nullptr),
client(nullptr),
waitable(nullptr),
callback_group(nullptr),
node_base(nullptr),
data(nullptr)
node_base(nullptr)
{}
AnyExecutable::~AnyExecutable()

View File

@@ -66,7 +66,6 @@ CallbackGroup::size() const
timer_ptrs_.size() +
waitable_ptrs_.size();
}
void CallbackGroup::collect_all_ptrs(
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,

View File

@@ -13,8 +13,6 @@
// limitations under the License.
#include <algorithm>
#include <chrono>
#include <iterator>
#include <memory>
#include <map>
#include <string>
@@ -24,14 +22,13 @@
#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
@@ -41,24 +38,16 @@
using namespace std::chrono_literals;
using rclcpp::exceptions::throw_from_rcl_error;
using rclcpp::Executor;
class rclcpp::ExecutorImplementation {};
/// Mask to indicate to the waitset to only add the subscription.
/// The events and intraprocess waitable are already added via the callback group.
static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false};
class rclcpp::ExecutorImplementation {};
Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->entities_need_rebuild_.store(true);
})),
collector_(notify_waitable_),
wait_set_({}, {}, {}, {}, {}, {}, options.context),
current_notify_waitable_(notify_waitable_),
memory_strategy_(options.memory_strategy),
impl_(std::make_unique<rclcpp::ExecutorImplementation>())
{
// Store the context for later use.
@@ -72,56 +61,74 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
}
});
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get());
// Put the executor's guard condition in
memory_strategy_->add_guard_condition(*interrupt_guard_condition_.get());
rcl_allocator_t allocator = memory_strategy_->get_allocator();
rcl_ret_t ret = rcl_wait_set_init(
&wait_set_,
0, 2, 0, 0, 0, 0,
context_->get_rcl_context().get(),
allocator);
if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to create wait set: %s", rcl_get_error_string().str);
rcl_reset_error();
throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor");
}
}
Executor::~Executor()
{
std::lock_guard<std::mutex> guard(mutex_);
notify_waitable_->remove_guard_condition(interrupt_guard_condition_);
notify_waitable_->remove_guard_condition(shutdown_guard_condition_);
std::cout << "Executor::~Executor" << std::endl;
current_collection_.timers.update(
{}, {},
[this](auto timer) {
std::cout << "remove_timer(" << timer.get() << ")" << std::endl;
wait_set_.remove_timer(timer);
// Disassociate all callback groups.
for (auto & pair : weak_groups_to_nodes_) {
auto group = pair.first.lock();
if (group) {
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
has_executor.store(false);
}
}
// Disassociate all nodes.
std::for_each(
weak_nodes_.begin(), weak_nodes_.end(), []
(rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) {
auto shared_node_ptr = weak_node_ptr.lock();
if (shared_node_ptr) {
std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
}
});
weak_nodes_.clear();
weak_groups_associated_with_executor_to_nodes_.clear();
weak_groups_to_nodes_associated_with_executor_.clear();
weak_groups_to_nodes_.clear();
for (const auto & pair : weak_groups_to_guard_conditions_) {
auto guard_condition = pair.second;
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_groups_to_guard_conditions_.clear();
current_collection_.subscriptions.update(
{}, {},
[this](auto subscription) {
std::cout << "remove_subscription(" << subscription.get() << ")" << std::endl;
wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
});
for (const auto & pair : weak_nodes_to_guard_conditions_) {
auto guard_condition = pair.second;
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_nodes_to_guard_conditions_.clear();
current_collection_.clients.update(
{}, {},
[this](auto client) {
std::cout << "remove_client(" << client.get() << ")" << std::endl;
wait_set_.remove_client(client);});
current_collection_.services.update(
{}, {},
[this](auto service) {
std::cout << "remove_service(" << service.get() << ")" << std::endl;
wait_set_.remove_service(service);});
current_collection_.guard_conditions.update(
{}, {},
[this](auto guard_condition) {
std::cout << "remove_guard_condition(" << guard_condition.get() << ")" << std::endl;
wait_set_.remove_guard_condition(guard_condition);});
current_collection_.waitables.update(
{}, {},
[this](auto waitable) {
std::cout << "remove_waitable(" << waitable.get() << ")" << std::endl;
wait_set_.remove_waitable(waitable);});
// Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy wait set: %s", rcl_get_error_string().str);
rcl_reset_error();
}
// Remove and release the sigint guard condition
memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get());
memory_strategy_->remove_guard_condition(interrupt_guard_condition_.get());
// Remove shutdown callback handle registered to Context
if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
@@ -135,39 +142,95 @@ Executor::~Executor()
std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
this->collector_.update_collections();
return this->collector_.get_all_callback_groups();
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_manually_added_callback_groups()
{
this->collector_.update_collections();
return this->collector_.get_manually_added_callback_groups();
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_automatically_added_callback_groups_from_nodes()
{
this->collector_.update_collections();
return this->collector_.get_automatically_added_callback_groups();
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}
void
Executor::add_callback_group(
Executor::add_callback_groups_from_nodes_associated_to_executor()
{
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
node->for_each_callback_group(
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
if (
shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
this->add_callback_group_to_map(
shared_group_ptr,
node,
weak_groups_to_nodes_associated_with_executor_,
true);
}
});
}
}
}
void
Executor::add_callback_group_to_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify)
{
(void) node_ptr;
this->collector_.add_callback_group(group_ptr);
// If the callback_group already has an executor
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error("Callback group has already been added to an executor.");
}
if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto insert_info =
weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr));
bool was_inserted = insert_info.second;
if (!was_inserted) {
throw std::runtime_error("Callback group was already added to executor.");
}
// Also add to the map that contains all callback groups
weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
if (node_ptr->get_context()->is_valid()) {
auto callback_group_guard_condition = group_ptr->get_notify_guard_condition();
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
// Add the callback_group's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(*callback_group_guard_condition);
}
if (notify) {
// Interrupt waiting to handle new node
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
@@ -178,23 +241,91 @@ Executor::add_callback_group(
}
}
void
Executor::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
{
std::lock_guard<std::mutex> guard{mutex_};
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_associated_with_executor_to_nodes_,
notify);
}
void
Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
this->collector_.add_node(node_ptr);
if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
// If the node already has an executor
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error(
std::string("Node '") + node_ptr->get_fully_qualified_name() +
"' has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
node_ptr->for_each_callback_group(
[this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
if (!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
this->add_callback_group_to_map(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
});
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
const auto gc = node_ptr->get_shared_notify_guard_condition();
weak_nodes_to_guard_conditions_[node_ptr] = gc.get();
// Add the node's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(*gc);
weak_nodes_.push_back(node_ptr);
}
void
Executor::remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify)
{
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
if (iter != weak_groups_to_nodes.end()) {
node_ptr = iter->second.lock();
if (node_ptr == nullptr) {
throw std::runtime_error("Node must not be deleted before its callback group(s).");
}
weak_groups_to_nodes.erase(iter);
weak_groups_to_nodes_.erase(group_ptr);
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
} else {
throw std::runtime_error("Callback group needs to be associated with executor.");
}
// If the node was matched and removed, interrupt waiting.
if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_))
{
auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
if (iter != weak_groups_to_guard_conditions_.end()) {
memory_strategy_->remove_guard_condition(iter->second);
}
weak_groups_to_guard_conditions_.erase(weak_group_ptr);
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
}
}
@@ -204,21 +335,11 @@ Executor::remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify)
{
this->collector_.remove_callback_group(group_ptr);
if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
std::lock_guard<std::mutex> guard{mutex_};
this->remove_callback_group_from_map(
group_ptr,
weak_groups_associated_with_executor_to_nodes_,
notify);
}
void
@@ -230,22 +351,48 @@ Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
void
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
this->collector_.remove_node(node_ptr);
if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
if (!node_ptr->get_associated_with_executor_atomic().load()) {
throw std::runtime_error("Node needs to be associated with an executor.");
}
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
std::lock_guard<std::mutex> guard{mutex_};
bool found_node = false;
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
found_node = true;
node_it = weak_nodes_.erase(node_it);
} else {
++node_it;
}
}
if (!found_node) {
throw std::runtime_error("Node needs to be associated with this executor.");
}
for (auto it = weak_groups_to_nodes_associated_with_executor_.begin();
it != weak_groups_to_nodes_associated_with_executor_.end(); )
{
auto weak_node_ptr = it->second;
auto shared_node_ptr = weak_node_ptr.lock();
auto group_ptr = it->first.lock();
// Increment iterator before removing in case it's invalidated
it++;
if (shared_node_ptr == node_ptr) {
remove_callback_group_from_map(
group_ptr,
weak_groups_to_nodes_associated_with_executor_,
notify);
}
}
memory_strategy_->remove_guard_condition(node_ptr->get_shared_notify_guard_condition().get());
weak_nodes_to_guard_conditions_.erase(node_ptr);
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
}
void
@@ -312,44 +459,21 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
throw std::runtime_error("spin_some() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
size_t work_in_queue = 0;
bool has_waited = false;
{
std::lock_guard<std::mutex> lock(mutex_);
work_in_queue = ready_executables_.size();
}
// The logic below is to guarantee that we:
// a) run all of the work in the queue before we spin the first time
// b) spin at least once
// c) run all of the work in the queue after we spin
bool work_available = false;
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
AnyExecutable any_exec;
if (work_in_queue > 0) {
// If there is work in the queue, then execute it
// This covers the case that there are things left in the queue from a
// previous spin.
if (get_next_ready_executable(any_exec)) {
execute_any_executable(any_exec);
}
} else if (!has_waited && !work_in_queue) {
// Once the ready queue is empty, then we need to wait at least once.
wait_for_work(std::chrono::milliseconds(0));
has_waited = true;
} else if (has_waited && !work_in_queue) {
// Once we have emptied the ready queue, but have already waited:
if (!exhaustive) {
// In the case of spin some, then we can exit
break;
} else {
// In the case of spin all, then we will allow ourselves to wait again.
has_waited = false;
}
if (!work_available) {
wait_for_work(std::chrono::milliseconds::zero());
}
if (get_next_ready_executable(any_exec)) {
execute_any_executable(any_exec);
work_available = true;
} else {
if (!work_available || !exhaustive) {
break;
}
work_available = false;
}
std::lock_guard<std::mutex> lock(mutex_);
work_in_queue = ready_executables_.size();
}
}
@@ -384,21 +508,30 @@ Executor::cancel()
}
}
void
Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
memory_strategy_ = memory_strategy;
}
void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!spinning.load()) {
return;
}
if (any_exec.timer) {
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_timer(any_exec.timer);
}
if (any_exec.subscription) {
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_subscription(any_exec.subscription);
@@ -412,10 +545,16 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
if (any_exec.waitable) {
any_exec.waitable->execute(any_exec.data);
}
// Reset the callback_group, regardless of type
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
}
}
@@ -464,7 +603,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
rclcpp::MessageInfo message_info;
message_info.get_rmw_message_info().from_intra_process = false;
switch (subscription->get_subscription_type()) {
switch (subscription->get_delivered_message_kind()) {
// Deliver ROS message
case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
{
@@ -584,117 +723,228 @@ Executor::execute_client(
[&]() {client->handle_response(request_header, response);});
}
void
Executor::collect_entities()
{
// Get the current list of available waitables from the collector.
rclcpp::executors::ExecutorEntitiesCollection collection;
this->collector_.update_collections();
auto callback_groups = this->collector_.get_all_callback_groups();
rclcpp::executors::build_entities_collection(callback_groups, collection);
// Make a copy of notify waitable so we can continue to mutate the original
// one outside of the execute loop.
// This prevents the collection of guard conditions in the waitable from changing
// while we are waiting on it.
if (notify_waitable_) {
current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
*notify_waitable_);
auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
}
std::cout << "current_collection.timers: " << current_collection_.timers.size() << std::endl;
std::cout << "next_collection.timers: " << collection.timers.size() << std::endl;
// Update each of the groups of entities in the current collection, adding or removing
// from the wait set as necessary.
current_collection_.timers.update(
collection.timers,
[this](auto timer) {
std::cout << "add_timer(" << timer << ")" << std::endl;
wait_set_.add_timer(timer);},
[this](auto timer) {
std::cout << "remove_timer(" << timer << ")" << std::endl;
wait_set_.remove_timer(timer);});
std::cout << "current_collection.subscriptions: " << current_collection_.subscriptions.size() << std::endl;
std::cout << "next_collection.subscriptions: " << collection.subscriptions.size() << std::endl;
current_collection_.subscriptions.update(
collection.subscriptions,
[this](auto subscription) {
std::cout << "add_subscription(" << subscription << ")" << std::endl;
wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
},
[this](auto subscription) {
std::cout << "remove_subscription(" << subscription << ")" << std::endl;
wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
});
current_collection_.clients.update(
collection.clients,
[this](auto client) {wait_set_.add_client(client);},
[this](auto client) {wait_set_.remove_client(client);});
current_collection_.services.update(
collection.services,
[this](auto service) {wait_set_.add_service(service);},
[this](auto service) {wait_set_.remove_service(service);});
current_collection_.guard_conditions.update(
collection.guard_conditions,
[this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
[this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
current_collection_.waitables.update(
collection.waitables,
[this](auto waitable) {
wait_set_.add_waitable(waitable);
},
[this](auto waitable) {wait_set_.remove_waitable(waitable);});
this->entities_need_rebuild_.store(false);
}
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
{
std::lock_guard<std::mutex> guard(mutex_);
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
this->collect_entities();
// Check weak_nodes_ to find any callback group that is not owned
// by an executor and add it to the list of callbackgroups for
// collect entities. Also exchange to false so it is not
// allowed to add to another executor
add_callback_groups_from_nodes_associated_to_executor();
// Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles();
bool has_invalid_weak_groups_or_nodes =
memory_strategy_->collect_entities(weak_groups_to_nodes_);
if (has_invalid_weak_groups_or_nodes) {
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
for (auto pair : weak_groups_to_nodes_) {
auto weak_group_ptr = pair.first;
auto weak_node_ptr = pair.second;
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
invalid_group_ptrs.push_back(weak_group_ptr);
auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr);
if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) {
auto guard_condition = node_guard_pair->second;
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
}
}
}
std::for_each(
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) !=
weak_groups_to_nodes_associated_with_executor_.end())
{
weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
}
if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) !=
weak_groups_associated_with_executor_to_nodes_.end())
{
weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
}
auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr);
if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
auto guard_condition = callback_guard_pair->second;
weak_groups_to_guard_conditions_.erase(group_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_groups_to_nodes_.erase(group_ptr);
});
}
// clear wait set
rcl_ret_t ret = rcl_wait_set_clear(&wait_set_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Couldn't clear wait set");
}
// The size of waitables are accounted for in size of the other entities
ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "Couldn't resize the wait set");
}
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
}
}
auto wait_result = wait_set_.wait(timeout);
if (wait_result.kind() == WaitResultKind::Empty) {
rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
"empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
}
rclcpp::executors::ready_executables(current_collection_, wait_result, ready_executables_);
// check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities
std::lock_guard<std::mutex> guard(mutex_);
memory_strategy_->remove_null_handles(&wait_set_);
}
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
Executor::get_node_by_group(
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group)
{
if (!group) {
return nullptr;
}
rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group);
const auto finder = weak_groups_to_nodes.find(weak_group_ptr);
if (finder != weak_groups_to_nodes.end()) {
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock();
return node_ptr;
}
return nullptr;
}
rclcpp::CallbackGroup::SharedPtr
Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
{
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
auto group = pair.first.lock();
if (!group) {
continue;
}
auto timer_ref = group->find_timer_ptrs_if(
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
return timer_ptr == timer;
});
if (timer_ref) {
return group;
}
}
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
auto group = pair.first.lock();
if (!group) {
continue;
}
auto timer_ref = group->find_timer_ptrs_if(
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
return timer_ptr == timer;
});
if (timer_ref) {
return group;
}
}
return nullptr;
}
bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
TRACEPOINT(rclcpp_executor_get_next_ready);
bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
return success;
}
std::lock_guard<std::mutex> guard(mutex_);
if (ready_executables_.size() == 0) {
return false;
bool
Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes)
{
TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};
// Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
if (any_executable.timer) {
success = true;
}
if (!success) {
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes);
if (any_executable.subscription) {
success = true;
}
}
if (!success) {
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes);
if (any_executable.service) {
success = true;
}
}
if (!success) {
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes);
if (any_executable.client) {
success = true;
}
}
if (!success) {
// Check the waitables to see if there are any that are ready
memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
if (any_executable.waitable) {
any_executable.data = any_executable.waitable->take_data();
success = true;
}
}
// At this point any_executable should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (success) {
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group;
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
if (iter == weak_groups_to_nodes.end()) {
success = false;
}
}
any_executable = ready_executables_.front();
ready_executables_.pop_front();
if (any_executable.callback_group &&
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
assert(any_executable.callback_group->can_be_taken_from().load());
any_executable.callback_group->can_be_taken_from().store(false);
if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly ..Check if the callback_group belongs to this executor
if (any_executable.callback_group && any_executable.callback_group->type() == \
CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_executable is executed or when the
// any_executable is destructued
any_executable.callback_group->can_be_taken_from().store(false);
}
}
return true;
// If there is no ready executable, return false
return success;
}
bool
@@ -717,6 +967,22 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos
return success;
}
// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
bool
Executor::has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes) const
{
return std::find_if(
weak_groups_to_nodes.begin(),
weak_groups_to_nodes.end(),
[&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
auto other_ptr = other.second.lock();
return other_ptr == node_ptr;
}) != weak_groups_to_nodes.end();
}
bool
Executor::is_spinning()
{

View File

@@ -222,7 +222,9 @@ ready_executables(
executables.push_back(exec);
added++;
}
return added;
}
} // namespace executors
} // namespace rclcpp

View File

@@ -105,8 +105,7 @@ void
ExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
if (!has_executor.exchange(false)) {
if (!node_ptr->get_associated_with_executor_atomic().load()) {
throw std::runtime_error(
std::string("Node '") + node_ptr->get_fully_qualified_name() +
"' needs to be associated with an executor.");
@@ -162,6 +161,7 @@ ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPt
throw std::runtime_error("Node must not be deleted before its callback group(s).");
}
*/
auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr);
std::lock_guard<std::mutex> lock(mutex_);
bool associated = manually_added_groups_.count(group_ptr) != 0;
@@ -314,11 +314,7 @@ ExecutorEntitiesCollector::process_queues()
if (node_it != weak_nodes_.end()) {
remove_weak_node(node_it);
} else {
// The node may have been destroyed and removed from the colletion before
// we processed the queues. Don't throw if the pointer is already expired.
if (!weak_node_ptr.expired()) {
throw std::runtime_error("Node needs to be associated with this executor.");
}
throw std::runtime_error("Node needs to be associated with this executor.");
}
auto node_ptr = weak_node_ptr.lock();

View File

@@ -46,18 +46,20 @@ void
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}
if (guard_condition) {
auto rcl_guard_condition = &guard_condition->get_rcl_guard_condition();
rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
wait_set,
rcl_guard_condition, NULL);
rcl_ret_t ret = rcl_wait_set_add_guard_condition(
wait_set, cond, NULL);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}
}
}
}

View File

@@ -99,19 +99,6 @@ MultiThreadedExecutor::run(size_t this_thread_number)
execute_any_executable(any_exec);
if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive &&
any_exec.callback_group->size() > 1)
{
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group change: ") + ex.what());
}
}
// Clear the callback_group to prevent the AnyExecutable destructor from
// resetting the callback group `can_be_taken_from`
any_exec.callback_group.reset();

View File

@@ -0,0 +1,524 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/executors/static_executor_entities_collector.hpp"
#include <algorithm>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
using rclcpp::executors::StaticExecutorEntitiesCollector;
StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector()
{
// Disassociate all callback groups and thus nodes.
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
auto group = pair.first.lock();
if (group) {
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
has_executor.store(false);
}
}
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
auto group = pair.first.lock();
if (group) {
std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
has_executor.store(false);
}
}
// Disassociate all nodes
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
has_executor.store(false);
}
}
weak_groups_associated_with_executor_to_nodes_.clear();
weak_groups_to_nodes_associated_with_executor_.clear();
exec_list_.clear();
weak_nodes_.clear();
weak_nodes_to_guard_conditions_.clear();
}
void
StaticExecutorEntitiesCollector::init(
rcl_wait_set_t * p_wait_set,
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{
// Empty initialize executable list
exec_list_ = rclcpp::experimental::ExecutableList();
// Get executor's wait_set_ pointer
p_wait_set_ = p_wait_set;
// Get executor's memory strategy ptr
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor waitable.");
}
memory_strategy_ = memory_strategy;
// Get memory strategy and executable list. Prepare wait_set_
std::shared_ptr<void> shared_ptr;
execute(shared_ptr);
// The entities collector is now initialized
initialized_ = true;
}
void
StaticExecutorEntitiesCollector::fini()
{
memory_strategy_->clear_handles();
exec_list_.clear();
}
std::shared_ptr<void>
StaticExecutorEntitiesCollector::take_data()
{
return nullptr;
}
void
StaticExecutorEntitiesCollector::execute(std::shared_ptr<void> & data)
{
(void) data;
// Fill memory strategy with entities coming from weak_nodes_
fill_memory_strategy();
// Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy)
fill_executable_list();
// Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize)
prepare_wait_set();
// Add new nodes guard conditions to map
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
for (const auto & weak_node : new_nodes_) {
if (auto node_ptr = weak_node.lock()) {
weak_nodes_to_guard_conditions_[node_ptr] =
node_ptr->get_shared_notify_guard_condition().get();
}
}
new_nodes_.clear();
}
void
StaticExecutorEntitiesCollector::fill_memory_strategy()
{
memory_strategy_->clear_handles();
bool has_invalid_weak_groups_or_nodes =
memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_);
// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_groups_or_nodes) {
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
auto & weak_group_ptr = pair.first;
auto & weak_node_ptr = pair.second;
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
invalid_group_ptrs.push_back(weak_group_ptr);
}
}
std::for_each(
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
});
}
has_invalid_weak_groups_or_nodes =
memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_);
// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_groups_or_nodes) {
std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
auto & weak_group_ptr = pair.first;
const auto & weak_node_ptr = pair.second;
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
invalid_group_ptrs.push_back(weak_group_ptr);
}
}
std::for_each(
invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
[this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
});
}
// Add the static executor waitable to the memory strategy
memory_strategy_->add_waitable_handle(this->shared_from_this());
}
void
StaticExecutorEntitiesCollector::fill_executable_list()
{
exec_list_.clear();
add_callback_groups_from_nodes_associated_to_executor();
fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_);
fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_);
// Add the executor's waitable to the executable list
exec_list_.add_waitable(shared_from_this());
}
void
StaticExecutorEntitiesCollector::fill_executable_list_from_map(
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes)
{
for (const auto & pair : weak_groups_to_nodes) {
auto group = pair.first.lock();
auto node = pair.second.lock();
if (!node || !group || !group->can_be_taken_from().load()) {
continue;
}
group->find_timer_ptrs_if(
[this](const rclcpp::TimerBase::SharedPtr & timer) {
if (timer) {
exec_list_.add_timer(timer);
}
return false;
});
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
if (subscription) {
exec_list_.add_subscription(subscription);
}
return false;
});
group->find_service_ptrs_if(
[this](const rclcpp::ServiceBase::SharedPtr & service) {
if (service) {
exec_list_.add_service(service);
}
return false;
});
group->find_client_ptrs_if(
[this](const rclcpp::ClientBase::SharedPtr & client) {
if (client) {
exec_list_.add_client(client);
}
return false;
});
group->find_waitable_ptrs_if(
[this](const rclcpp::Waitable::SharedPtr & waitable) {
if (waitable) {
exec_list_.add_waitable(waitable);
}
return false;
});
}
}
void
StaticExecutorEntitiesCollector::prepare_wait_set()
{
// clear wait set
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}
// The size of waitables are accounted for in size of the other entities
rcl_ret_t ret = rcl_wait_set_resize(
p_wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw std::runtime_error(
std::string("Couldn't resize the wait set: ") + rcl_get_error_string().str);
}
}
void
StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout)
{
// clear wait set (memset to '0' all wait_set_ entities
// but keeps the wait_set_ number of entities)
if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}
if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
}
rcl_ret_t status =
rcl_wait(p_wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in rcl_wait(). This should never happen.");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
using rclcpp::exceptions::throw_from_rcl_error;
throw_from_rcl_error(status, "rcl_wait() failed");
}
}
void
StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set)
{
// Add waitable guard conditions (one for each registered node) into the wait set.
for (const auto & pair : weak_nodes_to_guard_conditions_) {
auto & gc = pair.second;
detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc);
}
}
size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions()
{
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
return weak_nodes_to_guard_conditions_.size() + new_nodes_.size();
}
bool
StaticExecutorEntitiesCollector::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
bool is_new_node = false;
// If the node already has an executor
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
node_ptr->for_each_callback_group(
[this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
if (
!group_ptr->get_associated_with_executor_atomic().load() &&
group_ptr->automatically_add_to_executor_with_node())
{
is_new_node = (add_callback_group(
group_ptr,
node_ptr,
weak_groups_to_nodes_associated_with_executor_) ||
is_new_node);
}
});
weak_nodes_.push_back(node_ptr);
return is_new_node;
}
bool
StaticExecutorEntitiesCollector::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
{
// If the callback_group already has an executor
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error("Callback group has already been added to an executor.");
}
bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto insert_info = weak_groups_to_nodes.insert(
std::make_pair(weak_group_ptr, node_ptr));
bool was_inserted = insert_info.second;
if (!was_inserted) {
throw std::runtime_error("Callback group was already added to executor.");
}
if (is_new_node) {
std::lock_guard<std::mutex> guard{new_nodes_mutex_};
new_nodes_.push_back(node_ptr);
return true;
}
return false;
}
bool
StaticExecutorEntitiesCollector::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_);
}
bool
StaticExecutorEntitiesCollector::remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr)
{
return this->remove_callback_group_from_map(
group_ptr,
weak_groups_associated_with_executor_to_nodes_);
}
bool
StaticExecutorEntitiesCollector::remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
{
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto iter = weak_groups_to_nodes.find(weak_group_ptr);
if (iter != weak_groups_to_nodes.end()) {
node_ptr = iter->second.lock();
if (node_ptr == nullptr) {
throw std::runtime_error("Node must not be deleted before its callback group(s).");
}
weak_groups_to_nodes.erase(iter);
} else {
throw std::runtime_error("Callback group needs to be associated with executor.");
}
// If the node was matched and removed, interrupt waiting.
if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_))
{
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr);
weak_nodes_to_guard_conditions_.erase(node_weak_ptr);
return true;
}
return false;
}
bool
StaticExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
if (!node_ptr->get_associated_with_executor_atomic().load()) {
return false;
}
bool node_found = false;
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
weak_nodes_.erase(node_it);
node_found = true;
break;
}
++node_it;
}
if (!node_found) {
return false;
}
std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;
std::for_each(
weak_groups_to_nodes_associated_with_executor_.begin(),
weak_groups_to_nodes_associated_with_executor_.end(),
[&found_group_ptrs, node_ptr](std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> key_value_pair) {
auto & weak_node_ptr = key_value_pair.second;
auto shared_node_ptr = weak_node_ptr.lock();
auto group_ptr = key_value_pair.first.lock();
if (shared_node_ptr == node_ptr) {
found_group_ptrs.push_back(group_ptr);
}
});
std::for_each(
found_group_ptrs.begin(), found_group_ptrs.end(), [this]
(rclcpp::CallbackGroup::SharedPtr group_ptr) {
this->remove_callback_group_from_map(
group_ptr,
weak_groups_to_nodes_associated_with_executor_);
});
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
return true;
}
bool
StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set)
{
// Check wait_set guard_conditions for added/removed entities to/from a node
for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) {
if (p_wait_set->guard_conditions[i] != NULL) {
auto found_guard_condition = std::find_if(
weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(),
[&](std::pair<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const GuardCondition *> pair) -> bool {
const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition();
return &rcl_gc == p_wait_set->guard_conditions[i];
});
if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) {
return true;
}
}
}
// None of the guard conditions triggered belong to a registered node
return false;
}
// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
bool
StaticExecutorEntitiesCollector::has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes) const
{
return std::find_if(
weak_groups_to_nodes.begin(),
weak_groups_to_nodes.end(),
[&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
auto other_ptr = other.second.lock();
return other_ptr == node_ptr;
}) != weak_groups_to_nodes.end();
}
void
StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor()
{
for (const auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (node) {
node->for_each_callback_group(
[this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
{
if (shared_group_ptr->automatically_add_to_executor_with_node() &&
!shared_group_ptr->get_associated_with_executor_atomic().load())
{
add_callback_group(
shared_group_ptr,
node,
weak_groups_to_nodes_associated_with_executor_);
}
});
}
}
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticExecutorEntitiesCollector::get_all_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticExecutorEntitiesCollector::get_manually_added_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
return groups;
}

View File

@@ -12,21 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include <chrono>
#include <memory>
#include <utility>
#include <vector>
#include "rcpputils/scope_exit.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/any_executable.hpp"
using rclcpp::executors::StaticSingleThreadedExecutor;
using rclcpp::experimental::ExecutableList;
StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(
const rclcpp::ExecutorOptions & options)
: rclcpp::Executor(options)
{
entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
}
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {}
StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor()
{
if (entities_collector_->is_init()) {
entities_collector_->fini();
}
}
void
StaticSingleThreadedExecutor::spin()
@@ -36,25 +46,14 @@ StaticSingleThreadedExecutor::spin()
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
// This is essentially the contents of the rclcpp::Executor::wait_for_work method,
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
// behavior.
// Set memory_strategy_ and exec_list_ based on weak_nodes_
// Prepare wait_set_ based on memory_strategy_
entities_collector_->init(&wait_set_, memory_strategy_);
while (rclcpp::ok(this->context_) && spinning.load()) {
std::deque<rclcpp::AnyExecutable> to_exec;
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(-1));
if (wait_result.kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
continue;
}
execute_ready_executables(current_collection_, wait_result, false);
// Refresh wait set and wait for work
entities_collector_->refresh_wait_set();
execute_ready_executables();
}
}
@@ -81,6 +80,11 @@ StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration)
void
StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
{
// Make sure the entities collector has been initialized
if (!entities_collector_->is_init()) {
entities_collector_->init(&wait_set_, memory_strategy_);
}
auto start = std::chrono::steady_clock::now();
auto max_duration_not_elapsed = [max_duration, start]() {
if (std::chrono::nanoseconds(0) == max_duration) {
@@ -101,21 +105,9 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
// Get executables that are ready now
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(0));
if (wait_result.kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
continue;
}
entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero());
// Execute ready executables
bool work_available = execute_ready_executables(current_collection_, wait_result, false);
bool work_available = execute_ready_executables();
if (!work_available || !exhaustive) {
break;
}
@@ -125,122 +117,164 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati
void
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
{
// Make sure the entities collector has been initialized
if (!entities_collector_->is_init()) {
entities_collector_->init(&wait_set_, memory_strategy_);
}
if (rclcpp::ok(context_) && spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
if (wait_result.kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
return;
}
// Wait until we have a ready entity or timeout expired
entities_collector_->refresh_wait_set(timeout);
// Execute ready executables
execute_ready_executables(current_collection_, wait_result, true);
execute_ready_executables(true);
}
}
// This preserves the "scheduling semantics" of the StaticSingleThreadedExecutor
// from the original implementation.
bool StaticSingleThreadedExecutor::execute_ready_executables(
const rclcpp::executors::ExecutorEntitiesCollection & collection,
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result,
bool spin_once)
void
StaticSingleThreadedExecutor::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
{
bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr);
if (is_new_node && notify) {
// Interrupt waiting to handle new node
interrupt_guard_condition_->trigger();
}
}
void
StaticSingleThreadedExecutor::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
bool is_new_node = entities_collector_->add_node(node_ptr);
if (is_new_node && notify) {
// Interrupt waiting to handle new node
interrupt_guard_condition_->trigger();
}
}
void
StaticSingleThreadedExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->add_node(node_ptr->get_node_base_interface(), notify);
}
void
StaticSingleThreadedExecutor::remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify)
{
bool node_removed = entities_collector_->remove_callback_group(group_ptr);
// If the node was matched and removed, interrupt waiting
if (node_removed && notify) {
interrupt_guard_condition_->trigger();
}
}
void
StaticSingleThreadedExecutor::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
bool node_removed = entities_collector_->remove_node(node_ptr);
if (!node_removed) {
throw std::runtime_error("Node needs to be associated with this executor.");
}
// If the node was matched and removed, interrupt waiting
if (notify) {
interrupt_guard_condition_->trigger();
}
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticSingleThreadedExecutor::get_all_callback_groups()
{
return entities_collector_->get_all_callback_groups();
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticSingleThreadedExecutor::get_manually_added_callback_groups()
{
return entities_collector_->get_manually_added_callback_groups();
}
std::vector<rclcpp::CallbackGroup::WeakPtr>
StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes()
{
return entities_collector_->get_automatically_added_callback_groups_from_nodes();
}
void
StaticSingleThreadedExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
{
this->remove_node(node_ptr->get_node_base_interface(), notify);
}
bool
StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once)
{
bool any_ready_executable = false;
if (wait_result.kind() != rclcpp::WaitResultKind::Ready) {
return any_ready_executable;
// Execute all the ready subscriptions
for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) {
if (i < entities_collector_->get_number_of_subscriptions()) {
if (wait_set_.subscriptions[i]) {
execute_subscription(entities_collector_->get_subscription(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
for (size_t ii = 0; ii < rcl_wait_set.size_of_timers; ++ii) {
if (nullptr == rcl_wait_set.timers[ii]) {continue;}
auto entity_iter = collection.timers.find(rcl_wait_set.timers[ii]);
if (entity_iter != collection.timers.end()) {
auto entity = entity_iter->second.entity.lock();
if (!entity) {
continue;
// Execute all the ready timers
for (size_t i = 0; i < wait_set_.size_of_timers; ++i) {
if (i < entities_collector_->get_number_of_timers()) {
if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
auto timer = entities_collector_->get_timer(i);
timer->call();
execute_timer(std::move(timer));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
if (!entity->call()) {
continue;
}
}
// Execute all the ready services
for (size_t i = 0; i < wait_set_.size_of_services; ++i) {
if (i < entities_collector_->get_number_of_services()) {
if (wait_set_.services[i]) {
execute_service(entities_collector_->get_service(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
execute_timer(entity);
}
}
// Execute all the ready clients
for (size_t i = 0; i < wait_set_.size_of_clients; ++i) {
if (i < entities_collector_->get_number_of_clients()) {
if (wait_set_.clients[i]) {
execute_client(entities_collector_->get_client(i));
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
}
// Execute all the ready waitables
for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) {
auto waitable = entities_collector_->get_waitable(i);
if (waitable->is_ready(&wait_set_)) {
auto data = waitable->take_data();
waitable->execute(data);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
for (size_t ii = 0; ii < rcl_wait_set.size_of_subscriptions; ++ii) {
if (nullptr == rcl_wait_set.subscriptions[ii]) {continue;}
auto entity_iter = collection.subscriptions.find(rcl_wait_set.subscriptions[ii]);
if (entity_iter != collection.subscriptions.end()) {
auto entity = entity_iter->second.entity.lock();
if (!entity) {
continue;
}
execute_subscription(entity);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
for (size_t ii = 0; ii < rcl_wait_set.size_of_services; ++ii) {
if (nullptr == rcl_wait_set.services[ii]) {continue;}
auto entity_iter = collection.services.find(rcl_wait_set.services[ii]);
if (entity_iter != collection.services.end()) {
auto entity = entity_iter->second.entity.lock();
if (!entity) {
continue;
}
execute_service(entity);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
for (size_t ii = 0; ii < rcl_wait_set.size_of_clients; ++ii) {
if (nullptr == rcl_wait_set.clients[ii]) {continue;}
auto entity_iter = collection.clients.find(rcl_wait_set.clients[ii]);
if (entity_iter != collection.clients.end()) {
auto entity = entity_iter->second.entity.lock();
if (!entity) {
continue;
}
execute_client(entity);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
}
for (auto & [handle, entry] : collection.waitables) {
auto waitable = entry.entity.lock();
if (!waitable) {
continue;
}
if (!waitable->is_ready(&rcl_wait_set)) {
continue;
}
auto data = waitable->take_data();
waitable->execute(data);
if (spin_once) {
return true;
}
any_ready_executable = true;
}
return any_ready_executable;
}

View File

@@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
timers_manager_ =
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
// This callback is invoked when:
@@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
this->refresh_current_collection_from_callback_groups();
});
// Make sure that the notify waitable is immediately added to the collection
// to avoid missing events
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
@@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(
this->entities_collector_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
}
EventsExecutor::~EventsExecutor()
@@ -269,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
switch (event.type) {
case ExecutorEventType::CLIENT_EVENT:
{
auto client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);
rclcpp::ClientBase::SharedPtr client;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);
}
if (client) {
for (size_t i = 0; i < event.num_events; i++) {
execute_client(client);
@@ -283,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SUBSCRIPTION_EVENT:
{
auto subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
rclcpp::SubscriptionBase::SharedPtr subscription;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
}
if (subscription) {
for (size_t i = 0; i < event.num_events; i++) {
execute_subscription(subscription);
@@ -295,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SERVICE_EVENT:
{
auto service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);
rclcpp::ServiceBase::SharedPtr service;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);
}
if (service) {
for (size_t i = 0; i < event.num_events; i++) {
execute_service(service);
@@ -315,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::WAITABLE_EVENT:
{
auto waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
rclcpp::Waitable::SharedPtr waitable;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
}
if (waitable) {
for (size_t i = 0; i < event.num_events; i++) {
auto data = waitable->take_data_by_entity_id(event.waitable_data);
@@ -382,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
void
EventsExecutor::refresh_current_collection_from_callback_groups()
{
// Build the new collection
this->entities_collector_->update_collections();
auto callback_groups = this->entities_collector_->get_all_callback_groups();
rclcpp::executors::ExecutorEntitiesCollection new_collection;
@@ -395,18 +414,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// retrieved in the "standard" way.
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
new_collection.waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
this->add_notify_waitable_to_collection(new_collection.waitables);
this->current_entities_collection_->waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
this->refresh_current_collection(new_collection);
}
@@ -415,6 +427,9 @@ void
EventsExecutor::refresh_current_collection(
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
{
// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
current_entities_collection_->timers.update(
new_collection.timers,
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
@@ -486,3 +501,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
};
return callback;
}
void
EventsExecutor::add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
{
// The notify waitable is not associated to any group, so use an invalid one
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
collection.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
}

View File

@@ -225,5 +225,52 @@ IntraProcessManager::can_communicate(
return true;
}
size_t
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
{
size_t capacity = std::numeric_limits<size_t>::max();
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
return 0u;
}
if (publisher_it->second.take_shared_subscriptions.empty() &&
publisher_it->second.take_ownership_subscriptions.empty())
{
// no subscriptions available
return 0u;
}
auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
{
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
if (subscription_it != subscriptions_.end()) {
auto subscription = subscription_it->second.lock();
if (subscription) {
capacity = std::min(capacity, subscription->available_capacity());
}
} else {
// Subscription is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling available_capacity for invalid or no longer existing subscription id");
}
};
for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
available_capacity(sub_id);
}
for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
available_capacity(sub_id);
}
return capacity;
}
} // namespace experimental
} // namespace rclcpp

View File

@@ -14,6 +14,7 @@
#include <memory>
#include <mutex>
#include <stdexcept>
#include "rcutils/macros.h"

View File

@@ -36,6 +36,7 @@
#include "rclcpp/node_interfaces/node_time_source.hpp"
#include "rclcpp/node_interfaces/node_timers.hpp"
#include "rclcpp/node_interfaces/node_topics.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions.hpp"
#include "rclcpp/node_interfaces/node_waitables.hpp"
#include "rclcpp/qos_overriding_options.hpp"
@@ -167,7 +168,7 @@ Node::Node(
options.use_intra_process_comms(),
options.enable_topic_statistics())),
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_)),
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
@@ -206,6 +207,12 @@ Node::Node(
options.clock_qos(),
options.use_clock_thread()
)),
node_type_descriptions_(new rclcpp::node_interfaces::NodeTypeDescriptions(
node_base_,
node_logging_,
node_parameters_,
node_services_
)),
node_waitables_(new rclcpp::node_interfaces::NodeWaitables(node_base_.get())),
node_options_(options),
sub_namespace_(""),
@@ -225,6 +232,10 @@ Node::Node(
node_topics_->resolve_topic_name("/parameter_events"),
options.parameter_event_qos(),
rclcpp::detail::PublisherQosParametersTraits{});
if (options.enable_logger_service()) {
node_logging_->create_logger_services(node_services_);
}
}
Node::Node(
@@ -587,6 +598,12 @@ Node::get_node_topics_interface()
return node_topics_;
}
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr
Node::get_node_type_descriptions_interface()
{
return node_type_descriptions_;
}
rclcpp::node_interfaces::NodeServicesInterface::SharedPtr
Node::get_node_services_interface()
{

View File

@@ -20,6 +20,7 @@
#include "rclcpp/node_interfaces/node_base.hpp"
#include "rcl/arguments.h"
#include "rcl/node_type_cache.h"
#include "rclcpp/exceptions.hpp"
#include "rcutils/logging_macros.h"
#include "rmw/validate_namespace.h"

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/node_impl.hpp"
#include "rclcpp/node_interfaces/node_logging.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
using rclcpp::node_interfaces::NodeLogging;
NodeLogging::NodeLogging(rclcpp::node_interfaces::NodeBaseInterface * node_base)
NodeLogging::NodeLogging(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base)
: node_base_(node_base)
{
logger_ = rclcpp::get_logger(NodeLogging::get_logger_name());
@@ -37,3 +39,55 @@ NodeLogging::get_logger_name() const
{
return rcl_node_get_logger_name(node_base_->get_rcl_node_handle());
}
void NodeLogging::create_logger_services(
node_interfaces::NodeServicesInterface::SharedPtr node_services)
{
rclcpp::ServicesQoS qos_profile;
const std::string node_name = node_base_->get_name();
auto callback_group = node_base_->get_default_callback_group();
get_loggers_service_ = rclcpp::create_service<rcl_interfaces::srv::GetLoggerLevels>(
node_base_, node_services,
node_name + "/get_logger_levels",
[](
const std::shared_ptr<rmw_request_id_t>,
const std::shared_ptr<rcl_interfaces::srv::GetLoggerLevels::Request> request,
std::shared_ptr<rcl_interfaces::srv::GetLoggerLevels::Response> response)
{
for (auto & name : request->names) {
rcl_interfaces::msg::LoggerLevel logger_level;
logger_level.name = name;
auto ret = rcutils_logging_get_logger_level(name.c_str());
if (ret < 0) {
logger_level.level = 0;
} else {
logger_level.level = static_cast<uint8_t>(ret);
}
response->levels.push_back(std::move(logger_level));
}
},
qos_profile, callback_group);
set_loggers_service_ = rclcpp::create_service<rcl_interfaces::srv::SetLoggerLevels>(
node_base_, node_services,
node_name + "/set_logger_levels",
[](
const std::shared_ptr<rmw_request_id_t>,
const std::shared_ptr<rcl_interfaces::srv::SetLoggerLevels::Request> request,
std::shared_ptr<rcl_interfaces::srv::SetLoggerLevels::Response> response)
{
rcl_interfaces::msg::SetLoggerLevelsResult result;
for (auto & level : request->levels) {
auto ret = rcutils_logging_set_logger_level(level.name.c_str(), level.level);
if (ret != RCUTILS_RET_OK) {
result.successful = false;
result.reason = rcutils_get_error_string().str;
} else {
result.successful = true;
}
response->results.push_back(std::move(result));
}
},
qos_profile, callback_group);
}

View File

@@ -50,7 +50,7 @@ NodeTimers::add_timer(
std::string("failed to notify wait set on timer creation: ") + ex.what());
}
TRACEPOINT(
TRACETOOLS_TRACEPOINT(
rclcpp_timer_link_node,
static_cast<const void *>(timer->get_timer_handle().get()),
static_cast<const void *>(node_base_->get_rcl_node_handle()));

View File

@@ -0,0 +1,157 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <string>
#include <thread>
#include "rclcpp/node_interfaces/node_type_descriptions.hpp"
#include "rclcpp/parameter_client.hpp"
#include "type_description_interfaces/srv/get_type_description.h"
namespace
{
// Helper wrapper for rclcpp::Service to access ::Request and ::Response types for allocation.
struct GetTypeDescription__C
{
using Request = type_description_interfaces__srv__GetTypeDescription_Request;
using Response = type_description_interfaces__srv__GetTypeDescription_Response;
using Event = type_description_interfaces__srv__GetTypeDescription_Event;
};
} // namespace
// Helper function for C typesupport.
namespace rosidl_typesupport_cpp
{
template<>
rosidl_service_type_support_t const *
get_service_type_support_handle<GetTypeDescription__C>()
{
return ROSIDL_GET_SRV_TYPE_SUPPORT(type_description_interfaces, srv, GetTypeDescription);
}
} // namespace rosidl_typesupport_cpp
namespace rclcpp
{
namespace node_interfaces
{
class NodeTypeDescriptions::NodeTypeDescriptionsImpl
{
public:
using ServiceT = GetTypeDescription__C;
rclcpp::Logger logger_;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
rclcpp::Service<ServiceT>::SharedPtr type_description_srv_;
NodeTypeDescriptionsImpl(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters,
rclcpp::node_interfaces::NodeServicesInterface::SharedPtr node_services)
: logger_(node_logging->get_logger()),
node_base_(node_base)
{
const std::string enable_param_name = "start_type_description_service";
bool enabled = false;
try {
auto enable_param = node_parameters->declare_parameter(
enable_param_name,
rclcpp::ParameterValue(true),
rcl_interfaces::msg::ParameterDescriptor()
.set__name(enable_param_name)
.set__type(rclcpp::PARAMETER_BOOL)
.set__description("Start the ~/get_type_description service for this node.")
.set__read_only(true));
enabled = enable_param.get<bool>();
} catch (const rclcpp::exceptions::InvalidParameterTypeException & exc) {
RCLCPP_ERROR(logger_, "%s", exc.what());
throw;
}
if (enabled) {
auto rcl_node = node_base->get_rcl_node_handle();
rcl_ret_t rcl_ret = rcl_node_type_description_service_init(rcl_node);
if (rcl_ret != RCL_RET_OK) {
RCLCPP_ERROR(
logger_, "Failed to initialize ~/get_type_description_service: %s",
rcl_get_error_string().str);
throw std::runtime_error(
"Failed to initialize ~/get_type_description service.");
}
rcl_service_t * rcl_srv = nullptr;
rcl_ret = rcl_node_get_type_description_service(rcl_node, &rcl_srv);
if (rcl_ret != RCL_RET_OK) {
throw std::runtime_error(
"Failed to get initialized ~/get_type_description service from rcl.");
}
rclcpp::AnyServiceCallback<ServiceT> cb;
cb.set(
[this](
std::shared_ptr<rmw_request_id_t> header,
std::shared_ptr<ServiceT::Request> request,
std::shared_ptr<ServiceT::Response> response
) {
rcl_node_type_description_service_handle_request(
node_base_->get_rcl_node_handle(),
header.get(),
request.get(),
response.get());
});
type_description_srv_ = std::make_shared<Service<ServiceT>>(
node_base_->get_shared_rcl_node_handle(),
rcl_srv,
cb);
node_services->add_service(
std::dynamic_pointer_cast<ServiceBase>(type_description_srv_),
nullptr);
}
}
~NodeTypeDescriptionsImpl()
{
if (
type_description_srv_ &&
RCL_RET_OK != rcl_node_type_description_service_fini(node_base_->get_rcl_node_handle()))
{
RCLCPP_ERROR(
logger_,
"Error in shutdown of get_type_description service: %s", rcl_get_error_string().str);
}
}
};
NodeTypeDescriptions::NodeTypeDescriptions(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging,
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters,
rclcpp::node_interfaces::NodeServicesInterface::SharedPtr node_services)
: impl_(new NodeTypeDescriptionsImpl(
node_base,
node_logging,
node_parameters,
node_services))
{}
NodeTypeDescriptions::~NodeTypeDescriptions()
{}
} // namespace node_interfaces
} // namespace rclcpp

View File

@@ -248,6 +248,19 @@ NodeOptions::start_parameter_services(bool start_parameter_services)
return *this;
}
bool
NodeOptions::enable_logger_service() const
{
return this->enable_logger_service_;
}
NodeOptions &
NodeOptions::enable_logger_service(bool enable_logger_service)
{
this->enable_logger_service_ = enable_logger_service;
return *this;
}
bool
NodeOptions::start_parameter_event_publisher() const
{

View File

@@ -384,3 +384,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin
return network_flow_endpoint_vector;
}
size_t PublisherBase::lowest_available_ipm_capacity() const
{
if (!intra_process_is_enabled_) {
return 0u;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died for a publisher.");
return 0u;
}
return ipm->lowest_available_capacity(intra_process_publisher_id_);
}

View File

@@ -52,7 +52,7 @@ SubscriptionBase::SubscriptionBase(
intra_process_subscription_id_(0),
event_callbacks_(event_callbacks),
type_support_(type_support_handle),
delivered_message_type_(delivered_message_kind)
delivered_message_kind_(delivered_message_kind)
{
auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
{
@@ -218,7 +218,7 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes
&message_info_out.get_rmw_message_info(),
nullptr // rmw_subscription_allocation_t is unused here
);
TRACEPOINT(rclcpp_take, static_cast<const void *>(message_out));
TRACETOOLS_TRACEPOINT(rclcpp_take, static_cast<const void *>(message_out));
if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
return false;
} else if (RCL_RET_OK != ret) {
@@ -261,13 +261,13 @@ SubscriptionBase::get_message_type_support_handle() const
bool
SubscriptionBase::is_serialized() const
{
return delivered_message_type_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
return delivered_message_kind_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
}
rclcpp::DeliveredMessageKind
SubscriptionBase::get_subscription_type() const
SubscriptionBase::get_delivered_message_kind() const
{
return delivered_message_type_;
return delivered_message_kind_;
}
size_t

View File

@@ -32,7 +32,8 @@ using rclcpp::TimerBase;
TimerBase::TimerBase(
rclcpp::Clock::SharedPtr clock,
std::chrono::nanoseconds period,
rclcpp::Context::SharedPtr context)
rclcpp::Context::SharedPtr context,
bool autostart)
: clock_(clock), timer_handle_(nullptr)
{
if (nullptr == context) {
@@ -64,9 +65,9 @@ TimerBase::TimerBase(
rcl_clock_t * clock_handle = clock_->get_clock_handle();
{
std::lock_guard<std::mutex> clock_guard(clock_->get_clock_mutex());
rcl_ret_t ret = rcl_timer_init(
timer_handle_.get(), clock_handle, rcl_context.get(), period.count(), nullptr,
rcl_get_default_allocator());
rcl_ret_t ret = rcl_timer_init2(
timer_handle_.get(), clock_handle, rcl_context.get(), period.count(),
nullptr, rcl_get_default_allocator(), autostart);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't initialize rcl timer handle");
}

View File

@@ -362,3 +362,42 @@ BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark
}
}
}
BENCHMARK_F(
PerformanceTestExecutorSimple,
static_executor_entities_collector_execute)(benchmark::State & st)
{
rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_ =
std::make_shared<rclcpp::executors::StaticExecutorEntitiesCollector>();
entities_collector_->add_node(node->get_node_base_interface());
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
rcl_allocator_t allocator = rcl_get_default_allocator();
auto shared_context = node->get_node_base_interface()->get_context();
rcl_context_t * context = shared_context->get_rcl_context().get();
rcl_ret_t ret = rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator);
if (ret != RCL_RET_OK) {
st.SkipWithError(rcutils_get_error_string().str);
}
RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_wait_set_fini(&wait_set);
if (ret != RCL_RET_OK) {
st.SkipWithError(rcutils_get_error_string().str);
}
});
auto memory_strategy = rclcpp::memory_strategies::create_default_strategy();
rclcpp::GuardCondition guard_condition(shared_context);
entities_collector_->init(&wait_set, memory_strategy);
RCPPUTILS_SCOPE_EXIT(entities_collector_->fini());
reset_heap_counters();
for (auto _ : st) {
(void)_;
std::shared_ptr<void> data = entities_collector_->take_data();
entities_collector_->execute(data);
}
}

View File

@@ -262,6 +262,11 @@ if(TARGET test_node_interfaces__node_topics)
"test_msgs")
target_link_libraries(test_node_interfaces__node_topics ${PROJECT_NAME} mimick)
endif()
ament_add_gtest(test_node_interfaces__node_type_descriptions
node_interfaces/test_node_type_descriptions.cpp)
if(TARGET test_node_interfaces__node_type_descriptions)
target_link_libraries(test_node_interfaces__node_type_descriptions ${PROJECT_NAME} mimick)
endif()
ament_add_gtest(test_node_interfaces__node_waitables
node_interfaces/test_node_waitables.cpp)
if(TARGET test_node_interfaces__node_waitables)
@@ -649,6 +654,13 @@ if(TARGET test_wait_for_message)
target_link_libraries(test_wait_for_message ${PROJECT_NAME})
endif()
ament_add_gtest(test_logger_service test_logger_service.cpp)
if(TARGET test_logger_service)
ament_target_dependencies(test_logger_service
"rcl_interfaces")
target_link_libraries(test_logger_service ${PROJECT_NAME})
endif()
ament_add_gtest(test_interface_traits test_interface_traits.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_interface_traits)
@@ -671,6 +683,14 @@ if(TARGET test_executors)
target_link_libraries(test_executors ${PROJECT_NAME})
endif()
ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_static_single_threaded_executor)
ament_target_dependencies(test_static_single_threaded_executor
"test_msgs")
target_link_libraries(test_static_single_threaded_executor ${PROJECT_NAME} mimick)
endif()
ament_add_gtest(test_multi_threaded_executor executors/test_multi_threaded_executor.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_multi_threaded_executor)
@@ -679,6 +699,15 @@ if(TARGET test_multi_threaded_executor)
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
endif()
ament_add_gtest(test_static_executor_entities_collector executors/test_static_executor_entities_collector.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120)
if(TARGET test_static_executor_entities_collector)
ament_target_dependencies(test_static_executor_entities_collector
"rcl"
"test_msgs")
target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick)
endif()
ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120)
if(TARGET test_entities_collector)

View File

@@ -43,11 +43,6 @@ public:
TEST_F(TestEventsExecutor, run_pub_sub)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
bool msg_received = false;
@@ -95,11 +90,6 @@ TEST_F(TestEventsExecutor, run_pub_sub)
TEST_F(TestEventsExecutor, run_clients_servers)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
bool request_received = false;
@@ -153,11 +143,6 @@ TEST_F(TestEventsExecutor, run_clients_servers)
TEST_F(TestEventsExecutor, spin_once_max_duration_timeout)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
EventsExecutor executor;
@@ -190,11 +175,6 @@ TEST_F(TestEventsExecutor, spin_once_max_duration_timeout)
TEST_F(TestEventsExecutor, spin_once_max_duration_timer)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
EventsExecutor executor;
@@ -226,11 +206,6 @@ TEST_F(TestEventsExecutor, spin_once_max_duration_timer)
TEST_F(TestEventsExecutor, spin_some_max_duration)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
{
auto node = std::make_shared<rclcpp::Node>("node");
@@ -277,11 +252,6 @@ TEST_F(TestEventsExecutor, spin_some_max_duration)
TEST_F(TestEventsExecutor, spin_some_zero_duration)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
size_t t_runs = 0;
@@ -303,11 +273,6 @@ TEST_F(TestEventsExecutor, spin_some_zero_duration)
TEST_F(TestEventsExecutor, spin_all_max_duration)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
{
auto node = std::make_shared<rclcpp::Node>("node");
@@ -358,11 +323,6 @@ TEST_F(TestEventsExecutor, spin_all_max_duration)
TEST_F(TestEventsExecutor, cancel_while_timers_running)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
EventsExecutor executor;
@@ -402,11 +362,6 @@ TEST_F(TestEventsExecutor, cancel_while_timers_running)
TEST_F(TestEventsExecutor, cancel_while_timers_waiting)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
size_t t1_runs = 0;
@@ -435,11 +390,6 @@ TEST_F(TestEventsExecutor, destroy_entities)
// This test fails on Windows! We skip it for now
GTEST_SKIP();
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
// Create a publisher node and start publishing messages
auto node_pub = std::make_shared<rclcpp::Node>("node_pub");
auto publisher = node_pub->create_publisher<test_msgs::msg::Empty>("topic", rclcpp::QoS(10));
@@ -485,11 +435,6 @@ std::string * g_sub_log_msg;
std::promise<void> * g_log_msgs_promise;
TEST_F(TestEventsExecutor, test_default_incompatible_qos_callbacks)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("node");
rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler();

View File

@@ -20,12 +20,14 @@
#include <gtest/gtest.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "rcl/error_handling.h"
#include "rcl/time.h"
@@ -43,18 +45,10 @@ template<typename T>
class TestExecutors : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
void SetUp()
{
rclcpp::init(0, nullptr);
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
@@ -75,6 +69,8 @@ public:
publisher.reset();
subscription.reset();
node.reset();
rclcpp::shutdown();
}
rclcpp::Node::SharedPtr node;
@@ -83,6 +79,8 @@ public:
int callback_count;
};
// spin_all and spin_some are not implemented correctly in StaticSingleThreadedExecutor, see:
// https://github.com/ros2/rclcpp/issues/1219 for tracking
template<typename T>
class TestExecutorsStable : public TestExecutors<T> {};
@@ -124,18 +122,19 @@ public:
// is updated.
TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames);
// StaticSingleThreadedExecutor is not included in these tests for now, due to:
// https://github.com/ros2/rclcpp/issues/1219
using StandardExecutors =
::testing::Types<
rclcpp::executors::SingleThreadedExecutor,
rclcpp::executors::MultiThreadedExecutor,
rclcpp::experimental::executors::EventsExecutor>;
TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames);
// Make sure that executors detach from nodes when destructing
TYPED_TEST(TestExecutors, detachOnDestruction)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
{
ExecutorType executor;
executor.add_node(this->node);
@@ -147,16 +146,11 @@ TYPED_TEST(TestExecutors, detachOnDestruction)
}
// Make sure that the executor can automatically remove expired nodes correctly
TYPED_TEST(TestExecutors, addTemporaryNode) {
// Currently fails for StaticSingleThreadedExecutor so it is being skipped, see:
// https://github.com/ros2/rclcpp/issues/1231
TYPED_TEST(TestExecutorsStable, addTemporaryNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
{
@@ -177,14 +171,6 @@ TYPED_TEST(TestExecutors, addTemporaryNode) {
TYPED_TEST(TestExecutors, emptyExecutor)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
std::thread spinner([&]() {EXPECT_NO_THROW(executor.spin());});
std::this_thread::sleep_for(50ms);
@@ -196,14 +182,6 @@ TYPED_TEST(TestExecutors, emptyExecutor)
TYPED_TEST(TestExecutors, addNodeTwoExecutors)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor1;
ExecutorType executor2;
EXPECT_NO_THROW(executor1.add_node(this->node));
@@ -215,14 +193,6 @@ TYPED_TEST(TestExecutors, addNodeTwoExecutors)
TYPED_TEST(TestExecutors, spinWithTimer)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
bool timer_completed = false;
@@ -246,28 +216,17 @@ TYPED_TEST(TestExecutors, spinWithTimer)
TYPED_TEST(TestExecutors, spinWhileAlreadySpinning)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
std::atomic_bool timer_completed = false;
auto timer = this->node->create_wall_timer(
1ms, [&]() {
timer_completed.store(true);
});
bool timer_completed = false;
auto timer = this->node->create_wall_timer(1ms, [&]() {timer_completed = true;});
std::thread spinner([&]() {executor.spin();});
// Sleep for a short time to verify executor.spin() is going, and didn't throw.
auto start = std::chrono::steady_clock::now();
while (!timer_completed.load() && (std::chrono::steady_clock::now() - start) < 10s) {
while (!timer_completed && (std::chrono::steady_clock::now() - start) < 10s) {
std::this_thread::sleep_for(1ms);
}
@@ -284,14 +243,6 @@ TYPED_TEST(TestExecutors, spinWhileAlreadySpinning)
TYPED_TEST(TestExecutors, testSpinUntilFutureComplete)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
@@ -315,14 +266,6 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureComplete)
TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
@@ -347,14 +290,6 @@ TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete)
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
@@ -402,14 +337,6 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout)
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteWithTimeout)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
@@ -522,14 +449,6 @@ private:
TYPED_TEST(TestExecutors, spinAll)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto waitable_interfaces = this->node->get_node_waitables_interface();
auto my_waitable = std::make_shared<TestWaitable>();
@@ -572,14 +491,6 @@ TYPED_TEST(TestExecutors, spinAll)
TYPED_TEST(TestExecutors, spinSome)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto waitable_interfaces = this->node->get_node_waitables_interface();
auto my_waitable = std::make_shared<TestWaitable>();
@@ -622,14 +533,6 @@ TYPED_TEST(TestExecutors, spinSome)
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
std::promise<bool> promise;
@@ -646,14 +549,6 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr)
TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
std::promise<bool> promise;
@@ -670,14 +565,6 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr)
TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
executor.add_node(this->node);
@@ -718,6 +605,77 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
spinner.join();
}
// This test verifies that the add_node operation is robust wrt race conditions.
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
// thread-safe in all executor implementations.
// The initial implementation of the events-executor contained a bug where the executor
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
// Manually adding a node to the executor results in a) producing a notify waitable event
// and b) refreshing the executor collections.
// The inconsistent state would happen if the event was processed before the collections were
// finished to be refreshed: the executor would pick up the event but be unable to process it.
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
// notify waitable events to be pushed.
// The behavior is observable only under heavy load, so this test spawns several worker
// threads. Due to the nature of the bug, this test may still succeed even if the
// bug is present. However repeated runs will show its flakiness nature and indicate
// an eventual regression.
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
// Spawn some threads to do some heavy work
std::atomic<bool> should_cancel = false;
std::vector<std::thread> stress_threads;
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
stress_threads.emplace_back(
[&should_cancel, i]() {
// This is just some arbitrary heavy work
volatile size_t total = 0;
for (size_t k = 0; k < 549528914167; k++) {
if (should_cancel) {
break;
}
total += k * (i + 42);
}
});
}
// Create an executor
auto executor = std::make_shared<ExecutorType>();
// Start spinning
auto executor_thread = std::thread(
[executor]() {
executor->spin();
});
// Add a node to the executor
executor->add_node(this->node);
// Cancel the executor (make sure that it's already spinning first)
while (!executor->is_spinning() && rclcpp::ok()) {
continue;
}
executor->cancel();
// Try to join the thread after cancelling the executor
// This is the "test". We want to make sure that we can still cancel the executor
// regardless of the presence of race conditions
executor_thread.join();
// The test is now completed: we can join the stress threads
should_cancel = true;
for (auto & t : stress_threads) {
t.join();
}
}
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
@@ -780,7 +738,7 @@ public:
test_name << test_info->test_case_name() << "_" << test_info->name();
node = std::make_shared<rclcpp::Node>("node", test_name.str());
callback_count = 0;
callback_count = 0u;
const std::string topic_name = std::string("topic_") + test_name.str();
@@ -789,7 +747,7 @@ public:
publisher = node->create_publisher<test_msgs::msg::Empty>(topic_name, rclcpp::QoS(1), po);
auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {
this->callback_count.fetch_add(1);
this->callback_count.fetch_add(1u);
};
rclcpp::SubscriptionOptions so;
@@ -811,7 +769,7 @@ public:
rclcpp::Node::SharedPtr node;
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher;
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription;
std::atomic_int callback_count;
std::atomic_size_t callback_count;
};
TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames);
@@ -827,7 +785,7 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
ExecutorType executor;
executor.add_node(this->node);
EXPECT_EQ(0, this->callback_count.load());
EXPECT_EQ(0u, this->callback_count.load());
this->publisher->publish(test_msgs::msg::Empty());
// Wait for up to 5 seconds for the first message to come available.
@@ -841,7 +799,7 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
EXPECT_EQ(1u, this->callback_count.load());
// reset counter
this->callback_count.store(0);
this->callback_count.store(0u);
for (size_t ii = 0; ii < kNumMessages; ++ii) {
this->publisher->publish(test_msgs::msg::Empty());

View File

@@ -77,9 +77,9 @@ TEST_F(TestNodeParameters, list_parameters)
std::vector<std::string> prefixes;
const auto list_result = node_parameters->list_parameters(prefixes, 1u);
// Currently the only default parameter is 'use_sim_time', but that may change.
// Currently the default parameters are 'use_sim_time' and 'start_type_description_service'
size_t number_of_parameters = list_result.names.size();
EXPECT_GE(1u, number_of_parameters);
EXPECT_GE(2u, number_of_parameters);
const std::string parameter_name = "new_parameter";
const rclcpp::ParameterValue value(true);

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include "rclcpp/node.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions.hpp"
class TestNodeTypeDescriptions : public ::testing::Test
{
public:
void SetUp()
{
rclcpp::init(0, nullptr);
}
void TearDown()
{
rclcpp::shutdown();
}
};
TEST_F(TestNodeTypeDescriptions, interface_created)
{
rclcpp::Node node{"node", "ns"};
ASSERT_NE(nullptr, node.get_node_type_descriptions_interface());
}
TEST_F(TestNodeTypeDescriptions, disabled_no_service)
{
rclcpp::NodeOptions node_options;
node_options.append_parameter_override("start_type_description_service", false);
rclcpp::Node node{"node", "ns", node_options};
rcl_node_t * rcl_node = node.get_node_base_interface()->get_rcl_node_handle();
rcl_service_t * srv = nullptr;
rcl_ret_t ret = rcl_node_get_type_description_service(rcl_node, &srv);
ASSERT_EQ(RCL_RET_NOT_INIT, ret);
}
TEST_F(TestNodeTypeDescriptions, enabled_creates_service)
{
rclcpp::NodeOptions node_options;
node_options.append_parameter_override("start_type_description_service", true);
rclcpp::Node node{"node", "ns", node_options};
rcl_node_t * rcl_node = node.get_node_base_interface()->get_rcl_node_handle();
rcl_service_t * srv = nullptr;
rcl_ret_t ret = rcl_node_get_type_description_service(rcl_node, &srv);
ASSERT_EQ(RCL_RET_OK, ret);
ASSERT_NE(nullptr, srv);
}

View File

@@ -99,13 +99,6 @@ TYPED_TEST_SUITE(TestAddCallbackGroupsToExecutorStable, StandardExecutors, Execu
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
auto timer_callback = []() {};
@@ -155,13 +148,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups)
TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
auto timer_callback = []() {};
@@ -193,13 +179,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups)
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
@@ -220,13 +199,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups)
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
@@ -263,13 +235,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t
TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
@@ -307,13 +272,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups)
TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType timer_executor;
ExecutorType sub_executor;
@@ -355,13 +313,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_e
TYPED_TEST(TestAddCallbackGroupsToExecutorStable, subscriber_triggered_to_receive_message)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
@@ -428,13 +379,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutorStable, subscriber_triggered_to_receiv
TYPED_TEST(TestAddCallbackGroupsToExecutorStable, callback_group_create_after_spin)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");
@@ -481,13 +425,6 @@ TYPED_TEST(TestAddCallbackGroupsToExecutorStable, callback_group_create_after_sp
TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}
ExecutorType executor;
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");

View File

@@ -193,3 +193,29 @@ TEST(TestCreateTimer, timer_function_pointer)
rclcpp::shutdown();
}
TEST(TestCreateTimer, timer_without_autostart)
{
rclcpp::init(0, nullptr);
auto node = std::make_shared<rclcpp::Node>("test_create_timer_node");
rclcpp::TimerBase::SharedPtr timer;
timer = rclcpp::create_timer(
node,
node->get_clock(),
rclcpp::Duration(0ms),
[]() {},
nullptr,
false);
EXPECT_TRUE(timer->is_canceled());
EXPECT_EQ(timer->time_until_trigger().count(), std::chrono::nanoseconds::max().count());
timer->reset();
EXPECT_LE(timer->time_until_trigger().count(), std::chrono::nanoseconds::max().count());
EXPECT_FALSE(timer->is_canceled());
timer->cancel();
rclcpp::shutdown();
}

View File

@@ -46,6 +46,23 @@ public:
{
spin_node_once_nanoseconds(node, std::chrono::milliseconds(100));
}
rclcpp::memory_strategy::MemoryStrategy * memory_strategy_ptr()
{
return memory_strategy_.get();
}
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group(
rclcpp::CallbackGroup::SharedPtr group)
{
std::lock_guard<std::mutex> guard_{mutex_}; // only to make the TSA happy
return get_node_by_group(weak_groups_to_nodes_, group);
}
rclcpp::CallbackGroup::SharedPtr local_get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
{
return get_group_by_timer(timer);
}
};
class TestExecutor : public ::testing::Test
@@ -113,7 +130,7 @@ TEST_F(TestExecutor, constructor_bad_wait_set_init) {
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_init, RCL_RET_ERROR);
RCLCPP_EXPECT_THROW_EQ(
static_cast<void>(std::make_unique<DummyExecutor>()),
std::runtime_error("Failed to create wait set: error not set"));
std::runtime_error("Failed to create wait set in Executor constructor: error not set"));
}
TEST_F(TestExecutor, add_callback_group_twice) {
@@ -125,7 +142,7 @@ TEST_F(TestExecutor, add_callback_group_twice) {
cb_group->get_associated_with_executor_atomic().exchange(false);
RCLCPP_EXPECT_THROW_EQ(
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false),
std::runtime_error("Callback group has already been added to this executor."));
std::runtime_error("Callback group was already added to executor."));
}
TEST_F(TestExecutor, add_callback_group_failed_trigger_guard_condition) {
@@ -151,15 +168,9 @@ TEST_F(TestExecutor, remove_callback_group_null_node) {
node.reset();
/**
* TODO(mjcarroll): Assert this when we are enforcing that nodes must be destroyed
* after their created callback groups.
RCLCPP_EXPECT_THROW_EQ(
dummy.remove_callback_group(cb_group, false),
std::runtime_error("Node must not be deleted before its callback group(s)."));
*/
EXPECT_NO_THROW(dummy.remove_callback_group(cb_group, false));
}
TEST_F(TestExecutor, remove_callback_group_failed_trigger_guard_condition) {
@@ -186,7 +197,7 @@ TEST_F(TestExecutor, remove_node_not_associated) {
RCLCPP_EXPECT_THROW_EQ(
dummy.remove_node(node->get_node_base_interface(), false),
std::runtime_error("Node '/ns/node' needs to be associated with an executor."));
std::runtime_error("Node needs to be associated with an executor."));
}
TEST_F(TestExecutor, remove_node_associated_with_different_executor) {
@@ -200,7 +211,7 @@ TEST_F(TestExecutor, remove_node_associated_with_different_executor) {
RCLCPP_EXPECT_THROW_EQ(
dummy2.remove_node(node1->get_node_base_interface(), false),
std::runtime_error("Node '/ns/node1' needs to be associated with this executor."));
std::runtime_error("Node needs to be associated with this executor."));
}
TEST_F(TestExecutor, spin_node_once_nanoseconds) {
@@ -317,14 +328,42 @@ TEST_F(TestExecutor, cancel_failed_trigger_guard_condition) {
std::runtime_error("Failed to trigger guard condition in cancel: error not set"));
}
TEST_F(TestExecutor, create_executor_fail_wait_set_clear) {
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR);
TEST_F(TestExecutor, set_memory_strategy_nullptr) {
DummyExecutor dummy;
RCLCPP_EXPECT_THROW_EQ(
DummyExecutor dummy,
std::runtime_error("Couldn't clear the wait set: error not set"));
dummy.set_memory_strategy(nullptr),
std::runtime_error("Received NULL memory strategy in executor."));
}
TEST_F(TestExecutor, spin_all_fail_wait_set_clear) {
TEST_F(TestExecutor, set_memory_strategy) {
DummyExecutor dummy;
rclcpp::memory_strategy::MemoryStrategy::SharedPtr strategy =
std::make_shared<
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
dummy.set_memory_strategy(strategy);
EXPECT_EQ(dummy.memory_strategy_ptr(), strategy.get());
}
TEST_F(TestExecutor, spin_once_failed_trigger_guard_condition) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
auto timer =
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {});
dummy.add_node(node);
// Wait for the wall timer to have expired.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR);
RCLCPP_EXPECT_THROW_EQ(
dummy.spin_once(std::chrono::milliseconds(1)),
std::runtime_error(
"Failed to trigger guard condition from execute_any_executable: error not set"));
}
TEST_F(TestExecutor, spin_some_fail_wait_set_clear) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
auto timer =
@@ -332,10 +371,9 @@ TEST_F(TestExecutor, spin_all_fail_wait_set_clear) {
dummy.add_node(node);
auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR);
RCLCPP_EXPECT_THROW_EQ(
dummy.spin_all(std::chrono::milliseconds(1)),
std::runtime_error("Couldn't clear the wait set: error not set"));
dummy.spin_some(std::chrono::milliseconds(1)),
std::runtime_error("Couldn't clear wait set: error not set"));
}
TEST_F(TestExecutor, spin_some_fail_wait_set_resize) {
@@ -363,7 +401,7 @@ TEST_F(TestExecutor, spin_some_fail_add_handles_to_wait_set) {
RCL_RET_ERROR);
RCLCPP_EXPECT_THROW_EQ(
dummy.spin_some(std::chrono::milliseconds(1)),
std::runtime_error("Couldn't fill wait set: error not set"));
std::runtime_error("Couldn't fill wait set"));
}
TEST_F(TestExecutor, spin_some_fail_wait) {
@@ -379,6 +417,71 @@ TEST_F(TestExecutor, spin_some_fail_wait) {
std::runtime_error("rcl_wait() failed: error not set"));
}
TEST_F(TestExecutor, get_node_by_group_null_group) {
DummyExecutor dummy;
ASSERT_EQ(nullptr, dummy.local_get_node_by_group(nullptr));
}
TEST_F(TestExecutor, get_node_by_group) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false);
ASSERT_EQ(node->get_node_base_interface().get(), dummy.local_get_node_by_group(cb_group).get());
}
TEST_F(TestExecutor, get_node_by_group_not_found) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
ASSERT_EQ(nullptr, dummy.local_get_node_by_group(cb_group).get());
}
TEST_F(TestExecutor, get_group_by_timer_nullptr) {
DummyExecutor dummy;
ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(nullptr));
}
TEST_F(TestExecutor, get_group_by_timer) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto timer =
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
dummy.add_node(node);
ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get());
}
TEST_F(TestExecutor, get_group_by_timer_with_deleted_group) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto timer =
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
dummy.add_node(node);
cb_group.reset();
ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(timer).get());
}
TEST_F(TestExecutor, get_group_by_timer_add_callback_group) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");
rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);
auto timer =
node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group);
dummy.add_callback_group(cb_group, node->get_node_base_interface(), false);
ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get());
}
TEST_F(TestExecutor, spin_until_future_complete_in_spin_until_future_complete) {
DummyExecutor dummy;
auto node = std::make_shared<rclcpp::Node>("node", "ns");

View File

@@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);
}
/*
Check the available buffer capacity while storing and consuming data from an intra-process
buffer.
The initial available buffer capacity should equal the buffer size.
Inserting a message should decrease the available buffer capacity by 1.
Consuming a message should increase the available buffer capacity by 1.
*/
TEST(TestIntraProcessBuffer, available_capacity) {
using MessageT = char;
using Alloc = std::allocator<void>;
using Deleter = std::default_delete<MessageT>;
using SharedMessageT = std::shared_ptr<const MessageT>;
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
MessageT, Alloc, Deleter, UniqueMessageT>;
constexpr auto history_depth = 5u;
auto buffer_impl =
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
history_depth);
UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
auto original_unique_msg = std::make_unique<char>('a');
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
auto original_value = *original_unique_msg;
intra_process_buffer.add_unique(std::move(original_unique_msg));
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
SharedMessageT popped_shared_msg;
popped_shared_msg = intra_process_buffer.consume_shared();
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_shared_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);
original_unique_msg = std::make_unique<char>('b');
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
original_value = *original_unique_msg;
intra_process_buffer.add_unique(std::move(original_unique_msg));
auto second_unique_msg = std::make_unique<char>('c');
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
auto second_value = *second_unique_msg;
intra_process_buffer.add_unique(std::move(second_unique_msg));
EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());
UniqueMessageT popped_unique_msg;
popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);
popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());
EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(second_value, *popped_unique_msg);
EXPECT_EQ(second_message_pointer, popped_message_pointer);
}

View File

@@ -156,18 +156,26 @@ public:
{
message_ptr = reinterpret_cast<std::uintptr_t>(msg.get());
shared_msg = msg;
++num_msgs;
}
void add(MessageUniquePtr msg)
{
message_ptr = reinterpret_cast<std::uintptr_t>(msg.get());
unique_msg = std::move(msg);
++num_msgs;
}
void pop(std::uintptr_t & msg_ptr)
{
msg_ptr = message_ptr;
message_ptr = 0;
--num_msgs;
}
size_t size() const
{
return num_msgs;
}
// need to store the messages somewhere otherwise the memory address will be reused
@@ -175,6 +183,8 @@ public:
MessageUniquePtr unique_msg;
std::uintptr_t message_ptr;
// count add and pop
size_t num_msgs = 0u;
};
} // namespace mock
@@ -221,6 +231,10 @@ public:
return topic_name.c_str();
}
virtual
size_t
available_capacity() const = 0;
rclcpp::QoS qos_profile;
std::string topic_name;
};
@@ -280,6 +294,12 @@ public:
return take_shared_method;
}
size_t
available_capacity() const override
{
return qos_profile.depth() - buffer->size();
}
bool take_shared_method;
typename rclcpp::experimental::buffers::mock::IntraProcessBuffer<MessageT>::UniquePtr buffer;
@@ -712,3 +732,91 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) {
EXPECT_EQ(original_message_pointer, received_message_pointer_10);
EXPECT_NE(original_message_pointer, received_message_pointer_11);
}
/*
This tests the method "lowest_available_capacity":
- Creates 1 publisher.
- The available buffer capacity should be at least history size.
- Add 2 subscribers.
- Add everything to the intra-process manager.
- All the entities are expected to have different ids.
- Check the subscriptions count for the publisher.
- The available buffer capacity should be the history size.
- Publish one message (without receiving it).
- The available buffer capacity should decrease by 1.
- Publish another message (without receiving it).
- The available buffer capacity should decrease by 1.
- One subscriber receives one message.
- The available buffer capacity should stay the same,
as the other subscriber still has not freed its buffer.
- The other subscriber receives one message.
- The available buffer capacity should increase by 1.
- One subscription goes out of scope.
- The available buffer capacity should not change.
*/
TEST(TestIntraProcessManager, lowest_available_capacity) {
using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager;
using MessageT = rcl_interfaces::msg::Log;
using PublisherT = rclcpp::mock::Publisher<MessageT>;
using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess<MessageT>;
constexpr auto history_depth = 10u;
auto ipm = std::make_shared<IntraProcessManagerT>();
auto p1 = std::make_shared<PublisherT>(rclcpp::QoS(history_depth).best_effort());
auto s1 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).best_effort());
auto s2 = std::make_shared<SubscriptionIntraProcessT>(rclcpp::QoS(history_depth).best_effort());
auto p1_id = ipm->add_publisher(p1);
p1->set_intra_process_manager(p1_id, ipm);
auto c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_LE(0u, c1);
auto s1_id = ipm->add_subscription(s1);
auto s2_id = ipm->add_subscription(s2);
bool unique_ids = s1_id != s2_id && p1_id != s1_id;
ASSERT_TRUE(unique_ids);
size_t p1_subs = ipm->get_subscription_count(p1_id);
size_t non_existing_pub_subs = ipm->get_subscription_count(42);
ASSERT_EQ(2u, p1_subs);
ASSERT_EQ(0u, non_existing_pub_subs);
c1 = ipm->lowest_available_capacity(p1_id);
auto non_existing_pub_c = ipm->lowest_available_capacity(42);
ASSERT_EQ(history_depth, c1);
ASSERT_EQ(0u, non_existing_pub_c);
auto unique_msg = std::make_unique<MessageT>();
p1->publish(std::move(unique_msg));
c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_EQ(history_depth - 1u, c1);
unique_msg = std::make_unique<MessageT>();
p1->publish(std::move(unique_msg));
c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_EQ(history_depth - 2u, c1);
s1->pop();
c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_EQ(history_depth - 2u, c1);
s2->pop();
c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_EQ(history_depth - 1u, c1);
ipm->get_subscription_intra_process(s1_id).reset();
c1 = ipm->lowest_available_capacity(p1_id);
ASSERT_EQ(history_depth - 1u, c1);
}

View File

@@ -0,0 +1,214 @@
// Copyright 2023 Sony Group Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <vector>
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/node.hpp"
#include "rcl_interfaces/srv/get_logger_levels.hpp"
#include "rcl_interfaces/srv/set_logger_levels.hpp"
using namespace std::chrono_literals;
class TestLoggerService : public ::testing::Test
{
protected:
void SetUp()
{
rclcpp::init(0, nullptr);
rclcpp::NodeOptions options = rclcpp::NodeOptions();
options.enable_logger_service(true);
node_ = std::make_shared<rclcpp::Node>("test_logger_service", "/test", options);
}
void TearDown()
{
rclcpp::shutdown();
}
rclcpp::Node::SharedPtr node_;
std::thread thread_;
};
TEST_F(TestLoggerService, check_connect_get_logger_service) {
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
"/test/test_logger_service/get_logger_levels");
ASSERT_TRUE(client->wait_for_service(2s));
}
TEST_F(TestLoggerService, check_connect_set_logger_service) {
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
"/test/test_logger_service/set_logger_levels");
ASSERT_TRUE(client->wait_for_service(2s));
}
TEST_F(TestLoggerService, test_set_and_get_one_logging_level) {
std::string test_logger_name = "rcl";
uint8_t test_logger_level = 20;
{
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
"/test/test_logger_service/set_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
auto level = rcl_interfaces::msg::LoggerLevel();
level.name = test_logger_name;
level.level = test_logger_level;
request->levels.push_back(level);
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->results.size(), 1u);
ASSERT_TRUE(result_get->results[0].successful);
ASSERT_STREQ(result_get->results[0].reason.c_str(), "");
}
{
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
"/test/test_logger_service/get_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::GetLoggerLevels::Request>();
request->names.emplace_back(test_logger_name);
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->levels.size(), 1u);
ASSERT_STREQ(result_get->levels[0].name.c_str(), test_logger_name.c_str());
ASSERT_EQ(result_get->levels[0].level, test_logger_level);
}
}
TEST_F(TestLoggerService, test_set_and_get_multi_logging_level) {
std::vector<std::pair<std::string, uint8_t>> test_data {
{"rcl", 30},
{"rclcpp", 40},
{"/test/test_logger_service", 50}
};
// Set multi log levels
{
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
"/test/test_logger_service/set_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
for (auto & set_level : test_data) {
auto level = rcl_interfaces::msg::LoggerLevel();
level.name = std::get<0>(set_level);
level.level = std::get<1>(set_level);
request->levels.push_back(level);
}
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->results.size(), test_data.size());
for (uint32_t i = 0; i < test_data.size(); i++) {
ASSERT_TRUE(result_get->results[0].successful);
}
}
// Get multi log levels
{
auto client = node_->create_client<rcl_interfaces::srv::GetLoggerLevels>(
"/test/test_logger_service/get_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::GetLoggerLevels::Request>();
for (auto & set_level : test_data) {
request->names.emplace_back(std::get<0>(set_level));
}
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->levels.size(), test_data.size());
for (uint32_t i = 0; i < test_data.size(); i++) {
ASSERT_STREQ(result_get->levels[i].name.c_str(), std::get<0>(test_data[i]).c_str());
ASSERT_EQ(result_get->levels[i].level, std::get<1>(test_data[i]));
}
}
}
TEST_F(TestLoggerService, test_set_logging_level_with_invalid_param) {
std::vector<std::pair<std::string, uint8_t>> test_data {
{"rcl", 12},
{"/test/test_logger_service", 22}
};
// Set multi log levels
{
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
"/test/test_logger_service/set_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
for (auto & set_level : test_data) {
auto level = rcl_interfaces::msg::LoggerLevel();
level.name = std::get<0>(set_level);
level.level = std::get<1>(set_level);
request->levels.push_back(level);
}
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->results.size(), test_data.size());
for (uint32_t i = 0; i < test_data.size(); i++) {
ASSERT_FALSE(result_get->results[i].successful);
// Check string starts with prefix
ASSERT_EQ(
result_get->results[i].reason.rfind("Unable to determine severity_string for severity", 0),
0);
}
}
}
TEST_F(TestLoggerService, test_set_logging_level_with_partial_invalid_param) {
std::vector<std::pair<std::string, uint8_t>> test_data {
{"rcl", 20},
{"rclcpp", 22},
{"/test/test_logger_service", 30}
};
// Set multi log levels
{
auto client = node_->create_client<rcl_interfaces::srv::SetLoggerLevels>(
"/test/test_logger_service/set_logger_levels");
ASSERT_TRUE(client->wait_for_service(1s));
auto request = std::make_shared<rcl_interfaces::srv::SetLoggerLevels::Request>();
for (auto & set_level : test_data) {
auto level = rcl_interfaces::msg::LoggerLevel();
level.name = std::get<0>(set_level);
level.level = std::get<1>(set_level);
request->levels.push_back(level);
}
auto result = client->async_send_request(request);
ASSERT_EQ(
rclcpp::spin_until_future_complete(node_, result),
rclcpp::FutureReturnCode::SUCCESS);
auto result_get = result.get();
ASSERT_EQ(result_get->results.size(), test_data.size());
ASSERT_TRUE(result_get->results[0].successful);
ASSERT_FALSE(result_get->results[1].successful);
ASSERT_TRUE(result_get->results[2].successful);
}
}

View File

@@ -78,6 +78,7 @@ TEST_F(TestNode, construction_and_destruction) {
EXPECT_NE(nullptr, node->get_node_options().get_rcl_node_options());
EXPECT_NE(nullptr, node->get_graph_event());
EXPECT_NE(nullptr, node->get_clock());
EXPECT_NE(nullptr, node->get_node_type_descriptions_interface());
}
{

View File

@@ -266,6 +266,11 @@ TEST(TestNodeOptions, bool_setters_and_getters) {
EXPECT_FALSE(options.automatically_declare_parameters_from_overrides());
options.automatically_declare_parameters_from_overrides(true);
EXPECT_TRUE(options.automatically_declare_parameters_from_overrides());
options.enable_logger_service(false);
EXPECT_FALSE(options.enable_logger_service());
options.enable_logger_service(true);
EXPECT_TRUE(options.enable_logger_service());
}
TEST(TestNodeOptions, parameter_event_qos) {

View File

@@ -59,6 +59,8 @@ protected:
node_with_option.reset();
}
// "start_type_description_service" and "use_sim_time"
const uint64_t builtin_param_count = 2;
rclcpp::Node::SharedPtr node;
rclcpp::Node::SharedPtr node_with_option;
};
@@ -925,6 +927,7 @@ TEST_F(TestParameterClient, sync_parameter_delete_parameters) {
Coverage for async load_parameters
*/
TEST_F(TestParameterClient, async_parameter_load_parameters) {
const uint64_t expected_param_count = 4 + builtin_param_count;
auto load_node = std::make_shared<rclcpp::Node>(
"load_node",
"namespace",
@@ -944,12 +947,13 @@ TEST_F(TestParameterClient, async_parameter_load_parameters) {
auto list_parameters = asynchronous_client->list_parameters({}, 3);
rclcpp::spin_until_future_complete(
load_node, list_parameters, std::chrono::milliseconds(100));
ASSERT_EQ(list_parameters.get().names.size(), static_cast<uint64_t>(5));
ASSERT_EQ(list_parameters.get().names.size(), expected_param_count);
}
/*
Coverage for sync load_parameters
*/
TEST_F(TestParameterClient, sync_parameter_load_parameters) {
const uint64_t expected_param_count = 4 + builtin_param_count;
auto load_node = std::make_shared<rclcpp::Node>(
"load_node",
"namespace",
@@ -964,13 +968,14 @@ TEST_F(TestParameterClient, sync_parameter_load_parameters) {
ASSERT_EQ(load_future[0].successful, true);
// list parameters
auto list_parameters = synchronous_client->list_parameters({}, 3);
ASSERT_EQ(list_parameters.names.size(), static_cast<uint64_t>(5));
ASSERT_EQ(list_parameters.names.size(), static_cast<uint64_t>(expected_param_count));
}
/*
Coverage for async load_parameters with complicated regex expression
*/
TEST_F(TestParameterClient, async_parameter_load_parameters_complicated_regex) {
const uint64_t expected_param_count = 5 + builtin_param_count;
auto load_node = std::make_shared<rclcpp::Node>(
"load_node",
"namespace",
@@ -990,7 +995,7 @@ TEST_F(TestParameterClient, async_parameter_load_parameters_complicated_regex) {
auto list_parameters = asynchronous_client->list_parameters({}, 3);
rclcpp::spin_until_future_complete(
load_node, list_parameters, std::chrono::milliseconds(100));
ASSERT_EQ(list_parameters.get().names.size(), static_cast<uint64_t>(6));
ASSERT_EQ(list_parameters.get().names.size(), expected_param_count);
// to check the parameter "a_value"
std::string param_name = "a_value";
auto param = load_node->get_parameter(param_name);
@@ -1020,6 +1025,7 @@ TEST_F(TestParameterClient, async_parameter_load_no_valid_parameter) {
Coverage for async load_parameters from maps with complicated regex expression
*/
TEST_F(TestParameterClient, async_parameter_load_parameters_from_map) {
const uint64_t expected_param_count = 5 + builtin_param_count;
auto load_node = std::make_shared<rclcpp::Node>(
"load_node",
"namespace",
@@ -1068,7 +1074,7 @@ TEST_F(TestParameterClient, async_parameter_load_parameters_from_map) {
auto list_parameters = asynchronous_client->list_parameters({}, 3);
rclcpp::spin_until_future_complete(
load_node, list_parameters, std::chrono::milliseconds(100));
ASSERT_EQ(list_parameters.get().names.size(), static_cast<uint64_t>(6));
ASSERT_EQ(list_parameters.get().names.size(), expected_param_count);
// to check the parameter "a_value"
std::string param_name = "a_value";
auto param = load_node->get_parameter(param_name);

View File

@@ -629,6 +629,41 @@ TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) {
EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(6000)));
}
TEST_F(TestPublisher, lowest_available_ipm_capacity) {
constexpr auto history_depth = 10u;
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> options_ipm_disabled;
options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> options_ipm_enabled;
options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Strings>) {};
auto pub_ipm_disabled = node->create_publisher<test_msgs::msg::Strings>(
"topic", history_depth,
options_ipm_disabled);
auto pub_ipm_enabled = node->create_publisher<test_msgs::msg::Strings>(
"topic", history_depth,
options_ipm_enabled);
auto sub = node->create_subscription<test_msgs::msg::Strings>(
"topic",
history_depth,
do_nothing);
ASSERT_EQ(1, pub_ipm_enabled->get_intra_process_subscription_count());
ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity());
ASSERT_EQ(history_depth, pub_ipm_enabled->lowest_available_ipm_capacity());
auto msg = std::make_shared<test_msgs::msg::Strings>();
ASSERT_NO_THROW(pub_ipm_disabled->publish(*msg));
ASSERT_NO_THROW(pub_ipm_enabled->publish(*msg));
ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity());
ASSERT_EQ(history_depth - 1u, pub_ipm_enabled->lowest_available_ipm_capacity());
}
INSTANTIATE_TEST_SUITE_P(
TestWaitForAllAckedWithParm,
TestPublisherWaitForAllAcked,

View File

@@ -314,11 +314,6 @@ TEST_F(TestQosEvent, add_to_wait_set) {
TEST_F(TestQosEvent, test_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1));
auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2));
@@ -360,11 +355,6 @@ TEST_F(TestQosEvent, test_on_new_event_callback)
TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
auto pub = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
auto dummy_cb = [](size_t count_events) {(void)count_events;};
@@ -439,11 +429,6 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
std::atomic_size_t matched_count = 0;
rclcpp::PublisherOptions pub_options;
@@ -451,8 +436,10 @@ TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
auto pub = node->create_publisher<test_msgs::msg::Empty>(
topic_name, 10, pub_options);
auto matched_event_callback = [&matched_count](size_t count) {
std::promise<void> prom;
auto matched_event_callback = [&matched_count, &prom](size_t count) {
matched_count += count;
prom.set_value();
};
pub->set_on_new_qos_event_callback(matched_event_callback, RCL_PUBLISHER_MATCHED);
@@ -460,34 +447,32 @@ TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());
const auto timeout = std::chrono::milliseconds(200);
const auto timeout = std::chrono::seconds(10);
{
auto sub1 = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(1));
{
auto sub2 = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}
TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}
std::atomic_size_t matched_count = 0;
rclcpp::SubscriptionOptions sub_options;
@@ -495,8 +480,10 @@ TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);
auto matched_event_callback = [&matched_count](size_t count) {
std::promise<void> prom;
auto matched_event_callback = [&matched_count, &prom](size_t count) {
matched_count += count;
prom.set_value();
};
sub->set_on_new_qos_event_callback(matched_event_callback, RCL_SUBSCRIPTION_MATCHED);
@@ -504,39 +491,44 @@ TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());
const auto timeout = std::chrono::milliseconds(200);
const auto timeout = std::chrono::seconds(10000);
{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(1));
{
auto pub2 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}
TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;
std::promise<void> prom;
rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
[&matched_expected_result, &prom](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
prom.set_value();
};
auto pub = node->create_publisher<test_msgs::msg::Empty>(
@@ -551,11 +543,12 @@ TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;
const auto timeout = std::chrono::milliseconds(200);
const auto timeout = std::chrono::seconds(10);
{
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
// destroy a connected subscription
matched_expected_result.total_count = 1;
@@ -563,20 +556,22 @@ TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
}
TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;
std::promise<void> prom;
rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
[&matched_expected_result, &prom](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
prom.set_value();
};
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);
@@ -590,10 +585,11 @@ TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;
const auto timeout = std::chrono::milliseconds(200);
const auto timeout = std::chrono::seconds(10);
{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
prom = {};
// destroy a connected publisher
matched_expected_result.total_count = 1;
@@ -601,5 +597,5 @@ TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
ex.spin_until_future_complete(prom.get_future(), timeout);
}

View File

@@ -70,7 +70,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
// before calling get_child of Logger
{
RCLCPP_INFO(
rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
@@ -83,7 +83,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
// after calling get_child of Logger
// 1. use child_logger directly
{
RCLCPP_INFO(child_logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(child_logger, "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
@@ -93,7 +93,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
// 2. use rclcpp::get_logger
{
RCLCPP_INFO(rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
RCLCPP_INFO(rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
@@ -104,7 +104,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild) {
// `child_logger` is end of life, there is no sublogger
{
RCLCPP_INFO(rclcpp::get_logger(logger_name), this->rosout_msg_data.c_str());
RCLCPP_INFO(rclcpp::get_logger(logger_name), "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
@@ -119,7 +119,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_parent_log) {
rclcpp::Logger logger = this->node->get_logger();
ASSERT_EQ(logger.get_name(), logger_name);
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
@@ -133,14 +133,14 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_child_log) {
this->rosout_msg_name = logger_name;
rclcpp::Logger logger = this->node->get_logger();
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
received_msg_promise = {};
logger = this->node->get_logger().get_child("child1");
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
future = received_msg_promise.get_future();
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
@@ -148,14 +148,14 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_child_log) {
received_msg_promise = {};
logger = this->node->get_logger().get_child("child2");
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
future = received_msg_promise.get_future();
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::TIMEOUT, return_code);
received_msg_promise = {};
this->rosout_msg_name = "ns.test_rosout_subscription.child2";
RCLCPP_INFO(logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(logger, "%s", this->rosout_msg_data.c_str());
future = received_msg_promise.get_future();
return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
@@ -171,7 +171,7 @@ TEST_F(TestRosoutSubscription, test_rosoutsubscription_getchild_hierarchy) {
rclcpp::Logger grandchild_logger =
this->node->get_logger().get_child("child").get_child("grandchild");
ASSERT_EQ(grandchild_logger.get_name(), logger_name);
RCLCPP_INFO(grandchild_logger, this->rosout_msg_data.c_str());
RCLCPP_INFO(grandchild_logger, "%s", this->rosout_msg_data.c_str());
auto future = received_msg_promise.get_future();
auto return_code = rclcpp::spin_until_future_complete(this->node, future, 3s);
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);

View File

@@ -17,6 +17,8 @@
#include <chrono>
#include <functional>
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
@@ -34,114 +36,28 @@ using namespace std::chrono_literals;
class TestSubscription : public ::testing::Test
{
public:
void OnMessage(test_msgs::msg::Empty::ConstSharedPtr msg)
void on_message(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
protected:
static void TearDownTestCase()
{
rclcpp::shutdown();
}
void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
{
node = std::make_shared<rclcpp::Node>("test_subscription", "/ns", node_options);
node_ = std::make_shared<rclcpp::Node>("test_subscription", "/ns", node_options);
}
void TearDown()
{
node.reset();
}
rclcpp::Node::SharedPtr node;
};
struct TestParameters
{
TestParameters(rclcpp::QoS qos, std::string description)
: qos(qos), description(description) {}
rclcpp::QoS qos;
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
class TestSubscriptionInvalidIntraprocessQos
: public TestSubscription,
public ::testing::WithParamInterface<TestParameters>
{};
class TestSubscriptionSub : public ::testing::Test
{
public:
void OnMessage(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
protected:
static void SetUpTestCase()
{
}
void SetUp()
{
node = std::make_shared<rclcpp::Node>("test_subscription", "/ns");
subnode = node->create_sub_node("sub_ns");
}
void TearDown()
{
node.reset();
}
rclcpp::Node::SharedPtr node;
rclcpp::Node::SharedPtr subnode;
};
class SubscriptionClassNodeInheritance : public rclcpp::Node
{
public:
SubscriptionClassNodeInheritance()
: Node("subscription_class_node_inheritance")
{
}
void OnMessage(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
void CreateSubscription()
{
auto callback = std::bind(
&SubscriptionClassNodeInheritance::OnMessage, this, std::placeholders::_1);
using test_msgs::msg::Empty;
auto sub = this->create_subscription<Empty>("topic", 10, callback);
}
};
class SubscriptionClass
{
public:
void OnMessage(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
void CreateSubscription()
{
auto node = std::make_shared<rclcpp::Node>("test_subscription_member_callback", "/ns");
auto callback = std::bind(&SubscriptionClass::OnMessage, this, std::placeholders::_1);
using test_msgs::msg::Empty;
auto sub = node->create_subscription<Empty>("topic", 10, callback);
}
rclcpp::Node::SharedPtr node_;
};
/*
@@ -155,7 +71,7 @@ TEST_F(TestSubscription, construction_and_destruction) {
};
{
constexpr size_t depth = 10u;
auto sub = node->create_subscription<Empty>("topic", depth, callback);
auto sub = node_->create_subscription<Empty>("topic", depth, callback);
EXPECT_NE(nullptr, sub->get_subscription_handle());
// Converting to base class was necessary for the compiler to choose the const version of
@@ -172,40 +88,7 @@ TEST_F(TestSubscription, construction_and_destruction) {
{
ASSERT_THROW(
{
auto sub = node->create_subscription<Empty>("invalid_topic?", 10, callback);
}, rclcpp::exceptions::InvalidTopicNameError);
}
}
/*
Testing subscription construction and destruction for subnodes.
*/
TEST_F(TestSubscriptionSub, construction_and_destruction) {
using test_msgs::msg::Empty;
auto callback = [](Empty::ConstSharedPtr msg) {
(void)msg;
};
{
auto sub = subnode->create_subscription<Empty>("topic", 1, callback);
EXPECT_STREQ(sub->get_topic_name(), "/ns/sub_ns/topic");
}
{
auto sub = subnode->create_subscription<Empty>("/topic", 1, callback);
EXPECT_STREQ(sub->get_topic_name(), "/topic");
}
{
auto sub = subnode->create_subscription<Empty>("~/topic", 1, callback);
std::string expected_topic_name =
std::string(node->get_namespace()) + "/" + node->get_name() + "/topic";
EXPECT_STREQ(sub->get_topic_name(), expected_topic_name.c_str());
}
{
ASSERT_THROW(
{
auto sub = node->create_subscription<Empty>("invalid_topic?", 1, callback);
auto sub = node_->create_subscription<Empty>("invalid_topic?", 10, callback);
}, rclcpp::exceptions::InvalidTopicNameError);
}
}
@@ -218,31 +101,31 @@ TEST_F(TestSubscription, various_creation_signatures) {
using test_msgs::msg::Empty;
auto cb = [](test_msgs::msg::Empty::ConstSharedPtr) {};
{
auto sub = node->create_subscription<Empty>("topic", 1, cb);
auto sub = node_->create_subscription<Empty>("topic", 1, cb);
(void)sub;
}
{
auto sub = node->create_subscription<Empty>("topic", rclcpp::QoS(1), cb);
auto sub = node_->create_subscription<Empty>("topic", rclcpp::QoS(1), cb);
(void)sub;
}
{
auto sub =
node->create_subscription<Empty>("topic", rclcpp::QoS(rclcpp::KeepLast(1)), cb);
node_->create_subscription<Empty>("topic", rclcpp::QoS(rclcpp::KeepLast(1)), cb);
(void)sub;
}
{
auto sub =
node->create_subscription<Empty>("topic", rclcpp::QoS(rclcpp::KeepAll()), cb);
node_->create_subscription<Empty>("topic", rclcpp::QoS(rclcpp::KeepAll()), cb);
(void)sub;
}
{
auto sub = node->create_subscription<Empty>(
auto sub = node_->create_subscription<Empty>(
"topic", 42, cb, rclcpp::SubscriptionOptions());
(void)sub;
}
{
auto sub = rclcpp::create_subscription<Empty>(
node, "topic", 42, cb, rclcpp::SubscriptionOptions());
node_, "topic", 42, cb, rclcpp::SubscriptionOptions());
(void)sub;
}
{
@@ -250,40 +133,78 @@ TEST_F(TestSubscription, various_creation_signatures) {
options.allocator = std::make_shared<std::allocator<void>>();
EXPECT_NE(nullptr, options.get_allocator());
auto sub = rclcpp::create_subscription<Empty>(
node, "topic", 42, cb, options);
node_, "topic", 42, cb, options);
(void)sub;
}
{
rclcpp::SubscriptionOptionsBase options_base;
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> options(options_base);
auto sub = rclcpp::create_subscription<Empty>(
node, "topic", 42, cb, options);
node_, "topic", 42, cb, options);
(void)sub;
}
}
class SubscriptionClass final
{
public:
void custom_create_subscription()
{
auto node = std::make_shared<rclcpp::Node>("test_subscription_member_callback", "/ns");
auto callback = std::bind(&SubscriptionClass::on_message, this, std::placeholders::_1);
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
}
private:
void on_message(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
};
class SubscriptionClassNodeInheritance final : public rclcpp::Node
{
public:
SubscriptionClassNodeInheritance()
: Node("subscription_class_node_inheritance")
{
}
void custom_create_subscription()
{
auto callback = std::bind(
&SubscriptionClassNodeInheritance::on_message, this, std::placeholders::_1);
auto sub = this->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
}
private:
void on_message(test_msgs::msg::Empty::ConstSharedPtr msg)
{
(void)msg;
}
};
/*
Testing subscriptions using std::bind.
*/
TEST_F(TestSubscription, callback_bind) {
initialize();
using test_msgs::msg::Empty;
{
// Member callback for plain class
SubscriptionClass subscription_object;
subscription_object.CreateSubscription();
subscription_object.custom_create_subscription();
}
{
// Member callback for class inheriting from rclcpp::Node
SubscriptionClassNodeInheritance subscription_object;
subscription_object.CreateSubscription();
subscription_object.custom_create_subscription();
}
{
// Member callback for class inheriting from testing::Test
// Regression test for https://github.com/ros2/rclcpp/issues/479 where the TEST_F GTest macro
// was interfering with rclcpp's `function_traits`.
auto callback = std::bind(&TestSubscription::OnMessage, this, std::placeholders::_1);
auto sub = node->create_subscription<Empty>("topic", 1, callback);
auto callback = std::bind(&TestSubscription::on_message, this, std::placeholders::_1);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 1, callback);
}
}
@@ -292,10 +213,9 @@ TEST_F(TestSubscription, callback_bind) {
*/
TEST_F(TestSubscription, take) {
initialize();
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {FAIL();};
{
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
test_msgs::msg::Empty msg;
rclcpp::MessageInfo msg_info;
EXPECT_FALSE(sub->take(msg, msg_info));
@@ -303,23 +223,23 @@ TEST_F(TestSubscription, take) {
{
rclcpp::SubscriptionOptions so;
so.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing, so);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing, so);
rclcpp::PublisherOptions po;
po.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
auto pub = node->create_publisher<test_msgs::msg::Empty>("~/test_take", 1, po);
auto pub = node_->create_publisher<test_msgs::msg::Empty>("~/test_take", 1, po);
{
test_msgs::msg::Empty msg;
pub->publish(msg);
}
test_msgs::msg::Empty msg;
rclcpp::MessageInfo msg_info;
bool message_recieved = false;
bool message_received = false;
auto start = std::chrono::steady_clock::now();
do {
message_recieved = sub->take(msg, msg_info);
message_received = sub->take(msg, msg_info);
std::this_thread::sleep_for(100ms);
} while (!message_recieved && std::chrono::steady_clock::now() - start < 10s);
EXPECT_TRUE(message_recieved);
} while (!message_received && std::chrono::steady_clock::now() - start < 10s);
EXPECT_TRUE(message_received);
}
// TODO(wjwwood): figure out a good way to test the intra-process exclusion behavior.
}
@@ -329,10 +249,9 @@ TEST_F(TestSubscription, take) {
*/
TEST_F(TestSubscription, take_serialized) {
initialize();
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const rclcpp::SerializedMessage>) {FAIL();};
{
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
EXPECT_FALSE(sub->take_serialized(*msg, msg_info));
@@ -340,23 +259,23 @@ TEST_F(TestSubscription, take_serialized) {
{
rclcpp::SubscriptionOptions so;
so.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing, so);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing, so);
rclcpp::PublisherOptions po;
po.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
auto pub = node->create_publisher<test_msgs::msg::Empty>("~/test_take", 1, po);
auto pub = node_->create_publisher<test_msgs::msg::Empty>("~/test_take", 1, po);
{
test_msgs::msg::Empty msg;
pub->publish(msg);
}
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
bool message_recieved = false;
bool message_received = false;
auto start = std::chrono::steady_clock::now();
do {
message_recieved = sub->take_serialized(*msg, msg_info);
message_received = sub->take_serialized(*msg, msg_info);
std::this_thread::sleep_for(100ms);
} while (!message_recieved && std::chrono::steady_clock::now() - start < 10s);
EXPECT_TRUE(message_recieved);
} while (!message_received && std::chrono::steady_clock::now() - start < 10s);
EXPECT_TRUE(message_received);
}
}
@@ -368,7 +287,7 @@ TEST_F(TestSubscription, rcl_subscription_init_error) {
// reset() is not needed for triggering exception, just to avoid an unused return value warning
EXPECT_THROW(
node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback).reset(),
node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback).reset(),
rclcpp::exceptions::RCLError);
}
@@ -380,7 +299,7 @@ TEST_F(TestSubscription, rcl_subscription_fini_error) {
// Cleanup just fails, no exception expected
EXPECT_NO_THROW(
node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback).reset());
node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback).reset());
}
TEST_F(TestSubscription, rcl_subscription_get_actual_qos_error) {
@@ -389,7 +308,7 @@ TEST_F(TestSubscription, rcl_subscription_get_actual_qos_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_get_actual_qos, nullptr);
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
RCLCPP_EXPECT_THROW_EQ(
sub->get_actual_qos(), std::runtime_error("failed to get qos settings: error not set"));
}
@@ -400,7 +319,7 @@ TEST_F(TestSubscription, rcl_take_type_erased_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_take, RCL_RET_ERROR);
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
test_msgs::msg::Empty msg;
rclcpp::MessageInfo message_info;
@@ -413,7 +332,7 @@ TEST_F(TestSubscription, rcl_take_serialized_message_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_take_serialized_message, RCL_RET_ERROR);
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
rclcpp::SerializedMessage msg;
rclcpp::MessageInfo message_info;
@@ -426,14 +345,14 @@ TEST_F(TestSubscription, rcl_subscription_get_publisher_count_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_get_publisher_count, RCL_RET_ERROR);
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
EXPECT_THROW(sub->get_publisher_count(), rclcpp::exceptions::RCLError);
}
TEST_F(TestSubscription, handle_loaned_message) {
initialize();
auto callback = [](std::shared_ptr<const test_msgs::msg::Empty>) {};
auto sub = node->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("topic", 10, callback);
test_msgs::msg::Empty msg;
rclcpp::MessageInfo message_info;
@@ -448,13 +367,13 @@ TEST_F(TestSubscription, on_new_message_callback) {
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {FAIL();};
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 10, do_nothing);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 10, do_nothing);
std::atomic<size_t> c1 {0};
auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;};
sub->set_on_new_message_callback(increase_c1_cb);
auto pub = node->create_publisher<test_msgs::msg::Empty>("~/test_take", 3);
auto pub = node_->create_publisher<test_msgs::msg::Empty>("~/test_take", 3);
{
test_msgs::msg::Empty msg;
pub->publish(msg);
@@ -518,13 +437,13 @@ TEST_F(TestSubscription, on_new_intra_process_message_callback) {
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {FAIL();};
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 10, do_nothing);
auto sub = node_->create_subscription<test_msgs::msg::Empty>("~/test_take", 10, do_nothing);
std::atomic<size_t> c1 {0};
auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;};
sub->set_on_new_intra_process_message_callback(increase_c1_cb);
auto pub = node->create_publisher<test_msgs::msg::Empty>("~/test_take", 1);
auto pub = node_->create_publisher<test_msgs::msg::Empty>("~/test_take", 1);
{
test_msgs::msg::Empty msg;
pub->publish(msg);
@@ -580,80 +499,13 @@ TEST_F(TestSubscription, on_new_intra_process_message_callback) {
EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument);
}
/*
Testing subscription with intraprocess enabled and invalid QoS
*/
TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws) {
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
rclcpp::QoS qos = GetParam().qos;
using test_msgs::msg::Empty;
{
auto callback = std::bind(
&TestSubscriptionInvalidIntraprocessQos::OnMessage,
this,
std::placeholders::_1);
ASSERT_THROW(
{auto subscription = node->create_subscription<Empty>(
"topic",
qos,
callback);},
std::invalid_argument);
}
}
/*
Testing subscription with invalid use_intra_process_comm
*/
TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws_intraprocess) {
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> options;
options.use_intra_process_comm = static_cast<rclcpp::IntraProcessSetting>(5);
initialize();
rclcpp::QoS qos = GetParam().qos;
auto callback = std::bind(
&TestSubscriptionInvalidIntraprocessQos::OnMessage,
this,
std::placeholders::_1);
RCLCPP_EXPECT_THROW_EQ(
{auto subscription = node->create_subscription<test_msgs::msg::Empty>(
"topic",
qos,
callback,
options);},
std::runtime_error("Unrecognized IntraProcessSetting value"));
}
static std::vector<TestParameters> invalid_qos_profiles()
{
std::vector<TestParameters> parameters;
parameters.reserve(3);
parameters.push_back(
TestParameters(
rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(),
"transient_local_qos"));
parameters.push_back(
TestParameters(
rclcpp::QoS(rclcpp::KeepAll()),
"keep_all_qos"));
return parameters;
}
INSTANTIATE_TEST_SUITE_P(
TestSubscriptionThrows, TestSubscriptionInvalidIntraprocessQos,
::testing::ValuesIn(invalid_qos_profiles()),
::testing::PrintToStringParamName());
TEST_F(TestSubscription, get_network_flow_endpoints_errors) {
initialize();
const rclcpp::QoS subscription_qos(1);
auto subscription_callback = [](test_msgs::msg::Empty::ConstSharedPtr msg) {
(void)msg;
};
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
auto subscription = node_->create_subscription<test_msgs::msg::Empty>(
"topic", subscription_qos, subscription_callback);
{
@@ -680,3 +532,143 @@ TEST_F(TestSubscription, get_network_flow_endpoints_errors) {
EXPECT_NO_THROW(subscription->get_network_flow_endpoints());
}
}
class TestSubscriptionSub : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
void SetUp()
{
node_ = std::make_shared<rclcpp::Node>("test_subscription", "/ns");
subnode_ = node_->create_sub_node("sub_ns");
}
rclcpp::Node::SharedPtr node_;
rclcpp::Node::SharedPtr subnode_;
};
/*
Testing subscription construction and destruction for subnodes.
*/
TEST_F(TestSubscriptionSub, construction_and_destruction) {
auto callback = [](test_msgs::msg::Empty::ConstSharedPtr msg) {
(void)msg;
};
{
auto sub = subnode_->create_subscription<test_msgs::msg::Empty>("topic", 1, callback);
EXPECT_STREQ(sub->get_topic_name(), "/ns/sub_ns/topic");
}
{
auto sub = subnode_->create_subscription<test_msgs::msg::Empty>("/topic", 1, callback);
EXPECT_STREQ(sub->get_topic_name(), "/topic");
}
{
auto sub = subnode_->create_subscription<test_msgs::msg::Empty>("~/topic", 1, callback);
std::string expected_topic_name =
std::string(node_->get_namespace()) + "/" + node_->get_name() + "/topic";
EXPECT_STREQ(sub->get_topic_name(), expected_topic_name.c_str());
}
{
ASSERT_THROW(
{
auto sub = node_->create_subscription<test_msgs::msg::Empty>("invalid_topic?", 1, callback);
}, rclcpp::exceptions::InvalidTopicNameError);
}
}
struct TestParameters final
{
TestParameters(rclcpp::QoS qos, std::string description)
: qos(qos), description(description) {}
rclcpp::QoS qos;
std::string description;
};
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
{
out << params.description;
return out;
}
class TestSubscriptionInvalidIntraprocessQos
: public TestSubscription,
public ::testing::WithParamInterface<TestParameters>
{};
static std::vector<TestParameters> invalid_qos_profiles()
{
std::vector<TestParameters> parameters;
parameters.reserve(3);
parameters.push_back(
TestParameters(
rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(),
"transient_local_qos"));
parameters.push_back(
TestParameters(
rclcpp::QoS(rclcpp::KeepAll()),
"keep_all_qos"));
return parameters;
}
INSTANTIATE_TEST_SUITE_P(
TestSubscriptionThrows, TestSubscriptionInvalidIntraprocessQos,
::testing::ValuesIn(invalid_qos_profiles()),
::testing::PrintToStringParamName());
/*
Testing subscription with intraprocess enabled and invalid QoS
*/
TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws) {
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));
rclcpp::QoS qos = GetParam().qos;
{
auto callback = std::bind(
&TestSubscriptionInvalidIntraprocessQos::on_message,
this,
std::placeholders::_1);
ASSERT_THROW(
{auto subscription = node_->create_subscription<test_msgs::msg::Empty>(
"topic",
qos,
callback);},
std::invalid_argument);
}
}
/*
Testing subscription with invalid use_intra_process_comm
*/
TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws_intraprocess) {
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> options;
options.use_intra_process_comm = static_cast<rclcpp::IntraProcessSetting>(5);
initialize();
rclcpp::QoS qos = GetParam().qos;
auto callback = std::bind(
&TestSubscriptionInvalidIntraprocessQos::on_message,
this,
std::placeholders::_1);
RCLCPP_EXPECT_THROW_EQ(
{auto subscription = node_->create_subscription<test_msgs::msg::Empty>(
"topic",
qos,
callback,
options);},
std::runtime_error("Unrecognized IntraProcessSetting value"));
}

View File

@@ -305,7 +305,7 @@ TEST_F(TestTimeSource, clock) {
trigger_clock_changes(node, ros_clock, false);
// Even now that we've recieved a message, ROS time should still not be active since the
// Even now that we've received a message, ROS time should still not be active since the
// parameter has not been explicitly set.
EXPECT_FALSE(ros_clock->ros_time_is_active());

View File

@@ -73,6 +73,20 @@ protected:
EXPECT_FALSE(timer->is_steady());
break;
}
timer_without_autostart = test_node->create_wall_timer(
100ms,
[this]() -> void
{
this->has_timer_run.store(true);
if (this->cancel_timer.load()) {
this->timer->cancel();
}
// prevent any tests running timer from blocking
this->executor->cancel();
}, nullptr, false);
EXPECT_TRUE(timer_without_autostart->is_steady());
executor->add_node(test_node);
// don't start spinning, let the test dictate when
}
@@ -93,6 +107,7 @@ protected:
std::atomic<bool> cancel_timer;
rclcpp::Node::SharedPtr test_node;
std::shared_ptr<rclcpp::TimerBase> timer;
std::shared_ptr<rclcpp::TimerBase> timer_without_autostart;
std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> executor;
};
@@ -334,3 +349,18 @@ INSTANTIATE_TEST_SUITE_P(
return std::string("unknown");
}
);
/// Simple test of a timer without autostart
TEST_P(TestTimer, test_timer_without_autostart)
{
EXPECT_TRUE(timer_without_autostart->is_canceled());
EXPECT_EQ(
timer_without_autostart->time_until_trigger().count(),
std::chrono::nanoseconds::max().count());
// Reset to change start timer
timer_without_autostart->reset();
EXPECT_LE(
timer_without_autostart->time_until_trigger().count(),
std::chrono::nanoseconds::max().count());
EXPECT_FALSE(timer_without_autostart->is_canceled());
}

View File

@@ -3,6 +3,24 @@ Changelog for package rclcpp_action
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
22.0.0 (2023-07-11)
-------------------
21.3.0 (2023-06-12)
-------------------
21.2.0 (2023-06-07)
-------------------
21.1.1 (2023-05-11)
-------------------
21.1.0 (2023-04-27)
-------------------
21.0.0 (2023-04-18)
-------------------
20.0.0 (2023-04-13)
-------------------
* extract the result response before the callback is issued. (`#2132 <https://github.com/ros2/rclcpp/issues/2132>`_)

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>rclcpp_action</name>
<version>20.0.0</version>
<version>22.0.0</version>
<description>Adds action APIs for C++.</description>
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>

View File

@@ -2,6 +2,24 @@
Changelog for package rclcpp_components
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
22.0.0 (2023-07-11)
-------------------
21.3.0 (2023-06-12)
-------------------
21.2.0 (2023-06-07)
-------------------
21.1.1 (2023-05-11)
-------------------
21.1.0 (2023-04-27)
-------------------
21.0.0 (2023-04-18)
-------------------
20.0.0 (2023-04-13)
-------------------
* Update all rclcpp packages to C++17. (`#2121 <https://github.com/ros2/rclcpp/issues/2121>`_)

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>rclcpp_components</name>
<version>20.0.0</version>
<version>22.0.0</version>
<description>Package containing tools for dynamically loadable components</description>
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>

View File

@@ -3,6 +3,31 @@ Changelog for package rclcpp_lifecycle
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
22.0.0 (2023-07-11)
-------------------
* Implement get_node_type_descriptions_interface for lifecyclenode and add smoke test for it (`#2237 <https://github.com/ros2/rclcpp/issues/2237>`_)
* Switch lifecycle to use the RCLCPP macros. (`#2233 <https://github.com/ros2/rclcpp/issues/2233>`_)
* Add new node interface TypeDescriptionsInterface to provide GetTypeDescription service (`#2224 <https://github.com/ros2/rclcpp/issues/2224>`_)
* Contributors: Chris Lalancette, Emerson Knapp
21.3.0 (2023-06-12)
-------------------
21.2.0 (2023-06-07)
-------------------
21.1.1 (2023-05-11)
-------------------
21.1.0 (2023-04-27)
-------------------
21.0.0 (2023-04-18)
-------------------
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
* Support publishing loaned messages in LifecyclePublisher (`#2159 <https://github.com/ros2/rclcpp/issues/2159>`_)
* Contributors: Lei Liu, Michael Babenko
20.0.0 (2023-04-13)
-------------------
* Fixes to silence some clang warnings. (`#2127 <https://github.com/ros2/rclcpp/issues/2127>`_)

View File

@@ -72,6 +72,7 @@
#include "rclcpp/node_interfaces/node_time_source_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions_interface.hpp"
#include "rclcpp/node_interfaces/node_waitables_interface.hpp"
#include "rclcpp/parameter.hpp"
#include "rclcpp/publisher.hpp"
@@ -823,6 +824,14 @@ public:
rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr
get_node_time_source_interface();
/// Return the Node's internal NodeTypeDescriptionsInterface implementation.
/**
* \sa rclcpp::Node::get_node_type_descriptions_interface
*/
RCLCPP_LIFECYCLE_PUBLIC
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr
get_node_type_descriptions_interface();
/// Return the Node's internal NodeWaitablesInterface implementation.
/**
* \sa rclcpp::Node::get_node_waitables_interface
@@ -1085,6 +1094,7 @@ private:
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters_;
rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr node_time_source_;
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr node_type_descriptions_;
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_;
const rclcpp::NodeOptions node_options_;

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>rclcpp_lifecycle</name>
<version>20.0.0</version>
<version>22.0.0</version>
<description>Package containing a prototype for lifecycle implementation</description>
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>

View File

@@ -43,6 +43,7 @@
#include "rclcpp/node_interfaces/node_time_source.hpp"
#include "rclcpp/node_interfaces/node_timers.hpp"
#include "rclcpp/node_interfaces/node_topics.hpp"
#include "rclcpp/node_interfaces/node_type_descriptions.hpp"
#include "rclcpp/node_interfaces/node_waitables.hpp"
#include "rclcpp/parameter_service.hpp"
#include "rclcpp/qos.hpp"
@@ -76,7 +77,7 @@ LifecycleNode::LifecycleNode(
options.use_intra_process_comms(),
options.enable_topic_statistics())),
node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),
node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_)),
node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),
node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())),
node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),
@@ -113,9 +114,15 @@ LifecycleNode::LifecycleNode(
options.clock_qos(),
options.use_clock_thread()
)),
node_type_descriptions_(new rclcpp::node_interfaces::NodeTypeDescriptions(
node_base_,
node_logging_,
node_parameters_,
node_services_
)),
node_waitables_(new rclcpp::node_interfaces::NodeWaitables(node_base_.get())),
node_options_(options),
impl_(new LifecycleNodeInterfaceImpl(node_base_, node_services_))
impl_(new LifecycleNodeInterfaceImpl(node_base_, node_services_, node_logging_))
{
impl_->init(enable_communication_interface);
@@ -468,6 +475,12 @@ LifecycleNode::get_node_topics_interface()
return node_topics_;
}
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr
LifecycleNode::get_node_type_descriptions_interface()
{
return node_type_descriptions_;
}
rclcpp::node_interfaces::NodeServicesInterface::SharedPtr
LifecycleNode::get_node_services_interface()
{

View File

@@ -29,6 +29,7 @@
#include "lifecycle_msgs/srv/get_available_transitions.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
#include "rclcpp_lifecycle/node_interfaces/lifecycle_node_interface.hpp"
@@ -50,9 +51,11 @@ namespace rclcpp_lifecycle
LifecycleNode::LifecycleNodeInterfaceImpl::LifecycleNodeInterfaceImpl(
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base_interface,
std::shared_ptr<rclcpp::node_interfaces::NodeServicesInterface> node_services_interface)
std::shared_ptr<rclcpp::node_interfaces::NodeServicesInterface> node_services_interface,
std::shared_ptr<rclcpp::node_interfaces::NodeLoggingInterface> node_logging_interface)
: node_base_interface_(node_base_interface),
node_services_interface_(node_services_interface)
node_services_interface_(node_services_interface),
node_logging_interface_(node_logging_interface)
{
}
@@ -65,8 +68,8 @@ LifecycleNode::LifecycleNodeInterfaceImpl::~LifecycleNodeInterfaceImpl()
ret = rcl_lifecycle_state_machine_fini(&state_machine_, node_handle);
}
if (ret != RCL_RET_OK) {
RCUTILS_LOG_FATAL_NAMED(
"rclcpp_lifecycle",
RCLCPP_FATAL(
node_logging_interface_->get_logger(),
"failed to destroy rcl_state_machine");
}
}
@@ -398,7 +401,8 @@ LifecycleNode::LifecycleNodeInterfaceImpl::change_state(
{
std::lock_guard<std::recursive_mutex> lock(state_machine_mutex_);
if (rcl_lifecycle_state_machine_is_initialized(&state_machine_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR(
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Unable to change state for state machine for %s: %s",
node_base_interface_->get_name(), rcl_get_error_string().str);
return RCL_RET_ERROR;
@@ -411,7 +415,8 @@ LifecycleNode::LifecycleNodeInterfaceImpl::change_state(
rcl_lifecycle_trigger_transition_by_id(
&state_machine_, transition_id, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR(
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Unable to start transition %u from current state %s: %s",
transition_id, state_machine_.current_state->label, rcl_get_error_string().str);
rcutils_reset_error();
@@ -443,7 +448,8 @@ LifecycleNode::LifecycleNodeInterfaceImpl::change_state(
rcl_lifecycle_trigger_transition_by_label(
&state_machine_, transition_label, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR(
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Failed to finish transition %u. Current state is now: %s (%s)",
transition_id, state_machine_.current_state->label, rcl_get_error_string().str);
rcutils_reset_error();
@@ -458,7 +464,9 @@ LifecycleNode::LifecycleNodeInterfaceImpl::change_state(
// error handling ?!
// TODO(karsten1987): iterate over possible ret value
if (cb_return_code == node_interfaces::LifecycleNodeInterface::CallbackReturn::ERROR) {
RCUTILS_LOG_WARN("Error occurred while doing error handling.");
RCLCPP_WARN(
node_logging_interface_->get_logger(),
"Error occurred while doing error handling.");
auto error_cb_code = execute_callback(current_state_id, initial_state);
auto error_cb_label = get_label_for_return_code(error_cb_code);
@@ -467,7 +475,9 @@ LifecycleNode::LifecycleNodeInterfaceImpl::change_state(
rcl_lifecycle_trigger_transition_by_label(
&state_machine_, error_cb_label, publish_update) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR("Failed to call cleanup on error state: %s", rcl_get_error_string().str);
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Failed to call cleanup on error state: %s", rcl_get_error_string().str);
rcutils_reset_error();
return RCL_RET_ERROR;
}
@@ -495,8 +505,12 @@ LifecycleNode::LifecycleNodeInterfaceImpl::execute_callback(
try {
cb_success = callback(State(previous_state));
} catch (const std::exception & e) {
RCUTILS_LOG_ERROR("Caught exception in callback for transition %d", it->first);
RCUTILS_LOG_ERROR("Original error: %s", e.what());
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Caught exception in callback for transition %d", it->first);
RCLCPP_ERROR(
node_logging_interface_->get_logger(),
"Original error: %s", e.what());
cb_success = node_interfaces::LifecycleNodeInterface::CallbackReturn::ERROR;
}
}

View File

@@ -32,6 +32,7 @@
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_logging_interface.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
#include "rclcpp_lifecycle/node_interfaces/lifecycle_node_interface.hpp"
@@ -52,7 +53,8 @@ class LifecycleNode::LifecycleNodeInterfaceImpl final
public:
LifecycleNodeInterfaceImpl(
std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface> node_base_interface,
std::shared_ptr<rclcpp::node_interfaces::NodeServicesInterface> node_services_interface);
std::shared_ptr<rclcpp::node_interfaces::NodeServicesInterface> node_services_interface,
std::shared_ptr<rclcpp::node_interfaces::NodeLoggingInterface> node_logging_interface);
~LifecycleNodeInterfaceImpl();
@@ -152,6 +154,7 @@ private:
using NodeBasePtr = std::shared_ptr<rclcpp::node_interfaces::NodeBaseInterface>;
using NodeServicesPtr = std::shared_ptr<rclcpp::node_interfaces::NodeServicesInterface>;
using NodeLoggingPtr = std::shared_ptr<rclcpp::node_interfaces::NodeLoggingInterface>;
using ChangeStateSrvPtr = std::shared_ptr<rclcpp::Service<ChangeStateSrv>>;
using GetStateSrvPtr = std::shared_ptr<rclcpp::Service<GetStateSrv>>;
using GetAvailableStatesSrvPtr =
@@ -163,6 +166,7 @@ private:
NodeBasePtr node_base_interface_;
NodeServicesPtr node_services_interface_;
NodeLoggingPtr node_logging_interface_;
ChangeStateSrvPtr srv_change_state_;
GetStateSrvPtr srv_get_state_;
GetAvailableStatesSrvPtr srv_get_available_states_;

View File

@@ -427,11 +427,15 @@ TEST_F(TestDefaultStateMachine, lifecycle_subscriber) {
// Parameters are tested more thoroughly in rclcpp's test_node.cpp
// These are provided for coverage of lifecycle node's API
TEST_F(TestDefaultStateMachine, declare_parameters) {
// "start_type_description_service" and "use_sim_time"
const uint64_t builtin_param_count = 2;
const uint64_t expected_param_count = 6 + builtin_param_count;
auto test_node = std::make_shared<EmptyLifecycleNode>("testnode");
auto list_result = test_node->list_parameters({}, 0u);
EXPECT_EQ(list_result.names.size(), 1u);
EXPECT_STREQ(list_result.names[0].c_str(), "use_sim_time");
EXPECT_EQ(list_result.names.size(), builtin_param_count);
EXPECT_STREQ(list_result.names[0].c_str(), "start_type_description_service");
EXPECT_STREQ(list_result.names[1].c_str(), "use_sim_time");
const std::string bool_name = "test_boolean";
const std::string int_name = "test_int";
@@ -469,10 +473,11 @@ TEST_F(TestDefaultStateMachine, declare_parameters) {
test_node->declare_parameters("test_double", double_parameters);
list_result = test_node->list_parameters({}, 0u);
EXPECT_EQ(list_result.names.size(), 7u);
EXPECT_EQ(list_result.names.size(), expected_param_count);
// The order of these names is not controlled by lifecycle_node, doing set equality
std::set<std::string> expected_names = {
"start_type_description_service",
"test_boolean",
"test_double.double_one",
"test_double.double_two",
@@ -487,11 +492,13 @@ TEST_F(TestDefaultStateMachine, declare_parameters) {
}
TEST_F(TestDefaultStateMachine, check_parameters) {
const uint64_t builtin_param_count = 2;
auto test_node = std::make_shared<EmptyLifecycleNode>("testnode");
auto list_result = test_node->list_parameters({}, 0u);
EXPECT_EQ(list_result.names.size(), 1u);
EXPECT_STREQ(list_result.names[0].c_str(), "use_sim_time");
EXPECT_EQ(list_result.names.size(), builtin_param_count);
EXPECT_STREQ(list_result.names[0].c_str(), "start_type_description_service");
EXPECT_STREQ(list_result.names[1].c_str(), "use_sim_time");
const std::string bool_name = "test_boolean";
const std::string int_name = "test_int";
@@ -549,8 +556,7 @@ TEST_F(TestDefaultStateMachine, check_parameters) {
std::map<std::string, rclcpp::ParameterValue> parameter_map;
EXPECT_TRUE(test_node->get_parameters({}, parameter_map));
// int param, bool param, and use_sim_time
EXPECT_EQ(parameter_map.size(), 3u);
EXPECT_EQ(parameter_map.size(), parameter_names.size() + builtin_param_count);
// Check parameter types
auto parameter_types = test_node->get_parameter_types(parameter_names);
@@ -585,10 +591,12 @@ TEST_F(TestDefaultStateMachine, check_parameters) {
// List parameters
list_result = test_node->list_parameters({}, 0u);
EXPECT_EQ(list_result.names.size(), 3u);
EXPECT_STREQ(list_result.names[0].c_str(), parameter_names[0].c_str());
EXPECT_STREQ(list_result.names[1].c_str(), parameter_names[1].c_str());
EXPECT_STREQ(list_result.names[2].c_str(), "use_sim_time");
EXPECT_EQ(list_result.names.size(), parameter_names.size() + builtin_param_count);
size_t index = 0;
EXPECT_STREQ(list_result.names[index++].c_str(), "start_type_description_service");
EXPECT_STREQ(list_result.names[index++].c_str(), parameter_names[0].c_str());
EXPECT_STREQ(list_result.names[index++].c_str(), parameter_names[1].c_str());
EXPECT_STREQ(list_result.names[index++].c_str(), "use_sim_time");
// Undeclare parameter
test_node->undeclare_parameter(bool_name);
@@ -633,6 +641,7 @@ TEST_F(TestDefaultStateMachine, test_getters) {
EXPECT_LT(0u, test_node->now().nanoseconds());
EXPECT_STREQ("testnode", test_node->get_logger().get_name());
EXPECT_NE(nullptr, const_cast<const EmptyLifecycleNode *>(test_node.get())->get_clock());
EXPECT_NE(nullptr, test_node->get_node_type_descriptions_interface());
}
TEST_F(TestDefaultStateMachine, test_graph_topics) {