From ed42a5817ebe93c195c01c9386feac3fcc2c5f95 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:57:21 +0100 Subject: [PATCH 1/2] DPL: more validation of MessageSet::header / payload replacement --- Framework/Core/test/test_MessageSet.cxx | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index 290e55220d6cb..aa7b49c1d1d3c 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -250,6 +250,14 @@ TEST_CASE("GetHeaderPayloadOperators") auto& pl1 = msgSet.messages | get_payload{1, 0}; REQUIRE(pl1.get() != nullptr); REQUIRE(pl1->GetSize() == 200); + + // Validate pipe operators match old API + for (size_t i = 0; i < 2; ++i) { + REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); + REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); + } + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("GetHeaderPayloadMultiPayload") From f6070948e0c8bd0f4114291bdf1853a26147175d Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:04:08 +0100 Subject: [PATCH 2/2] DPL: move away from MessageSet::header / payload Rest of the usecases removed. Abstract header / payload retrieval, with the idea that get_header / get_payload will work on any range of fair::mq::MessagePtrs. --- Framework/Core/src/DataRelayer.cxx | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 7eb851e2aadd8..4cda75ed001b0 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -213,9 +213,9 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector(header->GetData()), reinterpret_cast(payload ? payload->GetData() : nullptr), @@ -786,9 +786,9 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto partial = getPartialRecord(li); // TODO: get the data ref from message model auto getter = [&partial](size_t idx, size_t part) { - if (!partial[idx].messages.empty() && partial[idx].header(part).get()) { - auto header = partial[idx].header(part).get(); - auto payload = partial[idx].payload(part).get(); + if (!partial[idx].messages.empty() && (partial[idx].messages | get_header{part}).get()) { + auto header = (partial[idx].messages | get_header{part}).get(); + auto payload = (partial[idx].messages | get_payload{part, 0}).get(); return DataRef{nullptr, reinterpret_cast(header->GetData()), reinterpret_cast(payload ? payload->GetData() : nullptr), @@ -952,10 +952,10 @@ std::vector DataRelayer::consumeExistingInputsForTime // TODO: in the original implementation of the cache, there have been only two messages per entry, // check if the 2 above corresponds to the number of messages. for (size_t pi = 0; pi < (cache[cacheId].messages | count_parts{}); pi++) { - auto& header = cache[cacheId].header(pi); + auto& header = cache[cacheId].messages | get_header{pi}; auto&& newHeader = header->GetTransport()->CreateMessage(); newHeader->Copy(*header); - messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))}); + messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].messages | get_payload{pi, 0})}); } };