Skip to content

Commit 99b8eb0

Browse files
Filter interested readers on PDP writer (#5604) (#5672)
* Refs #22506. Regression test. Signed-off-by: Eugenio Collado <[email protected]> * Refs #22506. Filter interested readers on PDP writer Signed-off-by: Eugenio Collado <[email protected]> * Fix IncompatibleQosGetters Signed-off-by: Eugenio Collado <[email protected]> * Fix PubSubParticipant rtps deprecated Signed-off-by: Eugenio Collado <[email protected]> --------- Manual conflicts resolution from commit b56b56a Signed-off-by: Eugenio Collado <[email protected]>
1 parent 37230ac commit 99b8eb0

File tree

14 files changed

+608
-40
lines changed

14 files changed

+608
-40
lines changed

include/fastdds/rtps/writer/StatelessWriter.h

+8-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ class StatelessWriter : public RTPSWriter
7676
WriterHistory* hist,
7777
WriterListener* listen = nullptr);
7878

79+
mutable LocatorList_t fixed_locators_;
80+
81+
virtual bool send_to_fixed_locators(
82+
CDRMessage_t* message,
83+
std::chrono::steady_clock::time_point& max_blocking_time_point) const;
84+
7985
public:
8086

8187
virtual ~StatelessWriter();
@@ -157,10 +163,11 @@ class StatelessWriter : public RTPSWriter
157163
//FOR NOW THERE IS NOTHING TO UPDATE.
158164
}
159165

166+
//! Deprecated in favor of PDP simple writer
160167
bool set_fixed_locators(
161168
const LocatorList_t& locator_list);
162169

163-
//!Reset the unsent changes.
170+
//! Deprecated in favor of PDP simple writer
164171
void unsent_changes_reset();
165172

