Compare commits

...

1 Commits

Author SHA1 Message Date
Pedro Pena
60ea49cadb wake after execute feature 2020-06-15 17:36:18 -04:00
7 changed files with 202 additions and 1 deletions

View File

@@ -94,6 +94,20 @@ public:
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}
/**
* Returns size of callback group which includes
* subscriptions, timers, clients, services, and
* waitable
*
* \return size of callback group
*/
size_t
size() const
{
return subscription_ptrs_.size() + timer_ptrs_.size() +
client_ptrs_.size() + service_ptrs_.size() + waitable_ptrs_.size();
}
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();

View File

@@ -263,6 +263,15 @@ public:
void
set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
/**
* Returns wake_after_execute_ flag
*
* \return wake_after_execute_ flag
*/
RCLCPP_PUBLIC
bool
get_wake_after_executor_flag() {return wake_after_execute_.load();}
protected:
RCLCPP_PUBLIC
void
@@ -280,6 +289,17 @@ protected:
void
execute_any_executable(AnyExecutable & any_exec);
/**
* After executing an executable, this function determines
* if it should wake the wait in rcl_wait so that the executor
* can process any pending executable
*
* \return wake_after_execute_ flag
*/
RCLCPP_PUBLIC
virtual bool
set_wake_after_execute_flag() {return wake_after_execute_.load();}
RCLCPP_PUBLIC
static void
execute_subscription(
@@ -325,6 +345,9 @@ protected:
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
/// boolean to control whether guard condition is triggered after executing
std::atomic_bool wake_after_execute_;
/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();

View File

@@ -78,6 +78,17 @@ protected:
void
run(size_t this_thread_number);
/**
* After executing an executable, this function determines
* if it should wake the wait in rcl_wait so that the executor
* can process any pending executable
*
* \return wake_after_execute_ flag
*/
RCLCPP_PUBLIC
bool
set_wake_after_execute_flag();
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

View File

@@ -36,6 +36,7 @@ using rclcpp::FutureReturnCode;
Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
wake_after_execute_(false),
memory_strategy_(options.memory_strategy)
{
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
@@ -304,9 +305,13 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
// Check whether triggering a guard condition is necessary
// (will depend on the type of executor and callback groups)
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
if (set_wake_after_execute_flag() &&
rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string().str);
}
}

View File

@@ -70,6 +70,43 @@ MultiThreadedExecutor::get_number_of_threads()
return number_of_threads_;
}
// Iterate through all the callback groups of
// all nodes in the executor and verify
// there is a non-empty callback group
// that is "Mutually Exclusive". If there is,
// then set trigger guard condition to true
// so that it avoids a thread getting stuck
// in rcl_wait.
bool
MultiThreadedExecutor::set_wake_after_execute_flag()
{
// Initialize as false in the case
// it is true and the executor has
// zero nodes, nodes with empty
// callback groups, or nodes with
// all reentrant callback groups
wake_after_execute_.store(false);
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto callback_group = weak_group.lock();
// Skip over callback groups that are empty
if(!callback_group || callback_group->size() == 0) {
continue;
}
if (callback_group->type() == rclcpp::callback_group::CallbackGroupType::MutuallyExclusive) {
wake_after_execute_.store(true);
break;
}
}
}
return wake_after_execute_.load();
}
void
MultiThreadedExecutor::run(size_t)
{

View File

@@ -391,6 +391,12 @@ if(TARGET test_multi_threaded_executor)
target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME})
endif()
ament_add_gtest(test_wake_after_execute_flag rclcpp/test_wake_after_execute_flag.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_wake_after_execute_flag)
target_link_libraries(test_wake_after_execute_flag ${PROJECT_NAME})
endif()
ament_add_gtest(test_guard_condition rclcpp/test_guard_condition.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_guard_condition)

View File

@@ -0,0 +1,105 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <chrono>
#include <string>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/executors.hpp"
#include "rclcpp/executor.hpp"
using namespace std::chrono_literals;
class TestWakeAfterExecuteFlag : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
};
constexpr int EXECUTION_COUNT = 5;
/*
Test guard condition trigger is set when a node is added and resetted when it is removed
*/
TEST_F(TestWakeAfterExecuteFlag, set_wake_after_execute_flag_multi_threaded) {
rclcpp::executors::MultiThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_multi_threaded");
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
std::atomic_int timer_count {0};
auto timer_callback = [&executor, &timer_count]() {
printf("Timer executed!");
if(timer_count > 0){
ASSERT_EQ(executor.get_wake_after_executor_flag(), true);
}
timer_count++;
if(timer_count > EXECUTION_COUNT){
executor.cancel();
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback, cbg);
executor.add_node(node);
executor.spin();
}
TEST_F(TestWakeAfterExecuteFlag, set_wake_after_execute_flag_single_threaded) {
rclcpp::executors::SingleThreadedExecutor executor;
std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_wake_after_execute_flag_single_threaded");
std::atomic_int timer_count {0};
auto timer_callback = [&executor, &timer_count]() {
printf("Timer executed!");
if(timer_count > 0){
ASSERT_EQ(executor.get_wake_after_executor_flag(), false);
}
timer_count++;
if(timer_count > EXECUTION_COUNT){
executor.cancel();
}
};
auto timer_ = node->create_wall_timer(
2s, timer_callback);
executor.add_node(node);
executor.spin();
}