Compare commits
7 Commits
native_buf
...
21.0.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
052c075052 | ||
|
|
193c252036 | ||
|
|
efbb9b6c89 | ||
|
|
3506dd1227 | ||
|
|
7a837496bd | ||
|
|
cd784f6612 | ||
|
|
7aa390d5b1 |
@@ -2,6 +2,17 @@
|
||||
Changelog for package rclcpp
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
21.0.2 (2023-07-14)
|
||||
-------------------
|
||||
* Fix warnings related to comparison of integer expressions of different signedness (`#2222 <https://github.com/ros2/rclcpp/issues/2222>`_)
|
||||
* Fix race condition in events-executor (`#2191 <https://github.com/ros2/rclcpp/issues/2191>`_)
|
||||
* Contributors: Alberto Soragna, Tomoya Fujita
|
||||
|
||||
21.0.1 (2023-05-11)
|
||||
-------------------
|
||||
* Fix delivered message kind (`#2175 <https://github.com/ros2/rclcpp/issues/2175>`_) (`#2178 <https://github.com/ros2/rclcpp/issues/2178>`_)
|
||||
* Contributors: mergify[bot]
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
|
||||
|
||||
@@ -243,6 +243,11 @@ private:
|
||||
std::function<void(size_t, int)>
|
||||
create_waitable_callback(const rclcpp::Waitable * waitable_id);
|
||||
|
||||
/// Utility to add the notify waitable to an entities collection
|
||||
void
|
||||
add_notify_waitable_to_collection(
|
||||
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);
|
||||
|
||||
/// Searches for the provided entity_id in the collection and returns the entity if valid
|
||||
template<typename CollectionType>
|
||||
typename CollectionType::EntitySharedPtr
|
||||
|
||||
@@ -260,13 +260,13 @@ public:
|
||||
bool
|
||||
is_serialized() const;
|
||||
|
||||
/// Return the type of the subscription.
|
||||
/// Return the delivered message kind.
|
||||
/**
|
||||
* \return `DeliveredMessageKind`, which adjusts how messages are received and delivered.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
DeliveredMessageKind
|
||||
get_subscription_type() const;
|
||||
get_delivered_message_kind() const;
|
||||
|
||||
/// Get matching publisher count.
|
||||
/** \return The number of publishers on this topic. */
|
||||
@@ -663,7 +663,7 @@ private:
|
||||
RCLCPP_DISABLE_COPY(SubscriptionBase)
|
||||
|
||||
rosidl_message_type_support_t type_support_;
|
||||
DeliveredMessageKind delivered_message_type_;
|
||||
DeliveredMessageKind delivered_message_kind_;
|
||||
|
||||
std::atomic<bool> subscription_in_use_by_wait_set_{false};
|
||||
std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp</name>
|
||||
<version>21.0.0</version>
|
||||
<version>21.0.2</version>
|
||||
<description>The ROS client library in C++.</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -603,7 +603,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
|
||||
rclcpp::MessageInfo message_info;
|
||||
message_info.get_rmw_message_info().from_intra_process = false;
|
||||
|
||||
switch (subscription->get_subscription_type()) {
|
||||
switch (subscription->get_delivered_message_kind()) {
|
||||
// Deliver ROS message
|
||||
case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
|
||||
{
|
||||
|
||||
@@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
|
||||
timers_manager_ =
|
||||
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
|
||||
|
||||
this->current_entities_collection_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
|
||||
|
||||
notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
|
||||
[this]() {
|
||||
// This callback is invoked when:
|
||||
@@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
|
||||
this->refresh_current_collection_from_callback_groups();
|
||||
});
|
||||
|
||||
// Make sure that the notify waitable is immediately added to the collection
|
||||
// to avoid missing events
|
||||
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
|
||||
|
||||
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
|
||||
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
|
||||
|
||||
@@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(
|
||||
|
||||
this->entities_collector_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
|
||||
|
||||
this->current_entities_collection_ =
|
||||
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
|
||||
}
|
||||
|
||||
EventsExecutor::~EventsExecutor()
|
||||
@@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
|
||||
// retrieved in the "standard" way.
|
||||
// To do it, we need to add the notify waitable as an entry in both the new and
|
||||
// current collections such that it's neither added or removed.
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
|
||||
new_collection.waitables.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
|
||||
this->current_entities_collection_->waitables.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
this->add_notify_waitable_to_collection(new_collection.waitables);
|
||||
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
|
||||
|
||||
this->refresh_current_collection(new_collection);
|
||||
}
|
||||
@@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
|
||||
};
|
||||
return callback;
|
||||
}
|
||||
|
||||
void
|
||||
EventsExecutor::add_notify_waitable_to_collection(
|
||||
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
|
||||
{
|
||||
// The notify waitable is not associated to any group, so use an invalid one
|
||||
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
|
||||
collection.insert(
|
||||
{
|
||||
this->notify_waitable_.get(),
|
||||
{this->notify_waitable_, weak_group_ptr}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ SubscriptionBase::SubscriptionBase(
|
||||
intra_process_subscription_id_(0),
|
||||
event_callbacks_(event_callbacks),
|
||||
type_support_(type_support_handle),
|
||||
delivered_message_type_(delivered_message_kind)
|
||||
delivered_message_kind_(delivered_message_kind)
|
||||
{
|
||||
auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
|
||||
{
|
||||
@@ -261,13 +261,13 @@ SubscriptionBase::get_message_type_support_handle() const
|
||||
bool
|
||||
SubscriptionBase::is_serialized() const
|
||||
{
|
||||
return delivered_message_type_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
|
||||
return delivered_message_kind_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
|
||||
}
|
||||
|
||||
rclcpp::DeliveredMessageKind
|
||||
SubscriptionBase::get_subscription_type() const
|
||||
SubscriptionBase::get_delivered_message_kind() const
|
||||
{
|
||||
return delivered_message_type_;
|
||||
return delivered_message_kind_;
|
||||
}
|
||||
|
||||
size_t
|
||||
|
||||
@@ -20,12 +20,14 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/time.h"
|
||||
@@ -43,18 +45,10 @@ template<typename T>
|
||||
class TestExecutors : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
void SetUp()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
|
||||
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
|
||||
std::stringstream test_name;
|
||||
test_name << test_info->test_case_name() << "_" << test_info->name();
|
||||
@@ -75,6 +69,8 @@ public:
|
||||
publisher.reset();
|
||||
subscription.reset();
|
||||
node.reset();
|
||||
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
rclcpp::Node::SharedPtr node;
|
||||
@@ -729,6 +725,77 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
|
||||
spinner.join();
|
||||
}
|
||||
|
||||
// This test verifies that the add_node operation is robust wrt race conditions.
|
||||
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
|
||||
// thread-safe in all executor implementations.
|
||||
// The initial implementation of the events-executor contained a bug where the executor
|
||||
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
|
||||
// Manually adding a node to the executor results in a) producing a notify waitable event
|
||||
// and b) refreshing the executor collections.
|
||||
// The inconsistent state would happen if the event was processed before the collections were
|
||||
// finished to be refreshed: the executor would pick up the event but be unable to process it.
|
||||
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
|
||||
// notify waitable events to be pushed.
|
||||
// The behavior is observable only under heavy load, so this test spawns several worker
|
||||
// threads. Due to the nature of the bug, this test may still succeed even if the
|
||||
// bug is present. However repeated runs will show its flakiness nature and indicate
|
||||
// an eventual regression.
|
||||
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
|
||||
{
|
||||
using ExecutorType = TypeParam;
|
||||
// rmw_connextdds doesn't support events-executor
|
||||
if (
|
||||
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
|
||||
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
|
||||
{
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
// Spawn some threads to do some heavy work
|
||||
std::atomic<bool> should_cancel = false;
|
||||
std::vector<std::thread> stress_threads;
|
||||
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
|
||||
stress_threads.emplace_back(
|
||||
[&should_cancel, i]() {
|
||||
// This is just some arbitrary heavy work
|
||||
volatile size_t total = 0;
|
||||
for (size_t k = 0; k < 549528914167; k++) {
|
||||
if (should_cancel) {
|
||||
break;
|
||||
}
|
||||
total += k * (i + 42);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Create an executor
|
||||
auto executor = std::make_shared<ExecutorType>();
|
||||
// Start spinning
|
||||
auto executor_thread = std::thread(
|
||||
[executor]() {
|
||||
executor->spin();
|
||||
});
|
||||
// Add a node to the executor
|
||||
executor->add_node(this->node);
|
||||
|
||||
// Cancel the executor (make sure that it's already spinning first)
|
||||
while (!executor->is_spinning() && rclcpp::ok()) {
|
||||
continue;
|
||||
}
|
||||
executor->cancel();
|
||||
|
||||
// Try to join the thread after cancelling the executor
|
||||
// This is the "test". We want to make sure that we can still cancel the executor
|
||||
// regardless of the presence of race conditions
|
||||
executor_thread.join();
|
||||
|
||||
// The test is now completed: we can join the stress threads
|
||||
should_cancel = true;
|
||||
for (auto & t : stress_threads) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
// Check spin_until_future_complete with node base pointer (instantiates its own executor)
|
||||
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
|
||||
{
|
||||
@@ -791,7 +858,7 @@ public:
|
||||
test_name << test_info->test_case_name() << "_" << test_info->name();
|
||||
node = std::make_shared<rclcpp::Node>("node", test_name.str());
|
||||
|
||||
callback_count = 0;
|
||||
callback_count = 0u;
|
||||
|
||||
const std::string topic_name = std::string("topic_") + test_name.str();
|
||||
|
||||
@@ -800,7 +867,7 @@ public:
|
||||
publisher = node->create_publisher<test_msgs::msg::Empty>(topic_name, rclcpp::QoS(1), po);
|
||||
|
||||
auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {
|
||||
this->callback_count.fetch_add(1);
|
||||
this->callback_count.fetch_add(1u);
|
||||
};
|
||||
|
||||
rclcpp::SubscriptionOptions so;
|
||||
@@ -822,7 +889,7 @@ public:
|
||||
rclcpp::Node::SharedPtr node;
|
||||
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher;
|
||||
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription;
|
||||
std::atomic_int callback_count;
|
||||
std::atomic_size_t callback_count;
|
||||
};
|
||||
|
||||
TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames);
|
||||
@@ -838,7 +905,7 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
|
||||
ExecutorType executor;
|
||||
executor.add_node(this->node);
|
||||
|
||||
EXPECT_EQ(0, this->callback_count.load());
|
||||
EXPECT_EQ(0u, this->callback_count.load());
|
||||
this->publisher->publish(test_msgs::msg::Empty());
|
||||
|
||||
// Wait for up to 5 seconds for the first message to come available.
|
||||
@@ -852,7 +919,7 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
|
||||
EXPECT_EQ(1u, this->callback_count.load());
|
||||
|
||||
// reset counter
|
||||
this->callback_count.store(0);
|
||||
this->callback_count.store(0u);
|
||||
|
||||
for (size_t ii = 0; ii < kNumMessages; ++ii) {
|
||||
this->publisher->publish(test_msgs::msg::Empty());
|
||||
|
||||
@@ -3,6 +3,12 @@ Changelog for package rclcpp_action
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
||||
21.0.2 (2023-07-14)
|
||||
-------------------
|
||||
|
||||
21.0.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_action</name>
|
||||
<version>21.0.0</version>
|
||||
<version>21.0.2</version>
|
||||
<description>Adds action APIs for C++.</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -2,6 +2,12 @@
|
||||
Changelog for package rclcpp_components
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
21.0.2 (2023-07-14)
|
||||
-------------------
|
||||
|
||||
21.0.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_components</name>
|
||||
<version>21.0.0</version>
|
||||
<version>21.0.2</version>
|
||||
<description>Package containing tools for dynamically loadable components</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
@@ -3,6 +3,12 @@ Changelog for package rclcpp_lifecycle
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
||||
21.0.2 (2023-07-14)
|
||||
-------------------
|
||||
|
||||
21.0.1 (2023-05-11)
|
||||
-------------------
|
||||
|
||||
21.0.0 (2023-04-18)
|
||||
-------------------
|
||||
* Add support for logging service. (`#2122 <https://github.com/ros2/rclcpp/issues/2122>`_)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="2">
|
||||
<name>rclcpp_lifecycle</name>
|
||||
<version>21.0.0</version>
|
||||
<version>21.0.2</version>
|
||||
<description>Package containing a prototype for lifecycle implementation</description>
|
||||
|
||||
<maintainer email="ivanpauno@ekumenlabs.com">Ivan Paunovic</maintainer>
|
||||
|
||||
Reference in New Issue
Block a user