166173
/**
@@ -265,7 +272,6 @@ class StatelessWriter : public RTPSWriter
265272

266273

267274
bool is_inline_qos_expected_ = false;
268-
LocatorList_t fixed_locators_;
269275
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;
270276

271277
std::condition_variable_any unsent_changes_cond_;

src/cpp/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ set(${PROJECT_NAME}_source_files
185185
rtps/builtin/discovery/participant/PDP.cpp
186186
rtps/builtin/discovery/participant/ServerAttributes.cpp
187187
rtps/builtin/discovery/participant/PDPSimple.cpp
188+
rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp
188189
rtps/builtin/discovery/participant/PDPListener.cpp
189190
rtps/builtin/discovery/endpoint/EDP.cpp
190191
rtps/builtin/discovery/endpoint/EDPSimple.cpp

src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp

+3-8
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ void PDPSimple::announceParticipantState(
284284

285285
if (!(dispose || new_change))
286286
{
287-
endpoints->writer.writer_->unsent_changes_reset();
287+
endpoints->writer.writer_->send_periodic_announcement();
288288
}
289289
}
290290
}
@@ -393,7 +393,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
393393
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.payload_pool_, writer.history_.get(),
394394
nullptr, writer_entity_id, true))
395395
{
396-
writer.writer_ = dynamic_cast<StatelessWriter*>(rtps_writer);
396+
writer.writer_ = dynamic_cast<PDPStatelessWriter*>(rtps_writer);
397397
assert(nullptr != writer.writer_);
398398

399399
#if HAVE_SECURITY
@@ -410,7 +410,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
410410
fixed_locators.push_back(local_locator);
411411
}
412412
}
413-
writer.writer_->set_fixed_locators(fixed_locators);
413+
writer.writer_->set_initial_peers(fixed_locators);
414414
}
415415
else
416416
{
@@ -680,11 +680,6 @@ void PDPSimple::match_pdp_remote_endpoints(
680680
{
681681
writer->matched_reader_add(*temp_reader_data);
682682
}
683-
684-
if (!writer_only && (BEST_EFFORT_RELIABILITY_QOS == reliability_kind))
685-
{
686-
endpoints->writer.writer_->unsent_changes_reset();
687-
}
688683
}
689684
}
690685

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file PDPStatelessWriter.cpp
17+
*/
18+
19+
#include <rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp>
20+
21+
#include <algorithm>
22+
#include <cassert>
23+
#include <chrono>
24+
#include <cstdint>
25+
#include <mutex>
26+
#include <set>
27+
#include <vector>
28+
29+
#include <fastrtps/rtps/history/WriterHistory.h>
30+
#include <rtps/participant/RTPSParticipantImpl.h>
31+
32+
namespace eprosima {
33+
namespace fastrtps {
34+
namespace rtps {
35+
36+
PDPStatelessWriter::PDPStatelessWriter(
37+
RTPSParticipantImpl* participant,
38+
const GUID_t& guid,
39+
const WriterAttributes& attributes,
40+
fastdds::rtps::FlowController* flow_controller,
41+
WriterHistory* history,
42+
WriterListener* listener)
43+
: StatelessWriter(participant, guid, attributes, flow_controller, history, listener)
44+
, interested_readers_(participant->getRTPSParticipantAttributes().allocation.participants)
45+
{
46+
}
47+
48+
bool PDPStatelessWriter::matched_reader_add(
49+
const ReaderProxyData& data)
50+
{
51+
bool ret = StatelessWriter::matched_reader_add(data);
52+
if (ret)
53+
{
54+
// Mark new reader as interested
55+
add_interested_reader(data.guid());
56+
// Send announcement to new reader
57+
reschedule_all_samples();
58+
}
59+
return ret;
60+
}
61+
62+
bool PDPStatelessWriter::matched_reader_remove(
63+
const GUID_t& reader_guid)
64+
{
65+
bool ret = StatelessWriter::matched_reader_remove(reader_guid);
66+
if (ret)
67+
{
68+
// Mark reader as not interested
69+
remove_interested_reader(reader_guid);
70+
}
71+
return ret;
72+
}
73+
74+
void PDPStatelessWriter::unsent_change_added_to_history(
75+
CacheChange_t* change,
76+
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
77+
{
78+
mark_all_readers_interested();
79+
StatelessWriter::unsent_change_added_to_history(change, max_blocking_time);
80+
}
81+
82+
void PDPStatelessWriter::set_initial_peers(
83+
const fastdds::rtps::LocatorList& locator_list)
84+
{
85+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
86+
87+
initial_peers_.push_back(locator_list);
88+
mp_RTPSParticipant->createSenderResources(initial_peers_);
89+
}
90+
91+
void PDPStatelessWriter::send_periodic_announcement()
92+
{
93+
mark_all_readers_interested();
94+
reschedule_all_samples();
95+
}
96+
97+
bool PDPStatelessWriter::send_to_fixed_locators(
98+
CDRMessage_t* message,
99+
std::chrono::steady_clock::time_point& max_blocking_time_point) const
100+
{
101+
bool ret = true;
102+
103+
if (should_reach_all_destinations_)
104+
{
105+
ret = initial_peers_.empty() ||
106+
mp_RTPSParticipant->sendSync(message, m_guid,
107+
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
108+
max_blocking_time_point);
109+
110+
if (ret)
111+
{
112+
fixed_locators_.clear();
113+
should_reach_all_destinations_ = false;
114+
}
115+
}
116+
else
117+
{
118+
interested_readers_.clear();
119+
}
120+
121+
return ret;
122+
}
123+
124+
bool PDPStatelessWriter::is_relevant(
125+
const CacheChange_t& /* change */,
126+
const GUID_t& reader_guid) const
127+
{
128+
return interested_readers_.end() !=
129+
std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
130+
}
131+
132+
void PDPStatelessWriter::mark_all_readers_interested()
133+
{
134+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
135+
should_reach_all_destinations_ = true;
136+
interested_readers_.clear();
137+
fixed_locators_.clear();
138+
fixed_locators_.push_back(initial_peers_);
139+
reader_data_filter(nullptr);
140+
}
141+
142+
void PDPStatelessWriter::add_interested_reader(
143+
const GUID_t& reader_guid)
144+
{
145+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
146+
if (!should_reach_all_destinations_)
147+
{
148+
auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
149+
if (it == interested_readers_.end())
150+
{
151+
interested_readers_.emplace_back(reader_guid);
152+
reader_data_filter(this);
153+
}
154+
}
155+
}
156+
157+
void PDPStatelessWriter::remove_interested_reader(
158+
const GUID_t& reader_guid)
159+
{
160+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
161+
interested_readers_.remove(reader_guid);
162+
}
163+
164+
void PDPStatelessWriter::reschedule_all_samples()
165+
{
166+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
167+
size_t n_samples = mp_history->getHistorySize();
168+
if (0 < n_samples)
169+
{
170+
assert(1 == n_samples);
171+
auto it = mp_history->changesBegin();
172+
CacheChange_t* change = *it;
173+
flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24));
174+
}
175+
}
176+
177+
} // namespace rtps
178+
} // namespace fastdds
179+
} // namespace eprosima

0 commit comments

Comments
 (0)