Compare commits
1 Commits
rolling
...
native_buf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b4d3d01c6 |
@@ -80,7 +80,7 @@ public:
|
||||
node_base,
|
||||
*rclcpp::get_message_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib),
|
||||
topic_name,
|
||||
options.to_rcl_subscription_options(qos),
|
||||
force_cpu_buffer_backend_(options).to_rcl_subscription_options(qos),
|
||||
options.event_callbacks,
|
||||
options.use_default_callbacks,
|
||||
DeliveredMessageKind::SERIALIZED_MESSAGE),
|
||||
@@ -182,6 +182,17 @@ public:
|
||||
|
||||
private:
|
||||
RCLCPP_DISABLE_COPY(GenericSubscription)
|
||||
|
||||
template<typename AllocatorT>
|
||||
static rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>
|
||||
force_cpu_buffer_backend_(
|
||||
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
|
||||
{
|
||||
auto opts = options;
|
||||
opts.acceptable_buffer_backends = "cpu";
|
||||
return opts;
|
||||
}
|
||||
|
||||
AnySubscriptionCallback<rclcpp::SerializedMessage, std::allocator<void>> any_callback_;
|
||||
// The type support library should stay loaded, so it is stored in the GenericSubscription
|
||||
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;
|
||||
|
||||
@@ -551,15 +551,6 @@ public:
|
||||
event_handlers_[event_type]->clear_on_ready_callback();
|
||||
}
|
||||
|
||||
/// Check if content filtered topic feature of the subscription instance is supported.
|
||||
/**
|
||||
* \return boolean flag indicating if the content filtered topic of this subscription is
|
||||
* supported.
|
||||
*/
|
||||
RCLCPP_PUBLIC
|
||||
bool
|
||||
is_cft_supported() const;
|
||||
|
||||
/// Check if content filtered topic feature of the subscription instance is enabled.
|
||||
/**
|
||||
* \return boolean flag indicating if the content filtered topic of this subscription is enabled.
|
||||
|
||||
@@ -89,6 +89,15 @@ struct SubscriptionOptionsBase
|
||||
QosOverridingOptions qos_overriding_options;
|
||||
|
||||
ContentFilterOptions content_filter_options;
|
||||
|
||||
/// Acceptable buffer backend names for this subscription.
|
||||
/**
|
||||
* Empty string or "cpu" means CPU-only (default for backward compatibility).
|
||||
* "any" means all installed backends are acceptable.
|
||||
* Comma-separated for specific backends, e.g. "cuda,demo".
|
||||
* CPU is always implicitly acceptable regardless of this value.
|
||||
*/
|
||||
std::string acceptable_buffer_backends{"cpu"};
|
||||
};
|
||||
|
||||
/// Structure containing optional configuration for Subscriptions.
|
||||
@@ -145,6 +154,11 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
|
||||
}
|
||||
}
|
||||
|
||||
if (!acceptable_buffer_backends.empty()) {
|
||||
result.rmw_subscription_options.acceptable_buffer_backends =
|
||||
acceptable_buffer_backends.c_str();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -77,10 +77,6 @@ ParameterEventHandler::add_parameter_callback(
|
||||
bool
|
||||
ParameterEventHandler::configure_nodes_filter(const std::vector<std::string> & node_names)
|
||||
{
|
||||
if (!event_subscription_->is_cft_supported()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (node_names.empty()) {
|
||||
// Clear content filter
|
||||
event_subscription_->set_content_filter("");
|
||||
|
||||
@@ -480,12 +480,6 @@ SubscriptionBase::set_on_new_message_callback(
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
SubscriptionBase::is_cft_supported() const
|
||||
{
|
||||
return rcl_subscription_is_cft_supported(subscription_handle_.get());
|
||||
}
|
||||
|
||||
bool
|
||||
SubscriptionBase::is_cft_enabled() const
|
||||
{
|
||||
|
||||
@@ -108,21 +108,6 @@ bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::Bas
|
||||
m1.uint64_value == m2.uint64_value;
|
||||
}
|
||||
|
||||
TEST_F(TestContentFilterSubscription, is_cft_supported)
|
||||
{
|
||||
{
|
||||
auto mock = mocking_utils::patch_and_return(
|
||||
"lib:rclcpp", rcl_subscription_is_cft_supported, false);
|
||||
EXPECT_FALSE(sub->is_cft_supported());
|
||||
}
|
||||
|
||||
{
|
||||
auto mock = mocking_utils::patch_and_return(
|
||||
"lib:rclcpp", rcl_subscription_is_cft_supported, true);
|
||||
EXPECT_TRUE(sub->is_cft_supported());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestContentFilterSubscription, is_cft_enabled)
|
||||
{
|
||||
{
|
||||
@@ -180,7 +165,7 @@ TEST_F(TestContentFilterSubscription, get_content_filter)
|
||||
|
||||
TEST_F(TestContentFilterSubscription, set_content_filter)
|
||||
{
|
||||
if (sub->is_cft_supported()) {
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(filter_expression_init, expression_parameters_2));
|
||||
} else {
|
||||
@@ -193,10 +178,8 @@ TEST_F(TestContentFilterSubscription, set_content_filter)
|
||||
TEST_F(TestContentFilterSubscription, content_filter_get_begin)
|
||||
{
|
||||
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
|
||||
|
||||
// Run test only if rmw implementation supports content filter, otherwise skip it.
|
||||
if (!sub->is_cft_supported()) {
|
||||
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
|
||||
if (rmw_implementation_str == "rmw_zenoh_cpp") {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -223,27 +206,28 @@ TEST_F(TestContentFilterSubscription, content_filter_get_begin)
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(original_message, output_message);
|
||||
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(filter_expression_init, expression_parameters_2));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(filter_expression_init, expression_parameters_2));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
|
||||
test_msgs::msg::BasicTypes message_to_be_filtered;
|
||||
message_to_be_filtered.int32_value = original_message.int32_value;
|
||||
pub->publish(message_to_be_filtered);
|
||||
test_msgs::msg::BasicTypes original_message;
|
||||
original_message.int32_value = 3;
|
||||
pub->publish(original_message);
|
||||
|
||||
test_msgs::msg::BasicTypes output_message2;
|
||||
receive = wait_for_message(output_message2, sub, context, 10s);
|
||||
EXPECT_FALSE(receive);
|
||||
test_msgs::msg::BasicTypes output_message;
|
||||
bool receive = wait_for_message(output_message, sub, context, 10s);
|
||||
EXPECT_FALSE(receive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestContentFilterSubscription, content_filter_get_later)
|
||||
{
|
||||
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
|
||||
// Run test only if rmw implementation supports content filter, otherwise skip it.
|
||||
if (!sub->is_cft_supported()) {
|
||||
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
|
||||
if (rmw_implementation_str == "rmw_zenoh_cpp") {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -267,30 +251,36 @@ TEST_F(TestContentFilterSubscription, content_filter_get_later)
|
||||
|
||||
test_msgs::msg::BasicTypes output_message;
|
||||
bool receive = wait_for_message(output_message, sub, context, 10s);
|
||||
EXPECT_FALSE(receive);
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_FALSE(receive);
|
||||
} else {
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(original_message, output_message);
|
||||
}
|
||||
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(filter_expression_init, expression_parameters_2));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(filter_expression_init, expression_parameters_2));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
|
||||
test_msgs::msg::BasicTypes matching_message;
|
||||
matching_message.int32_value = original_message.int32_value;
|
||||
pub->publish(matching_message);
|
||||
test_msgs::msg::BasicTypes original_message;
|
||||
original_message.int32_value = 4;
|
||||
pub->publish(original_message);
|
||||
|
||||
test_msgs::msg::BasicTypes output_message2;
|
||||
receive = wait_for_message(output_message2, sub, context, 10s);
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(matching_message, output_message2);
|
||||
test_msgs::msg::BasicTypes output_message;
|
||||
bool receive = wait_for_message(output_message, sub, context, 10s);
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(original_message, output_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestContentFilterSubscription, content_filter_reset)
|
||||
{
|
||||
std::string rmw_implementation_str = std::string(rmw_get_implementation_identifier());
|
||||
// Run test only if rmw implementation supports content filter, otherwise skip it.
|
||||
if (!sub->is_cft_supported()) {
|
||||
GTEST_SKIP() << rmw_implementation_str << " doesn't support content filter";
|
||||
if (rmw_implementation_str == "rmw_zenoh_cpp") {
|
||||
GTEST_SKIP();
|
||||
}
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -314,20 +304,28 @@ TEST_F(TestContentFilterSubscription, content_filter_reset)
|
||||
|
||||
test_msgs::msg::BasicTypes output_message;
|
||||
bool receive = wait_for_message(output_message, sub, context, 10s);
|
||||
EXPECT_FALSE(receive);
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_FALSE(receive);
|
||||
} else {
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(original_message, output_message);
|
||||
}
|
||||
|
||||
EXPECT_NO_THROW(sub->set_content_filter(""));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
if (sub->is_cft_enabled()) {
|
||||
EXPECT_NO_THROW(
|
||||
sub->set_content_filter(""));
|
||||
// waiting to allow for filter propagation
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
|
||||
test_msgs::msg::BasicTypes unfiltered_message;
|
||||
unfiltered_message.int32_value = original_message.int32_value;
|
||||
pub->publish(unfiltered_message);
|
||||
test_msgs::msg::BasicTypes original_message;
|
||||
original_message.int32_value = 4;
|
||||
pub->publish(original_message);
|
||||
|
||||
test_msgs::msg::BasicTypes output_message2;
|
||||
receive = wait_for_message(output_message2, sub, context, 10s);
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(unfiltered_message, output_message2);
|
||||
test_msgs::msg::BasicTypes output_message;
|
||||
bool receive = wait_for_message(output_message, sub, context, 10s);
|
||||
EXPECT_TRUE(receive);
|
||||
EXPECT_EQ(original_message, output_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user