Compare commits
12 Commits
mjcarroll/
...
clalancett
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
295849dce3 | ||
|
|
d3c0db1d59 | ||
|
|
2730af14ad | ||
|
|
da5786933a | ||
|
|
30f8ebd940 | ||
|
|
df1ba66fdd | ||
|
|
149a8238fc | ||
|
|
6d68132567 | ||
|
|
f55d29dada | ||
|
|
2803e17931 | ||
|
|
222820eb80 | ||
|
|
1b790ddf4a |
@@ -476,7 +476,22 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch when input is a ros message and the output could be anything.
|
||||
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
|
||||
convert_custom_type_message_to_ros_message_unique_ptr(const SubscribedType & msg)
|
||||
{
|
||||
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
|
||||
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
|
||||
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr);
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ptr);
|
||||
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
|
||||
} else {
|
||||
throw std::runtime_error(
|
||||
"convert_custom_type_message_to_ros_message_unique_ptr "
|
||||
"unexpectedly called without TypeAdapter");
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch when input is a ROS message and the output could be anything.
|
||||
void
|
||||
dispatch(
|
||||
std::shared_ptr<ROSMessageType> message,
|
||||
@@ -529,7 +544,7 @@ public:
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
|
||||
}
|
||||
// conditions for output is ros message
|
||||
// conditions for output is ROS message
|
||||
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
|
||||
callback(*message);
|
||||
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
|
||||
@@ -656,7 +671,12 @@ public:
|
||||
TRACEPOINT(callback_end, static_cast<const void *>(this));
|
||||
}
|
||||
|
||||
void
|
||||
template<typename T>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<T>::value &&
|
||||
std::is_same<T, ROSMessageType>::value
|
||||
>
|
||||
dispatch_intra_process(
|
||||
std::shared_ptr<const ROSMessageType> message,
|
||||
const rclcpp::MessageInfo & message_info)
|
||||
@@ -672,83 +692,83 @@ public:
|
||||
// Dispatch.
|
||||
std::visit(
|
||||
[&message, &message_info, this](auto && callback) {
|
||||
using T = std::decay_t<decltype(callback)>;
|
||||
using CallbackT = std::decay_t<decltype(callback)>;
|
||||
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
|
||||
|
||||
// conditions for custom type
|
||||
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
|
||||
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
|
||||
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
|
||||
callback(*local_message);
|
||||
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
|
||||
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
|
||||
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
|
||||
callback(*local_message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, UniquePtrCallback>||
|
||||
std::is_same_v<T, SharedPtrCallback>
|
||||
std::is_same_v<CallbackT, UniquePtrCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<T, SharedPtrWithInfoCallback>
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, SharedConstPtrCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
|
||||
std::is_same_v<CallbackT, SharedConstPtrCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
|
||||
}
|
||||
// conditions for ros message type
|
||||
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
|
||||
// conditions for ROS message type
|
||||
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
|
||||
callback(*message);
|
||||
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
|
||||
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
|
||||
callback(*message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
|
||||
{
|
||||
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
{
|
||||
callback(message);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(message, message_info);
|
||||
}
|
||||
// condition to catch SerializedMessage types
|
||||
else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"Cannot dispatch std::shared_ptr<const ROSMessageType> message "
|
||||
@@ -756,13 +776,18 @@ public:
|
||||
}
|
||||
// condition to catch unhandled callback types
|
||||
else { // NOLINT[readability/braces]
|
||||
static_assert(always_false_v<T>, "unhandled callback type");
|
||||
static_assert(always_false_v<CallbackT>, "unhandled callback type");
|
||||
}
|
||||
}, callback_variant_);
|
||||
TRACEPOINT(callback_end, static_cast<const void *>(this));
|
||||
}
|
||||
|
||||
void
|
||||
template<typename T>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<T>::value &&
|
||||
std::is_same<T, ROSMessageType>::value
|
||||
>
|
||||
dispatch_intra_process(
|
||||
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message,
|
||||
const rclcpp::MessageInfo & message_info)
|
||||
@@ -778,83 +803,83 @@ public:
|
||||
// Dispatch.
|
||||
std::visit(
|
||||
[&message, &message_info, this](auto && callback) {
|
||||
using T = std::decay_t<decltype(callback)>;
|
||||
using CallbackT = std::decay_t<decltype(callback)>;
|
||||
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
|
||||
|
||||
// conditions for custom type
|
||||
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
|
||||
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
|
||||
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
|
||||
callback(*local_message);
|
||||
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
|
||||
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
|
||||
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
|
||||
callback(*local_message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, UniquePtrCallback>||
|
||||
std::is_same_v<T, SharedPtrCallback>
|
||||
std::is_same_v<CallbackT, UniquePtrCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<T, SharedPtrWithInfoCallback>
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, SharedConstPtrCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
|
||||
std::is_same_v<CallbackT, SharedConstPtrCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
|
||||
}
|
||||
// conditions for ros message type
|
||||
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
|
||||
// conditions for ROS message type
|
||||
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
|
||||
callback(*message);
|
||||
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
|
||||
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
|
||||
callback(*message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
|
||||
{
|
||||
callback(std::move(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(std::move(message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
{
|
||||
callback(std::move(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(std::move(message), message_info);
|
||||
}
|
||||
// condition to catch SerializedMessage types
|
||||
else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<T, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<T, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<T, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"Cannot dispatch std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message "
|
||||
@@ -862,7 +887,227 @@ public:
|
||||
}
|
||||
// condition to catch unhandled callback types
|
||||
else { // NOLINT[readability/braces]
|
||||
static_assert(always_false_v<T>, "unhandled callback type");
|
||||
static_assert(always_false_v<CallbackT>, "unhandled callback type");
|
||||
}
|
||||
}, callback_variant_);
|
||||
TRACEPOINT(callback_end, static_cast<const void *>(this));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
typename std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, SubscribedType>::value
|
||||
>
|
||||
dispatch_intra_process(
|
||||
std::shared_ptr<const T> message,
|
||||
const rclcpp::MessageInfo & message_info)
|
||||
{
|
||||
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
|
||||
// Check if the variant is "unset", throw if it is.
|
||||
if (callback_variant_.index() == 0) {
|
||||
if (std::get<0>(callback_variant_) == nullptr) {
|
||||
// This can happen if it is default initialized, or if it is assigned nullptr.
|
||||
throw std::runtime_error("dispatch called on an unset AnySubscriptionCallback");
|
||||
}
|
||||
}
|
||||
// Dispatch.
|
||||
std::visit(
|
||||
[&message, &message_info, this](auto && callback) {
|
||||
using CallbackT = std::decay_t<decltype(callback)>;
|
||||
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
|
||||
|
||||
// conditions for custom type
|
||||
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
|
||||
callback(*message);
|
||||
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
|
||||
callback(*message);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, UniquePtrCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, SharedConstPtrCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(std::unique_ptr<SubscribedType, SubscribedTypeDeleter>(message.get(), subscribed_type_deleter_));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(std::unique_ptr<SubscribedType, SubscribedTypeDeleter>(message.get(), subscribed_type_deleter_), message_info);
|
||||
}
|
||||
// conditions for ROS message type
|
||||
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
|
||||
auto local_message = convert_custom_type_message_to_ros_message_unique_ptr(*message);
|
||||
callback(*local_message);
|
||||
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
|
||||
auto local_message = convert_custom_type_message_to_ros_message_unique_ptr(*message);
|
||||
callback(*local_message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_message_to_ros_message_unique_ptr(*message), message_info);
|
||||
}
|
||||
// condition to catch SerializedMessage types
|
||||
else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"Cannot dispatch std::shared_ptr<const ROSMessageType> message "
|
||||
"to rclcpp::SerializedMessage");
|
||||
}
|
||||
// condition to catch unhandled callback types
|
||||
else { // NOLINT[readability/braces]
|
||||
static_assert(always_false_v<CallbackT>, "unhandled callback type");
|
||||
}
|
||||
}, callback_variant_);
|
||||
TRACEPOINT(callback_end, static_cast<const void *>(this));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
typename std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, SubscribedType>::value
|
||||
>
|
||||
dispatch_intra_process(
|
||||
std::unique_ptr<T, SubscribedTypeDeleter> message,
|
||||
const rclcpp::MessageInfo & message_info)
|
||||
{
|
||||
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
|
||||
// Check if the variant is "unset", throw if it is.
|
||||
if (callback_variant_.index() == 0) {
|
||||
if (std::get<0>(callback_variant_) == nullptr) {
|
||||
// This can happen if it is default initialized, or if it is assigned nullptr.
|
||||
throw std::runtime_error("dispatch called on an unset AnySubscriptionCallback");
|
||||
}
|
||||
}
|
||||
// Dispatch.
|
||||
std::visit(
|
||||
[&message, &message_info, this](auto && callback) {
|
||||
using CallbackT = std::decay_t<decltype(callback)>;
|
||||
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;
|
||||
|
||||
// conditions for custom type
|
||||
if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefCallback>) {
|
||||
callback(*message);
|
||||
} else if constexpr (is_ta && std::is_same_v<CallbackT, ConstRefWithInfoCallback>) { // NOLINT
|
||||
callback(*message);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, UniquePtrCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(std::move(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(std::move(message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, SharedConstPtrCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrCallback>
|
||||
))
|
||||
{
|
||||
callback(std::move(message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
is_ta && (
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoCallback>
|
||||
))
|
||||
{
|
||||
callback(std::move(message), message_info);
|
||||
}
|
||||
// conditions for ROS message type
|
||||
else if constexpr (std::is_same_v<CallbackT, ConstRefROSMessageCallback>) { // NOLINT
|
||||
auto local_message = convert_custom_type_to_ros_message_unique_ptr(*message);
|
||||
callback(*local_message);
|
||||
} else if constexpr (std::is_same_v<CallbackT, ConstRefWithInfoROSMessageCallback>) {
|
||||
auto local_message = convert_custom_type_to_ros_message_unique_ptr(*message);
|
||||
callback(*local_message, message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, UniquePtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, UniquePtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, SharedConstPtrROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
|
||||
} else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, SharedConstPtrWithInfoROSMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
|
||||
{
|
||||
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
|
||||
}
|
||||
// condition to catch SerializedMessage types
|
||||
else if constexpr ( // NOLINT[readability/braces]
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, UniquePtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, ConstRefSharedConstPtrSerializedMessageWithInfoCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageCallback>||
|
||||
std::is_same_v<CallbackT, SharedPtrSerializedMessageWithInfoCallback>)
|
||||
{
|
||||
throw std::runtime_error(
|
||||
"Cannot dispatch std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message "
|
||||
"to rclcpp::SerializedMessage");
|
||||
}
|
||||
// condition to catch unhandled callback types
|
||||
else { // NOLINT[readability/braces]
|
||||
static_assert(always_false_v<CallbackT>, "unhandled callback type");
|
||||
}
|
||||
}, callback_variant_);
|
||||
TRACEPOINT(callback_end, static_cast<const void *>(this));
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#define RCLCPP__EXPERIMENTAL__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
|
||||
@@ -15,10 +15,6 @@
|
||||
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <stdexcept>
|
||||
#include <utility>
|
||||
|
||||
@@ -16,11 +16,9 @@
|
||||
#define RCLCPP__EXPERIMENTAL__CREATE_INTRA_PROCESS_BUFFER_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <stdexcept>
|
||||
#include <utility>
|
||||
|
||||
#include "rcl/subscription.h"
|
||||
|
||||
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp"
|
||||
#include "rclcpp/intra_process_buffer_type.hpp"
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
#ifndef RCLCPP__EXPERIMENTAL__EXECUTABLE_LIST_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__EXECUTABLE_LIST_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/client.hpp"
|
||||
|
||||
@@ -19,17 +19,16 @@
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <exception>
|
||||
#include <map>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rosidl_runtime_cpp/traits.hpp"
|
||||
|
||||
#include "rclcpp/allocator/allocator_deleter.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
|
||||
@@ -38,6 +37,7 @@
|
||||
#include "rclcpp/logging.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/publisher_base.hpp"
|
||||
#include "rclcpp/type_adapter.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
@@ -175,17 +175,32 @@ public:
|
||||
* \param message the message that is being stored.
|
||||
*/
|
||||
template<
|
||||
typename T,
|
||||
typename DeleterT,
|
||||
typename MessageT,
|
||||
typename Alloc = std::allocator<void>,
|
||||
typename Deleter = std::default_delete<MessageT>>
|
||||
void
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
|
||||
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
|
||||
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<T>::value &&
|
||||
std::is_same<T, ROSMessageType>::value
|
||||
>
|
||||
do_intra_process_publish(
|
||||
uint64_t intra_process_publisher_id,
|
||||
std::unique_ptr<MessageT, Deleter> message,
|
||||
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
|
||||
std::unique_ptr<T, DeleterT> message,
|
||||
ROSMessageTypeAllocator & ros_message_type_allocator,
|
||||
ROSMessageTypeDeleter & ros_message_type_deleter,
|
||||
PublishedTypeAllocator & published_type_allocator)
|
||||
{
|
||||
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
|
||||
(void)ros_message_type_deleter;
|
||||
(void)published_type_allocator;
|
||||
|
||||
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
@@ -201,53 +216,152 @@ public:
|
||||
|
||||
if (sub_ids.take_ownership_subscriptions.empty()) {
|
||||
// None of the buffers require ownership, so we promote the pointer
|
||||
std::shared_ptr<MessageT> msg = std::move(message);
|
||||
std::shared_ptr<ROSMessageType> msg = std::move(message);
|
||||
|
||||
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
msg, sub_ids.take_shared_subscriptions);
|
||||
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
|
||||
sub_ids.take_shared_subscriptions.size() <= 1)
|
||||
{
|
||||
// There is at maximum 1 buffer that does not require ownership.
|
||||
// So this case is equivalent to all the buffers requiring ownership
|
||||
} else {
|
||||
if (sub_ids.take_shared_subscriptions.size() <= 1) {
|
||||
// There is at maximum 1 buffer that does not require ownership.
|
||||
// So this case is equivalent to all the buffers requiring ownership
|
||||
|
||||
// Merge the two vector of ids into a unique one
|
||||
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
|
||||
concatenated_vector.insert(
|
||||
concatenated_vector.end(),
|
||||
sub_ids.take_ownership_subscriptions.begin(),
|
||||
sub_ids.take_ownership_subscriptions.end());
|
||||
// Merge the two vector of ids into a unique one
|
||||
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
|
||||
concatenated_vector.insert(
|
||||
concatenated_vector.end(),
|
||||
sub_ids.take_ownership_subscriptions.begin(),
|
||||
sub_ids.take_ownership_subscriptions.end());
|
||||
|
||||
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
std::move(message),
|
||||
concatenated_vector,
|
||||
allocator);
|
||||
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
|
||||
sub_ids.take_shared_subscriptions.size() > 1)
|
||||
{
|
||||
// Construct a new shared pointer from the message
|
||||
// for the buffers that do not require ownership
|
||||
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
|
||||
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
std::move(message),
|
||||
concatenated_vector,
|
||||
ros_message_type_allocator);
|
||||
} else {
|
||||
// Construct a new shared pointer from the message
|
||||
// for the buffers that do not require ownership
|
||||
auto shared_msg = std::allocate_shared<ROSMessageType, ROSMessageTypeAllocator>(
|
||||
ros_message_type_allocator, *message);
|
||||
|
||||
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
shared_msg, sub_ids.take_shared_subscriptions);
|
||||
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
|
||||
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
shared_msg, sub_ids.take_shared_subscriptions);
|
||||
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
std::move(message), sub_ids.take_ownership_subscriptions, ros_message_type_allocator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename T,
|
||||
typename DeleterT,
|
||||
typename MessageT,
|
||||
typename Alloc = std::allocator<void>,
|
||||
typename Deleter = std::default_delete<MessageT>>
|
||||
std::shared_ptr<const MessageT>
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
|
||||
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
|
||||
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, PublishedType>::value
|
||||
>
|
||||
do_intra_process_publish(
|
||||
uint64_t intra_process_publisher_id,
|
||||
std::unique_ptr<T, DeleterT> message,
|
||||
ROSMessageTypeAllocator & ros_message_type_allocator,
|
||||
ROSMessageTypeDeleter & ros_message_type_deleter,
|
||||
PublishedTypeAllocator & published_type_allocator)
|
||||
{
|
||||
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
|
||||
if (publisher_it == pub_to_subs_.end()) {
|
||||
// Publisher is either invalid or no longer exists.
|
||||
RCLCPP_WARN(
|
||||
rclcpp::get_logger("rclcpp"),
|
||||
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
|
||||
return;
|
||||
}
|
||||
const auto & sub_ids = publisher_it->second;
|
||||
|
||||
if (sub_ids.take_ownership_subscriptions.empty()) {
|
||||
// None of the buffers require ownership, so we promote the pointer
|
||||
std::shared_ptr<T> msg = std::move(message);
|
||||
|
||||
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
msg,
|
||||
sub_ids.take_shared_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
} else {
|
||||
if (sub_ids.take_shared_subscriptions.size() <= 1) {
|
||||
// There is at maximum 1 buffer that does not require ownership.
|
||||
// So this case is equivalent to all the buffers requiring ownership
|
||||
|
||||
// Merge the two vector of ids into a unique one
|
||||
std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
|
||||
concatenated_vector.insert(
|
||||
concatenated_vector.end(),
|
||||
sub_ids.take_ownership_subscriptions.begin(),
|
||||
sub_ids.take_ownership_subscriptions.end());
|
||||
|
||||
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
std::move(message),
|
||||
concatenated_vector,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
} else {
|
||||
// Construct a new shared pointer from the message
|
||||
// for the buffers that do not require ownership
|
||||
auto shared_msg = std::allocate_shared<T, PublishedTypeAllocator>(
|
||||
published_type_allocator,
|
||||
*message);
|
||||
|
||||
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
shared_msg,
|
||||
sub_ids.take_shared_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
std::move(message),
|
||||
sub_ids.take_ownership_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename T,
|
||||
typename DeleterT,
|
||||
typename MessageT,
|
||||
typename AllocatorT,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
|
||||
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
|
||||
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<T>::value &&
|
||||
std::is_same<T, ROSMessageType>::value,
|
||||
std::shared_ptr<const ROSMessageType>
|
||||
>
|
||||
do_intra_process_publish_and_return_shared(
|
||||
uint64_t intra_process_publisher_id,
|
||||
std::unique_ptr<MessageT, Deleter> message,
|
||||
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
|
||||
std::unique_ptr<T, DeleterT> message,
|
||||
ROSMessageTypeAllocator & ros_message_type_allocator,
|
||||
ROSMessageTypeDeleter & ros_message_type_deleter,
|
||||
PublishedTypeAllocator & published_type_allocator)
|
||||
{
|
||||
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
|
||||
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
|
||||
(void)ros_message_type_deleter;
|
||||
(void)published_type_allocator;
|
||||
|
||||
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
@@ -263,33 +377,111 @@ public:
|
||||
|
||||
if (sub_ids.take_ownership_subscriptions.empty()) {
|
||||
// If there are no owning, just convert to shared.
|
||||
std::shared_ptr<MessageT> shared_msg = std::move(message);
|
||||
std::shared_ptr<ROSMessageType> shared_msg = std::move(message);
|
||||
if (!sub_ids.take_shared_subscriptions.empty()) {
|
||||
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
shared_msg, sub_ids.take_shared_subscriptions);
|
||||
}
|
||||
return shared_msg;
|
||||
} else {
|
||||
// Construct a new shared pointer from the message for the buffers that
|
||||
// do not require ownership and to return.
|
||||
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
|
||||
auto shared_msg = std::allocate_shared<ROSMessageType, ROSMessageTypeAllocator>(
|
||||
ros_message_type_allocator, *message);
|
||||
|
||||
if (!sub_ids.take_shared_subscriptions.empty()) {
|
||||
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
this->template add_shared_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
shared_msg,
|
||||
sub_ids.take_shared_subscriptions);
|
||||
}
|
||||
if (!sub_ids.take_ownership_subscriptions.empty()) {
|
||||
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
|
||||
std::move(message),
|
||||
sub_ids.take_ownership_subscriptions,
|
||||
allocator);
|
||||
}
|
||||
|
||||
this->template add_owned_msg_to_buffers<ROSMessageType, AllocatorT>(
|
||||
std::move(message),
|
||||
sub_ids.take_ownership_subscriptions,
|
||||
ros_message_type_allocator);
|
||||
|
||||
return shared_msg;
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename T,
|
||||
typename DeleterT,
|
||||
typename MessageT,
|
||||
typename AllocatorT,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>,
|
||||
typename PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>,
|
||||
typename PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
|
||||
>
|
||||
do_intra_process_publish_and_return_shared(
|
||||
uint64_t intra_process_publisher_id,
|
||||
std::unique_ptr<T, DeleterT> message,
|
||||
ROSMessageTypeAllocator & ros_message_type_allocator,
|
||||
ROSMessageTypeDeleter & ros_message_type_deleter,
|
||||
PublishedTypeAllocator & published_type_allocator)
|
||||
{
|
||||
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1);
|
||||
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr);
|
||||
auto shared_ros_msg = std::shared_ptr<ROSMessageType>(ptr, ros_message_type_deleter);
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *shared_ros_msg);
|
||||
|
||||
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
|
||||
if (publisher_it == pub_to_subs_.end()) {
|
||||
// Publisher is either invalid or no longer exists.
|
||||
RCLCPP_WARN(
|
||||
rclcpp::get_logger("rclcpp"),
|
||||
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
|
||||
return nullptr;
|
||||
}
|
||||
const auto & sub_ids = publisher_it->second;
|
||||
|
||||
if (sub_ids.take_ownership_subscriptions.empty()) {
|
||||
// If there are no owning, just convert to shared.
|
||||
std::shared_ptr<T> shared_msg = std::move(message);
|
||||
if (!sub_ids.take_shared_subscriptions.empty()) {
|
||||
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
shared_msg,
|
||||
sub_ids.take_shared_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
}
|
||||
return shared_ros_msg;
|
||||
} else {
|
||||
// Construct a new shared pointer from the message for the buffers that
|
||||
// do not require ownership and to return.
|
||||
auto shared_msg = std::allocate_shared<T, PublishedTypeAllocator>(
|
||||
published_type_allocator,
|
||||
*message);
|
||||
|
||||
if (!sub_ids.take_shared_subscriptions.empty()) {
|
||||
this->template add_blah_shared_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
shared_msg,
|
||||
sub_ids.take_shared_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
}
|
||||
|
||||
this->template add_blah_owned_msg_to_buffers<MessageT, T, AllocatorT>(
|
||||
std::move(message),
|
||||
sub_ids.take_ownership_subscriptions,
|
||||
ros_message_type_allocator,
|
||||
ros_message_type_deleter);
|
||||
|
||||
return shared_ros_msg;
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if the given rmw_gid_t matches any stored Publishers.
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
@@ -337,9 +529,14 @@ private:
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename Alloc,
|
||||
typename Deleter>
|
||||
void
|
||||
typename Alloc = std::allocator<void>,
|
||||
typename Deleter = std::default_delete<MessageT>,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<MessageT>::value &&
|
||||
std::is_same<MessageT, ROSMessageType>::value
|
||||
>
|
||||
add_shared_msg_to_buffers(
|
||||
std::shared_ptr<const MessageT> message,
|
||||
std::vector<uint64_t> subscription_ids)
|
||||
@@ -369,11 +566,69 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename T,
|
||||
typename AllocatorT,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, PublishedType>::value
|
||||
>
|
||||
add_blah_shared_msg_to_buffers(
|
||||
std::shared_ptr<const T> message,
|
||||
std::vector<uint64_t> subscription_ids,
|
||||
ROSMessageTypeAllocator & allocator,
|
||||
ROSMessageTypeDeleter & deleter)
|
||||
{
|
||||
// TODO(clalancette): This goes away once the subscription buffer can take PublishedTypes
|
||||
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
|
||||
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
|
||||
auto shared_ros_msg = std::shared_ptr<ROSMessageType>(ptr, deleter);
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *shared_ros_msg);
|
||||
|
||||
for (auto id : subscription_ids) {
|
||||
auto subscription_it = subscriptions_.find(id);
|
||||
if (subscription_it == subscriptions_.end()) {
|
||||
throw std::runtime_error("subscription has unexpectedly gone out of scope");
|
||||
}
|
||||
auto subscription_base = subscription_it->second.lock();
|
||||
if (subscription_base) {
|
||||
auto subscription = std::dynamic_pointer_cast<
|
||||
rclcpp::experimental::SubscriptionIntraProcessBuffer<ROSMessageType,
|
||||
AllocatorT, ROSMessageTypeDeleter>
|
||||
>(subscription_base);
|
||||
if (nullptr == subscription) {
|
||||
throw std::runtime_error(
|
||||
"failed to dynamic cast SubscriptionIntraProcessBase to "
|
||||
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
|
||||
"can happen when the publisher and subscription use different "
|
||||
"allocator types, which is not supported");
|
||||
}
|
||||
|
||||
subscription->provide_intra_process_message(shared_ros_msg);
|
||||
} else {
|
||||
subscriptions_.erase(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename Alloc = std::allocator<void>,
|
||||
typename Deleter = std::default_delete<MessageT>>
|
||||
void
|
||||
typename Deleter = std::default_delete<MessageT>,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rosidl_generator_traits::is_message<MessageT>::value &&
|
||||
std::is_same<MessageT, ROSMessageType>::value
|
||||
>
|
||||
add_owned_msg_to_buffers(
|
||||
std::unique_ptr<MessageT, Deleter> message,
|
||||
std::vector<uint64_t> subscription_ids,
|
||||
@@ -419,6 +674,74 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename T,
|
||||
typename AllocatorT,
|
||||
typename ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
|
||||
typename PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type,
|
||||
typename ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>,
|
||||
typename ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type,
|
||||
typename ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>
|
||||
>
|
||||
typename
|
||||
std::enable_if_t<
|
||||
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
|
||||
std::is_same<T, PublishedType>::value
|
||||
>
|
||||
add_blah_owned_msg_to_buffers(
|
||||
std::unique_ptr<T> message,
|
||||
std::vector<uint64_t> subscription_ids,
|
||||
ROSMessageTypeAllocator & allocator,
|
||||
ROSMessageTypeDeleter & deleter)
|
||||
{
|
||||
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> unique_ros_msg{nullptr};
|
||||
|
||||
for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
|
||||
auto subscription_it = subscriptions_.find(*it);
|
||||
if (subscription_it == subscriptions_.end()) {
|
||||
throw std::runtime_error("subscription has unexpectedly gone out of scope");
|
||||
}
|
||||
auto subscription_base = subscription_it->second.lock();
|
||||
if (subscription_base) {
|
||||
auto subscription = std::dynamic_pointer_cast<
|
||||
rclcpp::experimental::SubscriptionIntraProcessBuffer<ROSMessageType,
|
||||
AllocatorT, ROSMessageTypeDeleter>
|
||||
>(subscription_base);
|
||||
if (nullptr == subscription) {
|
||||
throw std::runtime_error(
|
||||
"failed to dynamic cast SubscriptionIntraProcessBase to "
|
||||
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
|
||||
"can happen when the publisher and subscription use different "
|
||||
"allocator types, which is not supported");
|
||||
}
|
||||
|
||||
if (unique_ros_msg == nullptr) {
|
||||
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
|
||||
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
|
||||
unique_ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *unique_ros_msg);
|
||||
}
|
||||
|
||||
if (std::next(it) == subscription_ids.end()) {
|
||||
// If this is the last subscription, give up ownership
|
||||
subscription->provide_intra_process_message(std::move(unique_ros_msg));
|
||||
} else {
|
||||
// Copy the message since we have additional subscriptions to serve
|
||||
using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
|
||||
MessageUniquePtr copy_message;
|
||||
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
|
||||
ROSMessageTypeAllocatorTraits::construct(allocator, ptr, *unique_ros_msg);
|
||||
copy_message = MessageUniquePtr(ptr, deleter);
|
||||
|
||||
subscription->provide_intra_process_message(std::move(copy_message));
|
||||
}
|
||||
} else {
|
||||
subscriptions_.erase(subscription_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PublisherToSubscriptionIdsMap pub_to_subs_;
|
||||
SubscriptionMap subscriptions_;
|
||||
PublisherMap publishers_;
|
||||
|
||||
@@ -15,25 +15,22 @@
|
||||
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
|
||||
|
||||
#include <rmw/rmw.h>
|
||||
#include <rmw/types.h>
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/types.h"
|
||||
|
||||
#include "rclcpp/any_subscription_callback.hpp"
|
||||
#include "rclcpp/context.hpp"
|
||||
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
|
||||
#include "rclcpp/qos.hpp"
|
||||
#include "rclcpp/type_support_decl.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
#include "tracetools/tracetools.h"
|
||||
|
||||
namespace rclcpp
|
||||
@@ -146,10 +143,10 @@ protected:
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
ConstMessageSharedPtr shared_msg = shared_ptr->first;
|
||||
any_callback_.dispatch_intra_process(shared_msg, msg_info);
|
||||
any_callback_.template dispatch_intra_process<T>(shared_msg, msg_info);
|
||||
} else {
|
||||
MessageUniquePtr unique_msg = std::move(shared_ptr->second);
|
||||
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
|
||||
any_callback_.template dispatch_intra_process<T>(std::move(unique_msg), msg_info);
|
||||
}
|
||||
shared_ptr.reset();
|
||||
}
|
||||
|
||||
@@ -15,15 +15,12 @@
|
||||
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
|
||||
|
||||
#include <rmw/rmw.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rcl/wait.h"
|
||||
|
||||
#include "rclcpp/qos.hpp"
|
||||
#include "rclcpp/type_support_decl.hpp"
|
||||
|
||||
@@ -15,25 +15,20 @@
|
||||
#ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
|
||||
#define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
|
||||
|
||||
#include <rmw/rmw.h>
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/guard_condition.h"
|
||||
#include "rcl/wait.h"
|
||||
|
||||
#include "rclcpp/any_subscription_callback.hpp"
|
||||
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
|
||||
#include "rclcpp/qos.hpp"
|
||||
#include "rclcpp/type_support_decl.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
#include "tracetools/tracetools.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
@@ -131,7 +126,11 @@ protected:
|
||||
trigger_guard_condition()
|
||||
{
|
||||
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
|
||||
(void)ret;
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(
|
||||
ret,
|
||||
"Failed to trigger guard condition in intra-process buffer");
|
||||
}
|
||||
}
|
||||
|
||||
BufferUniquePtr buffer_;
|
||||
|
||||
@@ -20,12 +20,16 @@
|
||||
#include "rcl/publisher.h"
|
||||
#include "rcl/subscription.h"
|
||||
|
||||
#include "rclcpp/callback_group.hpp"
|
||||
#include "rclcpp/macros.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
|
||||
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
|
||||
#include "rclcpp/publisher.hpp"
|
||||
#include "rclcpp/publisher_base.hpp"
|
||||
#include "rclcpp/publisher_factory.hpp"
|
||||
#include "rclcpp/qos.hpp"
|
||||
#include "rclcpp/subscription_base.hpp"
|
||||
#include "rclcpp/subscription_factory.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
|
||||
@@ -20,12 +20,14 @@
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/publisher.h"
|
||||
#include "rmw/error_handling.h"
|
||||
#include "rmw/rmw.h"
|
||||
#include "rosidl_runtime_cpp/traits.hpp"
|
||||
|
||||
#include "rclcpp/allocator/allocator_common.hpp"
|
||||
#include "rclcpp/allocator/allocator_deleter.hpp"
|
||||
@@ -253,7 +255,7 @@ public:
|
||||
this->do_inter_process_publish(*msg);
|
||||
return;
|
||||
}
|
||||
// If an interprocess subscription exist, then the unique_ptr is promoted
|
||||
// If an interprocess subscription exists, then the unique_ptr is promoted
|
||||
// to a shared_ptr and published.
|
||||
// This allows doing the intraprocess publish first and then doing the
|
||||
// interprocess publish, resulting in lower publish-to-subscribe latency.
|
||||
@@ -263,10 +265,11 @@ public:
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared<T, ROSMessageTypeDeleter>(
|
||||
std::move(msg));
|
||||
this->do_inter_process_publish(*shared_msg);
|
||||
} else {
|
||||
this->do_intra_process_publish(std::move(msg));
|
||||
this->do_intra_process_publish<T, ROSMessageTypeDeleter>(std::move(msg));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,13 +322,30 @@ public:
|
||||
>
|
||||
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
|
||||
{
|
||||
// TODO(wjwwood): later update this to give the unique_ptr to the intra
|
||||
// process manager and let it decide if it needs to be converted or not.
|
||||
// For now, convert it unconditionally and pass it the ROSMessageType
|
||||
// publish function specialization.
|
||||
auto unique_ros_msg = this->create_ros_message_unique_ptr();
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
|
||||
this->publish(std::move(unique_ros_msg));
|
||||
if (!intra_process_is_enabled_) {
|
||||
// If we aren't using intraprocess at all, do the type conversion
|
||||
// immediately and publish interprocess.
|
||||
auto unique_ros_msg = this->create_ros_message_unique_ptr();
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
|
||||
this->do_inter_process_publish(*unique_ros_msg);
|
||||
return;
|
||||
}
|
||||
// If an interprocess subscription exists, then the unique_ptr is promoted
|
||||
// to a shared_ptr and published.
|
||||
// This allows doing the intraprocess publish first and then doing the
|
||||
// interprocess publish, resulting in lower publish-to-subscribe latency.
|
||||
// It's not possible to do that with an unique_ptr,
|
||||
// as do_intra_process_publish takes the ownership of the message.
|
||||
bool inter_process_publish_needed =
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared<T, PublishedTypeDeleter>(
|
||||
std::move(msg));
|
||||
this->do_inter_process_publish(*shared_msg);
|
||||
} else {
|
||||
this->do_intra_process_publish<T, PublishedTypeDeleter>(std::move(msg));
|
||||
}
|
||||
}
|
||||
|
||||
/// Publish a message on the topic.
|
||||
@@ -346,12 +366,7 @@ public:
|
||||
>
|
||||
publish(const T & msg)
|
||||
{
|
||||
// TODO(wjwwood): later update this to give the unique_ptr to the intra
|
||||
// process manager and let it decide if it needs to be converted or not.
|
||||
// For now, convert it unconditionally and pass it the ROSMessageType
|
||||
// publish function specialization.
|
||||
|
||||
// Avoid allocating when not using intra process.
|
||||
// Avoid double allocation when not using intra process.
|
||||
if (!intra_process_is_enabled_) {
|
||||
// Convert to the ROS message equivalent and publish it.
|
||||
ROSMessageType ros_msg;
|
||||
@@ -359,13 +374,11 @@ public:
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(ros_msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr, convert it,
|
||||
// and pass it along.
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// As the message is not const, a copy should be made.
|
||||
// A shared_ptr<const MessageT> could also be constructed here.
|
||||
auto unique_ros_msg = this->create_ros_message_unique_ptr();
|
||||
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg);
|
||||
this->publish(std::move(unique_ros_msg));
|
||||
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
|
||||
this->publish(std::move(unique_msg));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -490,8 +503,9 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T, typename DeleterT>
|
||||
void
|
||||
do_intra_process_publish(std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
|
||||
do_intra_process_publish(std::unique_ptr<T, DeleterT> msg)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -502,15 +516,17 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
ipm->template do_intra_process_publish<ROSMessageType, AllocatorT>(
|
||||
ipm->template do_intra_process_publish<T, DeleterT, MessageT, AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
std::move(msg),
|
||||
ros_message_type_allocator_);
|
||||
ros_message_type_allocator_,
|
||||
ros_message_type_deleter_,
|
||||
published_type_allocator_);
|
||||
}
|
||||
|
||||
template<typename T, typename DeleterT>
|
||||
std::shared_ptr<const ROSMessageType>
|
||||
do_intra_process_publish_and_return_shared(
|
||||
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
|
||||
do_intra_process_publish_and_return_shared(std::unique_ptr<T, DeleterT> msg)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -521,11 +537,13 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType,
|
||||
return ipm->template do_intra_process_publish_and_return_shared<T, DeleterT, MessageT,
|
||||
AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
std::move(msg),
|
||||
ros_message_type_allocator_);
|
||||
ros_message_type_allocator_,
|
||||
ros_message_type_deleter_,
|
||||
published_type_allocator_);
|
||||
}
|
||||
|
||||
/// Return a new unique_ptr using the ROSMessageType of the publisher.
|
||||
@@ -537,7 +555,7 @@ protected:
|
||||
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
|
||||
}
|
||||
|
||||
/// Duplicate a given ros message as a unique_ptr.
|
||||
/// Duplicate a given ROS message as a unique_ptr.
|
||||
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
|
||||
duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
|
||||
{
|
||||
@@ -546,6 +564,15 @@ protected:
|
||||
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
|
||||
}
|
||||
|
||||
/// Duplicate a given type adapted message as a unique_ptr.
|
||||
std::unique_ptr<PublishedType, PublishedTypeDeleter>
|
||||
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
|
||||
{
|
||||
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
|
||||
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
|
||||
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
|
||||
}
|
||||
|
||||
/// Copy of original options passed during construction.
|
||||
/**
|
||||
* It is important to save a copy of this so that the rmw payload which it
|
||||
|
||||
@@ -26,14 +26,13 @@
|
||||
|
||||
#include "rclcpp/any_subscription_callback.hpp"
|
||||
#include "rclcpp/get_message_type_support_handle.hpp"
|
||||
#include "rclcpp/intra_process_buffer_type.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/qos.hpp"
|
||||
#include "rclcpp/subscription.hpp"
|
||||
#include "rclcpp/subscription_options.hpp"
|
||||
#include "rclcpp/subscription_traits.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
|
||||
#include "rclcpp/visibility_control.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
@@ -14,9 +14,18 @@
|
||||
|
||||
#include "rclcpp/node_interfaces/node_topics.hpp"
|
||||
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
|
||||
#include "rclcpp/callback_group.hpp"
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/node_interfaces/node_timers_interface.hpp"
|
||||
#include "rclcpp/publisher_base.hpp"
|
||||
#include "rclcpp/publisher_factory.hpp"
|
||||
#include "rclcpp/subscription_base.hpp"
|
||||
#include "rclcpp/subscription_factory.hpp"
|
||||
#include "rclcpp/qos.hpp"
|
||||
|
||||
using rclcpp::exceptions::throw_from_rcl_error;
|
||||
|
||||
|
||||
@@ -86,10 +86,12 @@ TEST_F(TestAnySubscriptionCallback, unset_dispatch_throw) {
|
||||
any_subscription_callback_.dispatch(msg_shared_ptr_, message_info_),
|
||||
std::runtime_error);
|
||||
EXPECT_THROW(
|
||||
any_subscription_callback_.dispatch_intra_process(msg_shared_ptr_, message_info_),
|
||||
any_subscription_callback_.template dispatch_intra_process<test_msgs::msg::Empty>(
|
||||
msg_shared_ptr_, message_info_),
|
||||
std::runtime_error);
|
||||
EXPECT_THROW(
|
||||
any_subscription_callback_.dispatch_intra_process(get_unique_ptr_msg(), message_info_),
|
||||
any_subscription_callback_.template dispatch_intra_process<test_msgs::msg::Empty>(
|
||||
get_unique_ptr_msg(), message_info_),
|
||||
std::runtime_error);
|
||||
}
|
||||
|
||||
@@ -203,13 +205,15 @@ format_parameter_with_ta(const ::testing::TestParamInfo<DispatchTestsWithTA::Par
|
||||
/* Testing dispatch with shared_ptr<const MessageT> as input */ \
|
||||
TEST_P(DispatchTests_name, test_intra_shared_dispatch) { \
|
||||
auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \
|
||||
any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); \
|
||||
any_subscription_callback_to_test.template dispatch_intra_process<test_msgs::msg::Empty>( \
|
||||
msg_shared_ptr_, message_info_); \
|
||||
} \
|
||||
\
|
||||
/* Testing dispatch with unique_ptr<MessageT> as input */ \
|
||||
TEST_P(DispatchTests_name, test_intra_unique_dispatch) { \
|
||||
auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \
|
||||
any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); \
|
||||
any_subscription_callback_to_test.template dispatch_intra_process<test_msgs::msg::Empty>( \
|
||||
get_unique_ptr_msg(), message_info_); \
|
||||
}
|
||||
|
||||
PARAMETERIZED_TESTS(DispatchTests)
|
||||
|
||||
@@ -116,6 +116,7 @@ public:
|
||||
{
|
||||
auto allocator = std::make_shared<Alloc>();
|
||||
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
|
||||
message_deleter_ = std::make_shared<MessageDeleter>();
|
||||
}
|
||||
|
||||
// The following functions use the IntraProcessManager
|
||||
@@ -123,6 +124,7 @@ public:
|
||||
void publish(MessageUniquePtr msg);
|
||||
|
||||
std::shared_ptr<MessageAlloc> message_allocator_;
|
||||
std::shared_ptr<MessageDeleter> message_deleter_;
|
||||
};
|
||||
|
||||
} // namespace mock
|
||||
@@ -328,9 +330,15 @@ void Publisher<T, Alloc>::publish(MessageUniquePtr msg)
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
ipm->template do_intra_process_publish<T, Alloc>(
|
||||
using MessageAllocTraits = allocator::AllocRebind<T, Alloc>;
|
||||
using MessageAlloc = typename MessageAllocTraits::allocator_type;
|
||||
using MessageDeleter = allocator::Deleter<MessageAlloc, T>;
|
||||
|
||||
ipm->template do_intra_process_publish<T, MessageDeleter, T, Alloc>(
|
||||
intra_process_publisher_id_,
|
||||
std::move(msg),
|
||||
*message_allocator_,
|
||||
*message_deleter_,
|
||||
*message_allocator_);
|
||||
}
|
||||
|
||||
|
||||
@@ -34,14 +34,6 @@
|
||||
#include "rclcpp/msg/string.hpp"
|
||||
|
||||
|
||||
#ifdef RMW_IMPLEMENTATION
|
||||
# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX
|
||||
# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX)
|
||||
#else
|
||||
# define CLASSNAME(NAME, SUFFIX) NAME
|
||||
#endif
|
||||
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static const int g_max_loops = 200;
|
||||
@@ -50,15 +42,17 @@ static const std::chrono::milliseconds g_sleep_per_loop(10);
|
||||
|
||||
class TestPublisher : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
protected:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
if (!rclcpp::ok()) {
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
protected:
|
||||
void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
|
||||
{
|
||||
node = std::make_shared<rclcpp::Node>("my_node", "/ns", node_options);
|
||||
@@ -72,20 +66,6 @@ protected:
|
||||
rclcpp::Node::SharedPtr node;
|
||||
};
|
||||
|
||||
class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
@@ -146,44 +126,88 @@ struct TypeAdapter<int, rclcpp::msg::String>
|
||||
/*
|
||||
* Testing publisher creation signatures with a type adapter.
|
||||
*/
|
||||
TEST_F(TestPublisher, various_creation_signatures) {
|
||||
for (auto is_intra_process : {true, false}) {
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(is_intra_process);
|
||||
initialize(options);
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
}
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
}
|
||||
TEST_F(TestPublisher, creation_signatures_with_intra) {
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(true);
|
||||
initialize(options);
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
ASSERT_EQ(node->count_publishers("topic"), 1u);
|
||||
}
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
ASSERT_EQ(node->count_publishers("topic"), 1u);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Testing publisher creation signatures with a type adapter.
|
||||
*/
|
||||
TEST_F(TestPublisher, creation_signatures_without_intra) {
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(false);
|
||||
initialize(options);
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
ASSERT_EQ(node->count_publishers("topic"), 1u);
|
||||
}
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
|
||||
auto publisher = node->create_publisher<StringTypeAdapter>("topic", 42);
|
||||
(void)publisher;
|
||||
ASSERT_EQ(node->count_publishers("topic"), 1u);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Testing that conversion errors are passed up.
|
||||
*/
|
||||
TEST_F(TestPublisher, conversion_exception_is_passed_up) {
|
||||
TEST_F(TestPublisher, conversion_exception_is_passed_up_with_intra) {
|
||||
using BadStringTypeAdapter = rclcpp::TypeAdapter<int, rclcpp::msg::String>;
|
||||
for (auto is_intra_process : {true, false}) {
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(is_intra_process);
|
||||
initialize(options);
|
||||
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
|
||||
EXPECT_THROW(pub->publish(1), std::runtime_error);
|
||||
}
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(true);
|
||||
initialize(options);
|
||||
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
|
||||
|
||||
auto callback =
|
||||
[](
|
||||
const rclcpp::msg::String::ConstSharedPtr msg,
|
||||
const rclcpp::MessageInfo & message_info
|
||||
) -> void
|
||||
{
|
||||
(void)msg;
|
||||
(void)message_info;
|
||||
};
|
||||
|
||||
// When we are doing type adaptation with intra-process comms, we have to have a subscriber
|
||||
// for the 'convert_to_ros_msg' method in the TypeAdapter to be called. That's because the
|
||||
// intra-process manager delays doing the type conversion until it actually adds the message
|
||||
// to the subscription's internal buffers. No subscriptions -> no conversions. We add a
|
||||
// dummy subscription here just to trigger the conversion.
|
||||
auto sub = node->create_subscription<rclcpp::msg::String>("topic_name", 1, callback);
|
||||
|
||||
EXPECT_THROW(pub->publish(1), std::runtime_error);
|
||||
}
|
||||
|
||||
TEST_F(TestPublisher, conversion_exception_is_passed_up_without_intra) {
|
||||
using BadStringTypeAdapter = rclcpp::TypeAdapter<int, rclcpp::msg::String>;
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(false);
|
||||
initialize(options);
|
||||
auto pub = node->create_publisher<BadStringTypeAdapter>("topic_name", 1);
|
||||
EXPECT_THROW(pub->publish(1), std::runtime_error);
|
||||
}
|
||||
|
||||
/*
|
||||
* Testing that publisher sends type adapted types and ROS message types with intra proccess communications.
|
||||
*/
|
||||
TEST_F(
|
||||
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
|
||||
check_type_adapted_message_is_sent_and_received_intra_process) {
|
||||
TEST_F(TestPublisher, check_type_adapted_message_is_sent_and_received_intra_process) {
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
const std::string message_data = "Message Data";
|
||||
const std::string topic_name = "topic_name";
|
||||
@@ -200,13 +224,14 @@ TEST_F(
|
||||
ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process);
|
||||
};
|
||||
|
||||
auto node = rclcpp::Node::make_shared(
|
||||
"test_intra_process",
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true));
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(true);
|
||||
initialize(options);
|
||||
|
||||
auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 10);
|
||||
auto sub = node->create_subscription<rclcpp::msg::String>(topic_name, 1, callback);
|
||||
|
||||
auto wait_for_message_to_be_received = [&is_received, &node]() {
|
||||
auto wait_for_message_to_be_received = [this, &is_received]() {
|
||||
rclcpp::executors::SingleThreadedExecutor executor;
|
||||
int i = 0;
|
||||
executor.add_node(node);
|
||||
|
||||
@@ -50,15 +50,17 @@ static const std::chrono::milliseconds g_sleep_per_loop(10);
|
||||
|
||||
class TestSubscription : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
protected:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
if (!rclcpp::ok()) {
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
protected:
|
||||
void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
|
||||
{
|
||||
node = std::make_shared<rclcpp::Node>("my_node", "/ns", node_options);
|
||||
@@ -72,22 +74,6 @@ protected:
|
||||
rclcpp::Node::SharedPtr node;
|
||||
};
|
||||
|
||||
class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
if (!rclcpp::ok()) {
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
@@ -154,28 +140,29 @@ TEST_F(TestSubscription, various_creation_signatures) {
|
||||
auto sub =
|
||||
node->create_subscription<StringTypeAdapter>("topic", 42, [](const std::string &) {});
|
||||
(void)sub;
|
||||
ASSERT_EQ(node->count_subscribers("topic"), 1u);
|
||||
}
|
||||
{
|
||||
using StringTypeAdapter = rclcpp::adapt_type<std::string>::as<rclcpp::msg::String>;
|
||||
auto sub =
|
||||
node->create_subscription<StringTypeAdapter>("topic", 42, [](const std::string &) {});
|
||||
(void)sub;
|
||||
ASSERT_EQ(node->count_subscribers("topic"), 1u);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Testing that subscriber receives type adapted types and ROS message types with intra proccess communications.
|
||||
*/
|
||||
TEST_F(
|
||||
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
|
||||
check_type_adapted_messages_are_received_by_intra_process_subscription) {
|
||||
TEST_F(TestSubscription, check_type_adapted_messages_are_received_by_intra_process_subscription) {
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
const std::string message_data = "Message Data";
|
||||
const std::string topic_name = "topic_name";
|
||||
|
||||
auto node = rclcpp::Node::make_shared(
|
||||
"test_intra_process",
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true));
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(true);
|
||||
initialize(options);
|
||||
|
||||
auto pub = node->create_publisher<rclcpp::msg::String>(topic_name, 1);
|
||||
|
||||
rclcpp::msg::String msg;
|
||||
@@ -385,16 +372,15 @@ TEST_F(
|
||||
/*
|
||||
* Testing that subscriber receives type adapted types and ROS message types with inter proccess communications.
|
||||
*/
|
||||
TEST_F(
|
||||
CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION),
|
||||
check_type_adapted_messages_are_received_by_inter_process_subscription) {
|
||||
TEST_F(TestSubscription, check_type_adapted_messages_are_received_by_inter_process_subscription) {
|
||||
using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::String>;
|
||||
const std::string message_data = "Message Data";
|
||||
const std::string topic_name = "topic_name";
|
||||
|
||||
auto node = rclcpp::Node::make_shared(
|
||||
"test_intra_process",
|
||||
rclcpp::NodeOptions().use_intra_process_comms(false));
|
||||
rclcpp::NodeOptions options;
|
||||
options.use_intra_process_comms(false);
|
||||
initialize(options);
|
||||
|
||||
auto pub = node->create_publisher<rclcpp::msg::String>(topic_name, 1);
|
||||
|
||||
rclcpp::msg::String msg;
|
||||
|
||||
Reference in New Issue
Block a user