Compare commits
8 Commits
release-al
...
release-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69f7bca85d | ||
|
|
4a04fe8b4a | ||
|
|
6fbf3f8c5f | ||
|
|
9f84273467 | ||
|
|
9d754a70a2 | ||
|
|
4b0ad21b3d | ||
|
|
6ec5e8e974 | ||
|
|
be0be759ec |
@@ -65,6 +65,7 @@ endmacro()
|
||||
|
||||
call_for_each_rmw_implementation(target GENERATE_DEFAULT)
|
||||
|
||||
ament_export_dependencies(ament_cmake)
|
||||
ament_export_dependencies(rcl_interfaces)
|
||||
ament_export_dependencies(rmw)
|
||||
ament_export_dependencies(rmw_implementation)
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
#
|
||||
# Get all information abut rclcpp for a specific RMW implementation.
|
||||
# Get all information about rclcpp for a specific RMW implementation.
|
||||
#
|
||||
# It sets the common variables _DEFINITIONS, _INCLUDE_DIRS and _LIBRARIES
|
||||
# with the given prefix.
|
||||
@@ -29,7 +29,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
|
||||
set(${var_prefix}_FOUND TRUE)
|
||||
|
||||
# include directories
|
||||
set(${var_prefix}_INCLUDE_DIRS
|
||||
normalize_path(${var_prefix}_INCLUDE_DIRS
|
||||
"${rclcpp_DIR}/../../../include")
|
||||
|
||||
# libraries
|
||||
@@ -49,7 +49,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
|
||||
)
|
||||
if(NOT _lib)
|
||||
# warn about not existing library and ignore it
|
||||
message(FATAL_ERROR "Package 'rclcpp' doesn't contain the library '${_library_target}'")
|
||||
message(WARNING "Package 'rclcpp' doesn't contain the library '${_library_target}'")
|
||||
elseif(NOT IS_ABSOLUTE "${_lib}")
|
||||
# the found library must be an absolute path
|
||||
message(FATAL_ERROR "Package 'rclcpp' found the library '${_library_target}' at '${_lib}' which is not an absolute path")
|
||||
|
||||
@@ -66,7 +66,7 @@ public:
|
||||
get_service_ptrs() const;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
|
||||
const std::vector<rclcpp::client::ClientBase::WeakPtr> &
|
||||
get_client_ptrs() const;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
@@ -100,7 +100,7 @@ private:
|
||||
std::vector<rclcpp::subscription::SubscriptionBase::WeakPtr> subscription_ptrs_;
|
||||
std::vector<rclcpp::timer::TimerBase::WeakPtr> timer_ptrs_;
|
||||
std::vector<rclcpp::service::ServiceBase::SharedPtr> service_ptrs_;
|
||||
std::vector<rclcpp::client::ClientBase::SharedPtr> client_ptrs_;
|
||||
std::vector<rclcpp::client::ClientBase::WeakPtr> client_ptrs_;
|
||||
std::atomic_bool can_be_taken_from_;
|
||||
};
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
virtual std::shared_ptr<void> create_response() = 0;
|
||||
virtual std::shared_ptr<void> create_request_header() = 0;
|
||||
virtual void handle_response(
|
||||
std::shared_ptr<void> & request_header, std::shared_ptr<void> & response) = 0;
|
||||
std::shared_ptr<void> request_header, std::shared_ptr<void> response) = 0;
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(ClientBase);
|
||||
@@ -111,13 +111,17 @@ public:
|
||||
return std::shared_ptr<void>(new rmw_request_id_t);
|
||||
}
|
||||
|
||||
void handle_response(std::shared_ptr<void> & request_header, std::shared_ptr<void> & response)
|
||||
void handle_response(std::shared_ptr<void> request_header, std::shared_ptr<void> response)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
|
||||
auto typed_request_header = std::static_pointer_cast<rmw_request_id_t>(request_header);
|
||||
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
|
||||
int64_t sequence_number = typed_request_header->sequence_number;
|
||||
// TODO(esteve) this must check if the sequence_number is valid otherwise the
|
||||
// call_promise will be null
|
||||
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
|
||||
if (this->pending_requests_.count(sequence_number) == 0) {
|
||||
fprintf(stderr, "Received invalid sequence number. Ignoring...\n");
|
||||
return;
|
||||
}
|
||||
auto tuple = this->pending_requests_[sequence_number];
|
||||
auto call_promise = std::get<0>(tuple);
|
||||
auto callback = std::get<1>(tuple);
|
||||
@@ -143,6 +147,7 @@ public:
|
||||
>
|
||||
SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
|
||||
int64_t sequence_number;
|
||||
if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
|
||||
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
|
||||
@@ -187,6 +192,7 @@ private:
|
||||
RCLCPP_DISABLE_COPY(Client);
|
||||
|
||||
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
|
||||
std::mutex pending_requests_mutex_;
|
||||
};
|
||||
|
||||
} // namespace client
|
||||
|
||||
@@ -44,6 +44,24 @@ namespace executor
|
||||
*/
|
||||
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
|
||||
|
||||
///
|
||||
/**
|
||||
* Options to be passed to the executor constructor.
|
||||
*/
|
||||
struct ExecutorArgs
|
||||
{
|
||||
memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
|
||||
size_t max_conditions = 0;
|
||||
};
|
||||
|
||||
static inline ExecutorArgs create_default_executor_arguments()
|
||||
{
|
||||
ExecutorArgs args;
|
||||
args.memory_strategy = memory_strategies::create_default_strategy();
|
||||
args.max_conditions = 0;
|
||||
return args;
|
||||
}
|
||||
|
||||
/// Coordinate the order and timing of available communication tasks.
|
||||
/**
|
||||
* Executor provides spin functions (including spin_node_once and spin_some).
|
||||
@@ -62,8 +80,7 @@ public:
|
||||
/// Default constructor.
|
||||
// \param[in] ms The memory strategy to be used with this executor.
|
||||
RCLCPP_PUBLIC
|
||||
explicit Executor(
|
||||
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
|
||||
explicit Executor(const ExecutorArgs & args = create_default_executor_arguments());
|
||||
|
||||
/// Default destructor.
|
||||
RCLCPP_PUBLIC
|
||||
@@ -262,9 +279,14 @@ protected:
|
||||
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
|
||||
std::atomic_bool spinning;
|
||||
|
||||
rmw_guard_conditions_t fixed_guard_conditions_;
|
||||
|
||||
/// Guard condition for signaling the rmw layer to wake up for special events.
|
||||
rmw_guard_condition_t * interrupt_guard_condition_;
|
||||
|
||||
/// Waitset for managing entities that the rmw layer waits on.
|
||||
rmw_waitset_t * waitset_;
|
||||
|
||||
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
|
||||
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ public:
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
MultiThreadedExecutor(
|
||||
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
|
||||
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~MultiThreadedExecutor();
|
||||
|
||||
@@ -47,7 +47,7 @@ public:
|
||||
/// Default constructor. See the default constructor for Executor.
|
||||
RCLCPP_PUBLIC
|
||||
SingleThreadedExecutor(
|
||||
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
|
||||
const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
|
||||
|
||||
/// Default destrcutor.
|
||||
RCLCPP_PUBLIC
|
||||
|
||||
@@ -252,6 +252,8 @@ public:
|
||||
const CallbackGroupWeakPtrList &
|
||||
get_callback_groups() const;
|
||||
|
||||
std::atomic_bool has_executor;
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(Node);
|
||||
|
||||
|
||||
@@ -151,7 +151,8 @@ public:
|
||||
services_.push_back(service);
|
||||
}
|
||||
}
|
||||
for (auto & client : group->get_client_ptrs()) {
|
||||
for (auto & weak_client : group->get_client_ptrs()) {
|
||||
auto client = weak_client.lock();
|
||||
if (client) {
|
||||
clients_.push_back(client);
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
<depend>rmw_implementation</depend>
|
||||
|
||||
<exec_depend>ament_cmake</exec_depend>
|
||||
|
||||
<test_depend>ament_cmake_gtest</test_depend>
|
||||
<test_depend>ament_lint_auto</test_depend>
|
||||
<test_depend>ament_lint_common</test_depend>
|
||||
|
||||
@@ -41,7 +41,7 @@ CallbackGroup::get_service_ptrs() const
|
||||
return service_ptrs_;
|
||||
}
|
||||
|
||||
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
|
||||
const std::vector<rclcpp::client::ClientBase::WeakPtr> &
|
||||
CallbackGroup::get_client_ptrs() const
|
||||
{
|
||||
return client_ptrs_;
|
||||
|
||||
@@ -19,15 +19,57 @@
|
||||
|
||||
using rclcpp::executor::AnyExecutable;
|
||||
using rclcpp::executor::Executor;
|
||||
using rclcpp::executor::ExecutorArgs;
|
||||
|
||||
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
|
||||
: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()),
|
||||
memory_strategy_(ms)
|
||||
Executor::Executor(const ExecutorArgs & args)
|
||||
: spinning(false),
|
||||
memory_strategy_(args.memory_strategy)
|
||||
{
|
||||
interrupt_guard_condition_ = rmw_create_guard_condition();
|
||||
if (!interrupt_guard_condition_) {
|
||||
throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
|
||||
}
|
||||
|
||||
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
|
||||
// and one for the executor's guard cond (interrupt_guard_condition_)
|
||||
// These guard conditions are permanently attached to the waitset.
|
||||
const size_t number_of_guard_conds = 2;
|
||||
fixed_guard_conditions_.guard_condition_count = number_of_guard_conds;
|
||||
fixed_guard_conditions_.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
|
||||
|
||||
// Put the global ctrl-c guard condition in
|
||||
assert(fixed_guard_conditions_.guard_condition_count > 1);
|
||||
fixed_guard_conditions_.guard_conditions[0] = \
|
||||
rclcpp::utilities::get_global_sigint_guard_condition()->data;
|
||||
// Put the executor's guard condition in
|
||||
fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data;
|
||||
|
||||
// The waitset adds the fixed guard conditions to the middleware waitset on initialization,
|
||||
// and removes the guard conditions in rmw_destroy_waitset.
|
||||
waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions);
|
||||
|
||||
if (!waitset_) {
|
||||
fprintf(stderr,
|
||||
"[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe());
|
||||
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
|
||||
if (status != RMW_RET_OK) {
|
||||
fprintf(stderr,
|
||||
"[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
|
||||
}
|
||||
throw std::runtime_error("Failed to create waitset in Executor constructor");
|
||||
}
|
||||
}
|
||||
|
||||
Executor::~Executor()
|
||||
{
|
||||
// Try to deallocate the waitset.
|
||||
if (waitset_) {
|
||||
rmw_ret_t status = rmw_destroy_waitset(waitset_);
|
||||
if (status != RMW_RET_OK) {
|
||||
fprintf(stderr,
|
||||
"[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe());
|
||||
}
|
||||
}
|
||||
// Try to deallocate the interrupt guard condition.
|
||||
if (interrupt_guard_condition_ != nullptr) {
|
||||
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
|
||||
@@ -41,6 +83,10 @@ Executor::~Executor()
|
||||
void
|
||||
Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
|
||||
{
|
||||
// If the node already has an executor
|
||||
if (node_ptr->has_executor.exchange(true)) {
|
||||
throw std::runtime_error("Node has already been added to an executor.");
|
||||
}
|
||||
// Check to ensure node not already added
|
||||
for (auto & weak_node : weak_nodes_) {
|
||||
auto node = weak_node.lock();
|
||||
@@ -76,6 +122,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
|
||||
// *INDENT-ON*
|
||||
)
|
||||
);
|
||||
node_ptr->has_executor.store(false);
|
||||
if (notify) {
|
||||
// If the node was matched and removed, interrupt waiting
|
||||
if (node_removed) {
|
||||
@@ -318,25 +365,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
|
||||
client_handles.client_count =
|
||||
memory_strategy_->fill_client_handles(client_handles.clients);
|
||||
|
||||
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
|
||||
// and one for the executor's guard cond (interrupt_guard_condition_)
|
||||
size_t number_of_guard_conds = 2;
|
||||
rmw_guard_conditions_t guard_condition_handles;
|
||||
guard_condition_handles.guard_condition_count = number_of_guard_conds;
|
||||
guard_condition_handles.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
|
||||
if (guard_condition_handles.guard_conditions == NULL &&
|
||||
number_of_guard_conds > 0)
|
||||
{
|
||||
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
|
||||
throw std::runtime_error("Could not malloc for guard condition pointers.");
|
||||
}
|
||||
// Put the global ctrl-c guard condition in
|
||||
assert(guard_condition_handles.guard_condition_count > 1);
|
||||
guard_condition_handles.guard_conditions[0] = \
|
||||
rclcpp::utilities::get_global_sigint_guard_condition()->data;
|
||||
// Put the executor's guard condition in
|
||||
guard_condition_handles.guard_conditions[1] = \
|
||||
interrupt_guard_condition_->data;
|
||||
// Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset
|
||||
|
||||
rmw_time_t * wait_timeout = NULL;
|
||||
rmw_time_t rmw_timeout;
|
||||
@@ -364,19 +393,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
|
||||
// Now wait on the waitable subscriptions and timers
|
||||
rmw_ret_t status = rmw_wait(
|
||||
&subscriber_handles,
|
||||
&guard_condition_handles,
|
||||
nullptr,
|
||||
&service_handles,
|
||||
&client_handles,
|
||||
waitset_,
|
||||
wait_timeout);
|
||||
if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
|
||||
throw std::runtime_error(rmw_get_error_string_safe());
|
||||
}
|
||||
// If ctrl-c guard condition, return directly
|
||||
if (guard_condition_handles.guard_conditions[0] != 0) {
|
||||
// Make sure to free or clean memory
|
||||
memory_strategy_->clear_handles();
|
||||
return;
|
||||
}
|
||||
|
||||
memory_strategy_->remove_null_handles();
|
||||
}
|
||||
|
||||
@@ -27,4 +27,5 @@ rclcpp::spin(node::Node::SharedPtr node_ptr)
|
||||
rclcpp::executors::SingleThreadedExecutor exec;
|
||||
exec.add_node(node_ptr);
|
||||
exec.spin();
|
||||
exec.remove_node(node_ptr);
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
|
||||
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
|
||||
|
||||
MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
|
||||
: executor::Executor(ms)
|
||||
MultiThreadedExecutor::MultiThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
|
||||
: executor::Executor(args)
|
||||
{
|
||||
number_of_threads_ = std::thread::hardware_concurrency();
|
||||
if (number_of_threads_ == 0) {
|
||||
|
||||
@@ -17,10 +17,8 @@
|
||||
|
||||
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
|
||||
|
||||
SingleThreadedExecutor::SingleThreadedExecutor(
|
||||
rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
|
||||
: executor::Executor(ms) {}
|
||||
|
||||
SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
|
||||
: executor::Executor(args) {}
|
||||
|
||||
SingleThreadedExecutor::~SingleThreadedExecutor() {}
|
||||
|
||||
|
||||
@@ -84,8 +84,9 @@ MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector
|
||||
if (!group) {
|
||||
continue;
|
||||
}
|
||||
for (auto & client : group->get_client_ptrs()) {
|
||||
if (client->get_client_handle()->data == client_handle) {
|
||||
for (auto & weak_client : group->get_client_ptrs()) {
|
||||
auto client = weak_client.lock();
|
||||
if (client && client->get_client_handle()->data == client_handle) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
@@ -182,8 +183,9 @@ MemoryStrategy::get_group_by_client(
|
||||
if (!group) {
|
||||
continue;
|
||||
}
|
||||
for (auto & cli : group->get_client_ptrs()) {
|
||||
if (cli == client) {
|
||||
for (auto & weak_client : group->get_client_ptrs()) {
|
||||
auto cli = weak_client.lock();
|
||||
if (cli && cli == client) {
|
||||
return group;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ Node::Node(
|
||||
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
|
||||
use_intra_process_comms_(use_intra_process_comms)
|
||||
{
|
||||
has_executor.store(false);
|
||||
size_t domain_id = 0;
|
||||
char * ros_domain_id = nullptr;
|
||||
const char * env_var = "ROS_DOMAIN_ID";
|
||||
|
||||
@@ -39,9 +39,9 @@ TEST(TestRate, rate_basics) {
|
||||
rclcpp::utilities::sleep_for(offset);
|
||||
ASSERT_TRUE(r.sleep());
|
||||
auto two = std::chrono::system_clock::now();
|
||||
delta = two - one;
|
||||
ASSERT_TRUE(period < delta + epsilon);
|
||||
ASSERT_TRUE(period * overrun_ratio > delta);
|
||||
delta = two - start;
|
||||
ASSERT_TRUE(2 * period < delta);
|
||||
ASSERT_TRUE(2 * period * overrun_ratio > delta);
|
||||
|
||||
rclcpp::utilities::sleep_for(offset);
|
||||
auto two_offset = std::chrono::system_clock::now();
|
||||
@@ -66,35 +66,35 @@ TEST(TestRate, wall_rate_basics) {
|
||||
auto epsilon = std::chrono::milliseconds(1);
|
||||
double overrun_ratio = 1.5;
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
rclcpp::rate::WallRate r(period);
|
||||
ASSERT_TRUE(r.is_steady());
|
||||
ASSERT_TRUE(r.sleep());
|
||||
auto one = std::chrono::system_clock::now();
|
||||
auto one = std::chrono::steady_clock::now();
|
||||
auto delta = one - start;
|
||||
ASSERT_TRUE(period < delta);
|
||||
ASSERT_TRUE(period * overrun_ratio > delta);
|
||||
|
||||
rclcpp::utilities::sleep_for(offset);
|
||||
ASSERT_TRUE(r.sleep());
|
||||
auto two = std::chrono::system_clock::now();
|
||||
delta = two - one;
|
||||
ASSERT_TRUE(period < delta + epsilon);
|
||||
ASSERT_TRUE(period * overrun_ratio > delta);
|
||||
auto two = std::chrono::steady_clock::now();
|
||||
delta = two - start;
|
||||
ASSERT_TRUE(2 * period < delta + epsilon);
|
||||
ASSERT_TRUE(2 * period * overrun_ratio > delta);
|
||||
|
||||
rclcpp::utilities::sleep_for(offset);
|
||||
auto two_offset = std::chrono::system_clock::now();
|
||||
auto two_offset = std::chrono::steady_clock::now();
|
||||
r.reset();
|
||||
ASSERT_TRUE(r.sleep());
|
||||
auto three = std::chrono::system_clock::now();
|
||||
auto three = std::chrono::steady_clock::now();
|
||||
delta = three - two_offset;
|
||||
ASSERT_TRUE(period < delta);
|
||||
ASSERT_TRUE(period * overrun_ratio > delta);
|
||||
|
||||
rclcpp::utilities::sleep_for(offset + period);
|
||||
auto four = std::chrono::system_clock::now();
|
||||
auto four = std::chrono::steady_clock::now();
|
||||
ASSERT_FALSE(r.sleep());
|
||||
auto five = std::chrono::system_clock::now();
|
||||
auto five = std::chrono::steady_clock::now();
|
||||
delta = five - four;
|
||||
ASSERT_TRUE(epsilon > delta);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user