Compare commits

...

3 Commits

Author SHA1 Message Date
Tomoya Fujita
6bf1fb0af8 add missing include.
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
2026-02-25 08:50:24 +09:00
Tomoya Fujita
90c1e1f3e9 a few extra fixes to harden the hash map lookup.
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
2026-02-24 15:18:42 +09:00
Tomoya Fujita
3ff6029c6e Reapply "improve lookup time for matches_any_publishers(). (#3068)" (#3077)
This reverts commit 1bf4e6a810.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
2026-02-24 14:54:09 +09:00
3 changed files with 90 additions and 10 deletions

View File

@@ -19,6 +19,7 @@
#include <shared_mutex>
#include <algorithm>
#include <iterator>
#include <memory>
#include <stdexcept>
@@ -386,6 +387,39 @@ private:
std::vector<uint64_t> take_ownership_subscriptions;
};
/// Hash function for rmw_gid_t to enable use in unordered_map
struct rmw_gid_hash
{
std::size_t operator()(const rmw_gid_t & gid) const noexcept
{
// Using the FNV-1a hash algorithm on the gid data
constexpr std::size_t FNV_prime = 1099511628211u;
std::size_t result = 14695981039346656037u;
for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
result ^= gid.data[i];
result *= FNV_prime;
}
return result;
}
};
/// Equality comparison for rmw_gid_t to enable use in unordered_map
struct rmw_gid_equal
{
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
{
// Compare the data bytes only.
// implementation_identifier pointer comparison is not used here because
// intra-process communication is always within the same process and RMW,
// and pointer comparison is fragile across dynamically loaded components.
return std::equal(
std::begin(lhs.data),
std::end(lhs.data),
std::begin(rhs.data));
}
};
using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
@@ -398,6 +432,16 @@ private:
using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;
/// Structure to store publisher information in GID lookup map
struct PublisherInfo
{
uint64_t pub_id;
rclcpp::PublisherBase::WeakPtr publisher;
};
using GidToPublisherInfoMap =
std::unordered_map<rmw_gid_t, PublisherInfo, rmw_gid_hash, rmw_gid_equal>;
RCLCPP_PUBLIC
static
uint64_t
@@ -642,6 +686,8 @@ private:
PublisherBufferMap publisher_buffers_;
mutable std::shared_timed_mutex mutex_;
GidToPublisherInfoMap gid_to_publisher_info_;
};
} // namespace experimental

View File

@@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
}
}
// Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr)
gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher};
// Initialize the subscriptions storage for this publisher.
pub_to_subs_[pub_id] = SplittedSubscriptions();
@@ -98,6 +101,24 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
// Remove GID to publisher info mapping.
// First try via the publisher's own GID (fast path).
auto pub_it = publishers_.find(intra_process_publisher_id);
if (pub_it != publishers_.end()) {
auto publisher = pub_it->second.lock();
if (publisher) {
gid_to_publisher_info_.erase(publisher->get_gid());
} else {
// Publisher weak_ptr already expired, fall back to linear scan by pub_id.
for (auto git = gid_to_publisher_info_.begin(); git != gid_to_publisher_info_.end(); ++git) {
if (git->second.pub_id == intra_process_publisher_id) {
gid_to_publisher_info_.erase(git);
break;
}
}
}
}
publishers_.erase(intra_process_publisher_id);
publisher_buffers_.erase(intra_process_publisher_id);
pub_to_subs_.erase(intra_process_publisher_id);
@@ -108,16 +129,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
// Single O(1) hash map lookup - struct contains both ID and weak_ptr
auto it = gid_to_publisher_info_.find(*id);
if (it == gid_to_publisher_info_.end()) {
return false;
}
return false;
// Verify the publisher still exists by checking the weak_ptr
auto publisher = it->second.publisher.lock();
return publisher != nullptr;
}
size_t

View File

@@ -162,7 +162,14 @@ public:
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
: topic_name(topic),
qos_profile(qos)
{}
{
// Initialize a mock GID with unique data based on this pointer
gid_.implementation_identifier = "mock_rmw";
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
}
}
virtual ~PublisherBase()
{}
@@ -192,6 +199,12 @@ public:
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
}
const rmw_gid_t &
get_gid() const
{
return gid_;
}
bool
operator==([[maybe_unused]] const rmw_gid_t & gid) const
{
@@ -210,6 +223,7 @@ public:
private:
std::string topic_name;
rclcpp::QoS qos_profile;
rmw_gid_t gid_;
};
template<typename T, typename Alloc = std::allocator<void>>