Compare commits
45 Commits
mjcarroll/
...
karsten/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cd811a412 | ||
|
|
241164204d | ||
|
|
1f8226a6f3 | ||
|
|
511476c445 | ||
|
|
408343fb96 | ||
|
|
ff7a81f5dd | ||
|
|
4fa5f090ed | ||
|
|
ab73a62297 | ||
|
|
5472d363b1 | ||
|
|
35fd4908f8 | ||
|
|
93dc82699b | ||
|
|
b482164b97 | ||
|
|
a2fc8bd867 | ||
|
|
9b436fa777 | ||
|
|
2147026fbf | ||
|
|
2cefdca297 | ||
|
|
4c7506f5aa | ||
|
|
18181f5d8d | ||
|
|
9b65b7cefd | ||
|
|
657d9a0be4 | ||
|
|
7cff1aa85b | ||
|
|
d4536b3036 | ||
|
|
d74c4edac1 | ||
|
|
d8f1da9c88 | ||
|
|
4e1e744624 | ||
|
|
2ddbe32469 | ||
|
|
4c80594a6f | ||
|
|
bc228ee032 | ||
|
|
0c83e4981d | ||
|
|
af14ee76be | ||
|
|
d7a84eaa66 | ||
|
|
a3db0069b6 | ||
|
|
5faf8f40e7 | ||
|
|
02921427f7 | ||
|
|
56b520eb95 | ||
|
|
f9ef2c2fd5 | ||
|
|
29fea926af | ||
|
|
c8522b3cd6 | ||
|
|
9d7ef52885 | ||
|
|
86bd3a3c80 | ||
|
|
a300c2f4ae | ||
|
|
9f1c10f1c4 | ||
|
|
f3aab4f6c5 | ||
|
|
c8162ba861 | ||
|
|
9b73845f89 |
@@ -245,6 +245,18 @@ if(BUILD_TESTING)
|
||||
"test_msgs"
|
||||
)
|
||||
target_link_libraries(test_loaned_message ${PROJECT_NAME})
|
||||
ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp
|
||||
TIMEOUT 120)
|
||||
if(TARGET test_intra_process_communication)
|
||||
ament_target_dependencies(test_intra_process_communication
|
||||
"rcl"
|
||||
"rcl_interfaces"
|
||||
"rmw"
|
||||
"rosidl_typesupport_cpp"
|
||||
"test_msgs"
|
||||
)
|
||||
endif()
|
||||
target_link_libraries(test_intra_process_communication ${PROJECT_NAME})
|
||||
|
||||
ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240)
|
||||
if(TARGET test_node)
|
||||
|
||||
@@ -43,6 +43,7 @@ std::shared_ptr<PublisherT>
|
||||
create_publisher(
|
||||
NodeT & node,
|
||||
const std::string & topic_name,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
|
||||
@@ -56,7 +57,7 @@ create_publisher(
|
||||
// Create the publisher.
|
||||
auto pub = node_topics->create_publisher(
|
||||
topic_name,
|
||||
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
|
||||
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options, type_support),
|
||||
qos
|
||||
);
|
||||
|
||||
@@ -66,6 +67,28 @@ create_publisher(
|
||||
return std::dynamic_pointer_cast<PublisherT>(pub);
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
|
||||
typename NodeT>
|
||||
std::shared_ptr<PublisherT>
|
||||
create_publisher(
|
||||
NodeT & node,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
|
||||
)
|
||||
)
|
||||
{
|
||||
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
|
||||
|
||||
return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
|
||||
node, topic_name, type_support,
|
||||
qos, options);
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__CREATE_PUBLISHER_HPP_
|
||||
|
||||
@@ -51,14 +51,13 @@ typename std::shared_ptr<SubscriptionT>
|
||||
create_subscription(
|
||||
NodeT && node,
|
||||
const std::string & topic_name,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rclcpp::QoS & qos,
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
|
||||
),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
|
||||
MessageMemoryStrategyT::create_default()
|
||||
)
|
||||
)
|
||||
{
|
||||
using rclcpp::node_interfaces::get_node_topics_interface;
|
||||
@@ -67,7 +66,8 @@ create_subscription(
|
||||
auto factory = rclcpp::create_subscription_factory<MessageT>(
|
||||
std::forward<CallbackT>(callback),
|
||||
options,
|
||||
msg_mem_strat
|
||||
msg_mem_strat,
|
||||
type_support
|
||||
);
|
||||
|
||||
auto sub = node_topics->create_subscription(topic_name, factory, qos);
|
||||
@@ -76,6 +76,39 @@ create_subscription(
|
||||
return std::dynamic_pointer_cast<SubscriptionT>(sub);
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename CallbackT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename CallbackMessageT =
|
||||
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
|
||||
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
|
||||
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
|
||||
CallbackMessageT,
|
||||
AllocatorT
|
||||
>,
|
||||
typename NodeT>
|
||||
typename std::shared_ptr<SubscriptionT>
|
||||
create_subscription(
|
||||
NodeT && node,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos,
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
|
||||
MessageMemoryStrategyT::create_default()
|
||||
)
|
||||
{
|
||||
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
|
||||
|
||||
return create_subscription<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT,
|
||||
MessageMemoryStrategyT>(
|
||||
std::forward<NodeT>(
|
||||
node), topic_name, type_support, qos, std::forward<CallbackT>(
|
||||
callback), options, msg_mem_strat);
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_
|
||||
|
||||
@@ -110,11 +110,14 @@ public:
|
||||
* In addition this generates a unique intra process id for the subscription.
|
||||
*
|
||||
* \param subscription the SubscriptionIntraProcess to register.
|
||||
* \param is_serialized true if the buffer expects serialized messages
|
||||
* \return an unsigned 64-bit integer which is the subscription's unique id.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
uint64_t
|
||||
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
|
||||
add_subscription(
|
||||
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription,
|
||||
const bool is_serialized = false);
|
||||
|
||||
/// Unregister a subscription using the subscription's unique id.
|
||||
/**
|
||||
@@ -134,11 +137,12 @@ public:
|
||||
* In addition this generates a unique intra process id for the publisher.
|
||||
*
|
||||
* \param publisher publisher to be registered with the manager.
|
||||
* \param is_serialized true if the buffer expects serialized messages
|
||||
* \return an unsigned 64-bit integer which is the publisher's unique id.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
uint64_t
|
||||
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
|
||||
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false);
|
||||
|
||||
/// Unregister a publisher using the publisher's unique id.
|
||||
/**
|
||||
@@ -311,6 +315,7 @@ private:
|
||||
rmw_qos_profile_t qos;
|
||||
const char * topic_name;
|
||||
bool use_take_shared_method;
|
||||
bool is_serialized;
|
||||
};
|
||||
|
||||
struct PublisherInfo
|
||||
@@ -320,6 +325,7 @@ private:
|
||||
rclcpp::PublisherBase::WeakPtr publisher;
|
||||
rmw_qos_profile_t qos;
|
||||
const char * topic_name;
|
||||
bool is_serialized;
|
||||
};
|
||||
|
||||
struct SplittedSubscriptions
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
|
||||
#include "rclcpp/serialization.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/type_support_decl.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
#include "tracetools/tracetools.h"
|
||||
@@ -37,6 +39,8 @@ namespace rclcpp
|
||||
namespace experimental
|
||||
{
|
||||
|
||||
class SerializedMessage;
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename Alloc = std::allocator<void>,
|
||||
@@ -51,6 +55,10 @@ public:
|
||||
using MessageAlloc = typename MessageAllocTraits::allocator_type;
|
||||
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
|
||||
using CallbackMessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
|
||||
using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type;
|
||||
using CallbackMessageUniquePtr = std::unique_ptr<CallbackMessageT>;
|
||||
using CallbackMessageSharedPtr = std::shared_ptr<CallbackMessageT>;
|
||||
|
||||
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
|
||||
MessageT,
|
||||
@@ -64,11 +72,15 @@ public:
|
||||
rclcpp::Context::SharedPtr context,
|
||||
const std::string & topic_name,
|
||||
rmw_qos_profile_t qos_profile,
|
||||
rclcpp::IntraProcessBufferType buffer_type)
|
||||
rclcpp::IntraProcessBufferType buffer_type,
|
||||
std::shared_ptr<SerializationBase> serializer)
|
||||
: SubscriptionIntraProcessBase(topic_name, qos_profile),
|
||||
any_callback_(callback)
|
||||
any_callback_(callback), serializer_(serializer)
|
||||
{
|
||||
if (!std::is_same<MessageT, CallbackMessageT>::value) {
|
||||
if (!std::is_same<MessageT, CallbackMessageT>::value &&
|
||||
!std::is_same<MessageT, rclcpp::SerializedMessage>::value &&
|
||||
!std::is_same<CallbackMessageT, rcl_serialized_message_t>::value)
|
||||
{
|
||||
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
|
||||
}
|
||||
|
||||
@@ -142,18 +154,47 @@ private:
|
||||
(void)ret;
|
||||
}
|
||||
|
||||
// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
|
||||
template<typename T>
|
||||
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
|
||||
typename std::enable_if<
|
||||
std::is_same<T, rcl_serialized_message_t>::value &&
|
||||
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
|
||||
if (nullptr == serializer_) {
|
||||
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
|
||||
}
|
||||
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
ConstMessageSharedPtr msg = buffer_->consume_shared();
|
||||
auto serialized_msg =
|
||||
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));
|
||||
|
||||
if (nullptr == serialized_msg) {
|
||||
throw std::runtime_error("Subscription intra-process could not serialize message");
|
||||
}
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
|
||||
} else {
|
||||
throw std::runtime_error(
|
||||
"Subscription intra-process for serialized "
|
||||
"messages does not support unique pointers.");
|
||||
}
|
||||
}
|
||||
|
||||
// forward from ROS2 message to ROS2 message (same type)
|
||||
template<class T>
|
||||
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
|
||||
typename std::enable_if<
|
||||
!std::is_same<T, rcl_serialized_message_t>::value &&
|
||||
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
rmw_message_info_t msg_info;
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.publisher_gid = {0, {0}};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
@@ -166,8 +207,80 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
// forward from rcl_serialized_message_t to SerializationMessage (no conversion needed)
|
||||
template<typename T>
|
||||
typename std::enable_if<
|
||||
std::is_same<T, rcl_serialized_message_t>::value &&
|
||||
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
ConstMessageSharedPtr msg = buffer_->consume_shared();
|
||||
if (msg == nullptr) {
|
||||
throw std::runtime_error("Subscription intra-process could not get serialized message");
|
||||
}
|
||||
any_callback_.dispatch_intra_process(msg, msg_info);
|
||||
} else {
|
||||
throw std::runtime_error(
|
||||
"Subscription intra-process for serialized "
|
||||
"messages does not support unique pointers.");
|
||||
}
|
||||
}
|
||||
|
||||
// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
|
||||
template<class T>
|
||||
typename std::enable_if<
|
||||
!std::is_same<T, rcl_serialized_message_t>::value &&
|
||||
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
if (nullptr == serializer_) {
|
||||
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
|
||||
}
|
||||
|
||||
ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
|
||||
if (nullptr == serialized_container) {
|
||||
throw std::runtime_error("Subscription intra-process could not get serialized message");
|
||||
}
|
||||
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
CallbackMessageSharedPtr msg = construct_unique();
|
||||
serializer_->deserialize_message(
|
||||
serialized_container.get(),
|
||||
reinterpret_cast<void *>(msg.get()));
|
||||
any_callback_.dispatch_intra_process(msg, msg_info);
|
||||
} else {
|
||||
CallbackMessageUniquePtr msg = construct_unique();
|
||||
serializer_->deserialize_message(
|
||||
serialized_container.get(),
|
||||
reinterpret_cast<void *>(msg.get()));
|
||||
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
|
||||
}
|
||||
}
|
||||
|
||||
CallbackMessageUniquePtr construct_unique()
|
||||
{
|
||||
CallbackMessageUniquePtr unique_msg;
|
||||
auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1);
|
||||
CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr);
|
||||
unique_msg = CallbackMessageUniquePtr(ptr);
|
||||
|
||||
return unique_msg;
|
||||
}
|
||||
|
||||
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
|
||||
BufferUniquePtr buffer_;
|
||||
std::shared_ptr<SerializationBase> serializer_;
|
||||
std::shared_ptr<CallbackMessageAlloc> message_allocator_ =
|
||||
std::make_shared<CallbackMessageAlloc>();
|
||||
};
|
||||
|
||||
} // namespace experimental
|
||||
|
||||
@@ -56,6 +56,12 @@ public:
|
||||
using MessageDeleter = allocator::Deleter<MessageAllocator, MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
|
||||
using MessageSharedPtr = std::shared_ptr<const MessageT>;
|
||||
using SerializedMessageAllocatorTraits =
|
||||
allocator::AllocRebind<rclcpp::SerializedMessage,
|
||||
AllocatorT>;
|
||||
using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type;
|
||||
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
|
||||
rclcpp::SerializedMessage>;
|
||||
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
|
||||
|
||||
@@ -70,37 +76,28 @@ public:
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
|
||||
options.template to_rcl_publisher_options<MessageT>(qos)),
|
||||
options_(options),
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get()))
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
|
||||
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
|
||||
{
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
|
||||
init_setup();
|
||||
}
|
||||
|
||||
if (options_.event_callbacks.deadline_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.deadline_callback,
|
||||
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
|
||||
}
|
||||
if (options_.event_callbacks.liveliness_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.liveliness_callback,
|
||||
RCL_PUBLISHER_LIVELINESS_LOST);
|
||||
}
|
||||
if (options_.event_callbacks.incompatible_qos_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.incompatible_qos_callback,
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} else if (options_.use_default_callbacks) {
|
||||
// Register default callback when not specified
|
||||
try {
|
||||
this->add_event_handler(
|
||||
[this](QOSOfferedIncompatibleQoSInfo & info) {
|
||||
this->default_incompatible_qos_callback(info);
|
||||
},
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} catch (UnsupportedEventTypeException & /*exc*/) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
// Setup continues in the post construction method, post_init_setup().
|
||||
Publisher(
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
: PublisherBase(
|
||||
node_base,
|
||||
topic,
|
||||
type_support,
|
||||
options.template to_rcl_publisher_options<MessageT>(qos)),
|
||||
options_(options),
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
|
||||
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
|
||||
{
|
||||
init_setup();
|
||||
}
|
||||
|
||||
/// Called post construction, so that construction may continue after shared_from_this() works.
|
||||
@@ -135,8 +132,11 @@ public:
|
||||
"intraprocess communication allowed only with volatile durability");
|
||||
}
|
||||
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
|
||||
uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(
|
||||
this->shared_from_this(), true);
|
||||
this->setup_intra_process(
|
||||
intra_process_publisher_id,
|
||||
intra_process_publisher_id_serialized,
|
||||
ipm);
|
||||
}
|
||||
}
|
||||
@@ -173,6 +173,11 @@ public:
|
||||
virtual void
|
||||
publish(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
{
|
||||
if (std::is_same<MessageT, rcl_serialized_message_t>::value) {
|
||||
this->template publish<MessageDeleter>(std::move(msg));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!intra_process_is_enabled_) {
|
||||
this->do_inter_process_publish(*msg);
|
||||
return;
|
||||
@@ -187,34 +192,41 @@ public:
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared(
|
||||
std::move(msg), message_allocator_);
|
||||
this->do_inter_process_publish(*shared_msg);
|
||||
} else {
|
||||
this->do_intra_process_publish(std::move(msg));
|
||||
this->do_intra_process_publish(std::move(msg), message_allocator_);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void
|
||||
publish(const MessageT & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!intra_process_is_enabled_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// As the message is not const, a copy should be made.
|
||||
// A shared_ptr<const MessageT> could also be constructed here.
|
||||
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
|
||||
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
this->publish(std::move(unique_msg));
|
||||
this->do_publish_message(msg);
|
||||
}
|
||||
|
||||
void
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value>::type
|
||||
publish(const rcl_serialized_message_t & serialized_msg)
|
||||
{
|
||||
return this->do_serialized_publish(&serialized_msg);
|
||||
this->do_publish_message<rcl_serialized_message_t>(serialized_msg);
|
||||
}
|
||||
|
||||
/// Publish a serialized message. Non specialized version to prevent compiling errors.
|
||||
template<typename TDeleter, typename T>
|
||||
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
|
||||
{
|
||||
(void)serialized_msg;
|
||||
throw std::runtime_error(
|
||||
"publishing unique_ptr with custom deleter only supported for serialized messages");
|
||||
}
|
||||
|
||||
/// Publish a serialized message.
|
||||
template<typename TDeleter>
|
||||
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
|
||||
{
|
||||
this->do_serialized_publish(*serialized_msg);
|
||||
}
|
||||
|
||||
/// Publish an instance of a LoanedMessage.
|
||||
@@ -259,6 +271,69 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
void init_setup()
|
||||
{
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
|
||||
|
||||
if (options_.event_callbacks.deadline_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.deadline_callback,
|
||||
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
|
||||
}
|
||||
if (options_.event_callbacks.liveliness_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.liveliness_callback,
|
||||
RCL_PUBLISHER_LIVELINESS_LOST);
|
||||
}
|
||||
if (options_.event_callbacks.incompatible_qos_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.incompatible_qos_callback,
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} else if (options_.use_default_callbacks) {
|
||||
// Register default callback when not specified
|
||||
try {
|
||||
this->add_event_handler(
|
||||
[this](QOSOfferedIncompatibleQoSInfo & info) {
|
||||
this->default_incompatible_qos_callback(info);
|
||||
},
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} catch (UnsupportedEventTypeException & /*exc*/) {
|
||||
RCLCPP_WARN_ONCE(
|
||||
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
|
||||
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
|
||||
"events, you will not be notified when Publishers offer an incompatible "
|
||||
"QoS profile to Subscriptions on the same topic.");
|
||||
}
|
||||
}
|
||||
// Setup continues in the post construction method, post_init_setup().
|
||||
}
|
||||
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value>::type
|
||||
do_publish_message(const T & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!intra_process_is_enabled_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// As the message is not const, a copy should be made.
|
||||
// A shared_ptr<const MessageT> could also be constructed here.
|
||||
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
|
||||
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
this->publish(std::move(unique_msg));
|
||||
}
|
||||
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value>::type
|
||||
do_publish_message(const T & msg)
|
||||
{
|
||||
// Kept for backwards compatibility. Copies compelete memory!
|
||||
this->publish(std::make_unique<rclcpp::SerializedMessage>(msg));
|
||||
}
|
||||
|
||||
void
|
||||
do_inter_process_publish(const MessageT & msg)
|
||||
{
|
||||
@@ -280,15 +355,24 @@ protected:
|
||||
}
|
||||
|
||||
void
|
||||
do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
|
||||
do_serialized_publish(rcl_serialized_message_t serialized_msg)
|
||||
{
|
||||
if (intra_process_is_enabled_) {
|
||||
// TODO(Karsten1987): support serialized message passed by intraprocess
|
||||
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
|
||||
bool inter_process_publish_needed =
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
// declare here to avoid deletion before returning method
|
||||
auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr);
|
||||
if (RCL_RET_OK != status) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
|
||||
}
|
||||
}
|
||||
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
|
||||
if (RCL_RET_OK != status) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
|
||||
|
||||
auto msg = std::make_unique<rclcpp::SerializedMessage>(
|
||||
std::move(serialized_msg));
|
||||
|
||||
if (intra_process_is_enabled_) {
|
||||
do_intra_process_publish(std::move(msg), message_allocator_serialized_);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,8 +396,11 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T, typename Deleter, typename Allocator>
|
||||
void
|
||||
do_intra_process_publish(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
do_intra_process_publish(
|
||||
std::unique_ptr<T, Deleter> msg,
|
||||
std::shared_ptr<Allocator> & message_allocator)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -324,14 +411,21 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
ipm->template do_intra_process_publish<MessageT, AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
const uint64_t intra_process_publisher_id = std::is_same<T,
|
||||
rclcpp::SerializedMessage>::value ?
|
||||
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;
|
||||
|
||||
ipm->template do_intra_process_publish<T, AllocatorT>(
|
||||
intra_process_publisher_id,
|
||||
std::move(msg),
|
||||
message_allocator_);
|
||||
message_allocator);
|
||||
}
|
||||
|
||||
std::shared_ptr<const MessageT>
|
||||
do_intra_process_publish_and_return_shared(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
template<typename T, typename Deleter, typename Allocator>
|
||||
std::shared_ptr<const T>
|
||||
do_intra_process_publish_and_return_shared(
|
||||
std::unique_ptr<T, Deleter> msg,
|
||||
std::shared_ptr<Allocator> & message_allocator)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -342,10 +436,14 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
const uint64_t intra_process_publisher_id = std::is_same<T,
|
||||
rclcpp::SerializedMessage>::value ?
|
||||
intra_process_publisher_id_serialized_ : intra_process_publisher_id_;
|
||||
|
||||
return ipm->template do_intra_process_publish_and_return_shared<T, AllocatorT>(
|
||||
intra_process_publisher_id,
|
||||
std::move(msg),
|
||||
message_allocator_);
|
||||
message_allocator);
|
||||
}
|
||||
|
||||
/// Copy of original options passed during construction.
|
||||
@@ -356,6 +454,7 @@ protected:
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
|
||||
|
||||
std::shared_ptr<MessageAllocator> message_allocator_;
|
||||
std::shared_ptr<SerializedMessageAllocator> message_allocator_serialized_;
|
||||
|
||||
MessageDeleter message_deleter_;
|
||||
};
|
||||
|
||||
@@ -191,6 +191,7 @@ public:
|
||||
void
|
||||
setup_intra_process(
|
||||
uint64_t intra_process_publisher_id,
|
||||
uint64_t intra_process_publisher_id_serialized,
|
||||
IntraProcessManagerSharedPtr ipm);
|
||||
|
||||
protected:
|
||||
@@ -222,6 +223,7 @@ protected:
|
||||
bool intra_process_is_enabled_;
|
||||
IntraProcessManagerWeakPtr weak_ipm_;
|
||||
uint64_t intra_process_publisher_id_;
|
||||
uint64_t intra_process_publisher_id_serialized_;
|
||||
|
||||
rmw_gid_t rmw_gid_;
|
||||
};
|
||||
|
||||
@@ -63,17 +63,21 @@ struct PublisherFactory
|
||||
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, AllocatorT>.
|
||||
template<typename MessageT, typename AllocatorT, typename PublisherT>
|
||||
PublisherFactory
|
||||
create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
|
||||
create_publisher_factory(
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
{
|
||||
PublisherFactory factory {
|
||||
// factory function that creates a MessageT specific PublisherT
|
||||
[options](
|
||||
[options, type_support](
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos
|
||||
) -> std::shared_ptr<PublisherT>
|
||||
{
|
||||
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
|
||||
auto publisher = std::make_shared<PublisherT>(
|
||||
node_base, topic_name, qos, options,
|
||||
type_support);
|
||||
// This is used for setting up things like intra process comms which
|
||||
// require this->shared_from_this() which cannot be called from
|
||||
// the constructor.
|
||||
|
||||
123
rclcpp/include/rclcpp/serialization.hpp
Normal file
123
rclcpp/include/rclcpp/serialization.hpp
Normal file
@@ -0,0 +1,123 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__SERIALIZATION_HPP_
|
||||
#define RCLCPP__SERIALIZATION_HPP_
|
||||
|
||||
#include <rmw/rmw.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
/// Interface to (de)serialize a message
|
||||
class SerializationBase
|
||||
{
|
||||
public:
|
||||
virtual ~SerializationBase() = default;
|
||||
|
||||
/// Serialize a ROS2 message to a serialized stream
|
||||
/**
|
||||
* \param[in] message The ROS2 message which is read and serialized by rmw.
|
||||
*/
|
||||
virtual std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) = 0;
|
||||
|
||||
/// Deserialize a serialized stream to a ROS message
|
||||
/**
|
||||
* \param[in] serialized_message The serialized message to be converted to ROS2 by rmw.
|
||||
* \param[out] message The deserialized ROS2 message.
|
||||
*/
|
||||
virtual void deserialize_message(
|
||||
const rcl_serialized_message_t * serialized_message,
|
||||
void * msg) = 0;
|
||||
};
|
||||
|
||||
/// Default implementation to (de)serialize a message by using rmw_(de)serialize
|
||||
class Serialization : public SerializationBase
|
||||
{
|
||||
public:
|
||||
Serialization(
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rcutils_allocator_t allocator = rcutils_get_default_allocator())
|
||||
: type_support_(type_support), rcutils_allocator_(allocator)
|
||||
{}
|
||||
|
||||
std::shared_ptr<rcl_serialized_message_t> serialize_message(const void * message) override
|
||||
{
|
||||
auto serialized_message = new rcl_serialized_message_t;
|
||||
*serialized_message = rmw_get_zero_initialized_serialized_message();
|
||||
const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_);
|
||||
if (ret != RCUTILS_RET_OK) {
|
||||
throw std::runtime_error(
|
||||
"Error allocating resources for serialized message: " +
|
||||
std::string(rcutils_get_error_string().str));
|
||||
}
|
||||
|
||||
if (nullptr == message) {
|
||||
delete serialized_message;
|
||||
throw std::runtime_error("Message is nullpointer while serialization.");
|
||||
}
|
||||
|
||||
const auto error = rmw_serialize(
|
||||
message,
|
||||
&type_support_,
|
||||
serialized_message);
|
||||
if (error != RCL_RET_OK) {
|
||||
delete serialized_message;
|
||||
throw std::runtime_error("Failed to serialize.");
|
||||
}
|
||||
|
||||
auto shared_serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
|
||||
serialized_message,
|
||||
[](rcl_serialized_message_t * msg) {
|
||||
auto fini_ret = rmw_serialized_message_fini(msg);
|
||||
delete msg;
|
||||
if (fini_ret != RCL_RET_OK) {
|
||||
RCUTILS_LOG_ERROR_NAMED(
|
||||
"rclcpp",
|
||||
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
|
||||
}
|
||||
});
|
||||
|
||||
return shared_serialized_msg;
|
||||
}
|
||||
|
||||
void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override
|
||||
{
|
||||
if (nullptr == serialized_message ||
|
||||
serialized_message->buffer_capacity == 0 ||
|
||||
serialized_message->buffer_length == 0 ||
|
||||
!serialized_message->buffer)
|
||||
{
|
||||
throw std::runtime_error("Failed to deserialize nullptr serialized message.");
|
||||
}
|
||||
|
||||
const auto ret = rmw_deserialize(serialized_message, &type_support_, msg);
|
||||
if (ret != RMW_RET_OK) {
|
||||
throw std::runtime_error("Failed to deserialize serialized message.");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
rosidl_message_type_support_t type_support_;
|
||||
rcutils_allocator_t rcutils_allocator_;
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__SERIALIZATION_HPP_
|
||||
80
rclcpp/include/rclcpp/serialized_message.hpp
Normal file
80
rclcpp/include/rclcpp/serialized_message.hpp
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
#define RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
|
||||
#include <rclcpp/exceptions.hpp>
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include "rcutils/logging_macros.h"
|
||||
|
||||
#include "rmw/serialized_message.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
|
||||
class SerializedMessage : public rcl_serialized_message_t
|
||||
{
|
||||
public:
|
||||
SerializedMessage()
|
||||
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
|
||||
{}
|
||||
|
||||
explicit SerializedMessage(const SerializedMessage & serialized_message)
|
||||
: SerializedMessage(static_cast<const rcl_serialized_message_t>(serialized_message))
|
||||
{}
|
||||
|
||||
explicit SerializedMessage(const rcl_serialized_message_t & serialized_message)
|
||||
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
|
||||
{
|
||||
const auto ret = rmw_serialized_message_init(
|
||||
this, serialized_message.buffer_length,
|
||||
&serialized_message.allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
|
||||
// do not call memcpy if the pointer is "static"
|
||||
if (buffer != serialized_message.buffer) {
|
||||
std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length);
|
||||
}
|
||||
buffer_length = serialized_message.buffer_length;
|
||||
}
|
||||
|
||||
explicit SerializedMessage(rcl_serialized_message_t && msg)
|
||||
: rcl_serialized_message_t(msg)
|
||||
{
|
||||
// reset buffer to prevent double free
|
||||
msg = rmw_get_zero_initialized_serialized_message();
|
||||
}
|
||||
|
||||
~SerializedMessage()
|
||||
{
|
||||
if (nullptr != buffer) {
|
||||
const auto fini_ret = rmw_serialized_message_fini(this);
|
||||
if (fini_ret != RCL_RET_OK) {
|
||||
RCUTILS_LOG_ERROR_NAMED(
|
||||
"rclcpp",
|
||||
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "rclcpp/message_info.hpp"
|
||||
#include "rclcpp/message_memory_strategy.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/subscription_base.hpp"
|
||||
#include "rclcpp/subscription_options.hpp"
|
||||
#include "rclcpp/subscription_traits.hpp"
|
||||
@@ -155,29 +156,84 @@ public:
|
||||
"intraprocess communication allowed only with volatile durability");
|
||||
}
|
||||
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto context = node_base->get_context();
|
||||
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
|
||||
CallbackMessageT,
|
||||
AllocatorT,
|
||||
typename MessageUniquePtr::deleter_type>;
|
||||
auto subscription_intra_process = std::make_shared<SubscriptionIntraProcessT>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
|
||||
qos_profile,
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback));
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
using rclcpp::experimental::IntraProcessManager;
|
||||
uint64_t intra_process_subscription_id;
|
||||
uint64_t intra_process_subscription_id_serialized;
|
||||
|
||||
auto context = node_base->get_context();
|
||||
auto ipm = context->get_sub_context<IntraProcessManager>();
|
||||
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
|
||||
this->setup_intra_process(intra_process_subscription_id, ipm);
|
||||
|
||||
{
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto subscription_intra_process = std::make_shared<
|
||||
rclcpp::experimental::SubscriptionIntraProcess<
|
||||
CallbackMessageT,
|
||||
AllocatorT,
|
||||
typename MessageUniquePtr::deleter_type
|
||||
>>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get it by the fully-qualified name
|
||||
qos.get_rmw_qos_profile(),
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
|
||||
std::make_shared<rclcpp::Serialization>(
|
||||
type_support_handle,
|
||||
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
|
||||
);
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
|
||||
}
|
||||
|
||||
{
|
||||
using SerializedMessageAllocatorTraits =
|
||||
allocator::AllocRebind<rclcpp::SerializedMessage,
|
||||
AllocatorT>;
|
||||
using SerializedMessageAllocator =
|
||||
typename SerializedMessageAllocatorTraits::allocator_type;
|
||||
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
|
||||
rclcpp::SerializedMessage>;
|
||||
using SerializedMessageUniquePtr =
|
||||
std::unique_ptr<rclcpp::SerializedMessage,
|
||||
SerializedMessageDeleter>;
|
||||
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto subscription_intra_process = std::make_shared<
|
||||
rclcpp::experimental::SubscriptionIntraProcess<
|
||||
rclcpp::SerializedMessage,
|
||||
AllocatorT,
|
||||
typename SerializedMessageUniquePtr::deleter_type,
|
||||
CallbackMessageT
|
||||
>>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get it by the fully-qualified name
|
||||
qos.get_rmw_qos_profile(),
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
|
||||
std::make_shared<rclcpp::Serialization>(
|
||||
type_support_handle,
|
||||
options.template to_rcl_subscription_options<CallbackMessageT>(qos).allocator)
|
||||
);
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
intra_process_subscription_id_serialized = ipm->add_subscription(
|
||||
subscription_intra_process,
|
||||
true);
|
||||
}
|
||||
|
||||
this->setup_intra_process(
|
||||
{intra_process_subscription_id,
|
||||
intra_process_subscription_id_serialized}, ipm);
|
||||
}
|
||||
|
||||
TRACEPOINT(
|
||||
|
||||
@@ -228,13 +228,13 @@ public:
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
setup_intra_process(
|
||||
uint64_t intra_process_subscription_id,
|
||||
const std::vector<uint64_t> & intra_process_subscription_ids,
|
||||
IntraProcessManagerWeakPtr weak_ipm);
|
||||
|
||||
/// Return the waitable for intra-process, or nullptr if intra-process is not setup.
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::Waitable::SharedPtr
|
||||
get_intra_process_waitable() const;
|
||||
std::vector<rclcpp::Waitable::SharedPtr>
|
||||
get_intra_process_waitables() const;
|
||||
|
||||
/// Exchange state of whether or not a part of the subscription is used by a wait set.
|
||||
/**
|
||||
@@ -286,7 +286,7 @@ protected:
|
||||
|
||||
bool use_intra_process_;
|
||||
IntraProcessManagerWeakPtr weak_ipm_;
|
||||
uint64_t intra_process_subscription_id_;
|
||||
std::vector<uint64_t> intra_process_subscription_ids_;
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(SubscriptionBase)
|
||||
|
||||
@@ -78,7 +78,8 @@ SubscriptionFactory
|
||||
create_subscription_factory(
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
{
|
||||
auto allocator = options.get_allocator();
|
||||
|
||||
@@ -88,7 +89,7 @@ create_subscription_factory(
|
||||
|
||||
SubscriptionFactory factory {
|
||||
// factory function that creates a MessageT specific SubscriptionT
|
||||
[options, msg_mem_strat, any_subscription_callback](
|
||||
[options, msg_mem_strat, any_subscription_callback, type_support](
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos
|
||||
@@ -99,7 +100,7 @@ create_subscription_factory(
|
||||
|
||||
auto sub = Subscription<CallbackMessageT, AllocatorT>::make_shared(
|
||||
node_base,
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
|
||||
type_support,
|
||||
topic_name,
|
||||
qos,
|
||||
any_subscription_callback,
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "rclcpp/function_traits.hpp"
|
||||
#include "rclcpp/subscription_options.hpp"
|
||||
#include "rcl/types.h"
|
||||
|
||||
namespace rclcpp
|
||||
@@ -75,6 +76,7 @@ struct extract_message_type<std::unique_ptr<MessageT, Deleter>>: extract_message
|
||||
|
||||
template<
|
||||
typename CallbackT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
// Do not attempt if CallbackT is an integer (mistaken for depth)
|
||||
typename = std::enable_if_t<!std::is_integral<
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
|
||||
@@ -85,6 +87,10 @@ template<
|
||||
// Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile)
|
||||
typename = std::enable_if_t<!std::is_same<
|
||||
rmw_qos_profile_t,
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
|
||||
// Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator
|
||||
typename = std::enable_if_t<!std::is_same<
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>,
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>
|
||||
>
|
||||
struct has_message_type : extract_message_type<
|
||||
|
||||
@@ -165,18 +165,19 @@ public:
|
||||
}
|
||||
if (mask.include_intra_process_waitable) {
|
||||
auto local_subscription = inner_subscription;
|
||||
auto waitable = inner_subscription->get_intra_process_waitable();
|
||||
if (nullptr != waitable) {
|
||||
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
|
||||
waitable.get(),
|
||||
true);
|
||||
if (already_in_use) {
|
||||
throw std::runtime_error(
|
||||
"subscription intra-process waitable already associated with a wait set");
|
||||
for (auto waitable : inner_subscription->get_intra_process_waitables()) {
|
||||
if (nullptr != waitable) {
|
||||
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
|
||||
waitable.get(),
|
||||
true);
|
||||
if (already_in_use) {
|
||||
throw std::runtime_error(
|
||||
"subscription intra-process waitable already associated with a wait set");
|
||||
}
|
||||
this->storage_add_waitable(
|
||||
std::move(waitable),
|
||||
std::move(local_subscription));
|
||||
}
|
||||
this->storage_add_waitable(
|
||||
std::move(inner_subscription->get_intra_process_waitable()),
|
||||
std::move(local_subscription));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -230,11 +231,12 @@ public:
|
||||
}
|
||||
}
|
||||
if (mask.include_intra_process_waitable) {
|
||||
auto local_waitable = inner_subscription->get_intra_process_waitable();
|
||||
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
|
||||
if (nullptr != local_waitable) {
|
||||
// This is the case when intra process is disabled for the subscription.
|
||||
this->storage_remove_waitable(std::move(local_waitable));
|
||||
for (auto local_waitable : inner_subscription->get_intra_process_waitables()) {
|
||||
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
|
||||
if (nullptr != local_waitable) {
|
||||
// This is the case when intra process is disabled for the subscription.
|
||||
this->storage_remove_waitable(std::move(local_waitable));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -32,7 +32,9 @@ IntraProcessManager::~IntraProcessManager()
|
||||
{}
|
||||
|
||||
uint64_t
|
||||
IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
|
||||
IntraProcessManager::add_publisher(
|
||||
rclcpp::PublisherBase::SharedPtr publisher,
|
||||
const bool is_serialized)
|
||||
{
|
||||
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
@@ -41,6 +43,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
|
||||
publishers_[id].publisher = publisher;
|
||||
publishers_[id].topic_name = publisher->get_topic_name();
|
||||
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
|
||||
publishers_[id].is_serialized = is_serialized;
|
||||
|
||||
// Initialize the subscriptions storage for this publisher.
|
||||
pub_to_subs_[id] = SplittedSubscriptions();
|
||||
@@ -56,7 +59,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
|
||||
}
|
||||
|
||||
uint64_t
|
||||
IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
|
||||
IntraProcessManager::add_subscription(
|
||||
SubscriptionIntraProcessBase::SharedPtr subscription,
|
||||
const bool is_serialized)
|
||||
{
|
||||
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
@@ -66,6 +71,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su
|
||||
subscriptions_[id].topic_name = subscription->get_topic_name();
|
||||
subscriptions_[id].qos = subscription->get_actual_qos();
|
||||
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
|
||||
subscriptions_[id].is_serialized = is_serialized;
|
||||
|
||||
// adds the subscription id to all the matchable publishers
|
||||
for (auto & pair : publishers_) {
|
||||
@@ -220,6 +226,13 @@ IntraProcessManager::can_communicate(
|
||||
return false;
|
||||
}
|
||||
|
||||
// a publisher and a subscription with different content type can't communicate
|
||||
// if is_serialized is true, the expected message typ is rcl_serialized_message_t
|
||||
// otherwise the templated ROS2 message type
|
||||
if (sub_info.is_serialized != pub_info.is_serialized) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -99,10 +99,12 @@ NodeTopics::add_subscription(
|
||||
callback_group->add_waitable(subscription_event);
|
||||
}
|
||||
|
||||
auto intra_process_waitable = subscription->get_intra_process_waitable();
|
||||
if (nullptr != intra_process_waitable) {
|
||||
// Add to the callback group to be notified about intra-process msgs.
|
||||
callback_group->add_waitable(intra_process_waitable);
|
||||
const auto intra_process_waitables = subscription->get_intra_process_waitables();
|
||||
for (auto & intra_process_waitable : intra_process_waitables) {
|
||||
if (nullptr != intra_process_waitable) {
|
||||
// Add to the callback group to be notified about intra-process msgs.
|
||||
callback_group->add_waitable(intra_process_waitable);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify the executor that a new subscription was created using the parent Node.
|
||||
|
||||
@@ -44,7 +44,8 @@ PublisherBase::PublisherBase(
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rcl_publisher_options_t & publisher_options)
|
||||
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
|
||||
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
|
||||
intra_process_is_enabled_(false), intra_process_publisher_id_(0),
|
||||
intra_process_publisher_id_serialized_(0)
|
||||
{
|
||||
rcl_ret_t ret = rcl_publisher_init(
|
||||
&publisher_handle_,
|
||||
@@ -105,6 +106,7 @@ PublisherBase::~PublisherBase()
|
||||
return;
|
||||
}
|
||||
ipm->remove_publisher(intra_process_publisher_id_);
|
||||
ipm->remove_publisher(intra_process_publisher_id_serialized_);
|
||||
}
|
||||
|
||||
const char *
|
||||
@@ -238,9 +240,11 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
|
||||
void
|
||||
PublisherBase::setup_intra_process(
|
||||
uint64_t intra_process_publisher_id,
|
||||
uint64_t intra_process_publisher_id_serialized,
|
||||
IntraProcessManagerSharedPtr ipm)
|
||||
{
|
||||
intra_process_publisher_id_ = intra_process_publisher_id;
|
||||
intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized;
|
||||
weak_ipm_ = ipm;
|
||||
intra_process_is_enabled_ = true;
|
||||
}
|
||||
|
||||
@@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase(
|
||||
: node_base_(node_base),
|
||||
node_handle_(node_base_->get_shared_rcl_node_handle()),
|
||||
use_intra_process_(false),
|
||||
intra_process_subscription_id_(0),
|
||||
type_support_(type_support_handle),
|
||||
is_serialized_(is_serialized)
|
||||
{
|
||||
@@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase()
|
||||
"Intra process manager died before than a subscription.");
|
||||
return;
|
||||
}
|
||||
ipm->remove_subscription(intra_process_subscription_id_);
|
||||
for (auto intra_process_subscription_id : intra_process_subscription_ids_) {
|
||||
ipm->remove_subscription(intra_process_subscription_id);
|
||||
}
|
||||
}
|
||||
|
||||
const char *
|
||||
@@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const
|
||||
|
||||
void
|
||||
SubscriptionBase::setup_intra_process(
|
||||
uint64_t intra_process_subscription_id,
|
||||
const std::vector<uint64_t> & intra_process_subscription_ids,
|
||||
IntraProcessManagerWeakPtr weak_ipm)
|
||||
{
|
||||
intra_process_subscription_id_ = intra_process_subscription_id;
|
||||
intra_process_subscription_ids_ = intra_process_subscription_ids;
|
||||
weak_ipm_ = weak_ipm;
|
||||
use_intra_process_ = true;
|
||||
}
|
||||
@@ -218,23 +219,29 @@ SubscriptionBase::can_loan_messages() const
|
||||
return rcl_subscription_can_loan_messages(subscription_handle_.get());
|
||||
}
|
||||
|
||||
rclcpp::Waitable::SharedPtr
|
||||
SubscriptionBase::get_intra_process_waitable() const
|
||||
std::vector<rclcpp::Waitable::SharedPtr>
|
||||
SubscriptionBase::get_intra_process_waitables() const
|
||||
{
|
||||
// If not using intra process, shortcut to nullptr.
|
||||
if (!use_intra_process_) {
|
||||
return nullptr;
|
||||
return std::vector<rclcpp::Waitable::SharedPtr>();
|
||||
}
|
||||
// Get the intra process manager.
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
throw std::runtime_error(
|
||||
"SubscriptionBase::get_intra_process_waitable() called "
|
||||
"SubscriptionBase::get_intra_process_waitables() called "
|
||||
"after destruction of intra process manager");
|
||||
}
|
||||
|
||||
// Use the id to retrieve the subscription intra-process from the intra-process manager.
|
||||
return ipm->get_subscription_intra_process(intra_process_subscription_id_);
|
||||
std::vector<rclcpp::Waitable::SharedPtr> waitables(intra_process_subscription_ids_.size());
|
||||
|
||||
for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) {
|
||||
// Use the id to retrieve the subscription intra-process from the intra-process manager.
|
||||
waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]);
|
||||
}
|
||||
|
||||
return waitables;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -274,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
|
||||
if (this == pointer_to_subscription_part) {
|
||||
return subscription_in_use_by_wait_set_.exchange(in_use_state);
|
||||
}
|
||||
if (get_intra_process_waitable().get() == pointer_to_subscription_part) {
|
||||
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
|
||||
for (auto & waitable : get_intra_process_waitables()) {
|
||||
if (waitable.get() == pointer_to_subscription_part) {
|
||||
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
|
||||
}
|
||||
}
|
||||
for (const auto & qos_event : event_handlers_) {
|
||||
if (qos_event.get() == pointer_to_subscription_part) {
|
||||
|
||||
367
rclcpp/test/test_intra_process_communication.cpp
Normal file
367
rclcpp/test/test_intra_process_communication.cpp
Normal file
@@ -0,0 +1,367 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <test_msgs/message_fixtures.hpp>
|
||||
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/publisher.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
|
||||
int32_t & get_test_allocation_counter()
|
||||
{
|
||||
static int32_t counter = 0;
|
||||
return counter;
|
||||
}
|
||||
|
||||
void * custom_allocate(size_t size, void * state)
|
||||
{
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
++get_test_allocation_counter();
|
||||
auto r = m_allocator.allocate(size, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state)
|
||||
{
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
++get_test_allocation_counter();
|
||||
auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void * custom_reallocate(void * pointer, size_t size, void * state)
|
||||
{
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
if (pointer == nullptr) {
|
||||
++get_test_allocation_counter();
|
||||
}
|
||||
|
||||
auto r = m_allocator.reallocate(pointer, size, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void custom_deallocate(void * pointer, void * state)
|
||||
{
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
--get_test_allocation_counter();
|
||||
m_allocator.deallocate(pointer, state);
|
||||
}
|
||||
|
||||
rcl_serialized_message_t make_serialized_string_msg(
|
||||
const std::shared_ptr<test_msgs::msg::Strings> & stringMsg)
|
||||
{
|
||||
auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
// add custom (de)allocator to count the references to the object
|
||||
m_allocator.allocate = &custom_allocate;
|
||||
m_allocator.deallocate = &custom_deallocate;
|
||||
m_allocator.reallocate = &custom_reallocate;
|
||||
m_allocator.zero_allocate = &custom_zero_allocate;
|
||||
|
||||
rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message();
|
||||
auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
|
||||
static auto type =
|
||||
rosidl_typesupport_cpp::get_message_type_support_handle
|
||||
<test_msgs::msg::Strings>();
|
||||
auto error = rmw_serialize(stringMsg.get(), type, &msg);
|
||||
if (error != RMW_RET_OK) {
|
||||
RCUTILS_LOG_ERROR_NAMED(
|
||||
"test_intra_process_communication",
|
||||
"Something went wrong preparing the serialized message");
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameterized test.
|
||||
* The first param are the NodeOptions used to create the nodes.
|
||||
* The second param are the expected intraprocess count results.
|
||||
*/
|
||||
struct TestParameters
|
||||
{
|
||||
rclcpp::NodeOptions node_options[2];
|
||||
uint64_t intraprocess_count_results[2];
|
||||
size_t runs;
|
||||
std::string description;
|
||||
};
|
||||
|
||||
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
|
||||
{
|
||||
out << params.description;
|
||||
return out;
|
||||
}
|
||||
|
||||
class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam<TestParameters>
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
if (!rclcpp::ok()) {
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
static std::chrono::milliseconds offset;
|
||||
};
|
||||
|
||||
std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds(
|
||||
250);
|
||||
std::array<uint32_t, 2> counts;
|
||||
|
||||
void OnMessageSerialized(const std::shared_ptr<const rcl_serialized_message_t> msg)
|
||||
{
|
||||
EXPECT_NE(msg->buffer, nullptr);
|
||||
EXPECT_GT(msg->buffer_capacity, 0u);
|
||||
|
||||
++counts[0];
|
||||
}
|
||||
|
||||
void OnMessageConst(std::shared_ptr<const test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value.back(), '9');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
void OnMessageUniquePtr(std::unique_ptr<test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value.back(), '9');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
void OnMessage(std::shared_ptr<test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value.back(), '9');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
TEST_P(TestPublisherSubscriptionSerialized, publish_serialized)
|
||||
{
|
||||
get_test_allocation_counter() = 0;
|
||||
|
||||
TestParameters parameters = GetParam();
|
||||
{
|
||||
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
|
||||
"my_node",
|
||||
"/ns",
|
||||
parameters.node_options[0]);
|
||||
auto publisher = node->create_publisher<test_msgs::msg::Strings>("/topic", 10);
|
||||
|
||||
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessage);
|
||||
auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageUniquePtr);
|
||||
auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageConst);
|
||||
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageSerialized);
|
||||
|
||||
rclcpp::sleep_for(offset);
|
||||
|
||||
counts.fill(0);
|
||||
auto stringMsg = get_messages_strings()[3];
|
||||
|
||||
for (size_t i = 0; i < parameters.runs; i++) {
|
||||
auto msg0 = make_serialized_string_msg(stringMsg);
|
||||
|
||||
std::unique_ptr<test_msgs::msg::Strings> stringMsgU(
|
||||
new test_msgs::msg::Strings(
|
||||
*stringMsg));
|
||||
|
||||
publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
|
||||
publisher->publish(*stringMsg);
|
||||
publisher->publish(std::move(stringMsgU));
|
||||
}
|
||||
for (uint32_t i = 0; i < 3; ++i) {
|
||||
rclcpp::spin_some(node);
|
||||
rclcpp::sleep_for(offset);
|
||||
}
|
||||
|
||||
rclcpp::spin_some(node);
|
||||
}
|
||||
|
||||
if (parameters.runs == 1) {
|
||||
EXPECT_EQ(counts[0], 3u);
|
||||
EXPECT_EQ(counts[1], 9u);
|
||||
}
|
||||
|
||||
EXPECT_EQ(get_test_allocation_counter(), 0);
|
||||
}
|
||||
|
||||
TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic)
|
||||
{
|
||||
get_test_allocation_counter() = 0;
|
||||
|
||||
TestParameters parameters = GetParam();
|
||||
{
|
||||
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
|
||||
"my_node",
|
||||
"/ns",
|
||||
parameters.node_options[0]);
|
||||
auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
|
||||
node,
|
||||
"/topic",
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
|
||||
rclcpp::QoS(10));
|
||||
|
||||
auto sub_gen_serialized = rclcpp::create_subscription<rcl_serialized_message_t>(
|
||||
node,
|
||||
"/topic",
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
|
||||
rclcpp::QoS(10),
|
||||
&OnMessageSerialized);
|
||||
|
||||
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessage);
|
||||
auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageUniquePtr);
|
||||
auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageConst);
|
||||
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageSerialized);
|
||||
|
||||
rclcpp::sleep_for(offset);
|
||||
|
||||
counts.fill(0);
|
||||
auto stringMsg = get_messages_strings()[3];
|
||||
|
||||
for (size_t i = 0; i < parameters.runs; i++) {
|
||||
auto msg0 = make_serialized_string_msg(stringMsg);
|
||||
|
||||
publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
|
||||
}
|
||||
rclcpp::spin_some(node);
|
||||
rclcpp::sleep_for(offset);
|
||||
|
||||
rclcpp::spin_some(node);
|
||||
}
|
||||
|
||||
if (parameters.runs == 1) {
|
||||
EXPECT_EQ(counts[0], 2u);
|
||||
EXPECT_EQ(counts[1], 3u);
|
||||
}
|
||||
|
||||
EXPECT_EQ(get_test_allocation_counter(), 0);
|
||||
}
|
||||
|
||||
auto get_new_context()
|
||||
{
|
||||
auto context = rclcpp::Context::make_shared();
|
||||
context->init(0, nullptr);
|
||||
return context;
|
||||
}
|
||||
|
||||
std::vector<TestParameters> parameters = {
|
||||
/*
|
||||
Testing publisher subscription count api and internal process subscription count.
|
||||
Two subscriptions in the same topic, both using intraprocess comm.
|
||||
*/
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true),
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true)
|
||||
},
|
||||
{1u, 2u},
|
||||
1,
|
||||
"two_subscriptions_intraprocess_comm"
|
||||
},
|
||||
/*
|
||||
Testing publisher subscription count api and internal process subscription count.
|
||||
Two subscriptions, one using intra-process comm and the other not using it.
|
||||
*/
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true),
|
||||
rclcpp::NodeOptions().use_intra_process_comms(false)
|
||||
},
|
||||
{1u, 1u},
|
||||
1,
|
||||
"two_subscriptions_one_intraprocess_one_not"
|
||||
},
|
||||
/*
|
||||
Testing publisher subscription count api and internal process subscription count.
|
||||
Two contexts, both using intra-process.
|
||||
*/
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true),
|
||||
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true)
|
||||
},
|
||||
{1u, 1u},
|
||||
1,
|
||||
"two_subscriptions_in_two_contexts_with_intraprocess_comm"
|
||||
},
|
||||
/*
|
||||
Testing publisher subscription count api and internal process subscription count.
|
||||
Two contexts, both of them not using intra-process comm.
|
||||
*/
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(false),
|
||||
rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false)
|
||||
},
|
||||
{0u, 0u},
|
||||
1,
|
||||
"two_subscriptions_in_two_contexts_without_intraprocess_comm"
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<TestParameters> setRuns(const std::vector<TestParameters> & in, const size_t runs)
|
||||
{
|
||||
std::vector<TestParameters> out = in;
|
||||
for (auto & p : out) {
|
||||
p.runs = runs;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized,
|
||||
::testing::ValuesIn(parameters),
|
||||
::testing::PrintToStringParamName());
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized,
|
||||
::testing::ValuesIn(setRuns(parameters, 1000)),
|
||||
::testing::PrintToStringParamName());
|
||||
@@ -68,6 +68,18 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
LifecyclePublisher(
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<Alloc> & options,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
: rclcpp::Publisher<MessageT, Alloc>(node_base, topic, qos, options, type_support),
|
||||
enabled_(false),
|
||||
logger_(rclcpp::get_logger("LifecyclePublisher"))
|
||||
{
|
||||
}
|
||||
|
||||
~LifecyclePublisher() {}
|
||||
|
||||
/// LifecyclePublisher publish function
|
||||
@@ -110,6 +122,30 @@ public:
|
||||
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
|
||||
}
|
||||
|
||||
/// Publish a serialized message. Non specialized version to prevent compiling errors.
|
||||
template<typename TDeleter, typename T>
|
||||
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
|
||||
{
|
||||
(void)serialized_msg;
|
||||
throw std::runtime_error(
|
||||
"publishing unique_ptr with custom deleter only supported for serialized messages");
|
||||
}
|
||||
|
||||
/// Publish a serialized message.
|
||||
template<typename TDeleter>
|
||||
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
|
||||
{
|
||||
if (!enabled_) {
|
||||
RCLCPP_WARN(
|
||||
logger_,
|
||||
"Trying to publish message on the topic '%s', but the publisher is not activated",
|
||||
this->get_topic_name());
|
||||
|
||||
return;
|
||||
}
|
||||
this->do_serialized_publish(*serialized_msg);
|
||||
}
|
||||
|
||||
virtual void
|
||||
on_activate()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user