diff --git a/Framework/Core/include/Framework/MessageSet.h b/Framework/Core/include/Framework/MessageSet.h index 166934238d647..323c0ad4608af 100644 --- a/Framework/Core/include/Framework/MessageSet.h +++ b/Framework/Core/include/Framework/MessageSet.h @@ -136,30 +136,6 @@ struct MessageSet { } } - fair::mq::MessagePtr& header(size_t partIndex) - { - return messages[messageMap[partIndex].position]; - } - - fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0) - { - assert(partIndex < messageMap.size()); - assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); - return messages[messageMap[partIndex].position + payloadIndex + 1]; - } - - fair::mq::MessagePtr const& header(size_t partIndex) const - { - return messages[messageMap[partIndex].position]; - } - - fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const - { - assert(partIndex < messageMap.size()); - assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size()); - return messages[messageMap[partIndex].position + payloadIndex + 1]; - } - fair::mq::MessagePtr const& associatedHeader(size_t pos) const { return messages[messageMap[pairMap[pos].partIndex].position]; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 0fa70947bf18c..bb6502758a95a 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2140,8 +2140,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // sequence is the header message // - each part has one or more payload messages // - InputRecord provides all payloads as header-payload pair - auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex); - auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex); + auto const indices = currentSetOfInputs[i].messages | get_pair{partindex}; + auto const& headerMsg = currentSetOfInputs[i].messages[indices.headerIdx]; + auto const& payloadMsg = currentSetOfInputs[i].messages[indices.payloadIdx]; headerptr = static_cast(headerMsg->GetData()); payloadptr = payloadMsg ? static_cast(payloadMsg->GetData()) : nullptr; payloadSize = payloadMsg ? payloadMsg->GetSize() : 0; diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index aa7b49c1d1d3c..8c9ed4a7cbf1c 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -45,12 +45,16 @@ TEST_CASE("MessageSet") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); - // Validate pipe operators match old API - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | count_parts{}) == 1); + // messages: [hdr, pl] — one pair + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetWithFunction") @@ -76,11 +80,13 @@ TEST_CASE("MessageSetWithFunction") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | count_parts{}) == 1); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetWithMultipart") @@ -112,13 +118,18 @@ TEST_CASE("MessageSetWithMultipart") REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2); CHECK_THROWS((msgSet.messages | get_pair{2})); - // Validate pipe operators match old API for multi-payload - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); - REQUIRE(&(msgSet.messages | get_payload{0, 1}) == &msgSet.payload(0, 1)); - REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 2); + REQUIRE((msgSet.messages | count_parts{}) == 1); + // messages: [hdr, pl0, pl1] — one header, two payloads + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); + } } TEST_CASE("MessageSetAddPartRef") @@ -190,18 +201,16 @@ TEST_CASE("MessageSetAddMultiple") REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5); REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 4); REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 6); - // Validate pipe operators match old API for mixed modes - for (size_t i = 0; i < 3; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); - } - // Part 2 has a second payload (multi-payload with splitPayloadParts=2, splitPayloadIndex=2) - REQUIRE(&(msgSet.messages | get_payload{2, 1}) == &msgSet.payload(2, 1)); - for (size_t i = 0; i < 3; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{1}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{2}) == 2); + REQUIRE((msgSet.messages | count_parts{}) == 3); + REQUIRE((msgSet.messages | count_payloads{}) == 4); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("GetHeaderPayloadOperators") @@ -251,13 +260,18 @@ TEST_CASE("GetHeaderPayloadOperators") REQUIRE(pl1.get() != nullptr); REQUIRE(pl1->GetSize() == 200); - // Validate pipe operators match old API - for (size_t i = 0; i < 2; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); + REQUIRE((msgSet.messages | count_parts{}) == 2); + REQUIRE((msgSet.messages | count_payloads{}) == 2); + // messages: [hdr0, pl0, hdr1, pl1] — two standard pairs + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("GetHeaderPayloadMultiPayload") @@ -343,18 +357,24 @@ TEST_CASE("GetHeaderPayloadMultiPayload") // get_num_payloads for part 1 should be 3 REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); - // Validate pipe operators match old API for multi-payload (header, pl, pl, pl) - REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); - REQUIRE(&(msgSet.messages | get_header{1}) == &msgSet.header(1)); - REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); - REQUIRE(&(msgSet.messages | get_payload{1, 0}) == &msgSet.payload(1, 0)); - REQUIRE(&(msgSet.messages | get_payload{1, 1}) == &msgSet.payload(1, 1)); - REQUIRE(&(msgSet.messages | get_payload{1, 2}) == &msgSet.payload(1, 2)); - for (size_t i = 0; i < 2; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + REQUIRE((msgSet.messages | get_num_payloads{0}) == 1); + REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); + REQUIRE((msgSet.messages | count_parts{}) == 2); + REQUIRE((msgSet.messages | count_payloads{}) == 4); + // messages: [hdr0, pl0, hdr1, pl1_0, pl1_1, pl1_2] + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 4); + REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 5); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("TraditionalSplitParts") @@ -418,14 +438,20 @@ TEST_CASE("TraditionalSplitParts") // get_num_payloads: each traditional split pair has 1 payload for (size_t i = 0; i < 3; ++i) { - REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + REQUIRE((msgSet.messages | get_num_payloads{i}) == 1); } - - // Validate pipe operators match old MessageSet::header()/payload() API - for (size_t i = 0; i < 3; ++i) { - REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); - REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i)); + REQUIRE((msgSet.messages | count_parts{}) == 3); + REQUIRE((msgSet.messages | count_payloads{}) == 3); + // messages: [hdr0, pl0, hdr1, pl1, hdr2, pl2] — three traditional split pairs + REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); + REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); + REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 2); + REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 3); + REQUIRE((msgSet.messages | get_pair{2}).headerIdx == 4); + REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5); + for (size_t i = 0; i < msgSet.pairMap.size(); ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(&msgSet.messages[indices.headerIdx] == &msgSet.associatedHeader(i)); + REQUIRE(&msgSet.messages[indices.payloadIdx] == &msgSet.associatedPayload(i)); } - REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); - REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); }