diff --git a/Utilities/CMakeLists.txt b/Utilities/CMakeLists.txt index 440514c381c6a..82db492436f86 100644 --- a/Utilities/CMakeLists.txt +++ b/Utilities/CMakeLists.txt @@ -15,7 +15,6 @@ endif(ALIROOT) add_subdirectory(aliceHLTwrapper) add_subdirectory(O2MessageMonitor) -add_subdirectory(DataFlow) add_subdirectory(DataSampling) add_subdirectory(Publishers) add_subdirectory(DataCompression) diff --git a/Utilities/DataFlow/CMakeLists.txt b/Utilities/DataFlow/CMakeLists.txt deleted file mode 100644 index 05e9e26bd0c31..0000000000000 --- a/Utilities/DataFlow/CMakeLists.txt +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright CERN and copyright holders of ALICE O2. This software is distributed -# under the terms of the GNU General Public License v3 (GPL Version 3), copied -# verbatim in the file "COPYING". -# -# See http://alice-o2.web.cern.ch/license for full licensing information. -# -# In applying this license CERN does not waive the privileges and immunities -# granted to it by virtue of its status as an Intergovernmental Organization or -# submit itself to any jurisdiction. - -# the bucket contains the following dependencies - common_boost_bucket - Base - -# Headers - O2device - dl the 'dl' dependency is needed as the device -# boilerplate code in runSimpleMQStateMachine.h uses dlopen etc. Probably this -# hidden dependency can be avoided by including the to some compiled FairMQ -# library - -o2_add_library(DataFlow - SOURCES src/FakeTimeframeBuilder.cxx - src/FakeTimeframeGeneratorDevice.cxx - src/HeartbeatSampler.cxx - src/SubframeBuilderDevice.cxx - src/TimeframeParser.cxx - src/TimeframeReaderDevice.cxx - src/TimeframeValidatorDevice.cxx - src/TimeframeWriterDevice.cxx - src/EPNReceiverDevice.cxx - src/FLPSenderDevice.cxx - PUBLIC_LINK_LIBRARIES O2::Headers O2::TimeFrame FairMQ::FairMQ - O2::Device) - -o2_target_man_page(DataFlow NAME o2-timeframe-reader-device) -o2_target_man_page(DataFlow NAME o2-timeframe-writer-device) -o2_target_man_page(DataFlow NAME o2-subframebuilder-device) - -o2_add_executable(fake-timeframegenerator-device - SOURCES src/runFakeTimeframeGeneratorDevice - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(heartbeat-sampler - SOURCES src/runHeartbeatSampler - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(subframebuilder-device - SOURCES src/runSubframeBuilderDevice - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(timeframe-reader-device - SOURCES src/runTimeframeReaderDevice - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(timeframe-validator-device - SOURCES src/runTimeframeValidatorDevice - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(timeframe-writer-device - SOURCES src/runTimeframeWriterDevice - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(epn-receiver-device - SOURCES src/runEPNReceiver - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(flp-sender-device - SOURCES src/runFLPSender - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_executable(timeframe-validation-tool - SOURCES src/TimeframeValidationTool - PUBLIC_LINK_LIBRARIES O2::DataFlow) - -o2_add_test(TimeframeParser - SOURCES test/test_TimeframeParser - PUBLIC_LINK_LIBRARIES O2::DataFlow - COMPONENT_NAME dataflow - LABELS utils) - -o2_add_test(SubframeUtils01 - SOURCES test/test_SubframeUtils01 - PUBLIC_LINK_LIBRARIES O2::DataFlow - COMPONENT_NAME dataflow - LABELS utils) - -o2_add_test(PayloadMerger01 - SOURCES test/test_PayloadMerger01 - PUBLIC_LINK_LIBRARIES O2::DataFlow - COMPONENT_NAME dataflow - LABELS utils) diff --git a/Utilities/DataFlow/doc/o2-subframebuilder-device.1.in b/Utilities/DataFlow/doc/o2-subframebuilder-device.1.in deleted file mode 100644 index 653053997d842..0000000000000 --- a/Utilities/DataFlow/doc/o2-subframebuilder-device.1.in +++ /dev/null @@ -1,48 +0,0 @@ -.\" Manpage for o2-subframebuilder-device. -.TH AliceO2 1 "12 May 2017" "1.0" "o2-subframebuilder-device man page" - -.SH NAME - -o2-subframebuilder-device - aggregate HBF in input as a single STF - -.SH SYNOPSIS - -o2-subframebuilder-device [options] - -.SH DESCRIPTION - -o2-subframebuilder-device will take in input a number of HeartBeat Frames -(HBF) and merge them in a single STF, with a policy defined by the -passed options. - -.SH OPTIONS - ---self-triggered Time frame duration - -.TP 5 - ---in-chan-name [NAME] Name of the input channel - -.TP 5 - ---out-chan-name [NAME] Name of the output channel - -.TP 5 - ---detector-name [NAME] Name of detector as data source - -.TP 5 - ---flp-id arg [NAME] ID of the FLP used as data source - -.TP 5 - ---strip-hbf Strip HeartBeatHeader (HBH) & HeartBeatTrailer (HBT) from each HBF - -.SH SEE ALSO - -FLPSenderDEvice(1), o2-epn-receiver-device(1), o2-heartbeat-sampler(1), TimeframeValidator(1) - -.SH BUGS - -Lots of bugs diff --git a/Utilities/DataFlow/doc/o2-timeframe-reader-device.1.in b/Utilities/DataFlow/doc/o2-timeframe-reader-device.1.in deleted file mode 100644 index bdf460cadc976..0000000000000 --- a/Utilities/DataFlow/doc/o2-timeframe-reader-device.1.in +++ /dev/null @@ -1,29 +0,0 @@ -.\" Manpage for o2-timeframe-reader-device. -.TH man 1 "12 May 2017" "1.0" "o2-timeframe-reader-device man page" - -.SH NAME - -o2-timeframe-reader-device - read a timeframe from disk - -.SH SYNOPSIS - -o2-timeframe-reader-device --input-file [FILE] - -.SH DESCRIPTION - -o2-timeframe-reader-device will read a Timeframe from the FILE on disk and streams it -via FairMQ - -.SH OPTIONS - -.TP 5 - ---input-file [FILE] the file to be streamed - -.SH SEE ALSO - -o2-timeframe-writer-device(1) - -.SH BUGS - -Lots of bugs diff --git a/Utilities/DataFlow/doc/o2-timeframe-writer-device.1.in b/Utilities/DataFlow/doc/o2-timeframe-writer-device.1.in deleted file mode 100644 index a4828d9f12d41..0000000000000 --- a/Utilities/DataFlow/doc/o2-timeframe-writer-device.1.in +++ /dev/null @@ -1,29 +0,0 @@ -.\" Manpage for o2-timeframe-writer-device. -.TH man 1 "12 May 2017" "1.0" "o2-timeframe-writer-device man page" - -.SH NAME - -o2-timeframe-writer-device - writes a timeframe to disk - -.SH SYNOPSIS - -o2-timeframe-writer-device --input-file [FILE] - -.SH DESCRIPTION - -o2-timeframe-writer-device will receive a Timeframe from FairMQ transport and stream -it via FairMQ. - -.SH OPTIONS - -.TP 5 - ---output-file [FILE] the file where to stream results - -.SH SEE ALSO - -o2-timeframe-reader-device(1) - -.SH BUGS - -Lots of bugs diff --git a/Utilities/DataFlow/include/DataFlow/EPNReceiverDevice.h b/Utilities/DataFlow/include/DataFlow/EPNReceiverDevice.h deleted file mode 100644 index f13fc8cd88585..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/EPNReceiverDevice.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_DEVICES_EPNRECEIVER_H_ -#define ALICEO2_DEVICES_EPNRECEIVER_H_ - -#include -#include -#include -#include - -#include - -namespace o2 -{ -namespace devices -{ - -/// Container for (sub-)timeframes - -struct TFBuffer { - FairMQParts parts; - std::chrono::steady_clock::time_point start; - std::chrono::steady_clock::time_point end; -}; - -/// Receives sub-timeframes from the flpSenders and merges these into full timeframes. - -class EPNReceiverDevice final : public FairMQDevice -{ - public: - EPNReceiverDevice() = default; - ~EPNReceiverDevice() final = default; - void InitTask() final; - - /// Prints the contents of the timeframe container - void PrintBuffer(const std::unordered_map& buffer) const; - - /// Discared incomplete timeframes after \p fBufferTimeoutInMs. - void DiscardIncompleteTimeframes(); - - protected: - /// Overloads the Run() method of FairMQDevice - void Run() override; - - std::unordered_map mTimeframeBuffer; ///< Stores (sub-)timeframes - std::unordered_set mDiscardedSet; ///< Set containing IDs of dropped timeframes - - int mNumFLPs = 0; ///< Number of flpSenders - int mBufferTimeoutInMs = 5000; ///< Time after which incomplete timeframes are dropped - int mTestMode = 0; ///< Run the device in test mode (only syncSampler+flpSender+epnReceiver) - - std::string mInChannelName = ""; - std::string mOutChannelName = ""; - std::string mAckChannelName = ""; -}; - -} // namespace devices -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/include/DataFlow/FLPSenderDevice.h b/Utilities/DataFlow/include/DataFlow/FLPSenderDevice.h deleted file mode 100644 index 08cda323677b9..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/FLPSenderDevice.h +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_DEVICES_FLPSENDER_H_ -#define ALICEO2_DEVICES_FLPSENDER_H_ - -#include -#include -#include -#include - -#include - -namespace o2 -{ -namespace devices -{ - -/// Sends sub-timframes to epnReceivers -/// -/// Sub-timeframes are received from the previous step (or generated in test-mode) -/// and are sent to epnReceivers. Target epnReceiver is determined from the timeframe ID: -/// targetEpnReceiver = timeframeId % numEPNs (numEPNs is same for every flpSender, although some may be inactive). - -class FLPSenderDevice final : public FairMQDevice -{ - public: - /// Default constructor - FLPSenderDevice() = default; - - /// Default destructor - ~FLPSenderDevice() final = default; - - protected: - /// Overloads the InitTask() method of FairMQDevice - void InitTask() final; - - /// Overloads the Run() method of FairMQDevice - void Run() final; - - private: - /// Sends the "oldest" element from the sub-timeframe container - void sendFrontData(); - - std::queue mSTFBuffer; ///< Buffer for sub-timeframes - std::queue mArrivalTime; ///< Stores arrival times of sub-timeframes - - int mNumEPNs = 0; ///< Number of epnReceivers - unsigned int mIndex = 0; ///< Index of the flpSender among other flpSenders - unsigned int mSendOffset = 0; ///< Offset for staggering output - unsigned int mSendDelay = 8; ///< Delay for staggering output - - int mEventSize = 10000; ///< Size of the sub-timeframe body (only for test mode) - int mTestMode = false; ///< Run the device in test mode (only syncSampler+flpSender+epnReceiver) - uint16_t mTimeFrameId; - - std::string mInChannelName = ""; - std::string mOutChannelName = ""; - int mLastTimeframeId = -1; -}; - -} // namespace devices -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h b/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h deleted file mode 100644 index 87e72cc0df998..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef DATAFLOW_FAKETIMEFRAMEBUILDER_H_ -#define DATAFLOW_FAKETIMEFRAMEBUILDER_H_ - -#include "Headers/DataHeader.h" -#include -#include -#include - -namespace o2 -{ -namespace data_flow -{ - -struct FakeTimeframeSpec { - const char* origin; - const char* dataDescription; - std::function bufferFiller; - size_t bufferSize; -}; - -/** Generate a timeframe from the provided specification - */ -std::unique_ptr fakeTimeframeGenerator(std::vector& specs, std::size_t& totalSize); - -} // namespace data_flow -} // namespace o2 -#endif /* DATAFLOW_FAKETIMEFRAMEBUILDER_H_ */ diff --git a/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h b/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h deleted file mode 100644 index 5c25ec45f7394..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_FAKE_TIMEFRAME_GENERATOR_H_ -#define ALICEO2_FAKE_TIMEFRAME_GENERATOR_H_ - -#include "O2Device/O2Device.h" - -namespace o2 -{ -namespace data_flow -{ - -/// A device which writes to file the timeframes. -class FakeTimeframeGeneratorDevice : public base::O2Device -{ - public: - static constexpr const char* OptionKeyOutputChannelName = "output-channel-name"; - static constexpr const char* OptionKeyMaxTimeframes = "max-timeframes"; - - /// Default constructor - FakeTimeframeGeneratorDevice(); - - /// Default destructor - ~FakeTimeframeGeneratorDevice() override = default; - - void InitTask() final; - - protected: - /// Overloads the ConditionalRun() method of FairMQDevice - bool ConditionalRun() final; - - std::string mOutChannelName; - size_t mMaxTimeframes; - size_t mTimeframeCount; -}; - -} // namespace data_flow -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/include/DataFlow/HeartbeatSampler.h b/Utilities/DataFlow/include/DataFlow/HeartbeatSampler.h deleted file mode 100644 index a6bf6dce79ff6..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/HeartbeatSampler.h +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -//-*- Mode: C++ -*- - -#ifndef HEARTBEATSAMPLER_H -#define HEARTBEATSAMPLER_H - -// @file HeartbeatSampler.h -// @author Matthias Richter -// @since 2017-02-03 -// @brief Heartbeat sampler device - -#include "Headers/DataHeader.h" -#include "Headers/HeartbeatFrame.h" -#include "O2Device/O2Device.h" -#include - -namespace o2 -{ -namespace data_flow -{ - -/// @class HeartbeatSampler -/// @brief A sampler for heartbeat triggers -/// -/// The device is going to be used in an emulation of the reader -/// processes on the FLP -/// The heartbeat triggers are sent out with constant frequency, the -/// period in nano seconds can be configured by option --period -/// -/// TODO: the class can evolve to a general clock sampler device with -/// configurable period, even randomly distributed -class HeartbeatSampler final : public base::O2Device -{ - public: - typedef o2::base::O2Message O2Message; - - static constexpr const char* OptionKeyOutputChannelName = "out-chan-name"; - static constexpr const char* OptionKeyPeriod = "period"; - - HeartbeatSampler() = default; - ~HeartbeatSampler() final = default; - - protected: - /// overloading the InitTask() method of FairMQDevice - void InitTask() final; - - /// overloading ConditionalRun method of FairMQDevice - bool ConditionalRun() final; - - private: - /// publishing period (configurable) - uint32_t mPeriod = 1000000000; - /// name of the (configurable) - std::string mOutputChannelName = "output"; - /// number of elapsed periods - int mCount = 0; -}; - -} // namespace data_flow -}; // namespace o2 -#endif diff --git a/Utilities/DataFlow/include/DataFlow/PayloadMerger.h b/Utilities/DataFlow/include/DataFlow/PayloadMerger.h deleted file mode 100644 index 81c4544a5f717..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/PayloadMerger.h +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. -#ifndef PAYLOAD_MERGER_H -#define PAYLOAD_MERGER_H - -#include -#include -#include -#include -#include - -#include - -namespace o2 -{ -namespace dataflow -{ -/// Helper class that given a set of FairMQMessage, merges (part of) their -/// payload into a separate memory area. -/// -/// - Append multiple messages via the aggregate method -/// - Finalise buffer creation with the finalise call. -template -class PayloadMerger -{ - public: - using MergeableId = ID; - using MessageMap = std::multimap>; - using PayloadExtractor = std::function; - using IdExtractor = std::function&)>; - using MergeCompletionCheker = std::function; - - /// Helper class to merge FairMQMessages sharing a user defined class of equivalence, - /// specified by @makeId. Completeness of the class of equivalence can be asserted by - /// the @checkIfComplete policy. It's also possible to specify a user defined way of - /// extracting the parts of the payload to be merged via the extractPayload method. - PayloadMerger(IdExtractor makeId, - MergeCompletionCheker checkIfComplete, - PayloadExtractor extractPayload = fullPayloadExtractor) - : mMakeId{makeId}, - mCheckIfComplete{checkIfComplete}, - mExtractPayload{extractPayload} - { - } - - /// Aggregates @payload to all the ones with the same id. - /// @return the id extracted from the payload via the constructor - /// specified id policy (mMakeId callback). - MergeableId aggregate(std::unique_ptr& payload) - { - auto id = mMakeId(payload); - mPartsMap.emplace(std::make_pair(id, std::move(payload))); - return id; - } - - /// This merges a set of messages sharing the same id @id to a unique buffer - /// @out, so that it can be either consumed or sent as a message itself. - /// The decision on whether the merge must happen is done by the constructor - /// specified policy mCheckIfComplete which can, for example, decide - /// to merge when a certain number of subparts are reached. - /// Merging at the moment requires an extra copy, but in principle this could - /// be easily extended to support scatter - gather. - size_t finalise(char** out, MergeableId& id) - { - *out = nullptr; - if (mCheckIfComplete(id, mPartsMap) == false) { - return 0; - } - // If we are here, it means we can send the messages that belong - // to some predefined class of equivalence, identified by the MERGEABLE_ID, - // to the receiver. This is done by the following process: - // - // - Extract what we actually want to send (this might be data embedded inside the message itself) - // - Calculate the aggregate size of all the payloads. - // - Copy all the parts into a final payload - // - Create the header part - // - Create the payload part - // - Send - std::vector> parts; - - size_t sum = 0; - auto range = mPartsMap.equal_range(id); - for (auto hi = range.first, he = range.second; hi != he; ++hi) { - std::unique_ptr& payload = hi->second; - std::pair part; - part.second = mExtractPayload(&part.first, reinterpret_cast(payload->GetData()), payload->GetSize()); - parts.push_back(part); - sum += part.second; - } - - auto* payload = new char[sum](); - size_t offset = 0; - for (auto& part : parts) { - // Right now this does a copy. In principle this could be done with some sort of - // vectorized I/O - memcpy(payload + offset, part.first, part.second); - offset += part.second; - } - - mPartsMap.erase(id); - *out = payload; - return sum; - } - - // Helper method which leaves the payload untouched - static int64_t fullPayloadExtractor(char** payload, - char* buffer, - size_t bufferSize) - { - *payload = buffer; - return bufferSize; - } - - private: - IdExtractor mMakeId; - MergeCompletionCheker mCheckIfComplete; - PayloadExtractor mExtractPayload; - - MessageMap mPartsMap; -}; -} // namespace dataflow -} // namespace o2 - -#endif // PAYLOAD_MERGER_H diff --git a/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h b/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h deleted file mode 100644 index 0eda40bf51d0f..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/SubframeBuilderDevice.h +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -//-*- Mode: C++ -*- - -#ifndef SUBFRAMEBUILDERDEVICE_H -#define SUBFRAMEBUILDERDEVICE_H - -/// @file SubframeBuilderDevice.h -/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel -/// @since 2017-02-07 -/// @brief Demonstrator device for a subframe builder - -#include "Headers/DataHeader.h" -#include "Headers/HeartbeatFrame.h" -#include "O2Device/O2Device.h" -#include "DataFlow/PayloadMerger.h" -#include "DataFlow/SubframeUtils.h" -#include -#include - -class FairMQParts; - -namespace o2 -{ -namespace data_flow -{ - -/// @class SubframeBuilderDevice -/// A demonstrator device for building of sub timeframes -/// -/// The scheme in the demonstrator chain assumes multiple data publishers -/// for different input chains of the CRU. The output of these publishers -/// is assembled to a sub timeframe by this device. -/// -/// The device implements the high-level API of the FairMQDevice -/// run loop. The registered method is called once per message -/// which is in itself a multipart message. It;s parts are passed -/// with a FairMQParts object. Not yet clear if this approach is -/// suited for the frame builder as it needs the data from multiple -/// channels with the same channel name. Depends on the validity of -/// the message data. Very likely the message parts are no longer -/// valid after leaving the handler method. But one could apply a -/// scheme with unique pointers and move semantics -/// -/// The duration should be with respect to a time constant, which in -/// itself needs to be configurable, now the time constant is -/// hard-coded microseconds -class SubframeBuilderDevice final : public base::O2Device -{ - public: - using O2Message = o2::base::O2Message; - using SubframeId = o2::dataflow::SubframeId; - using Merger = dataflow::PayloadMerger; - - static constexpr const char* OptionKeyInputChannelName = "in-chan-name"; - static constexpr const char* OptionKeyOutputChannelName = "out-chan-name"; - static constexpr const char* OptionKeyOrbitDuration = "orbit-duration"; - static constexpr const char* OptionKeyOrbitsPerTimeframe = "orbits-per-timeframe"; - static constexpr const char* OptionKeyInDataFile = "indatafile-name"; - static constexpr const char* OptionKeyDetector = "detector-name"; - static constexpr const char* OptionKeyFLPId = "flp-id"; - static constexpr const char* OptionKeyStripHBF = "strip-hbf"; - - // TODO: this is just a first mockup, remove it - // Default start time for all the producers is 8/4/1977 - // Timeframe start time will be ((N * duration) + start time) where - // N is the incremental number of timeframes being sent out. - // TODO: replace this with a unique Heartbeat from a common device. - static constexpr uint32_t DefaultOrbitDuration = 88924; - static constexpr uint32_t DefaultOrbitsPerTimeframe = 256; - static constexpr uint64_t DefaultHeartbeatStart = 229314600000000000LL; - - /// Default constructor - SubframeBuilderDevice(); - - /// Default destructor - ~SubframeBuilderDevice() final; - - protected: - /// overloading the InitTask() method of FairMQDevice - void InitTask() final; - - /// data handling method to be registered as handler in the - /// FairMQDevice API method OnData - /// The device base class handles the state loop in the RUNNING - /// state and calls the handler when receiving a message on one channel - /// The multiple parts included in one message are provided in the - /// FairMQParts object. - bool HandleData(FairMQParts& msgParts, int /*index*/); - - /// Build the frame and send it - /// For the moment a simple mockup composing a DataHeader and adding it - /// to the multipart message together with the SubframeMetadata as payload - bool BuildAndSendFrame(FairMQParts& parts); - - private: - uint32_t mOrbitsPerTimeframe; - // FIXME: lookup the actual value - uint32_t mOrbitDuration; - std::string mInputChannelName = ""; - std::string mOutputChannelName = ""; - size_t mFLPId = 0; - bool mStripHBF = false; - std::unique_ptr mMerger; - - uint64_t mHeartbeatStart = DefaultHeartbeatStart; - - template - size_t fakeHBHPayloadHBT(char** buffer, std::function filler, int numOfElements) - { - // LOG(INFO) << "SENDING TPC PAYLOAD\n"; - auto payloadSize = sizeof(header::HeartbeatHeader) + sizeof(T) * numOfElements + sizeof(header::HeartbeatTrailer); - *buffer = new char[payloadSize]; - auto* hbh = reinterpret_cast(*buffer); - assert(payloadSize > 0); - assert(payloadSize - sizeof(header::HeartbeatTrailer) > 0); - auto* hbt = reinterpret_cast(payloadSize - sizeof(header::HeartbeatTrailer)); - - T* payload = reinterpret_cast(*buffer + sizeof(header::HeartbeatHeader)); - for (int i = 0; i < numOfElements; ++i) { - new (payload + i) T; - // put some random toy time stamp to each cluster - filler(payload[i], i); - } - return payloadSize; - } -}; - -} // namespace data_flow -}; // namespace o2 -#endif diff --git a/Utilities/DataFlow/include/DataFlow/SubframeUtils.h b/Utilities/DataFlow/include/DataFlow/SubframeUtils.h deleted file mode 100644 index 69b5f3d4a6c24..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/SubframeUtils.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. -#ifndef DATAFLOW_SUBFRAMEUTILS_H -#define DATAFLOW_SUBFRAMEUTILS_H - -#include -#include -#include "Headers/HeartbeatFrame.h" - -namespace o2 -{ -namespace dataflow -{ - -int64_t extractDetectorPayloadStrip(char** payload, char* buffer, size_t bufferSize) -{ - *payload = buffer + sizeof(o2::header::HeartbeatHeader); - return bufferSize - sizeof(o2::header::HeartbeatHeader) - sizeof(o2::header::HeartbeatTrailer); -} - -struct SubframeId { - size_t timeframeId; - size_t socketId; - - // operator needed for the equal_range algorithm/ multimap method - bool operator<(const SubframeId& rhs) const - { - return std::tie(timeframeId, socketId) < std::tie(rhs.timeframeId, rhs.socketId); - } -}; - -SubframeId makeIdFromHeartbeatHeader(const header::HeartbeatHeader& header, size_t socketId, size_t orbitsPerTimeframe) -{ - SubframeId id = { - .timeframeId = header.orbit / orbitsPerTimeframe, - .socketId = socketId}; - return id; -} - -} /* namespace dataflow */ -} /* namespace o2 */ - -#endif // DATAFLOW_SUBFRAMEUTILS_H diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeParser.h b/Utilities/DataFlow/include/DataFlow/TimeframeParser.h deleted file mode 100644 index 590982c1dfb04..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/TimeframeParser.h +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef TIMEFRAME_PARSER_H_ -#define TIMEFRAME_PARSER_H_ - -#include -#include - -class FairMQParts; - -namespace o2 -{ -namespace data_flow -{ - -/// An helper function which takes a std::istream pointing -/// to a naively persisted timeframe and pumps its parts to -/// FairMQParts, ready to be shipped via FairMQ. -void streamTimeframe(std::istream& stream, - std::function onAddPart, - std::function onSend); - -void streamTimeframe(std::ostream& stream, FairMQParts& parts); - -} // namespace data_flow -} // namespace o2 - -#endif // TIMEFRAME_PARSER_H diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h b/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h deleted file mode 100644 index f4359b6f21142..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_TIMEFRAME_READER_H_ -#define ALICEO2_TIMEFRAME_READER_H_ - -#include "O2Device/O2Device.h" -#include - -namespace o2 -{ -namespace data_flow -{ - -/// A device which writes to file the timeframes. -class TimeframeReaderDevice : public base::O2Device -{ - public: - static constexpr const char* OptionKeyOutputChannelName = "output-channel-name"; - static constexpr const char* OptionKeyInputFileName = "input-file"; - - /// Default constructor - TimeframeReaderDevice(); - - /// Default destructor - ~TimeframeReaderDevice() override = default; - - void InitTask() final; - - protected: - /// Overloads the ConditionalRun() method of FairMQDevice - bool ConditionalRun() final; - - std::string mOutChannelName; - std::string mInFileName; - std::fstream mFile; - std::vector mSeen; -}; - -} // namespace data_flow -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeValidatorDevice.h b/Utilities/DataFlow/include/DataFlow/TimeframeValidatorDevice.h deleted file mode 100644 index 453b0ea669d5b..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/TimeframeValidatorDevice.h +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_TIMEFRAMEVALIDATOR_H_ -#define ALICEO2_TIMEFRAMEVALIDATOR_H_ - -#include "O2Device/O2Device.h" - -namespace o2 -{ -namespace data_flow -{ - -/// A validating device for time frame data (coming from EPN) -class TimeframeValidatorDevice : public base::O2Device -{ - public: - static constexpr const char* OptionKeyInputChannelName = "input-channel-name"; - - /// Default constructor - TimeframeValidatorDevice(); - - /// Default destructor - ~TimeframeValidatorDevice() override = default; - - void InitTask() final; - - protected: - /// Overloads the Run() method of FairMQDevice - void Run() final; - - std::string mInChannelName; -}; - -} // namespace data_flow -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h b/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h deleted file mode 100644 index 73fceed644d48..0000000000000 --- a/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifndef ALICEO2_TIMEFRAME_WRITER_DEVICE_H_ -#define ALICEO2_TIMEFRAME_WRITER_DEVICE_H_ - -#include "O2Device/O2Device.h" -#include - -namespace o2 -{ -namespace data_flow -{ - -/// A device which writes to file the timeframes. -class TimeframeWriterDevice : public base::O2Device -{ - public: - static constexpr const char* OptionKeyInputChannelName = "input-channel-name"; - static constexpr const char* OptionKeyOutputFileName = "output-file"; - static constexpr const char* OptionKeyMaxTimeframesPerFile = "max-timeframes-per-file"; - static constexpr const char* OptionKeyMaxFileSize = "max-file-size"; - static constexpr const char* OptionKeyMaxFiles = "max-files"; - - /// Default constructor - TimeframeWriterDevice(); - - /// Default destructor - ~TimeframeWriterDevice() override = default; - - void InitTask() final; - - /// The PostRun will trigger saving the file to disk - void PostRun() final; - - protected: - /// Overloads the Run() method of FairMQDevice - void Run() final; - - std::string mInChannelName; - std::string mOutFileName; - std::fstream mFile; - size_t mMaxTimeframes; - size_t mMaxFileSize; - size_t mMaxFiles; - size_t mFileCount; -}; - -} // namespace data_flow -} // namespace o2 - -#endif diff --git a/Utilities/DataFlow/run/CreateSetup.py b/Utilities/DataFlow/run/CreateSetup.py deleted file mode 100755 index 0ebe8f8f38a82..0000000000000 --- a/Utilities/DataFlow/run/CreateSetup.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/python -# a very simple python script to -# create and scale a topology/conf + runfile for a variable list of -# HeartbeatSampler + DataPublisher + ... + FLP + EPN -# author: S. Wenzel -#FIXME: decide whether to config for xterm or tmux - -import json - -NumFLP = 2 -NumDP = NumFLP # in principle we could have different number of data publishers -NumEPN = 4 -EPNStartSocket = 6000 -HeartbeatSocket = 5000 -HeartbeatBaseName = "o2-heartbeat-sampler" -EPNBaseName = "epnReceiver" -SFBStartSocket = 5500 -FLPStartSocket = 4000 -ValidatorStartSocket = 7000 -FLPBaseName = "flpSender" -DPBaseName = "o2-datapublisher-device" -SFBBaseName = "subframeBuilder" -tcpbase = "tcp://127.0.0.1:" -detectors = ["TPC","ITS"] -datadescription = ["TPCCLUSTER", "ITSRAW"] - -configfilename = "customconfig.json" -runscriptfilename = "customrunscript.sh" - -# some potentially reusable API to construct devices -def initDevice(name): - device = {} - device["id"] = name - device["channels"] = [] # empty list for device - return device - -# creates one channel; empty sockets -def initChannel(name, type, connect): - channel = {} - channel["name"] = name - channel["type"] = type - channel["method"] = connect - channel["sockets"] = [] - return channel - -def initAddress(url): - return {"address":url} - -def addChannel(device, channel): - device["channels"].append(channel) - -def addAddress(channel, address): - channel["sockets"].append(address) - -# adds a custom key:value pair to a dictionary -def addPair(element, key, value): - element[key]=value - -# code describing our devices follows - -def writeOneDP(id): - device = initDevice(DPBaseName+str(id)) - inchannel = initChannel("input","sub","connect") - addAddress(inchannel, initAddress(tcpbase + str(HeartbeatSocket))) - outchannel = initChannel("output","pub","bind") - addAddress(outchannel, initAddress(tcpbase + str(SFBStartSocket + id))) - addPair(outchannel, "sndBufSize", "10") - addChannel(device, inchannel) - addChannel(device, outchannel) - return device - -def writeOneSFB(id): - device = initDevice(SFBBaseName + str(id)) - # the input channel - inchannel = initChannel("input", "sub", "connect") - addAddress(inchannel, initAddress(tcpbase + str(SFBStartSocket+id))) - addPair(inchannel, "sndBufSize", "10") - # the output channel - outchannel = initChannel("output", "pub", "bind") - addAddress(outchannel, initAddress(tcpbase + str(FLPStartSocket+id))) - addPair(outchannel, "sndBufSize", "10") - addChannel(device,inchannel) - addChannel(device,outchannel) - return device - -def writeHBSamplerDevice(): - device = initDevice(HeartbeatBaseName) - outchannel = initChannel("output", "pub", "bind") - addAddress(outchannel, initAddress(tcpbase + str(HeartbeatSocket))) - addChannel(device, outchannel) - return device - -# write timeframeValidator -def writeTFV(): - device = initDevice("timeframeValidator") - channel = initChannel("input", "sub", "connect") - addAddress(channel, initAddress(tcpbase + str(ValidatorStartSocket))) - addChannel(device, channel) - return device - -# write single flp device -def writeOneFLP(id): - device = initDevice(FLPBaseName + str(id)) - inchannel = initChannel("input","sub","connect") - addAddress(inchannel, initAddress(tcpbase + str(FLPStartSocket+id))) - outchannel = initChannel("output", "push", "connect") - # write all EPN addresses - for i in range(0,NumEPN): - addAddress(outchannel, initAddress(tcpbase + str(EPNStartSocket+i))) - addChannel(device, inchannel) - addChannel(device, outchannel) - return device - -def writeOneEPN(id): - # write a single EPN device - device = initDevice(EPNBaseName + str(id)) - inchannel = initChannel("input", "pull", "bind") - addAddress(inchannel, initAddress(tcpbase + str(EPNStartSocket+id))) - # the ack channel - ackchannel = initChannel("ack", "push", "connect") - addAddress(ackchannel, initAddress("tcp://127.0.0.1:5990")) - addPair(ackchannel, "ratelogging", "0") - # the output channel (time frame validator) - outchannel = initChannel("output", "pub", "bind") - addAddress(outchannel, initAddress(tcpbase + str(ValidatorStartSocket))) - addChannel(device, inchannel) - addChannel(device, ackchannel) - addChannel(device, outchannel) - return device - -def addFLPDevices(devicelist): - for i in range(0,NumFLP): - devicelist.append(writeOneDP(i)) - devicelist.append(writeOneSFB(i)) - devicelist.append(writeOneFLP(i)) - -def addEPNDevices(devicelist): - for i in range(0,NumEPN): - devicelist.append(writeOneEPN(i)) - -def getConf(): - conf = {} - conf["fairMQOptions"] = {"devices" : []} - devicelist = conf["fairMQOptions"]["devices"] - # append all the the devices to this list - addFLPDevices(devicelist) - addEPNDevices(devicelist) - devicelist.append(writeTFV()) - devicelist.append(writeHBSamplerDevice()) - return conf - -def writeJSONConf(): - print "creating JOSN config " + configfilename - with open(configfilename, 'w') as f: - f.write(json.dumps(getConf())) - -def writeRunScript(): - xtermcommand = "xterm -hold -e " - dumpstring = [] - # treat data publishers - for i in range(0, NumDP): - # we might need to give information about detector to DP as well - command = "o2-datapublisher-device --id DataPublisherDevice" + str(i) - command += " --data-description " + datadescription[i%2] - command += " --mq-config " + configfilename + " --in-chan-name input --out-chan-name output &" - dumpstring.append(xtermcommand + command) - - # treat sfbdevices - for i in range(0, NumFLP): - command = "o2-subframebuilder-device --id subframeBuilder" + str(i) - command += " --mq-config " + configfilename + " --detector " + detectors[i%2] + " &" - dumpstring.append(xtermcommand + command) - # treat flpSender - for i in range(0, NumFLP): - flpCommand = "flpSender --id flpSender" + str(i) - flpCommand += " --mq-config " + configfilename + " --in-chan-name input" - flpCommand += " --out-chan-name output --num-epns " + str(NumEPN) + " --flp-index " + str(i) - flpCommand += "&" - dumpstring.append(xtermcommand + flpCommand) - # treat EPN receiver - for i in range(0, NumEPN): - epnCommand = "epnReceiver --id epnReceiver" + str(i) - epnCommand += " --mq-config " + configfilename + " --in-chan-name input --out-chan-name output" - epnCommand += " --num-flps " + str(NumFLP) + "&" - dumpstring.append(xtermcommand + epnCommand) - # treat timeFrameValidator - command = "o2-timeframe-validator-device --id timeframeValidator --mq-config " + configfilename + " customconfig.json --input-channel-name input &" - dumpstring.append(xtermcommand + command) - - # treat heartbeatsampler - command = "o2-heartbeat-sampler --id heartbeatSampler --mq-config " + configfilename + " --out-chan-name output &" - dumpstring.append(xtermcommand + command) - - print "creating runscript " + runscriptfilename - with open(runscriptfilename, 'w') as f: - f.write('\n'.join(dumpstring)) - -if __name__ == '__main__': - writeJSONConf() - writeRunScript() diff --git a/Utilities/DataFlow/run/confBasicSetup.json b/Utilities/DataFlow/run/confBasicSetup.json deleted file mode 100644 index 4fe1a1c633d61..0000000000000 --- a/Utilities/DataFlow/run/confBasicSetup.json +++ /dev/null @@ -1,97 +0,0 @@ -{ - "fairMQOptions": - { - "devices": - [ - { - "id": "subframeBuilder", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5550" } - ], - "sndBufSize": "10" - }] - }, - - { - "id": "flpSender", - "channels": - [{ - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5550" } - ], - "rcvBufSize": "10" - }, - { - "name": "output", - "type": "push", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5570" } - ], - "sndBufSize": "10" - }] - }, - - { - "id": "epnReceiver", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5570"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, - - { - "id": "timeframeValidator", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5580"} - ], - "sndBufSize": "10" - } - ] - } - - ] - } -} diff --git a/Utilities/DataFlow/run/confComplexSetup.json b/Utilities/DataFlow/run/confComplexSetup.json deleted file mode 100644 index cbb69fb7434ce..0000000000000 --- a/Utilities/DataFlow/run/confComplexSetup.json +++ /dev/null @@ -1,132 +0,0 @@ -{ - "fairMQOptions": - { - "devices": - [ - { - "id": "subframeBuilderTPC", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5550" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "flpSenderTPC", - "channels": - [{ - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5550" } - ], - "rcvBufSize": "10" - }, - { - "name": "output", - "type": "push", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5570" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "subframeBuilderITS", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5551" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "flpSenderITS", - "channels": - [{ - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5551" } - ], - "rcvBufSize": "10" - }, - { - "name": "output", - "type": "push", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5570" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "epnReceiver", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5570"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, - { - "id": "timeframeValidator", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5580"} - ], - "sndBufSize": "10" - } - ] - } - ] - } -} diff --git a/Utilities/DataFlow/run/confComplexSetup2.json b/Utilities/DataFlow/run/confComplexSetup2.json deleted file mode 100644 index 3c527d7616713..0000000000000 --- a/Utilities/DataFlow/run/confComplexSetup2.json +++ /dev/null @@ -1,345 +0,0 @@ -{ - "fairMQOptions": - { - "devices": - [ - { - "id": "heartbeatSampler", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5450" } - ], - "sndBufSize": "10" - } - ] - }, - { - "id": "DataPublisherDeviceTPC", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets":[ - {"address": "tcp://127.0.0.1:5450"} - ] - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:6550" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "DataPublisherDeviceITS", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets":[ - {"address": "tcp://127.0.0.1:5450"} - ] - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:7550" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "subframeBuilderTPC", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets":[ - {"address": "tcp://127.0.0.1:6550"} - ] - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5550" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "flpSenderTPC", - "channels": - [ - { - "name": "input-channel-name", - "type": "sub", - "method": "connect", - "sockets":[ - {"address": "tcp://127.0.0.1:5450"} - ] - }, - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5550" } - ], - "rcvBufSize": "10" - }, - { - "name": "output", - "type": "push", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5570" }, - { "address": "tcp://127.0.0.1:5571" }, - { "address": "tcp://127.0.0.1:5572" }, - { "address": "tcp://127.0.0.1:5573" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "subframeBuilderITS", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets":[ - {"address": "tcp://127.0.0.1:7550"} - ] - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5551" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "flpSenderITS", - "channels": - [{ - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5551" } - ], - "rcvBufSize": "10" - }, - { - "name": "output", - "type": "push", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5570" }, - { "address": "tcp://127.0.0.1:5571" }, - { "address": "tcp://127.0.0.1:5572" }, - { "address": "tcp://127.0.0.1:5573" } - ], - "sndBufSize": "10" - }] - }, - { - "id": "epnReceiver1", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5570"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, -{ - "id": "epnReceiver1", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5570"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, - { - "id": "epnReceiver2", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5571"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, - { - "id": "epnReceiver3", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5572"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, -{ - "id": "epnReceiver4", - "channels": - [ - { - "name": "input", - "type": "pull", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5573"} - ], - "sndBufSize": "10" - }, - { - "name": "ack", - "type": "push", - "method": "connect", - "address": "tcp://127.0.0.1:5990", - "rateLogging": "0" - }, - { - "name": "output", - "type": "pub", - "method": "bind", - "address": "tcp://127.0.0.1:5580", - "sndBufSize": "10" - } - ] - }, - { - "id": "timeframeValidator", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5580"} - ], - "sndBufSize": "10" - } - ] - } - ] - } -} diff --git a/Utilities/DataFlow/run/confFakeTimeframe.json b/Utilities/DataFlow/run/confFakeTimeframe.json deleted file mode 100644 index ae1f686087f25..0000000000000 --- a/Utilities/DataFlow/run/confFakeTimeframe.json +++ /dev/null @@ -1,72 +0,0 @@ -{ - "fairMQOptions": - { - "devices": - [ - { - "id": "FakeTimeframeGeneratorDevice", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://*:5550" } - ], - "sndBufSize": "10" - } - ] - }, - { - "id": "TimeframeWriterDevice", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5550"} - ], - "sndBufSize": "10" - } - ] - }, - { - "id": "TimeframeReaderDevice", - "channels": - [ - { - "name": "output", - "type": "pub", - "method": "bind", - "sockets": - [ - { "address": "tcp://127.0.0.1:5551"} - ], - "sndBufSize": "10" - } - ] - }, - { - "id": "TimeframeValidatorDevice", - "channels": - [ - { - "name": "input", - "type": "sub", - "method": "connect", - "sockets": - [ - { "address": "tcp://127.0.0.1:5551"} - ], - "sndBufSize": "10" - } - ] - } - ] - } -} diff --git a/Utilities/DataFlow/run/startBasicSetup.sh b/Utilities/DataFlow/run/startBasicSetup.sh deleted file mode 100755 index e69106eab5dc5..0000000000000 --- a/Utilities/DataFlow/run/startBasicSetup.sh +++ /dev/null @@ -1,13 +0,0 @@ -# simple start script to lauch a basic setup -# Prerequisites: -# - expects the configuration file to be in the working directory -# - O2 bin and lib set n the shell environment - - -xterm -geometry 80x25+0+0 -hold -e epnReceiver --id epnReceiver --mq-config confBasicSetup.json --in-chan-name input --out-chan-name output --num-flps 1 & - -xterm -geometry 80x25+500+0 -hold -e flpSender --id flpSender --mq-config confBasicSetup.json --in-chan-name input --out-chan-name output --num-epns 1 & - -xterm -geometry 80x25+1000+0 -hold -e o2-subframebuilder-device --id subframeBuilder --mq-config confBasicSetup.json --self-triggered & - -xterm -geometry 80x25+1500+0 -hold -e o2-timeframe-validator-device --id timeframeValidator --mq-config confBasicSetup.json --input-channel-name input & diff --git a/Utilities/DataFlow/run/startComplexSetup.sh b/Utilities/DataFlow/run/startComplexSetup.sh deleted file mode 100755 index 6280269be5d16..0000000000000 --- a/Utilities/DataFlow/run/startComplexSetup.sh +++ /dev/null @@ -1,24 +0,0 @@ -# simple start script to launch a more complex setup -# with 2 data publishers (inside subframebuilder) + 2 attached flpSenders -# Prerequisites: -# - expects the configuration file to be in the working directory -# - O2 bin and lib set n the shell environment - - -# we have just one epn and 2 flps -xterm -geometry 80x25+0+0 -hold -e epnReceiver --id epnReceiver --mq-config confComplexSetup.json --in-chan-name input --out-chan-name output --num-flps 2 & - -# this is the flp for TPC -xterm -geometry 80x25+500+0 -hold -e flpSender --id flpSenderTPC --mq-config confComplexSetup.json --in-chan-name input --out-chan-name output --num-epns 1 --flp-index 0 & - -# this is the flp for ITS -xterm -geometry 80x25+800+0 -hold -e flpSender --id flpSenderITS --mq-config confComplexSetup.json --in-chan-name input --out-chan-name output --num-epns 1 --flp-index 1 & - -# this is the subtimeframe publisher for TPC -xterm -geometry 80x25+0+500 -hold -e o2-subframebuilder-device --id subframeBuilderTPC --mq-config confComplexSetup.json --self-triggered --detector TPC & - -# this is the subtimeframe publisher for ITS -xterm -geometry 80x25+500+500 -hold -e o2-subframebuilder-device --id subframeBuilderITS --mq-config confComplexSetup.json --self-triggered --detector ITS & - -# consumer and validator of the full EPN time frame -xterm -geometry 80x25+800+500 -hold -e o2-timeframe-validator-device --id timeframeValidator --mq-config confComplexSetup.json --input-channel-name input & diff --git a/Utilities/DataFlow/run/startComplexSetup2.sh b/Utilities/DataFlow/run/startComplexSetup2.sh deleted file mode 100755 index a0c343658ad8a..0000000000000 --- a/Utilities/DataFlow/run/startComplexSetup2.sh +++ /dev/null @@ -1,36 +0,0 @@ -# simple start script to launch a more complex setup -# with 2 data publishers (inside subframebuilder) + 2 attached flpSenders + 4 EPNS -# Prerequisites: -# - expects the configuration file to be in the working directory -# - O2 bin and lib set n the shell environment - -# it would be nice having a script that generates the configuration file for N FLP and M EPNS - -# Start one HBSampler device -xterm -geometry 80x25+0+0 -hold -e o2-heartbeat-sampler --id heartbeatSampler --mq-config confComplexSetup2.json --out-chan-name output & - -# Data publishers -xterm -geometry 80x25+500+0 -hold -e o2-datapublisher-device --id DataPublisherDeviceTPC --mq-config confComplexSetup2.json --in-chan-name input --out-chan-name output --data-description TPCCLUSTER & -xterm -geometry 80x25+500+400 -hold -e o2-datapublisher-device --id DataPublisherDeviceITS --mq-config confComplexSetup2.json --in-chan-name input --out-chan-name output --data-description ITSRAW & - - -# this is the subtimeframe publisher for TPC -xterm -geometry 80x25+1000+0 -hold -e o2-subframebuilder-device --id subframeBuilderTPC --mq-config confComplexSetup2.json --detector TPC & - -# this is the subtimeframe publisher for ITS -xterm -geometry 80x25+1000+400 -hold -e o2-subframebuilder-device --id subframeBuilderITS --mq-config confComplexSetup2.json --detector ITS & - -# this is the flp for TPC -xterm -geometry 80x25+1500+0 -hold -e o2-flp-sender-device --id flpSenderTPC --mq-config confComplexSetup2.json --in-chan-name input --out-chan-name output --num-epns 4 --flp-index 0 & - -# this is the flp for ITS -xterm -geometry 80x25+1500+400 -hold -e o2-flp-sender-device --id flpSenderITS --mq-config confComplexSetup2.json --in-chan-name input --out-chan-name output --num-epns 4 --flp-index 1 & - -# we have 4 epn and 2 flps -xterm -geometry 80x25+2000+0 -hold -e o2-epn-receiver-device --id epnReceiver1 --mq-config confComplexSetup2.json --buffer-timeout 10000 --in-chan-name input --out-chan-name output --num-flps 2 & -xterm -geometry 80x25+2000+400 -hold -e o2-epn-receiver-device --id epnReceiver2 --mq-config confComplexSetup2.json --buffer-timeout 10000 --in-chan-name input --out-chan-name output --num-flps 2 & -xterm -geometry 80x25+2000+800 -hold -e o2-epn-receiver-device --id epnReceiver3 --mq-config confComplexSetup2.json --buffer-timeout 10000 --in-chan-name input --out-chan-name output --num-flps 2 & -xterm -geometry 80x25+2000+1200 -hold -e o2-epn-receiver-device --id epnReceiver4 --mq-config confComplexSetup2.json --buffer-timeout 10000 --in-chan-name input --out-chan-name output --num-flps 2 & - -# consumer and validator of the full EPN time frame -xterm -geometry 80x25+2000+500 -hold -e o2-timeframe-validator-device --id timeframeValidator --mq-config confComplexSetup2.json --input-channel-name input & diff --git a/Utilities/DataFlow/run/startTimeframeExample.sh b/Utilities/DataFlow/run/startTimeframeExample.sh deleted file mode 100755 index f9c108825cc3a..0000000000000 --- a/Utilities/DataFlow/run/startTimeframeExample.sh +++ /dev/null @@ -1,4 +0,0 @@ -xterm -geometry 80x25+0+0 -hold -e FakeTimeframeGeneratorDevice --id FakeTimeframeGeneratorDevice --mq-config confFakeTimeframe.json --output-channel-name output & -xterm -geometry 80x25+0+0 -hold -e o2-timeframe-writer-device --id TimeframeWriterDevice --mq-config confFakeTimeframe.json --input-channel-name input --max-timeframes 1 --output-file data.o2tf & -#xterm -geometry 80x25+0+0 -hold -e o2-timeframe-reader-device --id TimeframeReaderDevice --mq-config confFakeTimeframe.json --input-file data.o2tf --output-channel-name output & -#xterm -geometry 80x25+0+0 -hold -e o2-timeframe-validator-device --id TimeframeValidatorDevice --mq-config confFakeTimeframe.json --input-channel-name input & diff --git a/Utilities/DataFlow/src/EPNReceiverDevice.cxx b/Utilities/DataFlow/src/EPNReceiverDevice.cxx deleted file mode 100644 index 9424bd58d9f25..0000000000000 --- a/Utilities/DataFlow/src/EPNReceiverDevice.cxx +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include // size_t -#include // writing to file (DEBUG) -#include -#include - -#include -#include - -#include "DataFlow/EPNReceiverDevice.h" -#include "Headers/DataHeader.h" -#include "Headers/SubframeMetadata.h" -#include "O2Device/Compatibility.h" -#include "TimeFrame/TimeFrame.h" - -#include - -using namespace std; -using namespace std::chrono; -using namespace o2::devices; -using SubframeMetadata = o2::data_flow::SubframeMetadata; -using TPCTestPayload = o2::data_flow::TPCTestPayload; -using TPCTestCluster = o2::data_flow::TPCTestCluster; -using IndexElement = o2::dataformats::IndexElement; - -void EPNReceiverDevice::InitTask() -{ - mNumFLPs = GetConfig()->GetValue("num-flps"); - mBufferTimeoutInMs = GetConfig()->GetValue("buffer-timeout"); - mTestMode = GetConfig()->GetValue("test-mode"); - mInChannelName = GetConfig()->GetValue("in-chan-name"); - mOutChannelName = GetConfig()->GetValue("out-chan-name"); - mAckChannelName = GetConfig()->GetValue("ack-chan-name"); -} - -void EPNReceiverDevice::PrintBuffer(const unordered_map& buffer) const -{ - string header = "===== "; - - for (int i = 1; i <= mNumFLPs; ++i) { - stringstream out; - out << i % 10; - header += out.str(); - //i > 9 ? header += " " : header += " "; - } - LOG(INFO) << header; - - for (auto& it : buffer) { - string stars = ""; - for (unsigned int j = 1; j <= (it.second).parts.Size(); ++j) { - stars += "*"; - } - LOG(INFO) << setw(4) << it.first << ": " << stars; - } -} - -void EPNReceiverDevice::DiscardIncompleteTimeframes() -{ - auto it = mTimeframeBuffer.begin(); - - while (it != mTimeframeBuffer.end()) { - if (duration_cast(steady_clock::now() - (it->second).start).count() > mBufferTimeoutInMs) { - LOG(WARN) << "Timeframe #" << it->first << " incomplete after " << mBufferTimeoutInMs << " milliseconds, discarding"; - mDiscardedSet.insert(it->first); - mTimeframeBuffer.erase(it++); - LOG(WARN) << "Number of discarded timeframes: " << mDiscardedSet.size(); - } else { - // LOG(INFO) << "Timeframe #" << it->first << " within timeout, buffering..."; - ++it; - } - } -} - -void EPNReceiverDevice::Run() -{ - uint16_t id = 0; // holds the timeframe id of the currently arrived sub-timeframe. - - FairMQChannel& ackOutChannel = fChannels.at(mAckChannelName).at(0); - - // Simple multi timeframe index - using TimeframeId = int; - using FlpId = int; - std::multimap index; - std::multimap flpIds; - - while (compatibility::FairMQ13::IsRunning(this)) { - FairMQParts subtimeframeParts; - if (Receive(subtimeframeParts, mInChannelName, 0, 100) <= 0) { - continue; - } - - assert(subtimeframeParts.Size() >= 2); - - const auto* dh = o2::header::get(subtimeframeParts.At(0)->GetData()); - assert(strncmp(dh->dataDescription.str, "SUBTIMEFRAMEMD", 16) == 0); - SubframeMetadata* sfm = reinterpret_cast(subtimeframeParts.At(1)->GetData()); - id = o2::data_flow::timeframeIdFromTimestamp(sfm->startTime, sfm->duration); - auto flpId = sfm->flpIndex; - - if (mDiscardedSet.find(id) == mDiscardedSet.end()) { - if (mTimeframeBuffer.find(id) == mTimeframeBuffer.end()) { - // if this is the first part with this ID, save the receive time. - mTimeframeBuffer[id].start = steady_clock::now(); - } - flpIds.insert(std::make_pair(id, flpId)); - LOG(INFO) << "Timeframe ID " << id << " for startTime " << sfm->startTime << "\n"; - // If the received ID has not previously been discarded, store - // the data part in the buffer For the moment we just concatenate - // the subtimeframes and add an index for their description at - // the end. Given every second part is a data header we skip - // every two parts to populate the index. Moreover we know that - // the SubframeMetadata is always in the second part, so we can - // extract the flpId from there. - for (size_t i = 0; i < subtimeframeParts.Size(); ++i) { - if (i % 2 == 0) { - const auto* adh = o2::header::get(subtimeframeParts.At(i)->GetData()); - auto ie = std::make_pair(*adh, index.count(id) * 2); - index.insert(std::make_pair(id, ie)); - } - mTimeframeBuffer[id].parts.AddPart(move(subtimeframeParts.At(i))); - } - } else { - // if received ID has been previously discarded. - LOG(WARN) << "Received part from an already discarded timeframe with id " << id; - } - - if (flpIds.count(id) == mNumFLPs) { - LOG(INFO) << "Timeframe " << id << " complete. Publishing.\n"; - o2::header::DataHeader tih; - std::vector flattenedIndex; - - tih.dataDescription = o2::header::DataDescription("TIMEFRAMEINDEX"); - tih.dataOrigin = o2::header::DataOrigin("EPN"); - tih.subSpecification = 0; - tih.payloadSize = index.count(id) * sizeof(flattenedIndex.front()); - void* indexData = malloc(tih.payloadSize); - auto indexRange = index.equal_range(id); - for (auto ie = indexRange.first; ie != indexRange.second; ++ie) { - flattenedIndex.push_back(ie->second); - } - memcpy(indexData, flattenedIndex.data(), tih.payloadSize); - - mTimeframeBuffer[id].parts.AddPart(NewSimpleMessage(tih)); - mTimeframeBuffer[id].parts.AddPart(NewMessage( - indexData, tih.payloadSize, - [](void* data, void* hint) { free(data); }, nullptr)); - // LOG(INFO) << "Collected all parts for timeframe #" << id; - // when all parts are collected send then to the output channel - Send(mTimeframeBuffer[id].parts, mOutChannelName); - LOG(INFO) << "Index count for " << id << " " << index.count(id) << "\n"; - index.erase(id); - LOG(INFO) << "Index count for " << id << " " << index.count(id) << "\n"; - flpIds.erase(id); - - if (mTestMode > 0) { - // Send an acknowledgement back to the sampler to measure the round trip time - unique_ptr ack(NewMessage(sizeof(uint16_t))); - memcpy(ack->GetData(), &id, sizeof(uint16_t)); - - if (ackOutChannel.Send(ack, 0) <= 0) { - LOG(ERROR) << "Could not send acknowledgement without blocking"; - } - } - - mTimeframeBuffer.erase(id); - } - - // LOG(WARN) << "Buffer size: " << fTimeframeBuffer.size(); - - // Check if any incomplete timeframes in the buffer are older than - // timeout period, and discard them if they are - // QUESTION: is this really what we want to do? - DiscardIncompleteTimeframes(); - } -} diff --git a/Utilities/DataFlow/src/FLPSenderDevice.cxx b/Utilities/DataFlow/src/FLPSenderDevice.cxx deleted file mode 100644 index cc37be574bb6d..0000000000000 --- a/Utilities/DataFlow/src/FLPSenderDevice.cxx +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include -#include - -#include -#include -#include - -#include "Headers/DataHeader.h" -#include "Headers/SubframeMetadata.h" -#include "DataFlow/FLPSenderDevice.h" -#include "O2Device/Compatibility.h" - -using namespace std; -using namespace std::chrono; -using namespace o2::devices; -using SubframeMetadata = o2::data_flow::SubframeMetadata; - -void FLPSenderDevice::InitTask() -{ - mIndex = GetConfig()->GetValue("flp-index"); - mEventSize = GetConfig()->GetValue("event-size"); - mNumEPNs = GetConfig()->GetValue("num-epns"); - mTestMode = GetConfig()->GetValue("test-mode"); - mSendOffset = GetConfig()->GetValue("send-offset"); - mSendDelay = GetConfig()->GetValue("send-delay"); - mInChannelName = GetConfig()->GetValue("in-chan-name"); - mOutChannelName = GetConfig()->GetValue("out-chan-name"); -} - -void FLPSenderDevice::Run() -{ - // base buffer, to be copied from for every timeframe body (zero-copy) - FairMQMessagePtr baseMsg(NewMessage(mEventSize)); - - // store the channel reference to avoid traversing the map on every loop iteration - //FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); - - while (compatibility::FairMQ13::IsRunning(this)) { - // - Get the SubtimeframeMetadata - // - Add the current FLP id to the SubtimeframeMetadata - // - Forward to the EPN the whole subtimeframe - FairMQParts subtimeframeParts; - if (Receive(subtimeframeParts, mInChannelName, 0, 100) <= 0) { - continue; - } - - assert(subtimeframeParts.Size() != 0); - assert(subtimeframeParts.Size() >= 2); - const auto* dh = o2::header::get(subtimeframeParts.At(0)->GetData()); - assert(strncmp(dh->dataDescription.str, "SUBTIMEFRAMEMD", 16) == 0); - - SubframeMetadata* sfm = reinterpret_cast(subtimeframeParts.At(1)->GetData()); - sfm->flpIndex = mIndex; - - mArrivalTime.push(steady_clock::now()); - mSTFBuffer.push(move(subtimeframeParts)); - - // if offset is 0 - send data out without staggering. - assert(mSTFBuffer.size() > 0); - - if (mSendOffset == 0 && mSTFBuffer.size() > 0) { - sendFrontData(); - } else if (mSTFBuffer.size() > 0) { - if (duration_cast(steady_clock::now() - mArrivalTime.front()).count() >= (mSendDelay * mSendOffset)) { - sendFrontData(); - } else { - // LOG(INFO) << "buffering..."; - } - } - } -} - -inline void FLPSenderDevice::sendFrontData() -{ - SubframeMetadata* sfm = static_cast(mSTFBuffer.front().At(1)->GetData()); - uint16_t currentTimeframeId = o2::data_flow::timeframeIdFromTimestamp(sfm->startTime, sfm->duration); - if (mLastTimeframeId != -1) { - if (currentTimeframeId == mLastTimeframeId) { - LOG(ERROR) << "Sent same consecutive timeframe ids\n"; - } - } - mLastTimeframeId = currentTimeframeId; - - // for which EPN is the message? - int direction = currentTimeframeId % mNumEPNs; - if (Send(mSTFBuffer.front(), mOutChannelName, direction, 0) < 0) { - LOG(ERROR) << "Failed to queue sub-timeframe #" << currentTimeframeId << " to EPN[" << direction << "]"; - } - mSTFBuffer.pop(); - mArrivalTime.pop(); -} diff --git a/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx b/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx deleted file mode 100644 index 66bcc4fbcea08..0000000000000 --- a/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "DataFlow/FakeTimeframeBuilder.h" -#include "TimeFrame/TimeFrame.h" -#include "Headers/DataHeader.h" -#include -#include -#include -#include - -using DataHeader = o2::header::DataHeader; -using DataDescription = o2::header::DataDescription; -using DataOrigin = o2::header::DataOrigin; -using IndexElement = o2::dataformats::IndexElement; - -namespace -{ -o2::header::DataDescription lookupDataDescription(const char* key) -{ - if (strcmp(key, "RAWDATA") == 0) { - return o2::header::gDataDescriptionRawData; - } else if (strcmp(key, "CLUSTERS") == 0) { - return o2::header::gDataDescriptionClusters; - } else if (strcmp(key, "TRACKS") == 0) { - return o2::header::gDataDescriptionTracks; - } else if (strcmp(key, "CONFIG") == 0) { - return o2::header::gDataDescriptionConfig; - } else if (strcmp(key, "INFO") == 0) { - return o2::header::gDataDescriptionInfo; - } - return o2::header::gDataDescriptionInvalid; -} - -o2::header::DataOrigin lookupDataOrigin(const char* key) -{ - if (strcmp(key, "TPC") == 0) { - return o2::header::gDataOriginTPC; - } - if (strcmp(key, "TRD") == 0) { - return o2::header::gDataOriginTRD; - } - if (strcmp(key, "TOF") == 0) { - return o2::header::gDataOriginTOF; - } - if (strcmp(key, "ITS") == 0) { - return o2::header::gDataOriginITS; - } - return o2::header::gDataOriginInvalid; -} - -} // namespace - -namespace o2 -{ -namespace data_flow -{ - -std::unique_ptr fakeTimeframeGenerator(std::vector& specs, std::size_t& totalSize) -{ - // Calculate the total size of your timeframe. This is - // given by: - // - N*The size of the data header (this should actually depend on the - // kind of data as different dataDescriptions will probably have - // different headers). - // - Sum_N(The size of the buffer_i) - // - The size of the index header - // - N*sizeof(dataheader) - // Assuming all the data header - size_t sizeOfHeaders = specs.size() * sizeof(DataHeader); - size_t sizeOfBuffers = 0; - for (auto&& spec : specs) { - sizeOfBuffers += spec.bufferSize; - } - size_t sizeOfIndexHeader = sizeof(DataHeader); - size_t sizeOfIndex = sizeof(IndexElement) * specs.size(); - totalSize = sizeOfHeaders + sizeOfBuffers + sizeOfIndexHeader + sizeOfIndex; - - // Add the actual - data - auto buffer = std::make_unique(totalSize); - char* bi = buffer.get(); - std::vector headers; - int count = 0; - for (auto&& spec : specs) { - IndexElement el; - el.first.dataDescription = lookupDataDescription(spec.dataDescription); - el.first.dataOrigin = lookupDataOrigin(spec.origin); - el.first.payloadSize = spec.bufferSize; - el.first.headerSize = sizeof(el.first); - el.second = count++; - // Let's zero at least the header... - memset(bi, 0, sizeof(el.first)); - memcpy(bi, &el, sizeof(el.first)); - headers.push_back(el); - bi += sizeof(el.first); - spec.bufferFiller(bi, spec.bufferSize); - bi += spec.bufferSize; - } - - // Add the index - DataHeader index; - index.dataDescription = DataDescription("TIMEFRAMEINDEX"); - index.dataOrigin = DataOrigin("FKE"); - index.headerSize = sizeOfIndexHeader; - index.payloadSize = sizeOfIndex; - memcpy(bi, &index, sizeof(index)); - memcpy(bi + sizeof(index), headers.data(), headers.size() * sizeof(IndexElement)); - return std::move(buffer); -} - -} // namespace data_flow -} // namespace o2 diff --git a/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx b/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx deleted file mode 100644 index 81c07bcc82f5c..0000000000000 --- a/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include - -#include "DataFlow/FakeTimeframeGeneratorDevice.h" -#include "DataFlow/FakeTimeframeBuilder.h" -#include "DataFlow/TimeframeParser.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/DataHeader.h" -#include -#include - -using DataHeader = o2::header::DataHeader; - -namespace -{ -struct OneShotReadBuf : public std::streambuf { - OneShotReadBuf(char* s, std::size_t n) - { - setg(s, s, s + n); - } -}; -} // namespace -namespace o2 -{ -namespace data_flow -{ - -FakeTimeframeGeneratorDevice::FakeTimeframeGeneratorDevice() - : O2Device{}, mOutChannelName{}, mMaxTimeframes{}, mTimeframeCount{0} -{ -} - -void FakeTimeframeGeneratorDevice::InitTask() -{ - mOutChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); - mMaxTimeframes = GetConfig()->GetValue(OptionKeyMaxTimeframes); -} - -bool FakeTimeframeGeneratorDevice::ConditionalRun() -{ - auto addPartFn = [this](FairMQParts& parts, char* buffer, size_t size) { - parts.AddPart(this->NewMessage( - buffer, - size, - [](void* data, void* hint) { delete[](char*) data; }, - nullptr)); - }; - auto sendFn = [this](FairMQParts& parts) { this->Send(parts, this->mOutChannelName); }; - auto zeroFiller = [](char* b, size_t s) { memset(b, 0, s); }; - - std::vector specs = { - {.origin = "TPC", - .dataDescription = "CLUSTERS", - .bufferFiller = zeroFiller, - .bufferSize = 1000}, - {.origin = "ITS", - .dataDescription = "CLUSTERS", - .bufferFiller = zeroFiller, - .bufferSize = 500}}; - - try { - size_t totalSize; - auto buffer = fakeTimeframeGenerator(specs, totalSize); - OneShotReadBuf osrb(buffer.get(), totalSize); - std::istream s(&osrb); - - streamTimeframe(s, - addPartFn, - sendFn); - } catch (std::runtime_error& e) { - LOG(ERROR) << e.what() << "\n"; - } - - mTimeframeCount++; - - if (mTimeframeCount < mMaxTimeframes) { - return true; - } - return false; -} - -} // namespace data_flow -} // namespace o2 diff --git a/Utilities/DataFlow/src/HeartbeatSampler.cxx b/Utilities/DataFlow/src/HeartbeatSampler.cxx deleted file mode 100644 index d960a5941d9c9..0000000000000 --- a/Utilities/DataFlow/src/HeartbeatSampler.cxx +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -// @file HeartbeatSampler.h -// @author Matthias Richter -// @since 2017-02-03 -// @brief Heartbeat sampler device - -#include // this_thread::sleep_for -#include - -#include "DataFlow/HeartbeatSampler.h" -#include "Headers/HeartbeatFrame.h" -#include - -void o2::data_flow::HeartbeatSampler::InitTask() -{ - mPeriod = GetConfig()->GetValue(OptionKeyPeriod); - mOutputChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); -} - -bool o2::data_flow::HeartbeatSampler::ConditionalRun() -{ - std::this_thread::sleep_for(std::chrono::nanoseconds(mPeriod)); - - o2::header::HeartbeatStatistics hbfPayload; - - o2::header::DataHeader dh; - dh.dataDescription = o2::header::gDataDescriptionHeartbeatFrame; - dh.dataOrigin = o2::header::DataOrigin("SMPL"); - dh.subSpecification = 0; - dh.payloadSize = sizeof(hbfPayload); - - // Note: the block type of both header an trailer members of the envelope - // structure are autmatically initialized to the appropriate block type - // and size '1' (i.e. only one 64bit word) - o2::header::HeartbeatFrameEnvelope specificHeader; - specificHeader.header.orbit = mCount; - specificHeader.trailer.hbAccept = 1; - - O2Message outgoing; - - // build multipart message from header and payload - o2::base::addDataBlock(outgoing, {dh, specificHeader}, NewSimpleMessage(hbfPayload)); - - // send message - Send(outgoing, mOutputChannelName.c_str()); - outgoing.fParts.clear(); - - mCount++; - return true; -} diff --git a/Utilities/DataFlow/src/SubframeBuilderDevice.cxx b/Utilities/DataFlow/src/SubframeBuilderDevice.cxx deleted file mode 100644 index adad827e04a1f..0000000000000 --- a/Utilities/DataFlow/src/SubframeBuilderDevice.cxx +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file SubframeBuilderDevice.cxx -/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel -/// @since 2017-02-07 -/// @brief Demonstrator device for a subframe builder - -#include // this_thread::sleep_for -#include -#include - -#include "DataFlow/SubframeBuilderDevice.h" -#include "DataFlow/SubframeUtils.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/HeartbeatFrame.h" -#include "Headers/DataHeader.h" -#include - -using HeartbeatHeader = o2::header::HeartbeatHeader; -using HeartbeatTrailer = o2::header::HeartbeatTrailer; -using DataHeader = o2::header::DataHeader; -using SubframeId = o2::dataflow::SubframeId; - -o2::data_flow::SubframeBuilderDevice::SubframeBuilderDevice() - : O2Device() -{ -} - -o2::data_flow::SubframeBuilderDevice::~SubframeBuilderDevice() = default; - -void o2::data_flow::SubframeBuilderDevice::InitTask() -{ - mOrbitDuration = GetConfig()->GetValue(OptionKeyOrbitDuration); - mOrbitsPerTimeframe = GetConfig()->GetValue(OptionKeyOrbitsPerTimeframe); - mInputChannelName = GetConfig()->GetValue(OptionKeyInputChannelName); - mOutputChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); - mFLPId = GetConfig()->GetValue(OptionKeyFLPId); - mStripHBF = GetConfig()->GetValue(OptionKeyStripHBF); - - LOG(INFO) << "Obtaining data from DataPublisher\n"; - // Now that we have all the information lets create the policies to do the - // payload extraction and merging and create the actual PayloadMerger. - - // We extract the timeframeId from the number of orbits. - // FIXME: handle multiple socket ids - Merger::IdExtractor makeId = [this](std::unique_ptr& msg) -> SubframeId { - HeartbeatHeader* hbh = reinterpret_cast(msg->GetData()); - SubframeId id = {.timeframeId = hbh->orbit / this->mOrbitsPerTimeframe, - .socketId = 0}; - return id; - }; - - // We extract the payload differently depending on wether we want to strip - // the header or not. - Merger::PayloadExtractor payloadExtractor = [this](char** out, char* in, size_t inSize) -> size_t { - if (!this->mStripHBF) { - return Merger::fullPayloadExtractor(out, in, inSize); - } - return o2::dataflow::extractDetectorPayloadStrip(out, in, inSize); - }; - - // Whether a given timeframe is complete depends on how many orbits per - // timeframe we want. - Merger::MergeCompletionCheker checkIfComplete = - [this](Merger::MergeableId id, Merger::MessageMap& map) { - return map.count(id) < this->mOrbitsPerTimeframe; - }; - - mMerger.reset(new Merger(makeId, checkIfComplete, payloadExtractor)); - OnData(mInputChannelName.c_str(), &o2::data_flow::SubframeBuilderDevice::HandleData); -} - -bool o2::data_flow::SubframeBuilderDevice::BuildAndSendFrame(FairMQParts& inParts) -{ - auto id = mMerger->aggregate(inParts.At(1)); - - char** outBuffer; - size_t outSize = mMerger->finalise(outBuffer, id); - // In this case we do not have enough subtimeframes for id, - // so we simply return. - if (outSize == 0) { - return true; - } - // If we reach here, it means we do have enough subtimeframes. - - // top level subframe header, the DataHeader is going to be used with - // description "SUBTIMEFRAMEMD" - // this should be defined in a common place, and also the origin - // the origin can probably name a detector identifier, but not sure if - // all CRUs of a FLP in all cases serve a single detector - o2::header::DataHeader dh; - dh.dataDescription = o2::header::DataDescription("SUBTIMEFRAMEMD"); - dh.dataOrigin = o2::header::DataOrigin("FLP"); - dh.subSpecification = mFLPId; - dh.payloadSize = sizeof(SubframeMetadata); - - DataHeader payloadheader(*o2::header::get((byte*)inParts.At(0)->GetData())); - - // subframe meta information as payload - SubframeMetadata md; - // id is really the first orbit in the timeframe. - md.startTime = id.timeframeId * mOrbitsPerTimeframe * static_cast(mOrbitDuration); - md.duration = mOrbitDuration * mOrbitsPerTimeframe; - LOG(INFO) << "Start time for subframe (" << md.startTime << ", " - << md.duration - << ")"; - - // Add the metadata about the merged subtimeframes - // FIXME: do we really need this? - O2Message outgoing; - o2::base::addDataBlock(outgoing, dh, NewSimpleMessage(md)); - - // Add the actual merged payload. - o2::base::addDataBlock(outgoing, payloadheader, - NewMessage( - *outBuffer, outSize, - [](void* data, void* hint) { delete[] reinterpret_cast(hint); }, *outBuffer)); - // send message - Send(outgoing, mOutputChannelName.c_str()); - // FIXME: do we actually need this? outgoing should go out of scope - outgoing.fParts.clear(); - - return true; -} - -bool o2::data_flow::SubframeBuilderDevice::HandleData(FairMQParts& msgParts, int /*index*/) -{ - // loop over header payload pairs in the incoming multimessage - // for each pair - // - check timestamp - // - create new subtimeframe if none existing where the timestamp of the data fits - // - add pair to the corresponding subtimeframe - - // check for completed subtimeframes and send all completed frames - // the builder does not implement the routing to the EPN, this is done in the - // specific FLP-EPN setup - // to fit into the simple emulation of event/frame ids in the flpSender the order of - // subtimeframes needs to be preserved - BuildAndSendFrame(msgParts); - return true; -} diff --git a/Utilities/DataFlow/src/TimeframeParser.cxx b/Utilities/DataFlow/src/TimeframeParser.cxx deleted file mode 100644 index 4e3cc4f2001b8..0000000000000 --- a/Utilities/DataFlow/src/TimeframeParser.cxx +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file TimeframeValidatorDevice.cxx -/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel -/// @since 2017-02-07 -/// @brief Validator device for a full time frame - -#include // this_thread::sleep_for -#include -#include - -#include "DataFlow/TimeframeParser.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/DataHeader.h" -#include "TimeFrame/TimeFrame.h" - -#include -#include - -using DataHeader = o2::header::DataHeader; -using DataDescription = o2::header::DataDescription; -using IndexElement = o2::dataformats::IndexElement; - -namespace o2 -{ -namespace data_flow -{ - -// Possible states for the parsing of a timeframe -// PARSE_BEGIN_STREAM -> -enum ParsingState { - PARSE_BEGIN_STREAM = 0, - PARSE_BEGIN_TIMEFRAME, - PARSE_BEGIN_PAIR, - PARSE_DATA_HEADER, - PARSE_CONCRETE_HEADER, - PARSE_PAYLOAD, - PARSE_END_PAIR, - PARSE_END_TIMEFRAME, - PARSE_END_STREAM, - ERROR -}; - -struct StreamingState { - StreamingState() = default; - - ParsingState state = PARSE_BEGIN_STREAM; - bool hasDataHeader = false; - bool hasConcreteHeader = false; - void* payloadBuffer = nullptr; - void* headerBuffer = nullptr; - DataHeader dh; // The current DataHeader being parsed -}; - -void streamTimeframe(std::istream& stream, - std::function onAddPart, - std::function onSend) -{ - FairMQParts parts; - StreamingState state; - assert(state.state == PARSE_BEGIN_STREAM); - while (true) { - switch (state.state) { - case PARSE_BEGIN_STREAM: - LOG(INFO) << "In PARSE_BEGIN_STREAM\n"; - state.state = PARSE_BEGIN_TIMEFRAME; - break; - case PARSE_BEGIN_TIMEFRAME: - LOG(INFO) << "In PARSE_BEGIN_TIMEFRAME\n"; - state.state = PARSE_BEGIN_PAIR; - break; - case PARSE_BEGIN_PAIR: - LOG(INFO) << "In PARSE_BEGIN_PAIR\n"; - state.state = PARSE_DATA_HEADER; - state.hasDataHeader = false; - state.payloadBuffer = nullptr; - state.headerBuffer = nullptr; - break; - case PARSE_DATA_HEADER: - LOG(INFO) << "In PARSE_DATA_HEADER\n"; - if (state.hasDataHeader) { - throw std::runtime_error("DataHeader already present."); - } else if (state.payloadBuffer) { - throw std::runtime_error("Unexpected payload."); - } - LOG(INFO) << "Reading dataheader of " << sizeof(state.dh) << " bytes\n"; - stream.read(reinterpret_cast(&state.dh), sizeof(state.dh)); - // If we have a TIMEFRAMEINDEX part and we find the eof, we are done. - if (stream.eof()) { - throw std::runtime_error("Premature end of stream"); - } - - // Otherwise we move to the state which is responsible for parsing the - // kind of header. - state.state = PARSE_CONCRETE_HEADER; - break; - case PARSE_CONCRETE_HEADER: - LOG(INFO) << "In PARSE_CONCRETE_HEADER\n"; - if (state.headerBuffer) { - throw std::runtime_error("File has two consecutive headers"); - } - if (state.dh.headerSize < sizeof(DataHeader)) { - std::ostringstream str; - str << "Bad header size. Should be greater then " - << sizeof(DataHeader) - << ". Found " << state.dh.headerSize << "\n"; - throw std::runtime_error(str.str()); - } - // We get the full header size and read the rest of the header - state.headerBuffer = malloc(state.dh.headerSize); - memcpy(state.headerBuffer, &state.dh, sizeof(state.dh)); - LOG(INFO) << "Reading rest of the header of " << state.dh.headerSize - sizeof(state.dh) << " bytes\n"; - stream.read(reinterpret_cast(state.headerBuffer) + sizeof(state.dh), - state.dh.headerSize - sizeof(state.dh)); - // Handle the case the file was truncated. - if (stream.eof()) { - throw std::runtime_error("Unexpected end of file"); - } - onAddPart(parts, reinterpret_cast(state.headerBuffer), state.dh.headerSize); - // Move to parse the payload - state.state = PARSE_PAYLOAD; - break; - case PARSE_PAYLOAD: - LOG(INFO) << "In PARSE_PAYLOAD\n"; - if (state.payloadBuffer) { - throw std::runtime_error("File has two consecutive payloads"); - } - state.payloadBuffer = new char[state.dh.payloadSize]; - LOG(INFO) << "Reading payload of " << state.dh.payloadSize << " bytes\n"; - stream.read(reinterpret_cast(state.payloadBuffer), state.dh.payloadSize); - if (stream.eof()) { - throw std::runtime_error("Unexpected end of file"); - } - onAddPart(parts, reinterpret_cast(state.payloadBuffer), state.dh.payloadSize); - state.state = PARSE_END_PAIR; - break; - case PARSE_END_PAIR: - LOG(INFO) << "In PARSE_END_PAIR\n"; - state.state = state.dh == DataDescription("TIMEFRAMEINDEX") ? PARSE_END_TIMEFRAME : PARSE_BEGIN_PAIR; - break; - case PARSE_END_TIMEFRAME: - LOG(INFO) << "In PARSE_END_TIMEFRAME\n"; - onSend(parts); - // Check if we have more. If not, we can declare success. - stream.peek(); - if (stream.eof()) { - state.state = PARSE_END_STREAM; - } else { - state.state = PARSE_BEGIN_TIMEFRAME; - } - break; - case PARSE_END_STREAM: - return; - break; - default: - break; - } - } -} - -void streamTimeframe(std::ostream& stream, FairMQParts& parts) -{ - if (parts.Size() < 2) { - throw std::runtime_error("Expecting at least 2 parts\n"); - } - - auto indexHeader = o2::header::get(parts.At(parts.Size() - 2)->GetData()); - // FIXME: Provide iterator pair API for the index - // Index should really be something which provides an - // iterator pair API so that we can sort / find / lower_bound - // easily. Right now we simply use it a C-style array. - auto index = reinterpret_cast(parts.At(parts.Size() - 1)->GetData()); - - LOG(INFO) << "This time frame has " << parts.Size() << " parts.\n"; - auto indexEntries = indexHeader->payloadSize / sizeof(IndexElement); - if (indexHeader->dataDescription != DataDescription("TIMEFRAMEINDEX")) { - throw std::runtime_error("Could not find a valid index header\n"); - } - LOG(INFO) << indexHeader->dataDescription.str << "\n"; - LOG(INFO) << "This time frame has " << indexEntries << "entries in the index.\n"; - if ((indexEntries * 2 + 2) != (parts.Size())) { - std::stringstream err; - err << "Mismatched index and received parts. Expected " - << (parts.Size() - 2 * 2) << " found " << indexEntries; - throw std::runtime_error(err.str()); - } - - LOG(INFO) << "Everything is fine with received timeframe\n"; - for (size_t i = 0; i < parts.Size(); ++i) { - stream.write(reinterpret_cast(parts.At(i)->GetData()), - parts.At(i)->GetSize()); - } -} - -} // namespace data_flow -} // namespace o2 diff --git a/Utilities/DataFlow/src/TimeframeReaderDevice.cxx b/Utilities/DataFlow/src/TimeframeReaderDevice.cxx deleted file mode 100644 index 5113838bb0a5a..0000000000000 --- a/Utilities/DataFlow/src/TimeframeReaderDevice.cxx +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include - -#include "DataFlow/TimeframeReaderDevice.h" -#include "DataFlow/TimeframeParser.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/DataHeader.h" -#include - -using DataHeader = o2::header::DataHeader; - -namespace o2 -{ -namespace data_flow -{ - -TimeframeReaderDevice::TimeframeReaderDevice() - : O2Device{}, mOutChannelName{}, mFile{} -{ -} - -void TimeframeReaderDevice::InitTask() -{ - mOutChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); - mInFileName = GetConfig()->GetValue(OptionKeyInputFileName); - mSeen.clear(); -} - -bool TimeframeReaderDevice::ConditionalRun() -{ - auto addPartFn = [this](FairMQParts& parts, char* buffer, size_t size) { - parts.AddPart(this->NewMessage( - buffer, - size, - [](void* data, void* hint) { delete[](char*) data; }, - nullptr)); - }; - auto sendFn = [this](FairMQParts& parts) { this->Send(parts, this->mOutChannelName); }; - - // FIXME: For the moment we support a single file. This should really be a glob. We - // should also have a strategy for watching directories. - std::vector files; - files.push_back(mInFileName); - for (auto&& fn : files) { - mFile.open(fn, std::ofstream::in | std::ofstream::binary); - try { - streamTimeframe(mFile, - addPartFn, - sendFn); - } catch (std::runtime_error& e) { - LOG(ERROR) << e.what() << "\n"; - } - mSeen.push_back(fn); - } - - return false; -} - -} // namespace data_flow -} // namespace o2 diff --git a/Utilities/DataFlow/src/TimeframeValidationTool.cxx b/Utilities/DataFlow/src/TimeframeValidationTool.cxx deleted file mode 100644 index b3f1168006fc6..0000000000000 --- a/Utilities/DataFlow/src/TimeframeValidationTool.cxx +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "DataFlow/TimeframeParser.h" -#include "fairmq/FairMQParts.h" -#include "Framework/Logger.h" -#include -#include -#include -#include -#include -#include -#include - -// A simple tool which verifies timeframe files -int main(int argc, char** argv) -{ - int c; - opterr = 0; - - while ((c = getopt(argc, argv, "")) != -1) { - switch (c) { - case '?': - if (isprint(optopt)) { - fprintf(stderr, "Unknown option `-%c'.\n", optopt); - } else { - fprintf(stderr, - "Unknown option character `\\x%x'.\n", - optopt); - } - return 1; - default: - abort(); - } - } - - std::vector filenames; - for (size_t index = optind; index < argc; index++) { - filenames.emplace_back(std::string(argv[index])); - } - - for (auto&& fn : filenames) { - LOG(INFO) << "Processing file" << fn << "\n"; - std::ifstream s(fn); - FairMQParts parts; - auto onAddParts = [](FairMQParts& p, char* buffer, size_t size) { - }; - auto onSend = [](FairMQParts& p) { - }; - - try { - o2::data_flow::streamTimeframe(s, onAddParts, onSend); - } catch (std::runtime_error& e) { - LOG(ERROR) << e.what() << std::endl; - exit(1); - } - } -} diff --git a/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx b/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx deleted file mode 100644 index 4f62bc621ef46..0000000000000 --- a/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file TimeframeValidatorDevice.cxx -/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel -/// @since 2017-02-07 -/// @brief Validator device for a full time frame - -#include // this_thread::sleep_for -#include - -#include "DataFlow/TimeframeValidatorDevice.h" -#include "TimeFrame/TimeFrame.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/DataHeader.h" -#include "O2Device/Compatibility.h" - -#include - -using DataHeader = o2::header::DataHeader; -using DataOrigin = o2::header::DataOrigin; -using DataDescription = o2::header::DataDescription; -using IndexElement = o2::dataformats::IndexElement; - -o2::data_flow::TimeframeValidatorDevice::TimeframeValidatorDevice() - : O2Device(), mInChannelName() -{ -} - -void o2::data_flow::TimeframeValidatorDevice::InitTask() -{ - mInChannelName = GetConfig()->GetValue(OptionKeyInputChannelName); -} - -void o2::data_flow::TimeframeValidatorDevice::Run() -{ - while (compatibility::FairMQ13::IsRunning(this)) { - FairMQParts timeframeParts; - if (Receive(timeframeParts, mInChannelName, 0, 100) <= 0) { - continue; - } - - if (timeframeParts.Size() < 2) { - LOG(ERROR) << "Expecting at least 2 parts\n"; - } - - auto indexHeader = o2::header::get(timeframeParts.At(timeframeParts.Size() - 2)->GetData()); - // FIXME: Provide iterator pair API for the index - // Index should really be something which provides an - // iterator pair API so that we can sort / find / lower_bound - // easily. Right now we simply use it a C-style array. - auto index = reinterpret_cast(timeframeParts.At(timeframeParts.Size() - 1)->GetData()); - - // TODO: fill this with checks on time frame - LOG(INFO) << "This time frame has " << timeframeParts.Size() << " parts.\n"; - auto indexEntries = indexHeader->payloadSize / sizeof(DataHeader); - if (indexHeader->dataDescription != DataDescription("TIMEFRAMEINDEX")) { - LOG(ERROR) << "Could not find a valid index header\n"; - } - LOG(INFO) << indexHeader->dataDescription.str << "\n"; - LOG(INFO) << "This time frame has " << indexEntries << "entries in the index.\n"; - if ((indexEntries * 2 + 2) != (timeframeParts.Size())) { - LOG(ERROR) << "Mismatched index and received parts\n"; - } - - // - Use the index to find out if we have TPC data - // - Get the part with the TPC data - // - Validate TPCCluster dummy data - // - Validate ITSRaw dummy data - int tpcIndex = -1; - int itsIndex = -1; - - for (int ii = 0; ii < indexEntries; ++ii) { - IndexElement& ie = index[ii]; - assert(ie.second >= 0); - LOG(DEBUG) << ie.first.dataDescription.str << " " - << ie.first.dataOrigin.str << std::endl; - if ((ie.first.dataOrigin == header::gDataOriginTPC) && (ie.first.dataDescription == header::gDataDescriptionClusters)) { - tpcIndex = ie.second; - } - if ((ie.first.dataOrigin == header::gDataOriginITS) && (ie.first.dataDescription == header::gDataDescriptionClusters)) { - itsIndex = ie.second; - } - } - - if (tpcIndex < 0) { - LOG(ERROR) << "Could not find expected TPC payload\n"; - continue; - } - if (itsIndex < 0) { - LOG(ERROR) << "Could not find expected ITS payload\n"; - continue; - } - LOG(DEBUG) << "TPC Index " << tpcIndex << "\n"; - LOG(DEBUG) << "ITS Index " << itsIndex << "\n"; - - // Data header it at position - 1 - auto tpcHeader = reinterpret_cast(timeframeParts.At(tpcIndex)->GetData()); - if ((tpcHeader->dataDescription != header::gDataDescriptionClusters) || - (tpcHeader->dataOrigin != header::gDataOriginTPC)) { - LOG(ERROR) << "Wrong data description. Expecting TPC - CLUSTERS, found " - << tpcHeader->dataOrigin.str << " - " - << tpcHeader->dataDescription.str << "\n"; - continue; - } - auto tpcPayload = reinterpret_cast(timeframeParts.At(tpcIndex + 1)->GetData()); - if (tpcHeader->payloadSize % sizeof(TPCTestCluster)) { - LOG(ERROR) << "TPC - CLUSTERS Size Mismatch\n"; - } - auto numOfClusters = tpcHeader->payloadSize / sizeof(TPCTestCluster); - for (size_t ci = 0; ci < numOfClusters; ++ci) { - TPCTestCluster& cluster = tpcPayload[ci]; - if (cluster.z != 1.5) { - LOG(ERROR) << "TPC Data mismatch. Expecting z = 1.5 got " << cluster.z << "\n"; - break; - } - if (cluster.timeStamp != ci) { - LOG(ERROR) << "TPC Data mismatch. Expecting " << ci << " got " << cluster.timeStamp << "\n"; - break; - } - } - - // Data header it at position - 1 - auto itsHeader = reinterpret_cast(timeframeParts.At(itsIndex)->GetData()); - if ((itsHeader->dataDescription != header::gDataDescriptionClusters) || (itsHeader->dataOrigin != header::gDataOriginITS)) { - LOG(ERROR) << "Wrong data description. Expecting ITS - CLUSTERS, found " - << itsHeader->dataOrigin.str << " - " << itsHeader->dataDescription.str << "\n"; - continue; - } - auto itsPayload = reinterpret_cast(timeframeParts.At(itsIndex + 1)->GetData()); - if (itsHeader->payloadSize % sizeof(ITSRawData)) { - LOG(ERROR) << "ITS - CLUSTERS Size Mismatch.\n"; - } - numOfClusters = itsHeader->payloadSize / sizeof(ITSRawData); - for (size_t ci = 0; ci < numOfClusters; ++ci) { - ITSRawData& cluster = itsPayload[ci]; - if (cluster.timeStamp != ci) { - LOG(ERROR) << "ITS Data mismatch. Expecting " << ci - << " got " << cluster.timeStamp << "\n"; - break; - } - } - LOG(INFO) << "Everything is fine with received timeframe\n"; - } -} diff --git a/Utilities/DataFlow/src/TimeframeWriterDevice.cxx b/Utilities/DataFlow/src/TimeframeWriterDevice.cxx deleted file mode 100644 index 151ac59424880..0000000000000 --- a/Utilities/DataFlow/src/TimeframeWriterDevice.cxx +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file TimeframeValidatorDevice.cxx -/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel -/// @since 2017-02-07 -/// @brief Validator device for a full time frame - -#include // this_thread::sleep_for -#include - -#include "DataFlow/TimeframeWriterDevice.h" -#include "DataFlow/TimeframeParser.h" -#include "TimeFrame/TimeFrame.h" -#include "Headers/SubframeMetadata.h" -#include "Headers/DataHeader.h" -#include "O2Device/Compatibility.h" - -#include -#include - -using DataHeader = o2::header::DataHeader; -using IndexElement = o2::dataformats::IndexElement; - -namespace o2 -{ -namespace data_flow -{ - -TimeframeWriterDevice::TimeframeWriterDevice() - : O2Device{}, mInChannelName{}, mFile{}, mMaxTimeframes{}, mMaxFileSize{}, mMaxFiles{}, mFileCount{0} -{ -} - -void TimeframeWriterDevice::InitTask() -{ - mInChannelName = GetConfig()->GetValue(OptionKeyInputChannelName); - mOutFileName = GetConfig()->GetValue(OptionKeyOutputFileName); - mMaxTimeframes = GetConfig()->GetValue(OptionKeyMaxTimeframesPerFile); - mMaxFileSize = GetConfig()->GetValue(OptionKeyMaxFileSize); - mMaxFiles = GetConfig()->GetValue(OptionKeyMaxFiles); -} - -void TimeframeWriterDevice::Run() -{ - boost::filesystem::path p(mOutFileName); - size_t streamedTimeframes = 0; - bool needsNewFile = true; - while (compatibility::FairMQ13::IsRunning(this) && mFileCount < mMaxFiles) { - // In case we need to process more than one file, - // the filename is split in basename and extension - // and we call the files `.`. - if (needsNewFile) { - std::string filename = mOutFileName; - if (mMaxFiles > 1) { - std::string base_path(mOutFileName, 0, mOutFileName.find_last_of(".")); - std::string extension(mOutFileName, mOutFileName.find_last_of(".")); - filename = base_path + std::to_string(mFileCount) + extension; - } - LOG(INFO) << "Opening " << filename << " for output\n"; - mFile.open(filename.c_str(), std::ofstream::out | std::ofstream::binary); - needsNewFile = false; - } - - FairMQParts timeframeParts; - if (Receive(timeframeParts, mInChannelName, 0, 100) <= 0) { - continue; - } - - streamTimeframe(mFile, timeframeParts); - if ((mFile.tellp() > mMaxFileSize) || (streamedTimeframes++ > mMaxTimeframes)) { - mFile.flush(); - mFile.close(); - mFileCount++; - needsNewFile = true; - } - } -} - -void TimeframeWriterDevice::PostRun() -{ - if (mFile.is_open()) { - mFile.flush(); - mFile.close(); - } -} - -} // namespace data_flow -} // namespace o2 diff --git a/Utilities/DataFlow/src/runEPNReceiver.cxx b/Utilities/DataFlow/src/runEPNReceiver.cxx deleted file mode 100644 index b4681116052df..0000000000000 --- a/Utilities/DataFlow/src/runEPNReceiver.cxx +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/** - * runEPNReceiver.cxx - * - * @since 2013-01-21 - * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos - */ - -#include "runFairMQDevice.h" -#include "DataFlow/EPNReceiverDevice.h" - -#include - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - ("buffer-timeout", bpo::value()->default_value(1000), "Buffer timeout in milliseconds") - ("num-flps", bpo::value()->required(), "Number of FLPs") - ("test-mode", bpo::value()->default_value(0), "Run in test mode") - ("in-chan-name", bpo::value()->default_value("stf2"), "Name of the input channel (sub-time frames)") - ("out-chan-name", bpo::value()->default_value("tf"), "Name of the output channel (time frames)") - ("ack-chan-name", bpo::value()->default_value("ack"), "Name of the acknowledgement channel"); - // clang-format on -} - -FairMQDevice* getDevice(const FairMQProgOptions& config) -{ - return new o2::devices::EPNReceiverDevice(); -} diff --git a/Utilities/DataFlow/src/runFLPSender.cxx b/Utilities/DataFlow/src/runFLPSender.cxx deleted file mode 100644 index ed2b94d648954..0000000000000 --- a/Utilities/DataFlow/src/runFLPSender.cxx +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/FLPSenderDevice.h" - -#include - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - ("flp-index", bpo::value()->default_value(0), "FLP Index (for debugging in test mode)") - ("event-size", bpo::value()->default_value(1000), "Event size in bytes (test mode)") - ("num-epns", bpo::value()->required(), "Number of EPNs") - ("test-mode", bpo::value()->default_value(0), "Run in test mode") - ("send-offset", bpo::value()->default_value(0), "Offset for staggered sending") - ("send-delay", bpo::value()->default_value(8), "Delay for staggered sending") - ("in-chan-name", bpo::value()->default_value("stf1"), "Name of the input channel (sub-time frames)") - ("out-chan-name", bpo::value()->default_value("stf2"), "Name of the output channel (sub-time frames)"); - // clang-format on -} - -FairMQDevice* getDevice(const FairMQProgOptions& config) -{ - return new o2::devices::FLPSenderDevice(); -} diff --git a/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx b/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx deleted file mode 100644 index 755cf2b8f111f..0000000000000 --- a/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/FakeTimeframeGeneratorDevice.h" -#include - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::FakeTimeframeGeneratorDevice::OptionKeyOutputChannelName, - bpo::value()->default_value("output"), - "Name of the output channel"); - options.add_options() - (o2::data_flow::FakeTimeframeGeneratorDevice::OptionKeyMaxTimeframes, - bpo::value()->default_value("1"), - "Number of timeframes to generate"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::FakeTimeframeGeneratorDevice(); -} diff --git a/Utilities/DataFlow/src/runHeartbeatSampler.cxx b/Utilities/DataFlow/src/runHeartbeatSampler.cxx deleted file mode 100644 index e5fcd7b12c6b2..0000000000000 --- a/Utilities/DataFlow/src/runHeartbeatSampler.cxx +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/HeartbeatSampler.h" - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::HeartbeatSampler::OptionKeyPeriod, - bpo::value()->default_value(1000000000), - "sampling period") - (o2::data_flow::HeartbeatSampler::OptionKeyOutputChannelName, - bpo::value()->default_value("output"), - "Name of the output channel"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::HeartbeatSampler(); -} diff --git a/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx b/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx deleted file mode 100644 index 1c26bf66d6e5f..0000000000000 --- a/Utilities/DataFlow/src/runSubframeBuilderDevice.cxx +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/SubframeBuilderDevice.h" - -namespace bpo = boost::program_options; - -constexpr uint32_t o2::data_flow::SubframeBuilderDevice::DefaultOrbitDuration; -constexpr uint32_t o2::data_flow::SubframeBuilderDevice::DefaultOrbitsPerTimeframe; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::SubframeBuilderDevice::OptionKeyOrbitDuration, - bpo::value()->default_value(o2::data_flow::SubframeBuilderDevice::DefaultOrbitDuration), - "Orbit duration") - (o2::data_flow::SubframeBuilderDevice::OptionKeyOrbitsPerTimeframe, - bpo::value()->default_value(o2::data_flow::SubframeBuilderDevice::DefaultOrbitsPerTimeframe), - "Orbits per timeframe") - (o2::data_flow::SubframeBuilderDevice::OptionKeyInputChannelName, - bpo::value()->default_value("input"), - "Name of the input channel") - (o2::data_flow::SubframeBuilderDevice::OptionKeyOutputChannelName, - bpo::value()->default_value("output"), - "Name of the output channel") - (o2::data_flow::SubframeBuilderDevice::OptionKeyDetector, - bpo::value()->default_value("TPC"), - "Name of detector as data source") - (o2::data_flow::SubframeBuilderDevice::OptionKeyFLPId, - bpo::value()->default_value(0), - "ID of the FLP used as data source") - (o2::data_flow::SubframeBuilderDevice::OptionKeyStripHBF, - bpo::bool_switch()->default_value(false), - "Strip HBH & HBT from each HBF"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::SubframeBuilderDevice(); -} diff --git a/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx b/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx deleted file mode 100644 index 57574a0dd7811..0000000000000 --- a/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/TimeframeReaderDevice.h" - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::TimeframeReaderDevice::OptionKeyOutputChannelName, - bpo::value()->default_value("output"), - "Name of the output channel"); - options.add_options() - (o2::data_flow::TimeframeReaderDevice::OptionKeyInputFileName, - bpo::value()->default_value("data.o2tf"), - "Name of the input file"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::TimeframeReaderDevice(); -} diff --git a/Utilities/DataFlow/src/runTimeframeValidatorDevice.cxx b/Utilities/DataFlow/src/runTimeframeValidatorDevice.cxx deleted file mode 100644 index 34b5e90d46e69..0000000000000 --- a/Utilities/DataFlow/src/runTimeframeValidatorDevice.cxx +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/TimeframeValidatorDevice.h" - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::TimeframeValidatorDevice::OptionKeyInputChannelName, - bpo::value()->default_value("input"), - "Name of the input channel"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::TimeframeValidatorDevice(); -} diff --git a/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx b/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx deleted file mode 100644 index 112c0cb3f8be1..0000000000000 --- a/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "runFairMQDevice.h" -#include "DataFlow/TimeframeWriterDevice.h" - -namespace bpo = boost::program_options; - -void addCustomOptions(bpo::options_description& options) -{ - // clang-format off - options.add_options() - (o2::data_flow::TimeframeWriterDevice::OptionKeyInputChannelName, - bpo::value()->default_value("input"), - "Name of the input channel"); - options.add_options() - (o2::data_flow::TimeframeWriterDevice::OptionKeyOutputFileName, - bpo::value()->default_value("data.o2tf"), - "Name of the input channel"); - options.add_options() - (o2::data_flow::TimeframeWriterDevice::OptionKeyMaxFiles, - bpo::value()->default_value(1), - "Maximum number of files to write"); - options.add_options() - (o2::data_flow::TimeframeWriterDevice::OptionKeyMaxTimeframesPerFile, - bpo::value()->default_value(1), - "Maximum number of timeframes per file"); - options.add_options() - (o2::data_flow::TimeframeWriterDevice::OptionKeyMaxFileSize, - bpo::value()->default_value(-1), - "Maximum size per file"); - // clang-format on -} - -FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) -{ - return new o2::data_flow::TimeframeWriterDevice(); -} diff --git a/Utilities/DataFlow/test/test_PayloadMerger01.cxx b/Utilities/DataFlow/test/test_PayloadMerger01.cxx deleted file mode 100644 index 40cc1e252a84c..0000000000000 --- a/Utilities/DataFlow/test/test_PayloadMerger01.cxx +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. -#define BOOST_TEST_MODULE Test Utilities DataFlowTest -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include - -#include -#include -#include "DataFlow/PayloadMerger.h" -#include "DataFlow/FakeTimeframeBuilder.h" -#include "DataFlow/TimeframeParser.h" -#include "DataFlow/SubframeUtils.h" -#include "fairmq/FairMQTransportFactory.h" -#include "fairmq/FairMQParts.h" - -using SubframeId = o2::dataflow::SubframeId; -using HeartbeatHeader = o2::header::HeartbeatHeader; -using HeartbeatTrailer = o2::header::HeartbeatTrailer; - -SubframeId fakeAddition(o2::dataflow::PayloadMerger& merger, - std::shared_ptr& transport, - int64_t orbit) -{ - // Create a message - // - // We set orbit to be always the same and the actual contents to be 127 - static size_t dummyMessageSize = 1000; - auto msg = transport->CreateMessage(dummyMessageSize); - char* b = reinterpret_cast(msg->GetData()) + sizeof(HeartbeatHeader); - for (size_t i = 0; i < (dummyMessageSize - sizeof(HeartbeatHeader)); ++i) { - b[i] = orbit; - } - b[0] = 127; - HeartbeatHeader* header = reinterpret_cast(msg->GetData()); - header->orbit = orbit; - return merger.aggregate(msg); -} - -BOOST_AUTO_TEST_CASE(PayloadMergerTest) -{ - auto zmq = FairMQTransportFactory::CreateTransportFactory("zeromq"); - - // Needs three subtimeframes to merge them - auto checkIfComplete = [](SubframeId id, o2::dataflow::PayloadMerger::MessageMap& m) -> bool { - return m.count(id) >= 3; - }; - - // Id is given by the orbit, 2 orbits per timeframe - auto makeId = [](std::unique_ptr& msg) { - auto header = reinterpret_cast(msg->GetData()); - return o2::dataflow::makeIdFromHeartbeatHeader(*header, 0, 2); - }; - - o2::dataflow::PayloadMerger merger(makeId, checkIfComplete, o2::dataflow::extractDetectorPayloadStrip); - char* finalBuf = new char[3000]; - size_t finalSize = 0; - auto id = fakeAddition(merger, zmq, 1); - finalSize = merger.finalise(&finalBuf, id); - BOOST_CHECK(finalSize == 0); // Not enough parts, not merging yet. - id = fakeAddition(merger, zmq, 1); - finalSize = merger.finalise(&finalBuf, id); - BOOST_CHECK(finalSize == 0); // Not enough parts, not merging yet. - id = fakeAddition(merger, zmq, 2); - finalSize = merger.finalise(&finalBuf, id); - BOOST_CHECK(finalSize == 0); // Different ID, not merging yet. - id = fakeAddition(merger, zmq, 1); - finalSize = merger.finalise(&finalBuf, id); - BOOST_CHECK(finalSize); // Now we merge! - size_t partSize = (1000 - sizeof(HeartbeatHeader) - sizeof(HeartbeatTrailer)); - BOOST_CHECK(finalSize == 3 * partSize); // This should be the calculated size - for (size_t i = 0; i < finalSize; ++i) { - BOOST_CHECK(finalBuf[i] == ((i % partSize) == 0 ? 127 : 1)); - } -} diff --git a/Utilities/DataFlow/test/test_SubframeUtils01.cxx b/Utilities/DataFlow/test/test_SubframeUtils01.cxx deleted file mode 100644 index 32605f9075808..0000000000000 --- a/Utilities/DataFlow/test/test_SubframeUtils01.cxx +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#define BOOST_TEST_MODULE Test Utilities DataFlowTest -#define BOOST_TEST_MAIN -#define BOOST_TEST_DYN_LINK - -#include "DataFlow/SubframeUtils.h" -#include -#include - -BOOST_AUTO_TEST_CASE(SubframeUtils01) -{ - o2::dataflow::SubframeId a; - a.timeframeId = 0; - a.socketId = 1; - o2::dataflow::SubframeId b; - b.timeframeId = 1; - b.socketId = 0; - BOOST_CHECK(a < b); - char* buf = new char[1000]; - memset(buf, 126, 1000); - for (size_t i = sizeof(o2::header::HeartbeatHeader); i < 1000 - sizeof(o2::header::HeartbeatHeader); ++i) { - buf[i] = 0; - } - BOOST_CHECK(buf[0] == 126); - BOOST_CHECK(buf[sizeof(o2::header::HeartbeatHeader)] == 0); - BOOST_CHECK(buf[sizeof(o2::header::HeartbeatHeader) - 1] == 126); - char* realPayload = nullptr; - size_t realSize = o2::dataflow::extractDetectorPayloadStrip(&realPayload, buf, 1000); - BOOST_CHECK(realPayload != nullptr); - BOOST_CHECK(realSize == 1000 - sizeof(o2::header::HeartbeatHeader) - sizeof(o2::header::HeartbeatTrailer)); - BOOST_CHECK(realPayload == buf + sizeof(o2::header::HeartbeatHeader)); - BOOST_CHECK(realPayload[0] == 0); - - o2::header::HeartbeatHeader header1; - header1.orbit = 0; - o2::header::HeartbeatHeader header2; - header2.orbit = 255; - o2::header::HeartbeatHeader header3; - header3.orbit = 256; - - auto id1 = o2::dataflow::makeIdFromHeartbeatHeader(header1, 1, 256); - auto id2 = o2::dataflow::makeIdFromHeartbeatHeader(header2, 1, 256); - auto id3 = o2::dataflow::makeIdFromHeartbeatHeader(header3, 1, 256); - BOOST_CHECK(!(id1 < id2)); // Maybe we should provide an == operator - BOOST_CHECK(!(id2 < id1)); - BOOST_CHECK(id1 < id3); - BOOST_CHECK(id2 < id3); - BOOST_CHECK(id1.timeframeId == 0); - BOOST_CHECK(id3.timeframeId == 1); -} diff --git a/Utilities/DataFlow/test/test_TimeframeParser.cxx b/Utilities/DataFlow/test/test_TimeframeParser.cxx deleted file mode 100644 index 37e6f5af09c36..0000000000000 --- a/Utilities/DataFlow/test/test_TimeframeParser.cxx +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "DataFlow/TimeframeParser.h" -#include "DataFlow/FakeTimeframeBuilder.h" -#include "Headers/DataHeader.h" -#include "Framework/Logger.h" -#include -#include -#include - -struct OneShotReadBuf : public std::streambuf { - OneShotReadBuf(char* s, std::size_t n) - { - setg(s, s, s + n); - } -}; - -using DataHeader = o2::header::DataHeader; - -int main(int argc, char** argv) -{ - // Construct a dummy timeframe. - // Stream it and get the parts - FairMQParts parts; - auto onAddParts = [](FairMQParts& p, char* buffer, size_t size) { - LOG(INFO) << "Adding part to those to be sent.\n"; - }; - auto onSend = [](FairMQParts& p) { - LOG(INFO) << "Everything OK. Sending parts\n"; - }; - - // Prepare a test timeframe to be streamed - auto zeroFiller = [](char* b, size_t s) { memset(b, 0, s); }; - std::vector specs = { - {.origin = "TPC", - .dataDescription = "CLUSTERS", - .bufferFiller = zeroFiller, - .bufferSize = 1000}}; - - size_t testBufferSize; - auto testBuffer = fakeTimeframeGenerator(specs, testBufferSize); - - OneShotReadBuf osrb(testBuffer.get(), testBufferSize); - std::istream s(&osrb); - - try { - o2::data_flow::streamTimeframe(s, onAddParts, onSend); - } catch (std::runtime_error& e) { - LOG(ERROR) << e.what() << std::endl; - exit(1); - } -}