Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Framework/Core/include/Framework/DataModelViews.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include "DomainInfoHeader.h"
#include "SourceInfoHeader.h"
#include "Headers/DataHeader.h"
#include "Framework/TimesliceSlot.h"
#include <ranges>
#include <span>

namespace o2::framework
{
Expand Down Expand Up @@ -213,13 +215,11 @@ struct get_num_payloads {
}
};

struct MessageSet;

struct inputs_for_slot {
TimesliceSlot slot;
template <typename R>
requires requires(R r) { requires std::ranges::random_access_range<decltype(r.sets)>; }
friend std::span<o2::framework::MessageSet> operator|(R&& r, inputs_for_slot self)
friend auto operator|(R&& r, inputs_for_slot self)
{
return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
}
Expand All @@ -231,7 +231,7 @@ struct messages_for_input {
requires std::ranges::random_access_range<R>
friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
{
return r[self.inputIdx].messages;
return std::span(r[self.inputIdx]);
}
};

Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/include/Framework/DataProcessingHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Framework/TimesliceSlot.h"
#include "Framework/TimesliceIndex.h"
#include <fairmq/FwdDecls.h>
#include <fairmq/Message.h>
#include <vector>
#include <span>

Expand All @@ -29,7 +30,6 @@ struct OutputChannelState;
struct ProcessingPolicies;
struct DeviceSpec;
struct FairMQDeviceProxy;
struct MessageSet;
struct ChannelIndex;
enum struct StreamingState;
enum struct TransitionHandlingState;
Expand All @@ -54,7 +54,7 @@ struct DataProcessingHelpers {
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
/// Helper to route messages for forwarding
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<MessageSet>& currentSetOfInputs,
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
bool copy, bool consume);
/// Helper to route messages for forwarding
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
Expand Down
10 changes: 5 additions & 5 deletions Framework/Core/include/Framework/DataRelayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "Framework/DataDescriptorMatcher.h"
#include "Framework/ForwardRoute.h"
#include "Framework/CompletionPolicy.h"
#include "Framework/MessageSet.h"
#include <fairmq/Message.h>
#include "Framework/TimesliceIndex.h"
#include "Framework/Tracing.h"
#include "Framework/TimesliceSlot.h"
Expand Down Expand Up @@ -113,7 +113,7 @@ class DataRelayer
ActivityStats processDanglingInputs(std::vector<ExpirationHandler> const&,
ServiceRegistryRef context, bool createNew);

using OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)>;
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::vector<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;

// Callback for when some messages are about to be owned by the the DataRelayer
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;
Expand Down Expand Up @@ -156,8 +156,8 @@ class DataRelayer
/// Returns an input registry associated to the given timeslice and gives
/// ownership to the caller. This is because once the inputs are out of the
/// DataRelayer they need to be deleted once the processing is concluded.
std::vector<MessageSet> consumeAllInputsForTimeslice(TimesliceSlot id);
std::vector<MessageSet> consumeExistingInputsForTimeslice(TimesliceSlot id);
std::vector<std::vector<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
std::vector<std::vector<fair::mq::MessagePtr>> consumeExistingInputsForTimeslice(TimesliceSlot id);

/// Returns how many timeslices we can handle in parallel
[[nodiscard]] size_t getParallelTimeslices() const;
Expand Down Expand Up @@ -203,7 +203,7 @@ class DataRelayer
/// Notice that we store them as a NxM sized vector, where
/// N is the maximum number of inflight timeslices, while
/// M is the number of inputs which are requested.
std::vector<MessageSet> mCache;
std::vector<std::vector<fair::mq::MessagePtr>> mCache;

/// This is the index which maps a given timestamp to the associated
/// cacheline.
Expand Down
178 changes: 0 additions & 178 deletions Framework/Core/include/Framework/MessageSet.h

This file was deleted.

24 changes: 13 additions & 11 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

#include "DecongestionService.h"
#include "Framework/DataProcessingHelpers.h"
#include "Framework/DataModelViews.h"
#include "DataRelayerHelpers.h"
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"
Expand Down Expand Up @@ -585,7 +586,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
// the inputs which are shared between this device and others
// to the next one in the daisy chain.
// FIXME: do it in a smarter way than O(N^2)
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

Expand Down Expand Up @@ -617,7 +618,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
};

static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

Expand All @@ -627,7 +628,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
// Always copy them, because we do not want to actually send them.
// We merely need the side effect of the consume, if applicable.
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
DataProcessingHelpers::cleanForwardedMessages(span, consume);
}

Expand Down Expand Up @@ -1278,7 +1279,7 @@ void DataProcessingDevice::Run()
// - we can trigger further events from the queue
// - we can guarantee this is the last thing we do in the loop (
// assuming no one else is adding to the queue before this point).
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
ServiceRegistryRef ref{registry};
ref.get<AsyncQueue>();
Expand Down Expand Up @@ -1942,7 +1943,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
nPayloadsPerHeader = 1;
ii += (nMessages / 2) - 1;
}
auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
O2_SIGNPOST_ID_GENERATE(cid, async_queue);
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
slot.index, oldestOutputInfo.timeslice.value);
Expand Down Expand Up @@ -2120,7 +2121,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// want to support multithreaded dispatching of operations, I can simply
// move these to some thread local store and the rest of the lambdas
// should work just fine.
std::vector<MessageSet> currentSetOfInputs;
std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;

//
auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
Expand All @@ -2131,7 +2132,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
}
auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
if ((currentSetOfInputs[i] | count_payloads{}) > partindex) {
const char* headerptr = nullptr;
const char* payloadptr = nullptr;
size_t payloadSize = 0;
Expand All @@ -2140,8 +2141,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// sequence is the header message
// - each part has one or more payload messages
// - InputRecord provides all payloads as header-payload pair
auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
auto const indices = currentSetOfInputs[i] | get_pair{partindex};
auto const& headerMsg = currentSetOfInputs[i][indices.headerIdx];
auto const& payloadMsg = currentSetOfInputs[i][indices.payloadIdx];
headerptr = static_cast<char const*>(headerMsg->GetData());
payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
Expand All @@ -2150,10 +2152,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
return DataRef{};
};
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
return currentSetOfInputs[i].getNumberOfPairs();
return (currentSetOfInputs[i] | count_payloads{});
};
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx].messages | get_header{0}));
auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx] | get_header{0}));
return header.GetRefCount();
};
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,14 @@ void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePt
}

auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
std::vector<MessageSet>& currentSetOfInputs,
std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
}
return forwardedParts;
Expand Down
Loading
Loading