Compare commits
3 Commits
native_buf
...
issues/306
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bf1fb0af8 | ||
|
|
90c1e1f3e9 | ||
|
|
3ff6029c6e |
@@ -19,6 +19,7 @@
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
@@ -386,6 +387,39 @@ private:
|
||||
std::vector<uint64_t> take_ownership_subscriptions;
|
||||
};
|
||||
|
||||
/// Hash function for rmw_gid_t to enable use in unordered_map
|
||||
struct rmw_gid_hash
|
||||
{
|
||||
std::size_t operator()(const rmw_gid_t & gid) const noexcept
|
||||
{
|
||||
// Using the FNV-1a hash algorithm on the gid data
|
||||
constexpr std::size_t FNV_prime = 1099511628211u;
|
||||
std::size_t result = 14695981039346656037u;
|
||||
|
||||
for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
|
||||
result ^= gid.data[i];
|
||||
result *= FNV_prime;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
/// Equality comparison for rmw_gid_t to enable use in unordered_map
|
||||
struct rmw_gid_equal
|
||||
{
|
||||
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
|
||||
{
|
||||
// Compare the data bytes only.
|
||||
// implementation_identifier pointer comparison is not used here because
|
||||
// intra-process communication is always within the same process and RMW,
|
||||
// and pointer comparison is fragile across dynamically loaded components.
|
||||
return std::equal(
|
||||
std::begin(lhs.data),
|
||||
std::end(lhs.data),
|
||||
std::begin(rhs.data));
|
||||
}
|
||||
};
|
||||
|
||||
using SubscriptionMap =
|
||||
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
|
||||
|
||||
@@ -398,6 +432,16 @@ private:
|
||||
using PublisherToSubscriptionIdsMap =
|
||||
std::unordered_map<uint64_t, SplittedSubscriptions>;
|
||||
|
||||
/// Structure to store publisher information in GID lookup map
|
||||
struct PublisherInfo
|
||||
{
|
||||
uint64_t pub_id;
|
||||
rclcpp::PublisherBase::WeakPtr publisher;
|
||||
};
|
||||
|
||||
using GidToPublisherInfoMap =
|
||||
std::unordered_map<rmw_gid_t, PublisherInfo, rmw_gid_hash, rmw_gid_equal>;
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
static
|
||||
uint64_t
|
||||
@@ -642,6 +686,8 @@ private:
|
||||
PublisherBufferMap publisher_buffers_;
|
||||
|
||||
mutable std::shared_timed_mutex mutex_;
|
||||
|
||||
GidToPublisherInfoMap gid_to_publisher_info_;
|
||||
};
|
||||
|
||||
} // namespace experimental
|
||||
|
||||
@@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
|
||||
}
|
||||
}
|
||||
|
||||
// Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr)
|
||||
gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher};
|
||||
|
||||
// Initialize the subscriptions storage for this publisher.
|
||||
pub_to_subs_[pub_id] = SplittedSubscriptions();
|
||||
|
||||
@@ -98,6 +101,24 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
|
||||
{
|
||||
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
// Remove GID to publisher info mapping.
|
||||
// First try via the publisher's own GID (fast path).
|
||||
auto pub_it = publishers_.find(intra_process_publisher_id);
|
||||
if (pub_it != publishers_.end()) {
|
||||
auto publisher = pub_it->second.lock();
|
||||
if (publisher) {
|
||||
gid_to_publisher_info_.erase(publisher->get_gid());
|
||||
} else {
|
||||
// Publisher weak_ptr already expired, fall back to linear scan by pub_id.
|
||||
for (auto git = gid_to_publisher_info_.begin(); git != gid_to_publisher_info_.end(); ++git) {
|
||||
if (git->second.pub_id == intra_process_publisher_id) {
|
||||
gid_to_publisher_info_.erase(git);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
publishers_.erase(intra_process_publisher_id);
|
||||
publisher_buffers_.erase(intra_process_publisher_id);
|
||||
pub_to_subs_.erase(intra_process_publisher_id);
|
||||
@@ -108,16 +129,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
|
||||
{
|
||||
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
|
||||
|
||||
for (auto & publisher_pair : publishers_) {
|
||||
auto publisher = publisher_pair.second.lock();
|
||||
if (!publisher) {
|
||||
continue;
|
||||
}
|
||||
if (*publisher.get() == id) {
|
||||
return true;
|
||||
}
|
||||
// Single O(1) hash map lookup - struct contains both ID and weak_ptr
|
||||
auto it = gid_to_publisher_info_.find(*id);
|
||||
if (it == gid_to_publisher_info_.end()) {
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
|
||||
// Verify the publisher still exists by checking the weak_ptr
|
||||
auto publisher = it->second.publisher.lock();
|
||||
return publisher != nullptr;
|
||||
}
|
||||
|
||||
size_t
|
||||
|
||||
@@ -162,7 +162,14 @@ public:
|
||||
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
|
||||
: topic_name(topic),
|
||||
qos_profile(qos)
|
||||
{}
|
||||
{
|
||||
// Initialize a mock GID with unique data based on this pointer
|
||||
gid_.implementation_identifier = "mock_rmw";
|
||||
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
|
||||
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
|
||||
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~PublisherBase()
|
||||
{}
|
||||
@@ -192,6 +199,12 @@ public:
|
||||
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
|
||||
}
|
||||
|
||||
const rmw_gid_t &
|
||||
get_gid() const
|
||||
{
|
||||
return gid_;
|
||||
}
|
||||
|
||||
bool
|
||||
operator==([[maybe_unused]] const rmw_gid_t & gid) const
|
||||
{
|
||||
@@ -210,6 +223,7 @@ public:
|
||||
private:
|
||||
std::string topic_name;
|
||||
rclcpp::QoS qos_profile;
|
||||
rmw_gid_t gid_;
|
||||
};
|
||||
|
||||
template<typename T, typename Alloc = std::allocator<void>>
|
||||
|
||||
Reference in New Issue
Block a user