Compare commits
1 Commits
runtime_in
...
peterpena/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60ea49cadb |
@@ -94,6 +94,20 @@ public:
|
||||
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns size of callback group which includes
|
||||
* subscriptions, timers, clients, services, and
|
||||
* waitable
|
||||
*
|
||||
* \return size of callback group
|
||||
*/
|
||||
size_t
|
||||
size() const
|
||||
{
|
||||
return subscription_ptrs_.size() + timer_ptrs_.size() +
|
||||
client_ptrs_.size() + service_ptrs_.size() + waitable_ptrs_.size();
|
||||
}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
std::atomic_bool &
|
||||
can_be_taken_from();
|
||||
|
||||
@@ -263,6 +263,15 @@ public:
|
||||
void
|
||||
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
|
||||
|
||||
/**
|
||||
* Returns wake_after_execute_ flag
|
||||
*
|
||||
* \return wake_after_execute_ flag
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
get_wake_after_executor_flag() {return wake_after_execute_.load();}
|
||||
|
||||
protected:
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
@@ -280,6 +289,17 @@ protected:
|
||||
void
|
||||
execute_any_executable(AnyExecutable & any_exec);
|
||||
|
||||
/**
|
||||
* After executing an executable, this function determines
|
||||
* if it should wake the wait in rcl_wait so that the executor
|
||||
* can process any pending executable
|
||||
*
|
||||
* \return wake_after_execute_ flag
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
virtual bool
|
||||
set_wake_after_execute_flag() {return wake_after_execute_.load();}
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
static void
|
||||
execute_subscription(
|
||||
@@ -325,6 +345,9 @@ protected:
|
||||
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
|
||||
std::atomic_bool spinning;
|
||||
|
||||
/// boolean to control whether guard condition is triggered after executing
|
||||
std::atomic_bool wake_after_execute_;
|
||||
|
||||
/// Guard condition for signaling the rmw layer to wake up for special events.
|
||||
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
|
||||
|
||||
|
||||
@@ -78,6 +78,17 @@ protected:
|
||||
void
|
||||
run(size_t this_thread_number);
|
||||
|
||||
/**
|
||||
* After executing an executable, this function determines
|
||||
* if it should wake the wait in rcl_wait so that the executor
|
||||
* can process any pending executable
|
||||
*
|
||||
* \return wake_after_execute_ flag
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
set_wake_after_execute_flag();
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ using rclcpp::FutureReturnCode;
|
||||
|
||||
Executor::Executor(const rclcpp::ExecutorOptions & options)
|
||||
: spinning(false),
|
||||
wake_after_execute_(false),
|
||||
memory_strategy_(options.memory_strategy)
|
||||
{
|
||||
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
|
||||
@@ -304,9 +305,13 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
|
||||
}
|
||||
// Reset the callback_group, regardless of type
|
||||
any_exec.callback_group->can_be_taken_from().store(true);
|
||||
|
||||
// Check whether triggering a guard condition is necessary
|
||||
// (will depend on the type of executor and callback groups)
|
||||
// Wake the wait, because it may need to be recalculated or work that
|
||||
// was previously blocked is now available.
|
||||
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
|
||||
if (set_wake_after_execute_flag() &&
|
||||
rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
|
||||
throw std::runtime_error(rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +70,43 @@ MultiThreadedExecutor::get_number_of_threads()
|
||||
return number_of_threads_;
|
||||
}
|
||||
|
||||
// Iterate through all the callback groups of
|
||||
// all nodes in the executor and verify
|
||||
// there is a non-empty callback group
|
||||
// that is "Mutually Exclusive". If there is,
|
||||
// then set trigger guard condition to true
|
||||
// so that it avoids a thread getting stuck
|
||||
// in rcl_wait.
|
||||
bool
|
||||
MultiThreadedExecutor::set_wake_after_execute_flag()
|
||||
{
|
||||
// Initialize as false in the case
|
||||
// it is true and the executor has
|
||||
// zero nodes, nodes with empty
|
||||
// callback groups, or nodes with
|
||||
// all reentrant callback groups
|
||||
wake_after_execute_.store(false);
|
||||
for (auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
if (!node) {
|
||||
continue;
|
||||
}
|
||||
for (auto & weak_group : node->get_callback_groups()) {
|
||||
auto callback_group = weak_group.lock();
|
||||
|
||||
// Skip over callback groups that are empty
|
||||
if(!callback_group || callback_group->size() == 0) {
|
||||
continue;
|
||||
}
|
||||
if (callback_group->type() == rclcpp::callback_group::CallbackGroupType::MutuallyExclusive) {
|
||||
wake_after_execute_.store(true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return wake_after_execute_.load();
|
||||
}
|
||||
|
||||
void
|
||||
MultiThreadedExecutor::run(size_t)
|
||||
{
|
||||
|
||||
@@ -391,6 +391,12 @@ if(TARGET test_multi_threaded_executor)
|
||||
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_wake_after_execute_flag rclcpp/test_wake_after_execute_flag.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_wake_after_execute_flag)
|
||||
target_link_libraries(test_wake_after_execute_flag ${PROJECT_NAME})
|
||||
endif()
|
||||
|
||||
ament_add_gtest(test_guard_condition rclcpp/test_guard_condition.cpp
|
||||
APPEND_LIBRARY_DIRS "${append_library_dirs}")
|
||||
if(TARGET test_guard_condition)
|
||||
|
||||
105
rclcpp/test/rclcpp/test_wake_after_execute_flag.cpp
Normal file
105
rclcpp/test/rclcpp/test_wake_after_execute_flag.cpp
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/node.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "rclcpp/executors.hpp"
|
||||
#include "rclcpp/executor.hpp"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class TestWakeAfterExecuteFlag : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
constexpr int EXECUTION_COUNT = 5;
|
||||
|
||||
/*
|
||||
Test guard condition trigger is set when a node is added and resetted when it is removed
|
||||
*/
|
||||
TEST_F(TestWakeAfterExecuteFlag, set_wake_after_execute_flag_multi_threaded) {
|
||||
|
||||
rclcpp::executors::MultiThreadedExecutor executor;
|
||||
|
||||
std::shared_ptr<rclcpp::Node> node =
|
||||
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_multi_threaded");
|
||||
|
||||
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
|
||||
|
||||
std::atomic_int timer_count {0};
|
||||
|
||||
auto timer_callback = [&executor, &timer_count]() {
|
||||
printf("Timer executed!");
|
||||
|
||||
if(timer_count > 0){
|
||||
ASSERT_EQ(executor.get_wake_after_executor_flag(), true);
|
||||
}
|
||||
|
||||
timer_count++;
|
||||
|
||||
if(timer_count > EXECUTION_COUNT){
|
||||
executor.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
auto timer_ = node->create_wall_timer(
|
||||
2s, timer_callback, cbg);
|
||||
|
||||
executor.add_node(node);
|
||||
executor.spin();
|
||||
}
|
||||
|
||||
TEST_F(TestWakeAfterExecuteFlag, set_wake_after_execute_flag_single_threaded) {
|
||||
|
||||
rclcpp::executors::SingleThreadedExecutor executor;
|
||||
|
||||
std::shared_ptr<rclcpp::Node> node =
|
||||
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_single_threaded");
|
||||
|
||||
std::atomic_int timer_count {0};
|
||||
|
||||
auto timer_callback = [&executor, &timer_count]() {
|
||||
printf("Timer executed!");
|
||||
|
||||
if(timer_count > 0){
|
||||
ASSERT_EQ(executor.get_wake_after_executor_flag(), false);
|
||||
}
|
||||
|
||||
timer_count++;
|
||||
|
||||
if(timer_count > EXECUTION_COUNT){
|
||||
executor.cancel();
|
||||
}
|
||||
};
|
||||
|
||||
auto timer_ = node->create_wall_timer(
|
||||
2s, timer_callback);
|
||||
|
||||
executor.add_node(node);
|
||||
executor.spin();
|
||||
}
|
||||
Reference in New Issue
Block a user