Compare commits
53 Commits
runtime_in
...
karsten/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82fa5bb5ed | ||
|
|
4dfcc5f7a7 | ||
|
|
e0fb0a4f5c | ||
|
|
dd6564518d | ||
|
|
d89ef5098b | ||
|
|
17f2d32250 | ||
|
|
7f04a19cb8 | ||
|
|
d6bf9833b3 | ||
|
|
1cd811a412 | ||
|
|
241164204d | ||
|
|
1f8226a6f3 | ||
|
|
511476c445 | ||
|
|
408343fb96 | ||
|
|
ff7a81f5dd | ||
|
|
4fa5f090ed | ||
|
|
ab73a62297 | ||
|
|
5472d363b1 | ||
|
|
35fd4908f8 | ||
|
|
93dc82699b | ||
|
|
b482164b97 | ||
|
|
a2fc8bd867 | ||
|
|
9b436fa777 | ||
|
|
2147026fbf | ||
|
|
2cefdca297 | ||
|
|
4c7506f5aa | ||
|
|
18181f5d8d | ||
|
|
9b65b7cefd | ||
|
|
657d9a0be4 | ||
|
|
7cff1aa85b | ||
|
|
d4536b3036 | ||
|
|
d74c4edac1 | ||
|
|
d8f1da9c88 | ||
|
|
4e1e744624 | ||
|
|
2ddbe32469 | ||
|
|
4c80594a6f | ||
|
|
bc228ee032 | ||
|
|
0c83e4981d | ||
|
|
af14ee76be | ||
|
|
d7a84eaa66 | ||
|
|
a3db0069b6 | ||
|
|
5faf8f40e7 | ||
|
|
02921427f7 | ||
|
|
56b520eb95 | ||
|
|
f9ef2c2fd5 | ||
|
|
29fea926af | ||
|
|
c8522b3cd6 | ||
|
|
9d7ef52885 | ||
|
|
86bd3a3c80 | ||
|
|
a300c2f4ae | ||
|
|
9f1c10f1c4 | ||
|
|
f3aab4f6c5 | ||
|
|
c8162ba861 | ||
|
|
9b73845f89 |
@@ -77,6 +77,8 @@ set(${PROJECT_NAME}_SRCS
|
||||
src/rclcpp/publisher_base.cpp
|
||||
src/rclcpp/qos.cpp
|
||||
src/rclcpp/qos_event.cpp
|
||||
src/rclcpp/serialization.cpp
|
||||
src/rclcpp/serialized_message.cpp
|
||||
src/rclcpp/service.cpp
|
||||
src/rclcpp/signal_handler.cpp
|
||||
src/rclcpp/subscription_base.cpp
|
||||
@@ -245,6 +247,18 @@ if(BUILD_TESTING)
|
||||
"test_msgs"
|
||||
)
|
||||
target_link_libraries(test_loaned_message ${PROJECT_NAME})
|
||||
ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp
|
||||
TIMEOUT 120)
|
||||
if(TARGET test_intra_process_communication)
|
||||
ament_target_dependencies(test_intra_process_communication
|
||||
"rcl"
|
||||
"rcl_interfaces"
|
||||
"rmw"
|
||||
"rosidl_typesupport_cpp"
|
||||
"test_msgs"
|
||||
)
|
||||
endif()
|
||||
target_link_libraries(test_intra_process_communication ${PROJECT_NAME})
|
||||
|
||||
ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240)
|
||||
if(TARGET test_node)
|
||||
@@ -393,6 +407,15 @@ if(BUILD_TESTING)
|
||||
${PROJECT_NAME}
|
||||
)
|
||||
endif()
|
||||
ament_add_gtest(test_serialized_message test/test_serialized_message.cpp)
|
||||
if(TARGET test_serialized_message)
|
||||
ament_target_dependencies(test_serialized_message
|
||||
test_msgs
|
||||
)
|
||||
target_link_libraries(test_serialized_message
|
||||
${PROJECT_NAME}
|
||||
)
|
||||
endif()
|
||||
ament_add_gtest(test_service test/test_service.cpp)
|
||||
if(TARGET test_service)
|
||||
ament_target_dependencies(test_service
|
||||
|
||||
@@ -43,6 +43,7 @@ std::shared_ptr<PublisherT>
|
||||
create_publisher(
|
||||
NodeT & node,
|
||||
const std::string & topic_name,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
|
||||
@@ -56,7 +57,7 @@ create_publisher(
|
||||
// Create the publisher.
|
||||
auto pub = node_topics->create_publisher(
|
||||
topic_name,
|
||||
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
|
||||
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options, type_support),
|
||||
qos
|
||||
);
|
||||
|
||||
@@ -66,6 +67,27 @@ create_publisher(
|
||||
return std::dynamic_pointer_cast<PublisherT>(pub);
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
|
||||
typename NodeT>
|
||||
std::shared_ptr<PublisherT>
|
||||
create_publisher(
|
||||
NodeT & node,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
|
||||
)
|
||||
)
|
||||
{
|
||||
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
|
||||
|
||||
return create_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
|
||||
node, topic_name, type_support, qos, options);
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__CREATE_PUBLISHER_HPP_
|
||||
|
||||
@@ -51,14 +51,13 @@ typename std::shared_ptr<SubscriptionT>
|
||||
create_subscription(
|
||||
NodeT && node,
|
||||
const std::string & topic_name,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rclcpp::QoS & qos,
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
|
||||
),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
|
||||
MessageMemoryStrategyT::create_default()
|
||||
)
|
||||
)
|
||||
{
|
||||
using rclcpp::node_interfaces::get_node_topics_interface;
|
||||
@@ -67,7 +66,8 @@ create_subscription(
|
||||
auto factory = rclcpp::create_subscription_factory<MessageT>(
|
||||
std::forward<CallbackT>(callback),
|
||||
options,
|
||||
msg_mem_strat
|
||||
msg_mem_strat,
|
||||
type_support
|
||||
);
|
||||
|
||||
auto sub = node_topics->create_subscription(topic_name, factory, qos);
|
||||
@@ -76,6 +76,39 @@ create_subscription(
|
||||
return std::dynamic_pointer_cast<SubscriptionT>(sub);
|
||||
}
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename CallbackT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
typename CallbackMessageT =
|
||||
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
|
||||
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
|
||||
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
|
||||
CallbackMessageT,
|
||||
AllocatorT
|
||||
>,
|
||||
typename NodeT>
|
||||
typename std::shared_ptr<SubscriptionT>
|
||||
create_subscription(
|
||||
NodeT && node,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos,
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options =
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>(),
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat =
|
||||
MessageMemoryStrategyT::create_default()
|
||||
)
|
||||
{
|
||||
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
|
||||
|
||||
return create_subscription<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT,
|
||||
MessageMemoryStrategyT>(
|
||||
std::forward<NodeT>(
|
||||
node), topic_name, type_support, qos, std::forward<CallbackT>(
|
||||
callback), options, msg_mem_strat);
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_
|
||||
|
||||
@@ -138,7 +138,7 @@ public:
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
uint64_t
|
||||
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
|
||||
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized = false);
|
||||
|
||||
/// Unregister a publisher using the publisher's unique id.
|
||||
/**
|
||||
@@ -311,6 +311,7 @@ private:
|
||||
rmw_qos_profile_t qos;
|
||||
const char * topic_name;
|
||||
bool use_take_shared_method;
|
||||
bool is_serialized;
|
||||
};
|
||||
|
||||
struct PublisherInfo
|
||||
@@ -320,6 +321,7 @@ private:
|
||||
rclcpp::PublisherBase::WeakPtr publisher;
|
||||
rmw_qos_profile_t qos;
|
||||
const char * topic_name;
|
||||
bool is_serialized;
|
||||
};
|
||||
|
||||
struct SplittedSubscriptions
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
|
||||
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
|
||||
#include "rclcpp/serialization.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/type_support_decl.hpp"
|
||||
#include "rclcpp/waitable.hpp"
|
||||
#include "tracetools/tracetools.h"
|
||||
@@ -37,6 +39,8 @@ namespace rclcpp
|
||||
namespace experimental
|
||||
{
|
||||
|
||||
class SerializedMessage;
|
||||
|
||||
template<
|
||||
typename MessageT,
|
||||
typename Alloc = std::allocator<void>,
|
||||
@@ -51,6 +55,10 @@ public:
|
||||
using MessageAlloc = typename MessageAllocTraits::allocator_type;
|
||||
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
|
||||
using CallbackMessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
|
||||
using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type;
|
||||
using CallbackMessageUniquePtr = std::unique_ptr<CallbackMessageT>;
|
||||
using CallbackMessageSharedPtr = std::shared_ptr<CallbackMessageT>;
|
||||
|
||||
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
|
||||
MessageT,
|
||||
@@ -64,11 +72,15 @@ public:
|
||||
rclcpp::Context::SharedPtr context,
|
||||
const std::string & topic_name,
|
||||
rmw_qos_profile_t qos_profile,
|
||||
rclcpp::IntraProcessBufferType buffer_type)
|
||||
: SubscriptionIntraProcessBase(topic_name, qos_profile),
|
||||
any_callback_(callback)
|
||||
rclcpp::IntraProcessBufferType buffer_type,
|
||||
std::shared_ptr<SerializationBase> serializer,
|
||||
bool is_serialized = false)
|
||||
: SubscriptionIntraProcessBase(topic_name, qos_profile, is_serialized),
|
||||
any_callback_(callback), serializer_(serializer)
|
||||
{
|
||||
if (!std::is_same<MessageT, CallbackMessageT>::value) {
|
||||
if (!std::is_same<MessageT, CallbackMessageT>::value &&
|
||||
!std::is_base_of<rcl_serialized_message_t, MessageT>::value)
|
||||
{
|
||||
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
|
||||
}
|
||||
|
||||
@@ -142,18 +154,49 @@ private:
|
||||
(void)ret;
|
||||
}
|
||||
|
||||
// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
|
||||
template<typename T>
|
||||
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
|
||||
typename std::enable_if<
|
||||
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
|
||||
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
|
||||
fprintf(stderr, "tx ROS2, rx serialized\n");
|
||||
if (nullptr == serializer_) {
|
||||
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
|
||||
}
|
||||
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
ConstMessageSharedPtr msg = buffer_->consume_shared();
|
||||
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
|
||||
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()), serialized_msg.get());
|
||||
|
||||
if (0u == serialized_msg->buffer_length) {
|
||||
throw std::runtime_error("Subscription intra-process could not serialize message");
|
||||
}
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
any_callback_.dispatch_intra_process(std::static_pointer_cast<rcl_serialized_message_t>(serialized_msg), msg_info);
|
||||
} else {
|
||||
throw std::runtime_error(
|
||||
"Subscription intra-process for serialized "
|
||||
"messages does not support unique pointers.");
|
||||
}
|
||||
}
|
||||
|
||||
// forward from ROS2 message to ROS2 message (same type)
|
||||
template<class T>
|
||||
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
|
||||
typename std::enable_if<
|
||||
!std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx not a SerializedMessage
|
||||
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
rmw_message_info_t msg_info;
|
||||
fprintf(stderr, "tx ROS2, rx ROS2\n");
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.publisher_gid = {0, {0}};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
@@ -166,8 +209,83 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
// forward from rcl_serialized_message_t to rcl_serialized_message_t (no conversion needed)
|
||||
template<typename T>
|
||||
typename std::enable_if<
|
||||
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
|
||||
std::is_base_of<rcl_serialized_message_t, T>::value, // rx a SerializedMessage
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
fprintf(stderr, "tx serialize, rx serialize\n");
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
ConstMessageSharedPtr msg = buffer_->consume_shared();
|
||||
if (msg == nullptr) {
|
||||
throw std::runtime_error("Subscription intra-process could not get serialized message");
|
||||
}
|
||||
any_callback_.dispatch_intra_process(msg, msg_info);
|
||||
} else {
|
||||
throw std::runtime_error(
|
||||
"Subscription intra-process for serialized "
|
||||
"messages does not support unique pointers.");
|
||||
}
|
||||
}
|
||||
|
||||
// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
|
||||
template<class T>
|
||||
typename std::enable_if<
|
||||
std::is_base_of<rcl_serialized_message_t, MessageT>::value && // tx a SerializedMessage
|
||||
!std::is_base_of<rcl_serialized_message_t, T>::value, // rx not a SerializedMessage
|
||||
void>::type
|
||||
execute_impl()
|
||||
{
|
||||
fprintf(stderr, "tx rcl, rx ROS2\n");
|
||||
if (nullptr == serializer_) {
|
||||
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
|
||||
}
|
||||
|
||||
ConstMessageSharedPtr serialized_message = buffer_->consume_shared();
|
||||
if (nullptr == serialized_message) {
|
||||
throw std::runtime_error("Subscription intra-process could not get serialized message");
|
||||
}
|
||||
|
||||
rmw_message_info_t msg_info = {};
|
||||
msg_info.from_intra_process = true;
|
||||
|
||||
if (any_callback_.use_take_shared_method()) {
|
||||
CallbackMessageSharedPtr msg = construct_unique();
|
||||
serializer_->deserialize_message(
|
||||
serialized_message.get(),
|
||||
reinterpret_cast<void *>(msg.get()));
|
||||
any_callback_.dispatch_intra_process(msg, msg_info);
|
||||
} else {
|
||||
// construct unique might not initialize the ros message?
|
||||
CallbackMessageUniquePtr msg = construct_unique();
|
||||
serializer_->deserialize_message(
|
||||
serialized_message.get(),
|
||||
reinterpret_cast<void *>(msg.get()));
|
||||
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
|
||||
}
|
||||
}
|
||||
|
||||
CallbackMessageUniquePtr construct_unique()
|
||||
{
|
||||
CallbackMessageUniquePtr unique_msg;
|
||||
auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1);
|
||||
CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr);
|
||||
unique_msg = CallbackMessageUniquePtr(ptr);
|
||||
|
||||
return unique_msg;
|
||||
}
|
||||
|
||||
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
|
||||
BufferUniquePtr buffer_;
|
||||
std::shared_ptr<SerializationBase> serializer_;
|
||||
std::shared_ptr<CallbackMessageAlloc> message_allocator_ =
|
||||
std::make_shared<CallbackMessageAlloc>();
|
||||
};
|
||||
|
||||
} // namespace experimental
|
||||
|
||||
@@ -39,8 +39,11 @@ public:
|
||||
RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile)
|
||||
: topic_name_(topic_name), qos_profile_(qos_profile)
|
||||
SubscriptionIntraProcessBase(
|
||||
const std::string & topic_name,
|
||||
rmw_qos_profile_t qos_profile,
|
||||
bool is_serialized)
|
||||
: topic_name_(topic_name), qos_profile_(qos_profile), is_serialized_(is_serialized)
|
||||
{}
|
||||
|
||||
virtual ~SubscriptionIntraProcessBase() = default;
|
||||
@@ -70,6 +73,10 @@ public:
|
||||
rmw_qos_profile_t
|
||||
get_actual_qos() const;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_serialized() const;
|
||||
|
||||
protected:
|
||||
std::recursive_mutex reentrant_mutex_;
|
||||
rcl_guard_condition_t gc_;
|
||||
@@ -80,6 +87,7 @@ private:
|
||||
|
||||
std::string topic_name_;
|
||||
rmw_qos_profile_t qos_profile_;
|
||||
bool is_serialized_;
|
||||
};
|
||||
|
||||
} // namespace experimental
|
||||
|
||||
@@ -56,6 +56,12 @@ public:
|
||||
using MessageDeleter = allocator::Deleter<MessageAllocator, MessageT>;
|
||||
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
|
||||
using MessageSharedPtr = std::shared_ptr<const MessageT>;
|
||||
using SerializedMessageAllocatorTraits =
|
||||
allocator::AllocRebind<rclcpp::SerializedMessage,
|
||||
AllocatorT>;
|
||||
using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type;
|
||||
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
|
||||
rclcpp::SerializedMessage>;
|
||||
|
||||
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, AllocatorT>)
|
||||
|
||||
@@ -63,44 +69,39 @@ public:
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
|
||||
bool is_serialized = false)
|
||||
: PublisherBase(
|
||||
node_base,
|
||||
topic,
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
|
||||
options.template to_rcl_publisher_options<MessageT>(qos)),
|
||||
options.template to_rcl_publisher_options<MessageT>(qos),
|
||||
is_serialized),
|
||||
options_(options),
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get()))
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
|
||||
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
|
||||
{
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
|
||||
init_setup();
|
||||
}
|
||||
|
||||
if (options_.event_callbacks.deadline_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.deadline_callback,
|
||||
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
|
||||
}
|
||||
if (options_.event_callbacks.liveliness_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.liveliness_callback,
|
||||
RCL_PUBLISHER_LIVELINESS_LOST);
|
||||
}
|
||||
if (options_.event_callbacks.incompatible_qos_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.incompatible_qos_callback,
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} else if (options_.use_default_callbacks) {
|
||||
// Register default callback when not specified
|
||||
try {
|
||||
this->add_event_handler(
|
||||
[this](QOSOfferedIncompatibleQoSInfo & info) {
|
||||
this->default_incompatible_qos_callback(info);
|
||||
},
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} catch (UnsupportedEventTypeException & /*exc*/) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
// Setup continues in the post construction method, post_init_setup().
|
||||
Publisher(
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
bool is_serialized = false)
|
||||
: PublisherBase(
|
||||
node_base,
|
||||
topic,
|
||||
type_support,
|
||||
options.template to_rcl_publisher_options<MessageT>(qos),
|
||||
is_serialized),
|
||||
options_(options),
|
||||
message_allocator_(new MessageAllocator(*options.get_allocator().get())),
|
||||
message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get()))
|
||||
{
|
||||
init_setup();
|
||||
}
|
||||
|
||||
/// Called post construction, so that construction may continue after shared_from_this() works.
|
||||
@@ -134,10 +135,9 @@ public:
|
||||
throw std::invalid_argument(
|
||||
"intraprocess communication allowed only with volatile durability");
|
||||
}
|
||||
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
|
||||
this->setup_intra_process(
|
||||
intra_process_publisher_id,
|
||||
ipm);
|
||||
uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), false);
|
||||
uint64_t intra_process_publisher_id_serialized = ipm->add_publisher(this->shared_from_this(), true);
|
||||
this->setup_intra_process(intra_process_publisher_id, intra_process_publisher_id_serialized, ipm);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public:
|
||||
virtual void
|
||||
publish(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
{
|
||||
//if (std::is_same<MessageT, rcl_serialized_message_t>::value) {
|
||||
// this->template publish<MessageDeleter>(std::move(msg));
|
||||
// return;
|
||||
//}
|
||||
|
||||
if (!intra_process_is_enabled_) {
|
||||
this->do_inter_process_publish(*msg);
|
||||
return;
|
||||
@@ -187,34 +192,48 @@ public:
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
|
||||
auto shared_msg = this->do_intra_process_publish_and_return_shared(
|
||||
std::move(msg), message_allocator_);
|
||||
this->do_inter_process_publish(*shared_msg);
|
||||
} else {
|
||||
this->do_intra_process_publish(std::move(msg));
|
||||
this->do_intra_process_publish(std::move(msg), message_allocator_);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void
|
||||
publish(const MessageT & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!intra_process_is_enabled_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// As the message is not const, a copy should be made.
|
||||
// A shared_ptr<const MessageT> could also be constructed here.
|
||||
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
|
||||
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
this->publish(std::move(unique_msg));
|
||||
this->do_publish_message(msg);
|
||||
}
|
||||
|
||||
void
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<!std::is_base_of<rcl_serialized_message_t, T>::value, void>::type
|
||||
publish(const rcl_serialized_message_t & serialized_msg)
|
||||
{
|
||||
return this->do_serialized_publish(&serialized_msg);
|
||||
this->do_publish_message<rcl_serialized_message_t>(serialized_msg);
|
||||
}
|
||||
|
||||
/// Publish a serialized message. Non specialized version to prevent compiling errors.
|
||||
template<typename TDeleter, typename T>
|
||||
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
|
||||
{
|
||||
(void)serialized_msg;
|
||||
throw std::runtime_error(
|
||||
"publishing unique_ptr with custom deleter only supported for serialized messages");
|
||||
}
|
||||
|
||||
/// Publish a serialized message.
|
||||
//template<typename TDeleter>
|
||||
//void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
|
||||
//{
|
||||
// this->do_serialized_publish(*serialized_msg);
|
||||
//}
|
||||
|
||||
/// Publish a serialized message.
|
||||
template<typename TDeleter>
|
||||
void publish(std::unique_ptr<SerializedMessage, TDeleter> serialized_msg)
|
||||
{
|
||||
this->do_serialized_publish(*serialized_msg);
|
||||
}
|
||||
|
||||
/// Publish an instance of a LoanedMessage.
|
||||
@@ -259,6 +278,69 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
void init_setup()
|
||||
{
|
||||
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
|
||||
|
||||
if (options_.event_callbacks.deadline_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.deadline_callback,
|
||||
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
|
||||
}
|
||||
if (options_.event_callbacks.liveliness_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.liveliness_callback,
|
||||
RCL_PUBLISHER_LIVELINESS_LOST);
|
||||
}
|
||||
if (options_.event_callbacks.incompatible_qos_callback) {
|
||||
this->add_event_handler(
|
||||
options_.event_callbacks.incompatible_qos_callback,
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} else if (options_.use_default_callbacks) {
|
||||
// Register default callback when not specified
|
||||
try {
|
||||
this->add_event_handler(
|
||||
[this](QOSOfferedIncompatibleQoSInfo & info) {
|
||||
this->default_incompatible_qos_callback(info);
|
||||
},
|
||||
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
|
||||
} catch (UnsupportedEventTypeException & /*exc*/) {
|
||||
RCLCPP_WARN_ONCE(
|
||||
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
|
||||
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
|
||||
"events, you will not be notified when Publishers offer an incompatible "
|
||||
"QoS profile to Subscriptions on the same topic.");
|
||||
}
|
||||
}
|
||||
// Setup continues in the post construction method, post_init_setup().
|
||||
}
|
||||
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<!std::is_base_of<rcl_serialized_message_t, T>::value>::type
|
||||
do_publish_message(const T & msg)
|
||||
{
|
||||
// Avoid allocating when not using intra process.
|
||||
if (!intra_process_is_enabled_) {
|
||||
// In this case we're not using intra process.
|
||||
return this->do_inter_process_publish(msg);
|
||||
}
|
||||
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
|
||||
// As the message is not const, a copy should be made.
|
||||
// A shared_ptr<const MessageT> could also be constructed here.
|
||||
auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
|
||||
MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
|
||||
MessageUniquePtr unique_msg(ptr, message_deleter_);
|
||||
this->publish(std::move(unique_msg));
|
||||
}
|
||||
|
||||
template<class T = MessageT>
|
||||
typename std::enable_if<std::is_base_of<rcl_serialized_message_t, T>::value>::type
|
||||
do_publish_message(const T & msg)
|
||||
{
|
||||
// Kept for backwards compatibility. Copies compelete memory!
|
||||
this->publish(std::make_unique<rclcpp::SerializedMessage>(msg));
|
||||
}
|
||||
|
||||
void
|
||||
do_inter_process_publish(const MessageT & msg)
|
||||
{
|
||||
@@ -280,15 +362,24 @@ protected:
|
||||
}
|
||||
|
||||
void
|
||||
do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
|
||||
do_serialized_publish(const SerializedMessage & serialized_msg)
|
||||
{
|
||||
if (intra_process_is_enabled_) {
|
||||
// TODO(Karsten1987): support serialized message passed by intraprocess
|
||||
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
|
||||
bool inter_process_publish_needed =
|
||||
get_subscription_count() > get_intra_process_subscription_count();
|
||||
|
||||
if (inter_process_publish_needed) {
|
||||
// declare here to avoid deletion before returning method
|
||||
auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr);
|
||||
if (RCL_RET_OK != status) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
|
||||
}
|
||||
}
|
||||
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
|
||||
if (RCL_RET_OK != status) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
|
||||
|
||||
if (intra_process_is_enabled_) {
|
||||
auto msg = std::make_unique<rclcpp::SerializedMessage>();
|
||||
*msg = serialized_msg;
|
||||
|
||||
do_intra_process_publish(std::move(msg), message_allocator_serialized_);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,8 +403,11 @@ protected:
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T, typename Deleter, typename Allocator>
|
||||
void
|
||||
do_intra_process_publish(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
do_intra_process_publish(
|
||||
std::unique_ptr<T, Deleter> msg,
|
||||
std::shared_ptr<Allocator> & message_allocator)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -324,14 +418,17 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
ipm->template do_intra_process_publish<MessageT, AllocatorT>(
|
||||
ipm->template do_intra_process_publish<T, AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
std::move(msg),
|
||||
message_allocator_);
|
||||
message_allocator);
|
||||
}
|
||||
|
||||
std::shared_ptr<const MessageT>
|
||||
do_intra_process_publish_and_return_shared(std::unique_ptr<MessageT, MessageDeleter> msg)
|
||||
template<typename T, typename Deleter, typename Allocator>
|
||||
std::shared_ptr<const T>
|
||||
do_intra_process_publish_and_return_shared(
|
||||
std::unique_ptr<T, Deleter> msg,
|
||||
std::shared_ptr<Allocator> & message_allocator)
|
||||
{
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
@@ -342,10 +439,10 @@ protected:
|
||||
throw std::runtime_error("cannot publish msg which is a null pointer");
|
||||
}
|
||||
|
||||
return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
|
||||
return ipm->template do_intra_process_publish_and_return_shared<T, AllocatorT>(
|
||||
intra_process_publisher_id_,
|
||||
std::move(msg),
|
||||
message_allocator_);
|
||||
message_allocator);
|
||||
}
|
||||
|
||||
/// Copy of original options passed during construction.
|
||||
@@ -356,6 +453,7 @@ protected:
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> options_;
|
||||
|
||||
std::shared_ptr<MessageAllocator> message_allocator_;
|
||||
std::shared_ptr<SerializedMessageAllocator> message_allocator_serialized_;
|
||||
|
||||
MessageDeleter message_deleter_;
|
||||
};
|
||||
|
||||
@@ -73,7 +73,8 @@ public:
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rcl_publisher_options_t & publisher_options);
|
||||
const rcl_publisher_options_t & publisher_options,
|
||||
bool is_serializd = false);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~PublisherBase();
|
||||
@@ -191,8 +192,13 @@ public:
|
||||
void
|
||||
setup_intra_process(
|
||||
uint64_t intra_process_publisher_id,
|
||||
uint64_t intra_process_publisher_id_serialized,
|
||||
IntraProcessManagerSharedPtr ipm);
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_serialized() const;
|
||||
|
||||
protected:
|
||||
template<typename EventCallbackT>
|
||||
void
|
||||
@@ -222,8 +228,12 @@ protected:
|
||||
bool intra_process_is_enabled_;
|
||||
IntraProcessManagerWeakPtr weak_ipm_;
|
||||
uint64_t intra_process_publisher_id_;
|
||||
uint64_t intra_process_publisher_id_serialized_;
|
||||
|
||||
rmw_gid_t rmw_gid_;
|
||||
|
||||
private:
|
||||
bool is_serialized_;
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
@@ -63,17 +63,21 @@ struct PublisherFactory
|
||||
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, AllocatorT>.
|
||||
template<typename MessageT, typename AllocatorT, typename PublisherT>
|
||||
PublisherFactory
|
||||
create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
|
||||
create_publisher_factory(
|
||||
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
{
|
||||
PublisherFactory factory {
|
||||
// factory function that creates a MessageT specific PublisherT
|
||||
[options](
|
||||
[options, type_support](
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos
|
||||
) -> std::shared_ptr<PublisherT>
|
||||
{
|
||||
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
|
||||
auto is_serialized = std::is_base_of<rcl_serialized_message_t, MessageT>::value;
|
||||
auto publisher = std::make_shared<PublisherT>(
|
||||
node_base, topic_name, qos, options, type_support, is_serialized);
|
||||
// This is used for setting up things like intra process comms which
|
||||
// require this->shared_from_this() which cannot be called from
|
||||
// the constructor.
|
||||
|
||||
68
rclcpp/include/rclcpp/serialization.hpp
Normal file
68
rclcpp/include/rclcpp/serialization.hpp
Normal file
@@ -0,0 +1,68 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__SERIALIZATION_HPP_
|
||||
#define RCLCPP__SERIALIZATION_HPP_
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rosidl_runtime_c/message_type_support_struct.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
class SerializedMessage;
|
||||
|
||||
/// Interface to (de)serialize a message
|
||||
class SerializationBase
|
||||
{
|
||||
public:
|
||||
virtual ~SerializationBase() = default;
|
||||
|
||||
/// Serialize a ROS2 message to a serialized stream
|
||||
/**
|
||||
* \param[in] message The ROS2 message which is read and serialized by rmw.
|
||||
*/
|
||||
virtual void serialize_message(
|
||||
const void * ros_message, SerializedMessage * serialized_message) const = 0;
|
||||
|
||||
/// Deserialize a serialized stream to a ROS message
|
||||
/**
|
||||
* \param[in] serialized_message The serialized message to be converted to ROS2 by rmw.
|
||||
* \param[out] message The deserialized ROS2 message.
|
||||
*/
|
||||
virtual void deserialize_message(
|
||||
const SerializedMessage * serialized_message, void * ros_message) const = 0;
|
||||
};
|
||||
|
||||
/// Default implementation to (de)serialize a message by using rmw_(de)serialize
|
||||
class Serialization : public SerializationBase
|
||||
{
|
||||
public:
|
||||
Serialization(const rosidl_message_type_support_t & type_support);
|
||||
|
||||
void serialize_message(
|
||||
const void * ros_message, SerializedMessage * serialized_message) const override;
|
||||
|
||||
void deserialize_message(
|
||||
const SerializedMessage * serialized_message, void * ros_message) const override;
|
||||
|
||||
private:
|
||||
rosidl_message_type_support_t type_support_;
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__SERIALIZATION_HPP_
|
||||
76
rclcpp/include/rclcpp/serialized_message.hpp
Normal file
76
rclcpp/include/rclcpp/serialized_message.hpp
Normal file
@@ -0,0 +1,76 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
#define RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
|
||||
#include "rcl/allocator.h"
|
||||
#include "rcl/types.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
|
||||
class SerializedMessage : public rcl_serialized_message_t
|
||||
{
|
||||
public:
|
||||
/// Default constructor for a SerializedMessage
|
||||
/**
|
||||
* Default constructs a serialized message and initalizes its
|
||||
* capacity with 0.
|
||||
*
|
||||
* \param[in] allocator The allocator to be used for the initialzation.
|
||||
*/
|
||||
explicit SerializedMessage(
|
||||
const rcl_allocator_t & allocator = rcl_get_default_allocator());
|
||||
|
||||
/// Default constructor for a SerializedMessage
|
||||
/**
|
||||
* Default constructs a serialized message and initalizes its
|
||||
* capacity with 0.
|
||||
*
|
||||
* \param[in] initial_capacity The amount of memory to be allocated.
|
||||
* \param[in] allocator The allocator to be used for the initialzation.
|
||||
*/
|
||||
SerializedMessage(
|
||||
size_t initial_capacity,
|
||||
const rcl_allocator_t & allocator = rcl_get_default_allocator());
|
||||
|
||||
/// Copy Constructor for a SerializedMessage
|
||||
SerializedMessage(const SerializedMessage & serialized_message);
|
||||
|
||||
/// Copy Constructor for a SerializedMessage from a rcl_serialized_message_t
|
||||
SerializedMessage(const rcl_serialized_message_t & serialized_message);
|
||||
|
||||
/// Move Constructor for a SerializedMessage
|
||||
SerializedMessage(SerializedMessage && serialized_message);
|
||||
|
||||
/// Move Constructor for a SerializedMessage from a rcl_serialized_message_t
|
||||
SerializedMessage(rcl_serialized_message_t && serialized_message);
|
||||
|
||||
SerializedMessage & operator=(const SerializedMessage & other);
|
||||
|
||||
SerializedMessage & operator=(const rcl_serialized_message_t & other);
|
||||
|
||||
SerializedMessage & operator=(SerializedMessage && other);
|
||||
|
||||
SerializedMessage & operator=(rcl_serialized_message_t && other);
|
||||
|
||||
/// Destructor for a SerializedMessage
|
||||
~SerializedMessage();
|
||||
};
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_
|
||||
@@ -25,7 +25,6 @@
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
|
||||
#include "rcl/error_handling.h"
|
||||
#include "rcl/subscription.h"
|
||||
|
||||
@@ -41,6 +40,7 @@
|
||||
#include "rclcpp/message_info.hpp"
|
||||
#include "rclcpp/message_memory_strategy.hpp"
|
||||
#include "rclcpp/node_interfaces/node_base_interface.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/subscription_base.hpp"
|
||||
#include "rclcpp/subscription_options.hpp"
|
||||
#include "rclcpp/subscription_traits.hpp"
|
||||
@@ -155,29 +155,80 @@ public:
|
||||
"intraprocess communication allowed only with volatile durability");
|
||||
}
|
||||
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto context = node_base->get_context();
|
||||
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
|
||||
CallbackMessageT,
|
||||
AllocatorT,
|
||||
typename MessageUniquePtr::deleter_type>;
|
||||
auto subscription_intra_process = std::make_shared<SubscriptionIntraProcessT>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
|
||||
qos_profile,
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback));
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
using rclcpp::experimental::IntraProcessManager;
|
||||
uint64_t intra_process_subscription_id;
|
||||
uint64_t intra_process_subscription_id_serialized;
|
||||
|
||||
auto context = node_base->get_context();
|
||||
auto ipm = context->get_sub_context<IntraProcessManager>();
|
||||
uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
|
||||
this->setup_intra_process(intra_process_subscription_id, ipm);
|
||||
|
||||
{
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto subscription_intra_process = std::make_shared<
|
||||
rclcpp::experimental::SubscriptionIntraProcess<
|
||||
CallbackMessageT,
|
||||
AllocatorT,
|
||||
typename MessageUniquePtr::deleter_type
|
||||
>>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get it by the fully-qualified name
|
||||
qos.get_rmw_qos_profile(),
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
|
||||
std::make_shared<rclcpp::Serialization>(type_support_handle)
|
||||
);
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
|
||||
}
|
||||
|
||||
{
|
||||
using SerializedMessageAllocatorTraits =
|
||||
allocator::AllocRebind<rclcpp::SerializedMessage,
|
||||
AllocatorT>;
|
||||
using SerializedMessageAllocator =
|
||||
typename SerializedMessageAllocatorTraits::allocator_type;
|
||||
using SerializedMessageDeleter = allocator::Deleter<SerializedMessageAllocator,
|
||||
rclcpp::SerializedMessage>;
|
||||
using SerializedMessageUniquePtr =
|
||||
std::unique_ptr<rclcpp::SerializedMessage,
|
||||
SerializedMessageDeleter>;
|
||||
|
||||
// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
|
||||
auto subscription_intra_process = std::make_shared<
|
||||
rclcpp::experimental::SubscriptionIntraProcess<
|
||||
rclcpp::SerializedMessage,
|
||||
AllocatorT,
|
||||
typename SerializedMessageUniquePtr::deleter_type,
|
||||
CallbackMessageT
|
||||
>>(
|
||||
callback,
|
||||
options.get_allocator(),
|
||||
context,
|
||||
this->get_topic_name(), // important to get it by the fully-qualified name
|
||||
qos.get_rmw_qos_profile(),
|
||||
resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback),
|
||||
std::make_shared<rclcpp::Serialization>(type_support_handle),
|
||||
is_serialized()
|
||||
);
|
||||
TRACEPOINT(
|
||||
rclcpp_subscription_init,
|
||||
(const void *)get_subscription_handle().get(),
|
||||
(const void *)subscription_intra_process.get());
|
||||
|
||||
// Add it to the intra process manager.
|
||||
intra_process_subscription_id_serialized = ipm->add_subscription(
|
||||
subscription_intra_process);
|
||||
}
|
||||
|
||||
this->setup_intra_process(
|
||||
{intra_process_subscription_id,
|
||||
intra_process_subscription_id_serialized}, ipm);
|
||||
}
|
||||
|
||||
TRACEPOINT(
|
||||
|
||||
@@ -228,13 +228,13 @@ public:
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
setup_intra_process(
|
||||
uint64_t intra_process_subscription_id,
|
||||
const std::vector<uint64_t> & intra_process_subscription_ids,
|
||||
IntraProcessManagerWeakPtr weak_ipm);
|
||||
|
||||
/// Return the waitable for intra-process, or nullptr if intra-process is not setup.
|
||||
RCLCPP_PUBLIC
|
||||
rclcpp::Waitable::SharedPtr
|
||||
get_intra_process_waitable() const;
|
||||
std::vector<rclcpp::Waitable::SharedPtr>
|
||||
get_intra_process_waitables() const;
|
||||
|
||||
/// Exchange state of whether or not a part of the subscription is used by a wait set.
|
||||
/**
|
||||
@@ -286,7 +286,7 @@ protected:
|
||||
|
||||
bool use_intra_process_;
|
||||
IntraProcessManagerWeakPtr weak_ipm_;
|
||||
uint64_t intra_process_subscription_id_;
|
||||
std::vector<uint64_t> intra_process_subscription_ids_;
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(SubscriptionBase)
|
||||
|
||||
@@ -78,7 +78,8 @@ SubscriptionFactory
|
||||
create_subscription_factory(
|
||||
CallbackT && callback,
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
|
||||
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
{
|
||||
auto allocator = options.get_allocator();
|
||||
|
||||
@@ -88,7 +89,7 @@ create_subscription_factory(
|
||||
|
||||
SubscriptionFactory factory {
|
||||
// factory function that creates a MessageT specific SubscriptionT
|
||||
[options, msg_mem_strat, any_subscription_callback](
|
||||
[options, msg_mem_strat, any_subscription_callback, type_support](
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic_name,
|
||||
const rclcpp::QoS & qos
|
||||
@@ -99,7 +100,7 @@ create_subscription_factory(
|
||||
|
||||
auto sub = Subscription<CallbackMessageT, AllocatorT>::make_shared(
|
||||
node_base,
|
||||
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
|
||||
type_support,
|
||||
topic_name,
|
||||
qos,
|
||||
any_subscription_callback,
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
#include <memory>
|
||||
|
||||
#include "rclcpp/function_traits.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/subscription_options.hpp"
|
||||
#include "rcl/types.h"
|
||||
|
||||
namespace rclcpp
|
||||
@@ -49,6 +51,15 @@ struct is_serialized_subscription_argument<std::shared_ptr<rcl_serialized_messag
|
||||
: std::true_type
|
||||
{};
|
||||
|
||||
template<>
|
||||
struct is_serialized_subscription_argument<rclcpp::SerializedMessage>: std::true_type
|
||||
{};
|
||||
|
||||
template<>
|
||||
struct is_serialized_subscription_argument<std::shared_ptr<rclcpp::SerializedMessage>>
|
||||
: std::true_type
|
||||
{};
|
||||
|
||||
template<typename T>
|
||||
struct is_serialized_subscription : is_serialized_subscription_argument<T>
|
||||
{};
|
||||
@@ -75,6 +86,7 @@ struct extract_message_type<std::unique_ptr<MessageT, Deleter>>: extract_message
|
||||
|
||||
template<
|
||||
typename CallbackT,
|
||||
typename AllocatorT = std::allocator<void>,
|
||||
// Do not attempt if CallbackT is an integer (mistaken for depth)
|
||||
typename = std::enable_if_t<!std::is_integral<
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
|
||||
@@ -85,6 +97,10 @@ template<
|
||||
// Do not attempt if CallbackT is a rmw_qos_profile_t (mistaken for qos profile)
|
||||
typename = std::enable_if_t<!std::is_same<
|
||||
rmw_qos_profile_t,
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>,
|
||||
// Do not attempt if CallbackT is a rclcpp::SubscriptionOptionsWithAllocator
|
||||
typename = std::enable_if_t<!std::is_same<
|
||||
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>,
|
||||
std::remove_cv_t<std::remove_reference_t<CallbackT>>>::value>
|
||||
>
|
||||
struct has_message_type : extract_message_type<
|
||||
|
||||
@@ -165,18 +165,19 @@ public:
|
||||
}
|
||||
if (mask.include_intra_process_waitable) {
|
||||
auto local_subscription = inner_subscription;
|
||||
auto waitable = inner_subscription->get_intra_process_waitable();
|
||||
if (nullptr != waitable) {
|
||||
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
|
||||
waitable.get(),
|
||||
true);
|
||||
if (already_in_use) {
|
||||
throw std::runtime_error(
|
||||
"subscription intra-process waitable already associated with a wait set");
|
||||
for (auto waitable : inner_subscription->get_intra_process_waitables()) {
|
||||
if (nullptr != waitable) {
|
||||
bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
|
||||
waitable.get(),
|
||||
true);
|
||||
if (already_in_use) {
|
||||
throw std::runtime_error(
|
||||
"subscription intra-process waitable already associated with a wait set");
|
||||
}
|
||||
this->storage_add_waitable(
|
||||
std::move(waitable),
|
||||
std::move(local_subscription));
|
||||
}
|
||||
this->storage_add_waitable(
|
||||
std::move(inner_subscription->get_intra_process_waitable()),
|
||||
std::move(local_subscription));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -230,11 +231,12 @@ public:
|
||||
}
|
||||
}
|
||||
if (mask.include_intra_process_waitable) {
|
||||
auto local_waitable = inner_subscription->get_intra_process_waitable();
|
||||
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
|
||||
if (nullptr != local_waitable) {
|
||||
// This is the case when intra process is disabled for the subscription.
|
||||
this->storage_remove_waitable(std::move(local_waitable));
|
||||
for (auto local_waitable : inner_subscription->get_intra_process_waitables()) {
|
||||
inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
|
||||
if (nullptr != local_waitable) {
|
||||
// This is the case when intra process is disabled for the subscription.
|
||||
this->storage_remove_waitable(std::move(local_waitable));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -32,7 +32,8 @@ IntraProcessManager::~IntraProcessManager()
|
||||
{}
|
||||
|
||||
uint64_t
|
||||
IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
|
||||
IntraProcessManager::add_publisher(
|
||||
rclcpp::PublisherBase::SharedPtr publisher, bool is_serialized)
|
||||
{
|
||||
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
@@ -41,6 +42,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
|
||||
publishers_[id].publisher = publisher;
|
||||
publishers_[id].topic_name = publisher->get_topic_name();
|
||||
publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
|
||||
publishers_[id].is_serialized = is_serialized;
|
||||
|
||||
// Initialize the subscriptions storage for this publisher.
|
||||
pub_to_subs_[id] = SplittedSubscriptions();
|
||||
@@ -66,6 +68,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su
|
||||
subscriptions_[id].topic_name = subscription->get_topic_name();
|
||||
subscriptions_[id].qos = subscription->get_actual_qos();
|
||||
subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method();
|
||||
subscriptions_[id].is_serialized = subscription->is_serialized();
|
||||
|
||||
// adds the subscription id to all the matchable publishers
|
||||
for (auto & pair : publishers_) {
|
||||
@@ -220,6 +223,13 @@ IntraProcessManager::can_communicate(
|
||||
return false;
|
||||
}
|
||||
|
||||
// a publisher and a subscription with different content type can't communicate
|
||||
// if is_serialized is true, the expected message typ is rcl_serialized_message_t
|
||||
// otherwise the templated ROS2 message type
|
||||
if (sub_info.is_serialized != pub_info.is_serialized) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -99,10 +99,12 @@ NodeTopics::add_subscription(
|
||||
callback_group->add_waitable(subscription_event);
|
||||
}
|
||||
|
||||
auto intra_process_waitable = subscription->get_intra_process_waitable();
|
||||
if (nullptr != intra_process_waitable) {
|
||||
// Add to the callback group to be notified about intra-process msgs.
|
||||
callback_group->add_waitable(intra_process_waitable);
|
||||
const auto intra_process_waitables = subscription->get_intra_process_waitables();
|
||||
for (auto & intra_process_waitable : intra_process_waitables) {
|
||||
if (nullptr != intra_process_waitable) {
|
||||
// Add to the callback group to be notified about intra-process msgs.
|
||||
callback_group->add_waitable(intra_process_waitable);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify the executor that a new subscription was created using the parent Node.
|
||||
|
||||
@@ -42,9 +42,12 @@ PublisherBase::PublisherBase(
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rosidl_message_type_support_t & type_support,
|
||||
const rcl_publisher_options_t & publisher_options)
|
||||
const rcl_publisher_options_t & publisher_options,
|
||||
bool is_serialized)
|
||||
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
|
||||
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
|
||||
intra_process_is_enabled_(false),
|
||||
intra_process_publisher_id_(0),
|
||||
is_serialized_(is_serialized)
|
||||
{
|
||||
rcl_ret_t ret = rcl_publisher_init(
|
||||
&publisher_handle_,
|
||||
@@ -238,13 +241,21 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
|
||||
void
|
||||
PublisherBase::setup_intra_process(
|
||||
uint64_t intra_process_publisher_id,
|
||||
uint64_t intra_process_publisher_id_serialized,
|
||||
IntraProcessManagerSharedPtr ipm)
|
||||
{
|
||||
intra_process_publisher_id_ = intra_process_publisher_id;
|
||||
intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized;
|
||||
weak_ipm_ = ipm;
|
||||
intra_process_is_enabled_ = true;
|
||||
}
|
||||
|
||||
bool
|
||||
PublisherBase::is_serialized() const
|
||||
{
|
||||
return is_serialized_;
|
||||
}
|
||||
|
||||
void
|
||||
PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const
|
||||
{
|
||||
|
||||
66
rclcpp/src/rclcpp/serialization.cpp
Normal file
66
rclcpp/src/rclcpp/serialization.cpp
Normal file
@@ -0,0 +1,66 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/serialization.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
|
||||
#include "rcpputils/asserts.hpp"
|
||||
|
||||
#include "rmw/rmw.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
Serialization::Serialization(const rosidl_message_type_support_t & type_support)
|
||||
: type_support_(type_support)
|
||||
{}
|
||||
|
||||
void Serialization::serialize_message(
|
||||
const void * ros_message, SerializedMessage * serialized_message) const
|
||||
{
|
||||
rcpputils::check_true(ros_message != nullptr, "ROS message is nullpointer.");
|
||||
rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer.");
|
||||
|
||||
const auto ret = rmw_serialize(
|
||||
ros_message,
|
||||
&type_support_,
|
||||
serialized_message);
|
||||
if (ret != RMW_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to serialize ROS message.");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Serialization::deserialize_message(
|
||||
const SerializedMessage * serialized_message, void * ros_message) const
|
||||
{
|
||||
rcpputils::check_true(serialized_message != nullptr, "Serialized message is nullpointer.");
|
||||
rcpputils::check_true(
|
||||
serialized_message->buffer_capacity != 0 &&
|
||||
serialized_message->buffer_length != 0 &&
|
||||
serialized_message->buffer != nullptr, "Serialized message is wrongly initialized.");
|
||||
rcpputils::check_true(ros_message != nullptr, "ROS message is a nullpointer.");
|
||||
|
||||
const auto ret = rmw_deserialize(serialized_message, &type_support_, ros_message);
|
||||
if (ret != RMW_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to deserialize ROS message.");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
135
rclcpp/src/rclcpp/serialized_message.cpp
Normal file
135
rclcpp/src/rclcpp/serialized_message.cpp
Normal file
@@ -0,0 +1,135 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
|
||||
#include <cstring>
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/logging.hpp"
|
||||
|
||||
#include "rmw/types.h"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks
|
||||
SerializedMessage::SerializedMessage(const rcl_allocator_t & allocator)
|
||||
: SerializedMessage(0u, allocator)
|
||||
{}
|
||||
|
||||
SerializedMessage::SerializedMessage(
|
||||
size_t initial_capacity, const rcl_allocator_t & allocator)
|
||||
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
|
||||
{
|
||||
const auto ret = rmw_serialized_message_init(
|
||||
this, initial_capacity, &allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
}
|
||||
|
||||
SerializedMessage::SerializedMessage(const SerializedMessage & serialized_message)
|
||||
: SerializedMessage(static_cast<const rcl_serialized_message_t &>(serialized_message))
|
||||
{}
|
||||
|
||||
SerializedMessage::SerializedMessage(const rcl_serialized_message_t & serialized_message)
|
||||
: rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message())
|
||||
{
|
||||
fprintf(stderr, "copy constructor called\n");
|
||||
const auto ret = rmw_serialized_message_init(
|
||||
this, serialized_message.buffer_capacity, &serialized_message.allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
|
||||
// do not call memcpy if the pointer is "static"
|
||||
if (buffer != serialized_message.buffer) {
|
||||
std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length);
|
||||
}
|
||||
buffer_length = serialized_message.buffer_length;
|
||||
}
|
||||
|
||||
SerializedMessage::SerializedMessage(SerializedMessage && serialized_message)
|
||||
: SerializedMessage(
|
||||
std::forward<rcl_serialized_message_t>(
|
||||
static_cast<rcl_serialized_message_t &&>(serialized_message)))
|
||||
{}
|
||||
|
||||
SerializedMessage::SerializedMessage(rcl_serialized_message_t && serialized_message)
|
||||
: rcl_serialized_message_t(serialized_message)
|
||||
{
|
||||
// reset buffer to prevent double free
|
||||
serialized_message = rmw_get_zero_initialized_serialized_message();
|
||||
}
|
||||
|
||||
SerializedMessage & SerializedMessage::operator=(const SerializedMessage & other)
|
||||
{
|
||||
*this = static_cast<const rcl_serialized_message_t &>(other);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
SerializedMessage & SerializedMessage::operator=(const rcl_serialized_message_t & other)
|
||||
{
|
||||
*this = static_cast<SerializedMessage>(rmw_get_zero_initialized_serialized_message());
|
||||
|
||||
const auto ret = rmw_serialized_message_init(
|
||||
this, other.buffer_capacity, &other.allocator);
|
||||
if (ret != RCL_RET_OK) {
|
||||
rclcpp::exceptions::throw_from_rcl_error(ret);
|
||||
}
|
||||
|
||||
// do not call memcpy if the pointer is "static"
|
||||
if (buffer != other.buffer) {
|
||||
std::memcpy(buffer, other.buffer, other.buffer_length);
|
||||
}
|
||||
buffer_length = other.buffer_length;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
SerializedMessage & SerializedMessage::operator=(SerializedMessage && other)
|
||||
{
|
||||
*this = static_cast<rcl_serialized_message_t &&>(other);
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
SerializedMessage & SerializedMessage::operator=(rcl_serialized_message_t && other)
|
||||
{
|
||||
this->buffer = other.buffer;
|
||||
this->buffer_capacity = other.buffer_length;
|
||||
this->buffer_length = other.buffer_length;
|
||||
this->allocator = other.allocator;
|
||||
|
||||
// reset original to prevent double free
|
||||
other = rmw_get_zero_initialized_serialized_message();
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
SerializedMessage::~SerializedMessage()
|
||||
{
|
||||
if (nullptr != buffer) {
|
||||
const auto fini_ret = rmw_serialized_message_fini(this);
|
||||
if (fini_ret != RCL_RET_OK) {
|
||||
RCLCPP_ERROR(
|
||||
get_logger("rclcpp"),
|
||||
"Failed to destroy serialized message: %s", rcl_get_error_string().str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
@@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase(
|
||||
: node_base_(node_base),
|
||||
node_handle_(node_base_->get_shared_rcl_node_handle()),
|
||||
use_intra_process_(false),
|
||||
intra_process_subscription_id_(0),
|
||||
type_support_(type_support_handle),
|
||||
is_serialized_(is_serialized)
|
||||
{
|
||||
@@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase()
|
||||
"Intra process manager died before than a subscription.");
|
||||
return;
|
||||
}
|
||||
ipm->remove_subscription(intra_process_subscription_id_);
|
||||
for (auto intra_process_subscription_id : intra_process_subscription_ids_) {
|
||||
ipm->remove_subscription(intra_process_subscription_id);
|
||||
}
|
||||
}
|
||||
|
||||
const char *
|
||||
@@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const
|
||||
|
||||
void
|
||||
SubscriptionBase::setup_intra_process(
|
||||
uint64_t intra_process_subscription_id,
|
||||
const std::vector<uint64_t> & intra_process_subscription_ids,
|
||||
IntraProcessManagerWeakPtr weak_ipm)
|
||||
{
|
||||
intra_process_subscription_id_ = intra_process_subscription_id;
|
||||
intra_process_subscription_ids_ = intra_process_subscription_ids;
|
||||
weak_ipm_ = weak_ipm;
|
||||
use_intra_process_ = true;
|
||||
}
|
||||
@@ -218,23 +219,29 @@ SubscriptionBase::can_loan_messages() const
|
||||
return rcl_subscription_can_loan_messages(subscription_handle_.get());
|
||||
}
|
||||
|
||||
rclcpp::Waitable::SharedPtr
|
||||
SubscriptionBase::get_intra_process_waitable() const
|
||||
std::vector<rclcpp::Waitable::SharedPtr>
|
||||
SubscriptionBase::get_intra_process_waitables() const
|
||||
{
|
||||
// If not using intra process, shortcut to nullptr.
|
||||
if (!use_intra_process_) {
|
||||
return nullptr;
|
||||
return std::vector<rclcpp::Waitable::SharedPtr>();
|
||||
}
|
||||
// Get the intra process manager.
|
||||
auto ipm = weak_ipm_.lock();
|
||||
if (!ipm) {
|
||||
throw std::runtime_error(
|
||||
"SubscriptionBase::get_intra_process_waitable() called "
|
||||
"SubscriptionBase::get_intra_process_waitables() called "
|
||||
"after destruction of intra process manager");
|
||||
}
|
||||
|
||||
// Use the id to retrieve the subscription intra-process from the intra-process manager.
|
||||
return ipm->get_subscription_intra_process(intra_process_subscription_id_);
|
||||
std::vector<rclcpp::Waitable::SharedPtr> waitables(intra_process_subscription_ids_.size());
|
||||
|
||||
for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) {
|
||||
// Use the id to retrieve the subscription intra-process from the intra-process manager.
|
||||
waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]);
|
||||
}
|
||||
|
||||
return waitables;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -274,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
|
||||
if (this == pointer_to_subscription_part) {
|
||||
return subscription_in_use_by_wait_set_.exchange(in_use_state);
|
||||
}
|
||||
if (get_intra_process_waitable().get() == pointer_to_subscription_part) {
|
||||
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
|
||||
for (auto & waitable : get_intra_process_waitables()) {
|
||||
if (waitable.get() == pointer_to_subscription_part) {
|
||||
return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
|
||||
}
|
||||
}
|
||||
for (const auto & qos_event : event_handlers_) {
|
||||
if (qos_event.get() == pointer_to_subscription_part) {
|
||||
|
||||
@@ -36,3 +36,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const
|
||||
{
|
||||
return qos_profile_;
|
||||
}
|
||||
|
||||
bool
|
||||
SubscriptionIntraProcessBase::is_serialized() const
|
||||
{
|
||||
return is_serialized_;
|
||||
}
|
||||
|
||||
388
rclcpp/test/test_intra_process_communication.cpp
Normal file
388
rclcpp/test/test_intra_process_communication.cpp
Normal file
@@ -0,0 +1,388 @@
|
||||
// Copyright 2020 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/publisher.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
|
||||
#include "rclcpp/serialization.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
|
||||
#include "test_msgs/message_fixtures.hpp"
|
||||
|
||||
int32_t & get_test_allocation_counter()
|
||||
{
|
||||
static int32_t counter = 0;
|
||||
return counter;
|
||||
}
|
||||
|
||||
void * custom_allocate(size_t size, void * state)
|
||||
{
|
||||
fprintf(stderr, "calling custom allocate\n");
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
++get_test_allocation_counter();
|
||||
auto r = m_allocator.allocate(size, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state)
|
||||
{
|
||||
fprintf(stderr, "calling custom zero allocate\n");
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
++get_test_allocation_counter();
|
||||
auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void * custom_reallocate(void * pointer, size_t size, void * state)
|
||||
{
|
||||
fprintf(stderr, "calling custom reallocate\n");
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
if (pointer == nullptr) {
|
||||
++get_test_allocation_counter();
|
||||
}
|
||||
|
||||
auto r = m_allocator.reallocate(pointer, size, state);
|
||||
return r;
|
||||
}
|
||||
|
||||
void custom_deallocate(void * pointer, void * state)
|
||||
{
|
||||
fprintf(stderr, "calling custom deallocate\n");
|
||||
static auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
--get_test_allocation_counter();
|
||||
m_allocator.deallocate(pointer, state);
|
||||
}
|
||||
|
||||
rclcpp::SerializedMessage make_serialized_string_msg(
|
||||
const std::shared_ptr<test_msgs::msg::Strings> & string_msg)
|
||||
{
|
||||
auto m_allocator = rcutils_get_default_allocator();
|
||||
|
||||
// add custom (de)allocator to count the references to the object
|
||||
m_allocator.allocate = &custom_allocate;
|
||||
m_allocator.deallocate = &custom_deallocate;
|
||||
m_allocator.reallocate = &custom_reallocate;
|
||||
m_allocator.zero_allocate = &custom_zero_allocate;
|
||||
|
||||
rclcpp::SerializedMessage msg(m_allocator);
|
||||
|
||||
static auto type_support =
|
||||
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>();
|
||||
|
||||
rclcpp::Serialization serializer(*type_support);
|
||||
EXPECT_NO_THROW(serializer.serialize_message(string_msg.get(), &msg));
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameterized test.
|
||||
* The first param are the NodeOptions used to create the nodes.
|
||||
* The second param are the expected intraprocess count results.
|
||||
*/
|
||||
struct TestParameters
|
||||
{
|
||||
rclcpp::NodeOptions node_options[2];
|
||||
uint64_t intraprocess_count_results[2];
|
||||
size_t runs;
|
||||
std::string description;
|
||||
};
|
||||
|
||||
std::ostream & operator<<(std::ostream & out, const TestParameters & params)
|
||||
{
|
||||
out << params.description;
|
||||
return out;
|
||||
}
|
||||
|
||||
auto get_new_context()
|
||||
{
|
||||
auto context = rclcpp::Context::make_shared();
|
||||
context->init(0, nullptr);
|
||||
return context;
|
||||
}
|
||||
|
||||
class TestPublisherSubscriptionSerialized : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
static void SetUpTestCase()
|
||||
{
|
||||
rclcpp::init(0, nullptr);
|
||||
}
|
||||
|
||||
static void TearDownTestCase()
|
||||
{
|
||||
rclcpp::shutdown();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::chrono::milliseconds offset_ = std::chrono::milliseconds(250);
|
||||
|
||||
std::vector<TestParameters> parameters_ = {
|
||||
/*
|
||||
Testing publisher subscription count api and internal process subscription count.
|
||||
Two subscriptions in the same topic, both using intraprocess comm.
|
||||
*/
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false),
|
||||
rclcpp::NodeOptions().use_intra_process_comms(false).start_parameter_services(false).enable_rosout(false)
|
||||
},
|
||||
{1u, 2u},
|
||||
1,
|
||||
"two_subscriptions_intraprocess_comm"
|
||||
},
|
||||
{
|
||||
{
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false),
|
||||
rclcpp::NodeOptions().use_intra_process_comms(true).start_parameter_services(false).enable_rosout(false)
|
||||
},
|
||||
{1u, 2u},
|
||||
1,
|
||||
"two_subscriptions_intraprocess_comm"
|
||||
},
|
||||
///*
|
||||
// Testing publisher subscription count api and internal process subscription count.
|
||||
// Two subscriptions, one using intra-process comm and the other not using it.
|
||||
// */
|
||||
//{
|
||||
// {
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(true),
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(false)
|
||||
// },
|
||||
// {1u, 1u},
|
||||
// 1,
|
||||
// "two_subscriptions_one_intraprocess_one_not"
|
||||
//},
|
||||
///*
|
||||
// Testing publisher subscription count api and internal process subscription count.
|
||||
// Two contexts, both using intra-process.
|
||||
// */
|
||||
//{
|
||||
// {
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(true),
|
||||
// rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true)
|
||||
// },
|
||||
// {1u, 1u},
|
||||
// 1,
|
||||
// "two_subscriptions_in_two_contexts_with_intraprocess_comm"
|
||||
//},
|
||||
///*
|
||||
// Testing publisher subscription count api and internal process subscription count.
|
||||
// Two contexts, both of them not using intra-process comm.
|
||||
// */
|
||||
//{
|
||||
// {
|
||||
// rclcpp::NodeOptions().use_intra_process_comms(false),
|
||||
// rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false)
|
||||
// },
|
||||
// {0u, 0u},
|
||||
// 1,
|
||||
// "two_subscriptions_in_two_contexts_without_intraprocess_comm"
|
||||
//}
|
||||
};
|
||||
};
|
||||
|
||||
std::array<uint32_t, 2> counts;
|
||||
|
||||
void OnMessageSerialized(const std::shared_ptr<const rcl_serialized_message_t> msg)
|
||||
{
|
||||
EXPECT_NE(msg->buffer, nullptr);
|
||||
EXPECT_GT(msg->buffer_capacity, 0u);
|
||||
|
||||
static auto type_support =
|
||||
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>();
|
||||
|
||||
test_msgs::msg::Strings ros_msg;
|
||||
rclcpp::Serialization serializer(*type_support);
|
||||
EXPECT_NO_THROW(
|
||||
serializer.deserialize_message(
|
||||
static_cast<const rclcpp::SerializedMessage *>(msg.get()), &ros_msg));
|
||||
EXPECT_EQ(ros_msg.string_value.front(), '0');
|
||||
EXPECT_EQ(ros_msg.string_value[6], '6');
|
||||
EXPECT_EQ(ros_msg.string_value.back(), '9');
|
||||
++counts[0];
|
||||
}
|
||||
|
||||
void OnMessageConst(std::shared_ptr<const test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value[6], '6');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
void OnMessageUniquePtr(std::unique_ptr<test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value.back(), '9');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
void OnMessage(std::shared_ptr<test_msgs::msg::Strings> msg)
|
||||
{
|
||||
EXPECT_EQ(msg->string_value.front(), '0');
|
||||
EXPECT_EQ(msg->string_value[6], '6');
|
||||
EXPECT_EQ(msg->string_value.back(), '\0');
|
||||
|
||||
++counts[1];
|
||||
}
|
||||
|
||||
TEST_F(TestPublisherSubscriptionSerialized, publish_serialized)
|
||||
{
|
||||
get_test_allocation_counter() = 0;
|
||||
|
||||
for (const auto & parameter : parameters_) {
|
||||
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
|
||||
"my_node",
|
||||
"/ns",
|
||||
parameter.node_options[0]);
|
||||
auto publisher = node->create_publisher<test_msgs::msg::Strings>("/topic", 10);
|
||||
|
||||
auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessage);
|
||||
//auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessageUniquePtr);
|
||||
//auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessageConst);
|
||||
auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
|
||||
"/topic", 10,
|
||||
&OnMessageSerialized);
|
||||
|
||||
rclcpp::sleep_for(offset_);
|
||||
|
||||
counts.fill(0);
|
||||
std::shared_ptr<test_msgs::msg::Strings> string_msg = get_messages_strings()[3];
|
||||
|
||||
for (size_t i = 0; i < parameter.runs; i++) {
|
||||
auto msg0 = make_serialized_string_msg(string_msg);
|
||||
|
||||
auto unique_string_msg = std::make_unique<test_msgs::msg::Strings>(*string_msg);
|
||||
|
||||
{
|
||||
auto unique_serialized_msg = std::make_unique<rclcpp::SerializedMessage>(std::move(msg0));
|
||||
publisher->publish(std::move(unique_serialized_msg));
|
||||
}
|
||||
//publisher->publish(*string_msg);
|
||||
//publisher->publish(std::move(unique_string_msg));
|
||||
}
|
||||
for (uint32_t i = 0; i < 3; ++i) {
|
||||
rclcpp::spin_some(node);
|
||||
rclcpp::sleep_for(offset_);
|
||||
}
|
||||
|
||||
rclcpp::spin_some(node);
|
||||
|
||||
if (parameter.runs == 1) {
|
||||
EXPECT_EQ(counts[0], 3u); // count serialized message callbacks
|
||||
EXPECT_EQ(counts[1], 9u); // count unique + shared message callbacks
|
||||
}
|
||||
}
|
||||
EXPECT_EQ(get_test_allocation_counter(), 0);
|
||||
}
|
||||
|
||||
//TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic)
|
||||
//{
|
||||
// get_test_allocation_counter() = 0;
|
||||
//
|
||||
// TestParameters parameters = GetParam();
|
||||
// {
|
||||
// rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>(
|
||||
// "my_node",
|
||||
// "/ns",
|
||||
// parameters.node_options[0]);
|
||||
// auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
|
||||
// node,
|
||||
// "/topic",
|
||||
// *rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
|
||||
// rclcpp::QoS(10));
|
||||
//
|
||||
// auto sub_gen_serialized = rclcpp::create_subscription<rcl_serialized_message_t>(
|
||||
// node,
|
||||
// "/topic",
|
||||
// *rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(),
|
||||
// rclcpp::QoS(10),
|
||||
// &OnMessageSerialized);
|
||||
//
|
||||
// auto sub_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessage);
|
||||
// auto sub_unique = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessageUniquePtr);
|
||||
// auto sub_const_shared = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessageConst);
|
||||
// auto sub_serialized = node->create_subscription<test_msgs::msg::Strings>(
|
||||
// "/topic", 10,
|
||||
// &OnMessageSerialized);
|
||||
//
|
||||
// rclcpp::sleep_for(offset);
|
||||
//
|
||||
// counts.fill(0);
|
||||
// auto stringMsg = get_messages_strings()[3];
|
||||
//
|
||||
// for (size_t i = 0; i < parameters.runs; i++) {
|
||||
// auto msg0 = make_serialized_string_msg(stringMsg);
|
||||
//
|
||||
// publisher->publish(std::make_unique<rcl_serialized_message_t>(msg0));
|
||||
// }
|
||||
// rclcpp::spin_some(node);
|
||||
// rclcpp::sleep_for(offset);
|
||||
//
|
||||
// rclcpp::spin_some(node);
|
||||
// }
|
||||
//
|
||||
// if (parameters.runs == 1) {
|
||||
// EXPECT_EQ(counts[0], 2u);
|
||||
// EXPECT_EQ(counts[1], 3u);
|
||||
// }
|
||||
//
|
||||
// EXPECT_EQ(get_test_allocation_counter(), 0);
|
||||
//}
|
||||
|
||||
|
||||
|
||||
std::vector<TestParameters> setRuns(const std::vector<TestParameters> & in, const size_t runs)
|
||||
{
|
||||
std::vector<TestParameters> out = in;
|
||||
for (auto & p : out) {
|
||||
p.runs = runs;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
//INSTANTIATE_TEST_CASE_P(
|
||||
// TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized,
|
||||
// ::testing::ValuesIn(parameters),
|
||||
// ::testing::PrintToStringParamName());
|
||||
|
||||
//INSTANTIATE_TEST_CASE_P(
|
||||
// TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized,
|
||||
// ::testing::ValuesIn(setRuns(parameters, 1000)),
|
||||
// ::testing::PrintToStringParamName());
|
||||
@@ -93,6 +93,12 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
is_serialized() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
rclcpp::QoS qos;
|
||||
std::string topic_name;
|
||||
uint64_t intra_process_publisher_id_;
|
||||
@@ -212,6 +218,12 @@ public:
|
||||
return topic_name;
|
||||
}
|
||||
|
||||
bool
|
||||
is_serialized() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
rmw_qos_profile_t qos_profile;
|
||||
const char * topic_name;
|
||||
};
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
|
||||
#include "rclcpp/exceptions.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
|
||||
#include "test_msgs/msg/empty.hpp"
|
||||
|
||||
@@ -143,6 +144,34 @@ TEST_F(TestPublisher, various_creation_signatures) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Testing for serialized publishers.
|
||||
*/
|
||||
TEST_F(TestPublisher, test_is_serialized) {
|
||||
initialize();
|
||||
using test_msgs::msg::Empty;
|
||||
{
|
||||
auto publisher = node->create_publisher<Empty>("topic", 42);
|
||||
EXPECT_FALSE(publisher->is_serialized());
|
||||
}
|
||||
{
|
||||
auto publisher = rclcpp::create_publisher<Empty>(node, "topic", 42, rclcpp::PublisherOptions());
|
||||
EXPECT_FALSE(publisher->is_serialized());
|
||||
}
|
||||
{
|
||||
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
|
||||
auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>(
|
||||
node, "topic", ts, 42, rclcpp::PublisherOptions());
|
||||
EXPECT_TRUE(publisher->is_serialized());
|
||||
}
|
||||
{
|
||||
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
|
||||
auto publisher = rclcpp::create_publisher<rclcpp::SerializedMessage>(
|
||||
node, "topic", ts, 42, rclcpp::PublisherOptions());
|
||||
EXPECT_TRUE(publisher->is_serialized());
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Testing publisher with intraprocess enabled and invalid QoS
|
||||
*/
|
||||
|
||||
137
rclcpp/test/test_serialized_message.cpp
Normal file
137
rclcpp/test/test_serialized_message.cpp
Normal file
@@ -0,0 +1,137 @@
|
||||
// Copyright 2019 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <memory>
|
||||
|
||||
#include "rclcpp/serialization.hpp"
|
||||
#include "rclcpp/serialized_message.hpp"
|
||||
#include "rclcpp/rclcpp.hpp"
|
||||
|
||||
#include "rcpputils/asserts.hpp"
|
||||
|
||||
#include "test_msgs/message_fixtures.hpp"
|
||||
#include "test_msgs/msg/basic_types.hpp"
|
||||
|
||||
TEST(TestSerializedMessage, empty_initialize) {
|
||||
rclcpp::SerializedMessage serialized_message;
|
||||
EXPECT_TRUE(serialized_message.buffer == nullptr);
|
||||
EXPECT_EQ(0u, serialized_message.buffer_length);
|
||||
EXPECT_EQ(0u, serialized_message.buffer_capacity);
|
||||
}
|
||||
|
||||
TEST(TestSerializedMessage, initialize_with_capacity) {
|
||||
rclcpp::SerializedMessage serialized_message(13);
|
||||
EXPECT_TRUE(serialized_message.buffer != nullptr);
|
||||
EXPECT_EQ(0u, serialized_message.buffer_length);
|
||||
EXPECT_EQ(13u, serialized_message.buffer_capacity);
|
||||
}
|
||||
|
||||
TEST(TestSerializedMessage, various_constructors) {
|
||||
std::string content = "Hello World";
|
||||
auto content_size = content.size() + 1; // accounting for null terminator
|
||||
|
||||
rclcpp::SerializedMessage serialized_message(content_size);
|
||||
// manually copy some content
|
||||
std::memcpy(serialized_message.buffer, content.c_str(), content.size());
|
||||
serialized_message.buffer[content.size()] = '\0';
|
||||
serialized_message.buffer_length = content_size;
|
||||
EXPECT_STREQ(content.c_str(), reinterpret_cast<char *>(serialized_message.buffer));
|
||||
EXPECT_EQ(content_size, serialized_message.buffer_capacity);
|
||||
|
||||
// Copy Constructor
|
||||
rclcpp::SerializedMessage other_serialized_message(serialized_message);
|
||||
EXPECT_EQ(content_size, other_serialized_message.buffer_capacity);
|
||||
EXPECT_EQ(content_size, other_serialized_message.buffer_length);
|
||||
EXPECT_STREQ(
|
||||
reinterpret_cast<char *>(serialized_message.buffer),
|
||||
reinterpret_cast<char *>(other_serialized_message.buffer));
|
||||
|
||||
// Move Constructor
|
||||
rclcpp::SerializedMessage yet_another_serialized_message(std::move(other_serialized_message));
|
||||
EXPECT_TRUE(other_serialized_message.buffer == nullptr);
|
||||
EXPECT_EQ(0u, other_serialized_message.buffer_capacity);
|
||||
EXPECT_EQ(0u, other_serialized_message.buffer_length);
|
||||
|
||||
auto default_allocator = rcl_get_default_allocator();
|
||||
auto rcl_serialized_msg = rmw_get_zero_initialized_serialized_message();
|
||||
auto ret = rmw_serialized_message_init(&rcl_serialized_msg, 13, &default_allocator);
|
||||
ASSERT_EQ(RCL_RET_OK, ret);
|
||||
|
||||
// manually copy some content
|
||||
std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size());
|
||||
rcl_serialized_msg.buffer[content.size()] = '\0';
|
||||
rcl_serialized_msg.buffer_length = content_size;
|
||||
EXPECT_EQ(13u, rcl_serialized_msg.buffer_capacity);
|
||||
|
||||
// Copy Constructor from rcl_serialized_message_t
|
||||
rclcpp::SerializedMessage from_rcl_msg(rcl_serialized_msg);
|
||||
EXPECT_EQ(13u, from_rcl_msg.buffer_capacity);
|
||||
EXPECT_EQ(content_size, from_rcl_msg.buffer_length);
|
||||
|
||||
// Verify that despite being fini'd, the copy is real
|
||||
ret = rmw_serialized_message_fini(&rcl_serialized_msg);
|
||||
ASSERT_EQ(RCL_RET_OK, ret);
|
||||
EXPECT_EQ(nullptr, rcl_serialized_msg.buffer);
|
||||
EXPECT_EQ(0u, rcl_serialized_msg.buffer_capacity);
|
||||
EXPECT_EQ(0u, rcl_serialized_msg.buffer_length);
|
||||
EXPECT_TRUE(nullptr != from_rcl_msg.buffer);
|
||||
EXPECT_EQ(13u, from_rcl_msg.buffer_capacity);
|
||||
EXPECT_EQ(content_size, from_rcl_msg.buffer_length);
|
||||
}
|
||||
|
||||
TEST(TestSerializedMessage, serialization) {
|
||||
auto type_support =
|
||||
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::BasicTypes>();
|
||||
rclcpp::Serialization serializer(*type_support);
|
||||
|
||||
auto basic_type_ros_msgs = get_messages_basic_types();
|
||||
for (const auto & ros_msg : basic_type_ros_msgs) {
|
||||
// convert ros msg to serialized msg
|
||||
rclcpp::SerializedMessage serialized_msg;
|
||||
serializer.serialize_message(ros_msg.get(), &serialized_msg);
|
||||
|
||||
// convert serialized msg back to ros msg
|
||||
test_msgs::msg::BasicTypes deserialized_ros_msg;
|
||||
serializer.deserialize_message(&serialized_msg, &deserialized_ros_msg);
|
||||
|
||||
EXPECT_EQ(*ros_msg, deserialized_ros_msg);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(TestSerializedMessage, serialization_into_nullptr) {
|
||||
auto type_support =
|
||||
rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::BasicTypes>();
|
||||
rclcpp::Serialization serializer(*type_support);
|
||||
|
||||
auto basic_type_ros_msgs = get_messages_basic_types();
|
||||
for (const auto & ros_msg : basic_type_ros_msgs) {
|
||||
rclcpp::SerializedMessage serialized_msg;
|
||||
test_msgs::msg::BasicTypes deserialized_ros_msg;
|
||||
|
||||
EXPECT_THROW(
|
||||
serializer.serialize_message(ros_msg.get(), nullptr),
|
||||
rcpputils::IllegalStateException);
|
||||
EXPECT_THROW(
|
||||
serializer.serialize_message(nullptr, &serialized_msg),
|
||||
rcpputils::IllegalStateException);
|
||||
|
||||
EXPECT_THROW(
|
||||
serializer.deserialize_message(&serialized_msg, nullptr),
|
||||
rcpputils::IllegalStateException);
|
||||
EXPECT_THROW(
|
||||
serializer.deserialize_message(nullptr, &deserialized_ros_msg),
|
||||
rcpputils::IllegalStateException);
|
||||
}
|
||||
}
|
||||
@@ -231,6 +231,40 @@ TEST_F(TestSubscription, various_creation_signatures) {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Testing for serialized subscriptions
|
||||
*/
|
||||
TEST_F(TestSubscription, test_is_serialized) {
|
||||
initialize();
|
||||
using test_msgs::msg::Empty;
|
||||
auto cb = [](test_msgs::msg::Empty::SharedPtr) {};
|
||||
auto cb_rcl_serialized = [](std::shared_ptr<rcl_serialized_message_t>) {};
|
||||
auto cb_rclcpp_serialized = [](std::shared_ptr<rclcpp::SerializedMessage>) {};
|
||||
{
|
||||
auto sub = node->create_subscription<Empty>("topic", 1, cb);
|
||||
EXPECT_FALSE(sub->is_serialized());
|
||||
}
|
||||
{
|
||||
auto sub = rclcpp::create_subscription<Empty>(node, "topic", rclcpp::QoS(1), cb);
|
||||
EXPECT_FALSE(sub->is_serialized());
|
||||
}
|
||||
{
|
||||
auto sub = rclcpp::create_subscription<Empty>(node, "topic", rclcpp::QoS(1), cb_rcl_serialized);
|
||||
EXPECT_TRUE(sub->is_serialized());
|
||||
}
|
||||
{
|
||||
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
|
||||
auto sub = rclcpp::create_subscription<rcl_serialized_message_t>(
|
||||
node, "topic", ts, rclcpp::QoS(1), cb_rcl_serialized);
|
||||
EXPECT_TRUE(sub->is_serialized());
|
||||
}
|
||||
{
|
||||
auto ts = *rosidl_typesupport_cpp::get_message_type_support_handle<Empty>();
|
||||
auto sub = rclcpp::create_subscription<rclcpp::SerializedMessage>(
|
||||
node, "topic", ts, rclcpp::QoS(1), cb_rclcpp_serialized);
|
||||
EXPECT_TRUE(sub->is_serialized());
|
||||
}
|
||||
}
|
||||
/*
|
||||
Testing subscriptions using std::bind.
|
||||
*/
|
||||
|
||||
@@ -52,6 +52,16 @@ void not_serialized_unique_ptr_callback(
|
||||
(void) unused;
|
||||
}
|
||||
|
||||
void rclcpp_serialized_callback_copy(rclcpp::SerializedMessage unused)
|
||||
{
|
||||
(void) unused;
|
||||
}
|
||||
|
||||
void rclcpp_serialized_callback_shared_ptr(std::shared_ptr<rclcpp::SerializedMessage> unused)
|
||||
{
|
||||
(void) unused;
|
||||
}
|
||||
|
||||
TEST(TestSubscriptionTraits, is_serialized_callback) {
|
||||
// Test regular functions
|
||||
auto cb1 = &serialized_callback_copy;
|
||||
@@ -97,6 +107,16 @@ TEST(TestSubscriptionTraits, is_serialized_callback) {
|
||||
static_assert(
|
||||
rclcpp::subscription_traits::is_serialized_callback<decltype(cb7)>::value == false,
|
||||
"passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback");
|
||||
|
||||
auto cb8 = &rclcpp_serialized_callback_copy;
|
||||
static_assert(
|
||||
rclcpp::subscription_traits::is_serialized_callback<decltype(cb8)>::value == true,
|
||||
"rclcpp::SerializedMessage in a first argument callback makes it a serialized callback");
|
||||
|
||||
auto cb9 = &rclcpp_serialized_callback_shared_ptr;
|
||||
static_assert(
|
||||
rclcpp::subscription_traits::is_serialized_callback<decltype(cb9)>::value == true,
|
||||
"std::shared_ptr<rclcpp::SerializedMessage> in a callback makes it a serialized callback");
|
||||
}
|
||||
|
||||
TEST(TestSubscriptionTraits, callback_messages) {
|
||||
|
||||
@@ -68,6 +68,18 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
LifecyclePublisher(
|
||||
rclcpp::node_interfaces::NodeBaseInterface * node_base,
|
||||
const std::string & topic,
|
||||
const rclcpp::QoS & qos,
|
||||
const rclcpp::PublisherOptionsWithAllocator<Alloc> & options,
|
||||
const rosidl_message_type_support_t & type_support)
|
||||
: rclcpp::Publisher<MessageT, Alloc>(node_base, topic, qos, options, type_support),
|
||||
enabled_(false),
|
||||
logger_(rclcpp::get_logger("LifecyclePublisher"))
|
||||
{
|
||||
}
|
||||
|
||||
~LifecyclePublisher() {}
|
||||
|
||||
/// LifecyclePublisher publish function
|
||||
@@ -110,6 +122,30 @@ public:
|
||||
rclcpp::Publisher<MessageT, Alloc>::publish(msg);
|
||||
}
|
||||
|
||||
/// Publish a serialized message. Non specialized version to prevent compiling errors.
|
||||
template<typename TDeleter, typename T>
|
||||
void publish(std::unique_ptr<T, TDeleter> serialized_msg)
|
||||
{
|
||||
(void)serialized_msg;
|
||||
throw std::runtime_error(
|
||||
"publishing unique_ptr with custom deleter only supported for serialized messages");
|
||||
}
|
||||
|
||||
/// Publish a serialized message.
|
||||
template<typename TDeleter>
|
||||
void publish(std::unique_ptr<rcl_serialized_message_t, TDeleter> serialized_msg)
|
||||
{
|
||||
if (!enabled_) {
|
||||
RCLCPP_WARN(
|
||||
logger_,
|
||||
"Trying to publish message on the topic '%s', but the publisher is not activated",
|
||||
this->get_topic_name());
|
||||
|
||||
return;
|
||||
}
|
||||
this->do_serialized_publish(*serialized_msg);
|
||||
}
|
||||
|
||||
virtual void
|
||||
on_activate()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user