From 5e3117146e31f71f0b10064a747b78612faedf56 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 11 Jun 2026 16:46:41 +0200 Subject: [PATCH 1/7] [ntuple] Prevent redundant processor (re-)connection in chains --- tree/ntuple/src/RNTupleProcessor.cxx | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tree/ntuple/src/RNTupleProcessor.cxx b/tree/ntuple/src/RNTupleProcessor.cxx index 3a00539e0a82c..ed6d7c702b730 100644 --- a/tree/ntuple/src/RNTupleProcessor.cxx +++ b/tree/ntuple/src/RNTupleProcessor.cxx @@ -347,13 +347,21 @@ ROOT::Experimental::RNTupleChainProcessor::AddFieldToEntry(const std::string &fi ROOT::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::LoadEntry(ROOT::NTupleSize_t entryNumber) { - ROOT::NTupleSize_t localEntryNumber = entryNumber; - std::size_t currProcessorNumber = 0; + // If the requested entry number is lower than the current entry number, we have to again localise the correct local + // entry number starting from the first processor in the chain. Otherwise, we can continue looking from the inner + // processor that is currently connected, which is much faster when the chain consists of many inner processors. if (entryNumber < fCurrentEntryNumber) { fCurrentProcessorNumber = 0; ConnectInnerProcessor(fCurrentProcessorNumber); } + std::size_t currProcessorNumber = fCurrentProcessorNumber; + ROOT::NTupleSize_t entriesSeen = 0; + for (unsigned i = 0; i < currProcessorNumber; ++i) { + entriesSeen += fInnerProcessors[i]->GetNEntries(); + } + ROOT::NTupleSize_t localEntryNumber = entryNumber - entriesSeen; + // As long as the entry fails to load from the current processor, we decrement the local entry number with the number // of entries in this processor and try with the next processor until we find the correct local entry number. while (fInnerProcessors[currProcessorNumber]->LoadEntry(localEntryNumber) == kInvalidNTupleIndex) { From 0c7ea383c725c196b9ce7a0f3870d08daf92a43a Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Fri, 12 Jun 2026 13:28:00 +0200 Subject: [PATCH 2/7] [ntuple] Remove unnecessary init and connect of single processors --- tree/ntuple/inc/ROOT/RNTupleProcessor.hxx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx index 5598260ad3ae5..22c1d399f3ce4 100644 --- a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx @@ -615,9 +615,8 @@ private: /// \brief Get the total number of entries in this processor. ROOT::NTupleSize_t GetNEntries() final { - Initialize(); if (fNEntries == ROOT::kInvalidNTupleIndex) - Connect(fFieldIdxs); + Initialize(); return fNEntries; } From 91e6ec111b115c6726c0fef73d74b154ba0818bf Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Fri, 12 Jun 2026 14:05:08 +0200 Subject: [PATCH 3/7] [ntuple] Use cached entry counts before calling GetNEntries --- tree/ntuple/src/RNTupleProcessor.cxx | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tree/ntuple/src/RNTupleProcessor.cxx b/tree/ntuple/src/RNTupleProcessor.cxx index ed6d7c702b730..077503fe2cc66 100644 --- a/tree/ntuple/src/RNTupleProcessor.cxx +++ b/tree/ntuple/src/RNTupleProcessor.cxx @@ -358,7 +358,10 @@ ROOT::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::LoadEntry(ROOT::NT std::size_t currProcessorNumber = fCurrentProcessorNumber; ROOT::NTupleSize_t entriesSeen = 0; for (unsigned i = 0; i < currProcessorNumber; ++i) { - entriesSeen += fInnerProcessors[i]->GetNEntries(); + if (fInnerNEntries[i] == kInvalidNTupleIndex) { + fInnerNEntries[i] = fInnerProcessors[i]->GetNEntries(); + } + entriesSeen += fInnerNEntries[i]; } ROOT::NTupleSize_t localEntryNumber = entryNumber - entriesSeen; From 6a8efec757654f94dbd9625284cda4b4bf8e2fd7 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Mon, 25 May 2026 09:12:04 +0200 Subject: [PATCH 4/7] [ntuple] Add `RNTupleProcessor::AddAllFields` --- tree/ntuple/inc/ROOT/RNTupleProcessor.hxx | 18 +++++++ tree/ntuple/src/RNTupleProcessor.cxx | 53 +++++++++++++++++++ tree/ntuple/test/ntuple_processor.cxx | 56 +++++++++++++++++++++ tree/ntuple/test/ntuple_processor_chain.cxx | 20 ++++++++ tree/ntuple/test/ntuple_processor_join.cxx | 18 +++++++ 5 files changed, 165 insertions(+) diff --git a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx index 22c1d399f3ce4..3951677f18d47 100644 --- a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx @@ -31,9 +31,12 @@ namespace ROOT { namespace Experimental { +class RNTupleProcessor; namespace Internal { struct RNTupleProcessorEntryLoader; +const RNTupleProcessorEntry * +LoadFullRNTupleProcessorEntry(ROOT::Experimental::RNTupleProcessor &processor, bool includeSubfields); } // namespace Internal // clang-format off @@ -247,6 +250,8 @@ that is returned by RequestField(). // clang-format on class RNTupleProcessor { friend struct ROOT::Experimental::Internal::RNTupleProcessorEntryLoader; // for unit tests + friend const Internal::RNTupleProcessorEntry * + ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(RNTupleProcessor &processor, bool includeSubfields); friend class RNTupleSingleProcessor; friend class RNTupleChainProcessor; friend class RNTupleJoinProcessor; @@ -317,6 +322,10 @@ protected: AddFieldToEntry(const std::string &fieldName, const std::string &typeName, void *valuePtr, const Internal::RNTupleProcessorProvenance &provenance) = 0; + // TODO docs + virtual void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, + bool includeSubfields) = 0; + ///////////////////////////////////////////////////////////////////////////// /// \brief Add the entry mappings for this processor to the provided join table. /// @@ -634,6 +643,9 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, + bool includeSubfields) final; + ///////////////////////////////////////////////////////////////////////////// /// \brief Add the entry mappings for this processor to the provided join table. /// @@ -729,6 +741,9 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, + bool includeSubfields) final; + ///////////////////////////////////////////////////////////////////////////// /// \brief Add the entry mappings for this processor to the provided join table. /// @@ -829,6 +844,9 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, + bool includeSubfields) final; + ///////////////////////////////////////////////////////////////////////////// /// \brief Add the entry mappings for this processor to the provided join table. /// diff --git a/tree/ntuple/src/RNTupleProcessor.cxx b/tree/ntuple/src/RNTupleProcessor.cxx index 077503fe2cc66..9a5b8a3c02e60 100644 --- a/tree/ntuple/src/RNTupleProcessor.cxx +++ b/tree/ntuple/src/RNTupleProcessor.cxx @@ -23,6 +23,13 @@ #include +const ROOT::Experimental::Internal::RNTupleProcessorEntry * +ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(RNTupleProcessor &processor, bool includeSubfields) +{ + processor.AddAllFieldsToEntry(RNTupleProcessorProvenance(), /*addPrefixProvenance=*/false, includeSubfields); + return processor.fEntry.get(); +} + std::unique_ptr ROOT::Experimental::RNTupleOpenSpec::CreatePageSource() const { if (const std::string *storagePath = std::get_if(&fStorage)) @@ -212,6 +219,36 @@ ROOT::Experimental::RNTupleSingleProcessor::AddFieldToEntry(const std::string &f return *fieldIdx; } +void ROOT::Experimental::RNTupleSingleProcessor::AddAllFieldsToEntry( + const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) +{ + Initialize(); + auto &desc = fPageSource->GetSharedDescriptorGuard().GetRef(); + auto fnAddSubfields = [this, &desc, &provenance, &addPrefixProvenance](const ROOT::RFieldDescriptor &field, + auto &fn) -> void { + std::string fieldName = desc.GetQualifiedFieldName(field.GetId()); + if (addPrefixProvenance) + fieldName = provenance.Get() + "." + fieldName; + + AddFieldToEntry(fieldName, field.GetTypeName(), nullptr, provenance); + for (const auto &subfield : desc.GetFieldIterable(field.GetId())) { + fn(subfield, fn); + } + }; + + for (const auto &field : desc.GetTopLevelFields()) { + if (includeSubfields) { + fnAddSubfields(field, fnAddSubfields); + } else { + std::string fieldName = desc.GetQualifiedFieldName(field.GetId()); + if (addPrefixProvenance) + fieldName = provenance.Get() + "." + fieldName; + + AddFieldToEntry(fieldName, field.GetTypeName(), nullptr, provenance); + } + } +} + ROOT::NTupleSize_t ROOT::Experimental::RNTupleSingleProcessor::LoadEntry(ROOT::NTupleSize_t entryNumber) { if (entryNumber >= fNEntries || !fEntry) @@ -345,6 +382,13 @@ ROOT::Experimental::RNTupleChainProcessor::AddFieldToEntry(const std::string &fi return fInnerProcessors[fCurrentProcessorNumber]->AddFieldToEntry(fieldName, typeName, valuePtr, provenance); } +void ROOT::Experimental::RNTupleChainProcessor::AddAllFieldsToEntry( + const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) +{ + Initialize(); + fInnerProcessors[0]->AddAllFieldsToEntry(provenance, addPrefixProvenance, includeSubfields); +} + ROOT::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::LoadEntry(ROOT::NTupleSize_t entryNumber) { // If the requested entry number is lower than the current entry number, we have to again localise the correct local @@ -527,6 +571,15 @@ ROOT::Experimental::RNTupleJoinProcessor::AddFieldToEntry(const std::string &fie } } +void ROOT::Experimental::RNTupleJoinProcessor::AddAllFieldsToEntry( + const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) +{ + Initialize(); + fPrimaryProcessor->AddAllFieldsToEntry(provenance, addPrefixProvenance, includeSubfields); + auto auxProvenance = provenance.Evolve(fAuxiliaryProcessor->GetProcessorName()); + fAuxiliaryProcessor->AddAllFieldsToEntry(auxProvenance, /*addPrefixProvenance=*/true, includeSubfields); +} + void ROOT::Experimental::RNTupleJoinProcessor::SetAuxiliaryFieldValidity(bool isValid) { for (const auto &fieldIdx : fAuxiliaryFieldIdxs) { diff --git a/tree/ntuple/test/ntuple_processor.cxx b/tree/ntuple/test/ntuple_processor.cxx index e96c6a6b3f9c6..7706f3e56b9c2 100644 --- a/tree/ntuple/test/ntuple_processor.cxx +++ b/tree/ntuple/test/ntuple_processor.cxx @@ -305,6 +305,24 @@ TEST_F(RNTupleProcessorTest, Subfields) } } +TEST_F(RNTupleProcessorTest, AddAllFields) +{ + auto proc = RNTupleProcessor::Create({fNTupleNames[0], fFileNames[0]}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/false); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 4); +} + +TEST_F(RNTupleProcessorTest, AddAllFieldsAndSubfields) +{ + auto proc = RNTupleProcessor::Create({fNTupleNames[0], fFileNames[0]}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/true); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 13); +} + TEST_F(RNTupleProcessorTest, PrintStructureSingle) { auto proc = RNTupleProcessor::Create({fNTupleNames[0], fFileNames[0]}); @@ -798,3 +816,41 @@ TEST_F(RNTupleProcessorTest, PrintStructureJoinedChainAsymmetric) " +-----------------------------+\n"; EXPECT_EQ(exp2, os2.str()); } + +TEST_F(RNTupleProcessorTest, AddAllFieldsComposed) +{ + auto primaryProc = RNTupleProcessor::Create({fNTupleNames[0], fFileNames[0]}); + + auto auxProcIntermediate = RNTupleProcessor::Create({fNTupleNames[2], fFileNames[2]}, "ntuple_aux2"); + + auto auxProc = RNTupleProcessor::CreateJoin( + RNTupleProcessor::CreateChain({{fNTupleNames[1], fFileNames[1]}, {fNTupleNames[2], fFileNames[2]}}), + std::move(auxProcIntermediate), {"i"}); + + auto proc = RNTupleProcessor::CreateJoin(std::move(primaryProc), std::move(auxProc), {}); + + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/false); + auto fieldIdxs = entry->GetFieldIndices(); + + // 11 fields instead of 10, because the join field is also included. + EXPECT_EQ(fieldIdxs.size(), 11); +} + +TEST_F(RNTupleProcessorTest, AddAllFieldsAndSubfieldsComposed) +{ + auto primaryProc = RNTupleProcessor::Create({fNTupleNames[0], fFileNames[0]}); + + auto auxProcIntermediate = RNTupleProcessor::Create({fNTupleNames[2], fFileNames[2]}, "ntuple_aux2"); + + auto auxProc = RNTupleProcessor::CreateJoin( + RNTupleProcessor::CreateChain({{fNTupleNames[1], fFileNames[1]}, {fNTupleNames[2], fFileNames[2]}}), + std::move(auxProcIntermediate), {"i"}); + + auto proc = RNTupleProcessor::CreateJoin(std::move(primaryProc), std::move(auxProc), {}); + + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/true); + auto fieldIdxs = entry->GetFieldIndices(); + + // 36 fields instead of 25, because the join field is also included. + EXPECT_EQ(fieldIdxs.size(), 36); +} diff --git a/tree/ntuple/test/ntuple_processor_chain.cxx b/tree/ntuple/test/ntuple_processor_chain.cxx index 8e5c390e3f7e9..063fc1edda8a5 100644 --- a/tree/ntuple/test/ntuple_processor_chain.cxx +++ b/tree/ntuple/test/ntuple_processor_chain.cxx @@ -143,6 +143,26 @@ TEST_F(RNTupleChainProcessorTest, MissingFields) EXPECT_EQ(15, proc->GetNEntriesProcessed()); } +TEST_F(RNTupleChainProcessorTest, AddAllFields) +{ + auto proc = RNTupleProcessor::CreateChain( + {{fNTupleName, fFileNames[0]}, {fNTupleName, fFileNames[2]}, {fNTupleName, fFileNames[1]}}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/false); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 2); +} + +TEST_F(RNTupleChainProcessorTest, AddAllFieldsAndSubfields) +{ + auto proc = RNTupleProcessor::CreateChain( + {{fNTupleName, fFileNames[0]}, {fNTupleName, fFileNames[2]}, {fNTupleName, fFileNames[1]}}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/true); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 3); +} + TEST_F(RNTupleChainProcessorTest, EmptyNTuples) { FileRaii fileGuard("test_ntuple_processor_empty_ntuples.root"); diff --git a/tree/ntuple/test/ntuple_processor_join.cxx b/tree/ntuple/test/ntuple_processor_join.cxx index 5bd235cbae4b1..ebaf5d426cf37 100644 --- a/tree/ntuple/test/ntuple_processor_join.cxx +++ b/tree/ntuple/test/ntuple_processor_join.cxx @@ -164,6 +164,24 @@ TEST(RNTupleJoinProcessor, NameConflict) } } +TEST_F(RNTupleJoinProcessorTest, AddAllFields) +{ + auto proc = RNTupleProcessor::CreateJoin({fNTupleNames[1], fFileNames[1]}, {fNTupleNames[2], fFileNames[2]}, {}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/false); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 5); +} + +TEST_F(RNTupleJoinProcessorTest, AddAllFieldsAndSubfields) +{ + auto proc = RNTupleProcessor::CreateJoin({fNTupleNames[1], fFileNames[1]}, {fNTupleNames[2], fFileNames[2]}, {}); + auto entry = ROOT::Experimental::Internal::LoadFullRNTupleProcessorEntry(*proc, /*includeSubfields=*/true); + auto fieldIdxs = entry->GetFieldIndices(); + + EXPECT_EQ(fieldIdxs.size(), 6); +} + TEST_F(RNTupleJoinProcessorTest, UnalignedSingleJoinField) { auto proc = RNTupleProcessor::CreateJoin({fNTupleNames[0], fFileNames[0]}, {fNTupleNames[1], fFileNames[1]}, {"i"}); From 8ccf02827b779d2ed7644fbf7de9af2660930194 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Mon, 25 May 2026 12:46:16 +0200 Subject: [PATCH 5/7] [ntuple] Change internal name of join field To make it less potentially ambigious. --- tree/ntuple/src/RNTupleProcessor.cxx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tree/ntuple/src/RNTupleProcessor.cxx b/tree/ntuple/src/RNTupleProcessor.cxx index 9a5b8a3c02e60..a58f8381bc6d8 100644 --- a/tree/ntuple/src/RNTupleProcessor.cxx +++ b/tree/ntuple/src/RNTupleProcessor.cxx @@ -157,9 +157,9 @@ ROOT::Experimental::RNTupleSingleProcessor::CreateAndConnectField(const std::str std::string onDiskFieldName = qualifiedFieldName; - // Strip the "_join" prefix (for join fields) from the field name, if present. - if (onDiskFieldName.find("_join.") == 0) { - onDiskFieldName = onDiskFieldName.substr(6); + // Strip the "R_rntproc_join_" prefix (for join fields) from the field name, if present. + if (onDiskFieldName.find("R_rntproc_join_") == 0) { + onDiskFieldName = onDiskFieldName.substr(15); } const auto &desc = fPageSource->GetSharedDescriptorGuard().GetRef(); @@ -512,7 +512,7 @@ void ROOT::Experimental::RNTupleJoinProcessor::Initialize( // We prepend the name of the primary processor in this case to prevent reading from the wrong join field in // composed join operations. - auto fieldIdx = AddFieldToEntry(fProcessorName + "._join." + joinField, "std::uint64_t", nullptr, + auto fieldIdx = AddFieldToEntry(fProcessorName + ".R_rntproc_join_" + joinField, "std::uint64_t", nullptr, Internal::RNTupleProcessorProvenance(fProcessorName)); fJoinFieldIdxs.insert(fieldIdx); } From 7ec438f358e2f89935ead3caea702bcc8c96b5c5 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Mon, 8 Jun 2026 08:52:04 +0200 Subject: [PATCH 6/7] [DF][WIP] Add initial RNTupleProcessorDS Based on the single and chain processor. Some issues with array type handling still need to be sorted out. No support for IMT. --- tree/dataframe/CMakeLists.txt | 2 + .../dataframe/inc/ROOT/RNTupleProcessorDS.hxx | 122 +++ tree/dataframe/src/RNTupleProcessorDS.cxx | 580 ++++++++++++ tree/dataframe/test/CMakeLists.txt | 4 + .../test/datasource_ntuple_processor.cxx | 875 ++++++++++++++++++ tree/ntuple/inc/ROOT/RNTupleProcessor.hxx | 48 + .../ntuple/inc/ROOT/RNTupleProcessorEntry.hxx | 16 +- tree/ntuple/src/RNTupleProcessor.cxx | 77 +- tree/ntuple/src/RNTupleProcessorEntry.cxx | 8 +- 9 files changed, 1728 insertions(+), 4 deletions(-) create mode 100644 tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx create mode 100644 tree/dataframe/src/RNTupleProcessorDS.cxx create mode 100644 tree/dataframe/test/datasource_ntuple_processor.cxx diff --git a/tree/dataframe/CMakeLists.txt b/tree/dataframe/CMakeLists.txt index 01ddbc3e3774e..6cf5aa2cc3375 100644 --- a/tree/dataframe/CMakeLists.txt +++ b/tree/dataframe/CMakeLists.txt @@ -101,6 +101,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame ROOT/RDF/PyROOTHelpers.hxx ROOT/RDF/RDFDescription.hxx ROOT/RNTupleDS.hxx + ROOT/RNTupleProcessorDS.hxx ${RDATAFRAME_EXTRA_HEADERS} SOURCES src/RActionBase.cxx @@ -143,6 +144,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame src/RTrivialDS.cxx src/RDFDescription.cxx src/RNTupleDS.cxx + src/RNTupleProcessorDS.cxx DICTIONARY_OPTIONS -writeEmptyRootPCM ${RDATAFRAME_EXTRA_INCLUDES} diff --git a/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx b/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx new file mode 100644 index 0000000000000..bd3a415e987ce --- /dev/null +++ b/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx @@ -0,0 +1,122 @@ +/// \file RNTupleProcessorDS.hxx +/// \ingroup NTuple ROOT7 +/// \author Florine de Geus +/// \date 2025-06-18 +/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback +/// is welcome! + +/************************************************************************* + * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#ifndef ROOT_RNTupleProcessorDS +#define ROOT_RNTupleProcessorDS + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace ROOT { +class RDataFrame; +} // namespace ROOT + +namespace ROOT::Experimental::RDF { +class RNTupleProcessorDS final : public ROOT::RDF::RDataSource { + std::unique_ptr fProcessor; + + /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d). + /// This enables the column reader to rewire the field IDs when the file changes (chain), + /// using the fully qualified name as a search key in the descriptor of the other page sources. + std::vector fColumnNames; + std::vector fColumnTypes; + + std::vector fActiveColumnReaders; + + /// \brief Holds useful information about fields added to the RNTupleProcessorDS + struct RFieldInfo { + ROOT::DescriptorId_t fFieldId; + std::string fFieldName; + std::string fTypeName; + std::size_t fNRepetitions; + // Enable `std::vector::emplace_back` for this type + RFieldInfo(ROOT::DescriptorId_t fieldId, std::string_view fieldName, std::string_view typeName, + std::size_t nRepetitions) + : fFieldId(fieldId), fFieldName(fieldName), fTypeName(typeName), fNRepetitions(nRepetitions) + { + } + }; + + /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections, + /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info + /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an + /// `std::vector`, with + /// ~~~{.cpp} + /// struct Jet { + /// float pt; + /// float eta; + /// }; + /// ~~~ + /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec` + /// each. + /// + /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding + /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however, + /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set + /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec). + void AddField(const ROOT::RFieldBase &field, std::string_view colName, std::vector fieldInfos, + bool convertToRVec = true); + +public: + RNTupleProcessorDS(std::unique_ptr processor); + // Rule of five + RNTupleProcessorDS(const RNTupleProcessorDS &) = delete; + RNTupleProcessorDS &operator=(const RNTupleProcessorDS &) = delete; + RNTupleProcessorDS(RNTupleProcessorDS &&) = delete; + RNTupleProcessorDS &operator=(RNTupleProcessorDS &&) = delete; + ~RNTupleProcessorDS() final; + + void SetNSlots(unsigned int nSlots) final; + // FIXME(fdegeus) get correct number of files (needs to be added in RNTupleProcessor) + std::size_t GetNFiles() const final { return 1; } + const std::vector &GetColumnNames() const final { return fColumnNames; } + bool HasColumn(std::string_view colName) const final; + std::string GetTypeName(std::string_view colName) const final; + std::vector> GetEntryRanges() final; + std::string GetLabel() final { return "RNTupleProcessorDS"; } + + void Initialize() final; + void InitSlot(unsigned int slot, ULong64_t firstEntry) final; + void FinalizeSlot(unsigned int slot) final; + void Finalize() final; + + std::unique_ptr + GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &) final; + + ROOT::RDF::RSampleInfo + CreateSampleInfo(unsigned int, + const std::unordered_map &) const final; + + // Old API, unused + bool SetEntry(unsigned int, ULong64_t) final; + +protected: + Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final; +}; +} // namespace ROOT::Experimental::RDF + +namespace ROOT::Experimental::RDF { +RDataFrame FromRNTupleProcessor(std::unique_ptr processor); +} // namespace ROOT::Experimental::RDF + +#endif // ROOT_RNTupleProcessorDS diff --git a/tree/dataframe/src/RNTupleProcessorDS.cxx b/tree/dataframe/src/RNTupleProcessorDS.cxx new file mode 100644 index 0000000000000..5cb8538ee65cd --- /dev/null +++ b/tree/dataframe/src/RNTupleProcessorDS.cxx @@ -0,0 +1,580 @@ +/// \file RNTupleProcessorDS.cxx +/// \author Jakob Blomer +/// \author Enrico Guiraud +/// \date 2018-10-04 + +/************************************************************************* + * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +// namespace ROOT::Experimental::RDF { +// class RNTupleProcessorDS; +// } + +// clang-format off +/** +* \class ROOT::Experimental::RDF::RNTupleProcessorDS +* \ingroup dataframe +* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data. +* +* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple(). +* +* For each column containing an array or a collection, a corresponding column `#colname` is available to access +* `colname.size()` without reading and deserializing the collection values. +* +**/ +// clang-format on +namespace ROOT::Experimental::Internal::RDF { +class RRDFCardinalityFieldBase : public ROOT::RFieldBase { +protected: + // We construct these fields and know that they match the page source + void ReconcileOnDiskField(const RNTupleDescriptor &) final {} + + RRDFCardinalityFieldBase(std::string_view name, std::string_view type) + : ROOT::RFieldBase(name, type, ROOT::ENTupleStructure::kPlain, false /* isSimple */) + { + } + + // Field is only used for reading + void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); } + void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final + { + GenerateColumnsImpl(desc); + } + +public: + RRDFCardinalityFieldBase(const RRDFCardinalityFieldBase &other) = delete; + RRDFCardinalityFieldBase &operator=(const RRDFCardinalityFieldBase &other) = delete; + RRDFCardinalityFieldBase(RRDFCardinalityFieldBase &&other) = default; + RRDFCardinalityFieldBase &operator=(RRDFCardinalityFieldBase &&other) = default; + ~RRDFCardinalityFieldBase() override = default; + + const RColumnRepresentations &GetColumnRepresentations() const final + { + static RColumnRepresentations representations({{ENTupleColumnType::kSplitIndex64}, + {ENTupleColumnType::kIndex64}, + {ENTupleColumnType::kSplitIndex32}, + {ENTupleColumnType::kIndex32}}, + {}); + return representations; + } +}; + +/// An artificial field that transforms an RNTuple column that contains the offset of collections into +/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g. +/// `R_rdf_sizeof_jets` for a collection named `jets`. +/// +/// This is similar to the RCardinalityField but it presents itself as an integer type. +/// The template argument T must be an integral type. +template +class RRDFCardinalityField final : public RRDFCardinalityFieldBase { + static_assert(std::is_integral_v, "T must be an integral type"); + + inline void CheckSize(ROOT::NTupleSize_t size) const + { + if constexpr (std::is_same_v || std::is_same_v) + return; + if (size > static_cast(std::numeric_limits::max())) { + throw RException(R__FAIL(std::string("integer overflow in field ") + GetFieldName() + + ". Please read the column with a larger-sized integral type.")); + } + } + +protected: + std::unique_ptr CloneImpl(std::string_view newName) const final + { + return std::make_unique(newName); + } + void ConstructValue(void *where) const final { *static_cast(where) = 0; } + +public: + RRDFCardinalityField(std::string_view name) + : RRDFCardinalityFieldBase(name, ROOT::Internal::GetRenormalizedTypeName(typeid(T))) + { + } + RRDFCardinalityField(const RRDFCardinalityField &other) = delete; + RRDFCardinalityField &operator=(const RRDFCardinalityField &other) = delete; + RRDFCardinalityField(RRDFCardinalityField &&other) = default; + RRDFCardinalityField &operator=(RRDFCardinalityField &&other) = default; + ~RRDFCardinalityField() override = default; + + std::size_t GetValueSize() const final { return sizeof(T); } + std::size_t GetAlignment() const final { return alignof(T); } + + /// Get the number of elements of the collection identified by globalIndex + void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final + { + RNTupleLocalIndex collectionStart; + ROOT::NTupleSize_t size; + fPrincipalColumn->GetCollectionInfo(globalIndex, &collectionStart, &size); + CheckSize(size); + *static_cast(to) = size; + } + + /// Get the number of elements of the collection identified by clusterIndex + void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final + { + RNTupleLocalIndex collectionStart; + ROOT::NTupleSize_t size; + fPrincipalColumn->GetCollectionInfo(localIndex, &collectionStart, &size); + CheckSize(size); + *static_cast(to) = size; + } +}; + +/** + * @brief An artificial field that provides the size of a fixed-size array + * + * This is the implementation of `R_rdf_sizeof_column` in case `column` contains + * fixed-size arrays on disk. + */ +class RArraySizeField final : public ROOT::RFieldBase { +private: + std::size_t fArrayLength; + + std::unique_ptr CloneImpl(std::string_view newName) const final + { + return std::make_unique(newName, fArrayLength); + } + void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); } + void GenerateColumns(const ROOT::RNTupleDescriptor &) final {} + void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final + { + *static_cast(to) = fArrayLength; + } + void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final + { + *static_cast(to) = fArrayLength; + } + + // We construct these fields and know that they match the page source + void ReconcileOnDiskField(const RNTupleDescriptor &) final {} + +public: + RArraySizeField(std::string_view name, std::size_t arrayLength) + : ROOT::RFieldBase(name, ROOT::Internal::GetRenormalizedTypeName(typeid(std::size_t)), + ROOT::ENTupleStructure::kPlain, false /* isSimple */), + fArrayLength(arrayLength) + { + } + RArraySizeField(const RArraySizeField &other) = delete; + RArraySizeField &operator=(const RArraySizeField &other) = delete; + RArraySizeField(RArraySizeField &&other) = default; + RArraySizeField &operator=(RArraySizeField &&other) = default; + ~RArraySizeField() final = default; + + void ConstructValue(void *where) const final { *static_cast(where) = 0; } + std::size_t GetValueSize() const final { return sizeof(std::size_t); } + std::size_t GetAlignment() const final { return alignof(std::size_t); } +}; + +/// Every RDF column is represented by exactly one RNTuple field +class RNTupleProcessorColumnReader : public ROOT::Detail::RDF::RColumnReaderBase { + RNTupleProcessor *fProcessor; ///< The processor managed by the data source + RNTupleProcessorOptionalPtr fValuePtr; ///< Container for the value of the column + +public: + RNTupleProcessorColumnReader(RNTupleProcessor &processor, RNTupleProcessorOptionalPtr &valuePtr) + : fProcessor(&processor), fValuePtr(std::move(valuePtr)) + { + } + ~RNTupleProcessorColumnReader() override = default; + + void *GetImpl(Long64_t entry) final + { + assert(entry == static_cast(fProcessor->GetCurrentEntryNumber())); + return fValuePtr.GetRawPtr(); + } +}; +} // namespace ROOT::Experimental::Internal::RDF + +ROOT::Experimental::RDF::RNTupleProcessorDS::~RNTupleProcessorDS() = default; + +void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBase &field, std::string_view colName, + std::vector fieldInfos, + bool convertToRVec) +{ + // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple + // using the following types and with a top-level field named "event" of type Event: + // + // struct Event { + // int id; + // std::vector tracks; + // }; + // struct Track { + // std::vector hits; + // }; + // struct Hit { + // float x; + // float y; + // }; + // + // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord). + // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the + // tree of sub fields and expose the following RDF columns: + // + // "event" [Event] + // "event.id" [int] + // "event.tracks" [RVec] + // "R_rdf_sizeof_event.tracks" [unsigned int] + // "event.tracks.hits" [RVec>] + // "R_rdf_sizeof_event.tracks.hits" [RVec] + // "event.tracks.hits.x" [RVec>] + // "R_rdf_sizeof_event.tracks.hits.x" [RVec] + // "event.tracks.hits.y" [RVec>] + // "R_rdf_sizeof_event.tracks.hits.y" [RVec] + + const auto &nRepetitions = field.GetNRepetitions(); + if ((field.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) { + // The field is a collection or a fixed-size array. + // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits", + // fieldInfos would already contain the fieldID of "event.tracks" + fieldInfos.emplace_back(field.GetOnDiskId(), field.GetFieldName(), field.GetTypeName(), nRepetitions); + } + + if (field.GetStructure() == ROOT::ENTupleStructure::kCollection) { + // Inner fields of collections are provided as projected collections of only that inner field, + // E.g. we provide a projected collection RVec> for "event.tracks.hits.x" in the example + // above. + bool representableAsRVec = + convertToRVec && (field.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" || + field.GetTypeName().substr(0, 12) == "std::vector<" || field.GetTypeName() == ""); + const auto *f = field.GetConstSubfields()[0]; + AddField(*f, colName, fieldInfos, representableAsRVec); + + // Note that at the end of the recursion, we handled the inner sub collections as well as the + // collection as whole, so we are done. + return; + + } else if (nRepetitions > 0) { + // Fixed-size array, same logic as ROOT::RVec. + const auto *f = field.GetConstSubfields()[0]; + AddField(*f, colName, fieldInfos); + return; + } else if (field.GetStructure() == ROOT::ENTupleStructure::kRecord) { + // Inner fields of records are provided as individual RDF columns, e.g. "event.id" + for (const auto &f : field.GetConstSubfields()) { + auto innerName = colName.empty() ? f->GetFieldName() : (std::string(colName) + "." + f->GetFieldName()); + // Inner fields of collections of records are always exposed as ROOT::RVec + AddField(*f, innerName, fieldInfos); + } + + // Do not add untyped record fields + if (field.GetTypeName() == "") + return; + } + + // The fieldID could be the root field or the class of fieldId might not be loaded. + // In these cases, only the inner fields are exposed as RDF columns. + auto fieldOrException = ROOT::RFieldBase::Create(field.GetFieldName(), field.GetTypeName()); + if (!fieldOrException) + return; + auto valueField = fieldOrException.Unwrap(); + if (const auto cardinalityField = dynamic_cast(valueField.get())) { + // Cardinality fields in RDataFrame are presented as integers + if (cardinalityField->As32Bit()) { + valueField = std::make_unique>(field.GetFieldName()); + } else if (cardinalityField->As64Bit()) { + valueField = std::make_unique>(field.GetFieldName()); + } else { + R__ASSERT(false && "cardinality field stored with an unexpected integer type"); + } + } + + valueField->SetOnDiskId(field.GetOnDiskId()); + auto valueSubfields = valueField->GetMutableSubfields(); + const auto fieldSubfields = field.GetConstSubfields(); + for (unsigned i = 0; i < valueSubfields.size(); ++i) { + valueSubfields[i]->SetOnDiskId(fieldSubfields[i]->GetOnDiskId()); + } + + std::unique_ptr cardinalityField; + // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks") + if (!fieldInfos.empty()) { + const auto &info = fieldInfos.back(); + const std::string name = "R_rdf_sizeof_" + info.fFieldName; + if (info.fNRepetitions > 0) { + cardinalityField = std::make_unique(name, info.fNRepetitions); + } else { + cardinalityField = std::make_unique>(name); + } + cardinalityField->SetOnDiskId(info.fFieldId); + } + + for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) { + const auto &fieldInfo = *i; + + const auto valueFieldName = valueField->GetFieldName(); + + if (fieldInfo.fNRepetitions > 0) { + // Fixed-size array, read it as ROOT::RVec in memory + valueField = + std::make_unique(valueFieldName, valueField->Clone("_0"), fieldInfo.fNRepetitions); + } else { + // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep + // their original type. + if (convertToRVec) { + valueField = std::make_unique(valueFieldName, valueField->Clone("_0")); + } else { + auto outerFieldType = fieldInfo.fTypeName; + valueField = ROOT::RFieldBase::Create(valueFieldName, outerFieldType).Unwrap(); + } + } + + valueField->SetOnDiskId(fieldInfo.fFieldId); + + // Skip the inner-most collection level to construct the cardinality column + // It's taken care of by the `if (!fieldInfos.empty())` scope above + if (i != fieldInfos.rbegin()) { + const auto cardinalityFieldName = cardinalityField->GetFieldName(); + if (fieldInfo.fNRepetitions > 0) { + // This collection level refers to a fixed-size array + cardinalityField = std::make_unique( + cardinalityFieldName, cardinalityField->Clone("_0"), fieldInfo.fNRepetitions); + } else { + // This collection level refers to an RVec + cardinalityField = std::make_unique(cardinalityFieldName, cardinalityField->Clone("_0")); + } + + cardinalityField->SetOnDiskId(fieldInfo.fFieldId); + } + } + + if (cardinalityField) { + std::string cardinalityFieldName = "R_rdf_sizeof_" + std::string(colName); + fColumnNames.emplace_back(cardinalityFieldName); + fColumnTypes.emplace_back(cardinalityField->GetTypeName()); + fProcessor->AddFieldToEntry(std::move(cardinalityField), cardinalityFieldName, nullptr, + Internal::RNTupleProcessorProvenance()); + } + + fieldInfos.emplace_back(field.GetOnDiskId(), field.GetFieldName(), field.GetTypeName(), nRepetitions); + fColumnNames.emplace_back(colName); + fColumnTypes.emplace_back(valueField->GetTypeName()); + + if (fieldInfos.size() > 1) { + fProcessor->AddFieldToEntry(std::move(valueField), std::string(colName), nullptr, + Internal::RNTupleProcessorProvenance()); + } +} + +ROOT::Experimental::RDF::RNTupleProcessorDS::RNTupleProcessorDS( + std::unique_ptr processor) + : fProcessor(std::move(processor)) +{ + // Do not add the subfields now, this is handled by AddField. + fProcessor->AddAllFieldsToEntry(Internal::RNTupleProcessorProvenance(), /*addPrefixProvenance=*/false, + /*includeSubfields=*/false); + const auto &entry = fProcessor->GetEntry(); + for (auto fieldIdx : entry.GetFieldIndices()) { + const auto &field = entry.GetValue(fieldIdx).GetField(); + AddField(field, entry.GetQualifiedFieldName(fieldIdx), + std::vector()); + } +} + +ROOT::RDF::RDataSource::Record_t +ROOT::Experimental::RDF::RNTupleProcessorDS::GetColumnReadersImpl(std::string_view /* name */, + const std::type_info & /* ti */) +{ + // This datasource uses the newer GetColumnReaders() API + return {}; +} + +std::unique_ptr +ROOT::Experimental::RDF::RNTupleProcessorDS::GetColumnReaders(unsigned int /* slot */, std::string_view name, + const std::type_info &tid) +{ + // At this point we can assume that `name` will be found in fColumnNames + const auto requestedType = ROOT::Internal::GetRenormalizedTypeName(ROOT::Internal::RDF::TypeID2TypeName(tid)); + + // First check if a field with the requested type already exists. If that is the case, we can immediately create a + // column reader for it. + if (fProcessor->GetEntry().FindFieldIndex(name, requestedType)) { + auto valuePtr = fProcessor->RequestField(std::string(name), requestedType); + auto reader = std::make_unique(*fProcessor, valuePtr); + fActiveColumnReaders.emplace_back(reader.get()); + + return reader; + } + + // Secondly, check whether a field with the same name, but a different on-disk type exists. If this is not the case, + // first try to add the field. If the field cannot be added, throw an exception. + // Otherwise, we create a new field with the requested type and add it to the processor entry + // before creating a column reader for it. + auto fieldIdx = fProcessor->GetEntry().FindFieldIndex(name); + if (!fieldIdx) { + try { + auto valuePtr = fProcessor->RequestField(std::string(name), requestedType); + auto reader = std::make_unique(*fProcessor, valuePtr); + fActiveColumnReaders.emplace_back(reader.get()); + + return reader; + } catch (const ROOT::RException &) { + throw std::runtime_error("RNTupleProcessorDS: Column \"" + std::string(name) + "\" does not exist"); + } + } + + const auto &field = fProcessor->GetEntry().GetField(*fieldIdx); + const std::string strName = std::string(name); + std::unique_ptr newField; + if (dynamic_cast(&field)) { + if (requestedType == "bool") { + newField = std::make_unique>(strName); + } else if (requestedType == "char") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::int8_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::uint8_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::int16_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::uint16_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::int32_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::uint32_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::int64_t") { + newField = std::make_unique>(strName); + } else if (requestedType == "std::uint64_t") { + newField = std::make_unique>(strName); + } else { + throw std::runtime_error("RNTupleProcessorDS: Could not create field with type \"" + requestedType + + "\" for column \"" + strName + "\""); + } + } else { + auto newAltProtoFieldOrException = ROOT::RFieldBase::Create(strName, requestedType); + if (!newAltProtoFieldOrException) { + throw std::runtime_error("RNTupleProcessorDS: Could not create field with type \"" + requestedType + + "\" for column \"" + strName + "\""); + } + newField = newAltProtoFieldOrException.Unwrap(); + } + newField->SetOnDiskId(field.GetOnDiskId()); + + try { + fProcessor->AddFieldToEntry(std::move(newField), strName, nullptr, Internal::RNTupleProcessorProvenance()); + } catch (const ROOT::RException &) { + std::string msg = "RNTupleProcessorDS: invalid type \"" + requestedType + "\" for column \"" + strName + + "\" with on-disk type \"" + field.GetTypeName() + "\""; + throw std::runtime_error(msg); + } + + auto valuePtr = fProcessor->RequestField(std::string(name), requestedType); + auto reader = std::make_unique(*fProcessor, valuePtr); + fActiveColumnReaders.emplace_back(reader.get()); + + return reader; +} + +std::vector> ROOT::Experimental::RDF::RNTupleProcessorDS::GetEntryRanges() +{ + std::vector> ranges; + if (fProcessor->GetNEntries() == fProcessor->GetNEntriesProcessed()) + return ranges; + ranges.emplace_back(0, fProcessor->GetNEntries()); + return ranges; +} + +void ROOT::Experimental::RDF::RNTupleProcessorDS::InitSlot(unsigned int /* slot */, ULong64_t /* firstEntry */) +{ + assert(fNSlots == 1 && "MT not supported"); +} + +void ROOT::Experimental::RDF::RNTupleProcessorDS::FinalizeSlot(unsigned int /* slot */) +{ + assert(fNSlots == 1 && "MT not supported"); +} + +std::string ROOT::Experimental::RDF::RNTupleProcessorDS::GetTypeName(std::string_view colName) const +{ + auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName); + + if (colNamePos == fColumnNames.end()) { + auto msg = std::string("RNTupleProcessorDS: There is no column with name \"") + std::string(colName) + "\""; + throw std::runtime_error(msg); + } + + const auto index = std::distance(fColumnNames.begin(), colNamePos); + return fColumnTypes[index]; +} + +bool ROOT::Experimental::RDF::RNTupleProcessorDS::HasColumn(std::string_view colName) const +{ + return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end(); +} + +void ROOT::Experimental::RDF::RNTupleProcessorDS::Initialize() +{ + fProcessor->Reset(); + fProcessor->Connect(fProcessor->GetEntry().GetFieldIndices(), Internal::RNTupleProcessorProvenance(), + /*updateFields=*/false); + return; +} + +void ROOT::Experimental::RDF::RNTupleProcessorDS::Finalize() +{ + return; +} + +void ROOT::Experimental::RDF::RNTupleProcessorDS::SetNSlots(unsigned int nSlots) +{ + assert(fNSlots == 0); + assert(nSlots == 1); + fNSlots = nSlots; +} + +bool ROOT::Experimental::RDF::RNTupleProcessorDS::SetEntry(unsigned int /* slot */, ULong64_t entry) +{ + if (fProcessor->GetCurrentEntryNumber() != entry) + return fProcessor->LoadEntry(entry) != ROOT::kInvalidNTupleIndex; + return true; +} + +ROOT::RDataFrame +ROOT::Experimental::RDF::FromRNTupleProcessor(std::unique_ptr processor) +{ + if (dynamic_cast(processor.get())) { + throw std::runtime_error("RNTupleProcessorDS: Joins are not yet supported"); + } + return ROOT::RDataFrame(std::make_unique(std::move(processor))); +} + +ROOT::RDF::RSampleInfo ROOT::Experimental::RDF::RNTupleProcessorDS::CreateSampleInfo( + unsigned int /* slot */, const std::unordered_map &sampleMap) const +{ + const auto &ntupleID = fProcessor->GetProcessorName(); + + // TODO: There is no support for RNTuple in RDatasetSpec, thus the sample map + // is always empty at the moment. + if (sampleMap.empty()) + return ROOT::RDF::RSampleInfo(ntupleID, std::make_pair(0, fProcessor->GetNEntries())); + + if (sampleMap.find(ntupleID) == sampleMap.end()) + throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples."); + + return ROOT::RDF::RSampleInfo(ntupleID, std::make_pair(0, fProcessor->GetNEntries()), sampleMap.at(ntupleID)); +} diff --git a/tree/dataframe/test/CMakeLists.txt b/tree/dataframe/test/CMakeLists.txt index 490b883fe7cbb..c8e87ef65e224 100644 --- a/tree/dataframe/test/CMakeLists.txt +++ b/tree/dataframe/test/CMakeLists.txt @@ -113,6 +113,7 @@ ROOT_GENERATE_DICTIONARY(ClassWithNestedSameNameDict DEPENDENCIES RIO) ROOT_ADD_GTEST(datasource_ntuple datasource_ntuple.cxx LIBRARIES ROOTDataFrame) +ROOT_ADD_GTEST(datasource_ntuple_processor datasource_ntuple_processor.cxx LIBRARIES ROOTDataFrame) if(MSVC) set_source_files_properties(datasource_ntuple.cxx PROPERTIES COMPILE_FLAGS /bigobj) endif() @@ -133,6 +134,9 @@ endif() ROOT_GENERATE_DICTIONARY(ClassWithArraysDict ${CMAKE_CURRENT_SOURCE_DIR}/ClassWithArrays.h MODULE datasource_ntuple LINKDEF ClassWithArraysLinkDef.h OPTIONS -inlineInputHeader DEPENDENCIES ROOTVecOps) +ROOT_GENERATE_DICTIONARY(ClassWithArraysDictProc ${CMAKE_CURRENT_SOURCE_DIR}/ClassWithArrays.h + MODULE datasource_ntuple_processor LINKDEF ClassWithArraysLinkDef.h OPTIONS -inlineInputHeader + DEPENDENCIES ROOTVecOps) #### TESTS REQUIRING EXTRA ROOT FEATURES #### if (imt) diff --git a/tree/dataframe/test/datasource_ntuple_processor.cxx b/tree/dataframe/test/datasource_ntuple_processor.cxx new file mode 100644 index 0000000000000..4b004f64d7af1 --- /dev/null +++ b/tree/dataframe/test/datasource_ntuple_processor.cxx @@ -0,0 +1,875 @@ +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include + +#include "ClassWithArrays.h" + +#include + +using ROOT::RNTupleModel; +using ROOT::RNTupleWriter; +using ROOT::Experimental::RNTupleProcessor; +using ROOT::Experimental::RDF::RNTupleProcessorDS; + +namespace { + +class FileRAII { +private: + std::string fPath; + +public: + explicit FileRAII(const std::string &path) : fPath(path) {} + FileRAII(const FileRAII &) = delete; + FileRAII &operator=(const FileRAII &) = delete; + ~FileRAII() { std::remove(fPath.c_str()); } + std::string GetPath() const { return fPath; } +}; + +} // namespace + +template +void EXPECT_VEC_EQ(const V1 &v1, const V2 &v2) +{ + ASSERT_EQ(v1.size(), v2.size()); + for (std::size_t i = 0ul; i < v1.size(); ++i) { + EXPECT_EQ(v1[i], v2[i]); + } +} + +class RNTupleProcessorDSTest : public ::testing::Test { +protected: + std::string fFileName = "RNTupleProcessorDS_test.root"; + std::string fNtplName = "ntuple"; + + void SetUp() override + { + auto model = RNTupleModel::Create(); + *model->MakeField("pt") = 42; + *model->MakeField("energy") = 7; + *model->MakeField("tag") = "xyz"; + *model->MakeField>("jets") = std::vector{1.f, 2.f}; + auto fldNnlo = model->MakeField>>("nnlo"); + fldNnlo->push_back(std::vector()); + fldNnlo->push_back(std::vector{1.0}); + fldNnlo->push_back(std::vector{1.0, 2.0, 4.0, 8.0}); + *model->MakeField("rvec") = ROOT::RVecI{1, 2, 3}; + auto fldElectron = model->MakeField("electron"); + fldElectron->pt = 137.0; + auto fldVecElectron = model->MakeField>("VecElectron"); + for (int i = 0; i < 128; ++i) + fldVecElectron->push_back(*fldElectron); + auto fldNElectron = std::make_unique>>("nElectron"); + model->AddProjectedField(std::move(fldNElectron), [](const std::string &) { return "VecElectron"; }); + { + auto ntuple = RNTupleWriter::Recreate(std::move(model), fNtplName, fFileName); + ntuple->Fill(); + } + } + + void TearDown() override { std::remove(fFileName.c_str()); } +}; + +TEST_F(RNTupleProcessorDSTest, ColTypeNames) +{ + RNTupleProcessorDS ds(RNTupleProcessor::Create({fNtplName, fFileName})); + + auto colNames = ds.GetColumnNames(); + ASSERT_EQ(16, colNames.size()); + + EXPECT_TRUE(ds.HasColumn("pt")); + EXPECT_TRUE(ds.HasColumn("energy")); + EXPECT_TRUE(ds.HasColumn("rvec")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_nnlo")); + EXPECT_TRUE(ds.HasColumn("electron")); + EXPECT_TRUE(ds.HasColumn("electron.pt")); + EXPECT_TRUE(ds.HasColumn("VecElectron")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_VecElectron")); + EXPECT_TRUE(ds.HasColumn("VecElectron.pt")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_VecElectron.pt")); + EXPECT_TRUE(ds.HasColumn("nElectron")); + EXPECT_FALSE(ds.HasColumn("Address")); + + EXPECT_STREQ("std::string", ds.GetTypeName("tag").c_str()); + EXPECT_STREQ("float", ds.GetTypeName("energy").c_str()); + EXPECT_EQ(ROOT::Internal::GetRenormalizedTypeName(typeid(std::size_t)), ds.GetTypeName("R_rdf_sizeof_jets")); + EXPECT_STREQ("ROOT::VecOps::RVec", ds.GetTypeName("rvec").c_str()); + EXPECT_STREQ("std::uint64_t", ds.GetTypeName("nElectron").c_str()); + + try { + ds.GetTypeName("Address"); + FAIL() << "should not be able to get a type for a non-existent column"; + } catch (const std::runtime_error &err) { + EXPECT_THAT(err.what(), testing::HasSubstr("RNTupleProcessorDS: There is no column with name \"Address\"")); + } +} + +TEST_F(RNTupleProcessorDSTest, NFiles) +{ + RNTupleProcessorDS ds(RNTupleProcessor::Create({fNtplName, fFileName})); + + EXPECT_EQ(1, ds.GetNFiles()); +} + +TEST_F(RNTupleProcessorDSTest, Basic) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + auto pt_max = df.Max("pt"); + auto pt_sum = df.Sum("pt"); + EXPECT_EQ(42.f, *pt_max); + EXPECT_EQ(42.f, *pt_sum); + EXPECT_EQ(*pt_sum, *pt_max); +} + +TEST_F(RNTupleProcessorDSTest, CardinalityColumn) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + + // Check that the special column # works without jitting... + auto identity = [](std::size_t sz) { return sz; }; + auto max_njets = df.Define("njets", identity, {"R_rdf_sizeof_jets"}).Max("njets"); + auto max_njets2 = df.Max("#jets"); + auto max_rvec = df.Max("#rvec"); + EXPECT_EQ(*max_njets, *max_njets2); + EXPECT_EQ(*max_njets, 2); + // EXPECT_EQ(*max_rvec, 3); + + // ...and with jitting + auto max_njets_jitted = df.Define("njets", "R_rdf_sizeof_jets").Max("njets"); + auto max_njets_jitted2 = df.Define("njets", "#jets").Max("njets"); + auto max_njets_jitted3 = df.Max("#jets"); + auto max_rvec2 = df.Max("#rvec"); + EXPECT_EQ(*max_njets_jitted, *max_njets_jitted2); + EXPECT_EQ(*max_njets_jitted3, *max_njets_jitted2); + EXPECT_EQ(2, *max_njets_jitted); + EXPECT_EQ(3, *max_rvec2); +} + +TEST_F(RNTupleProcessorDSTest, ProjectedCardinalityColumn) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + + EXPECT_EQ(128u, *df.Filter("nElectron == 128").Max("nElectron")); + + EXPECT_EQ(128u, *df.Filter([](std::uint64_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](std::int32_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](std::uint32_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](std::int16_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](std::uint16_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](std::uint8_t x) { return x == 128; }, {"nElectron"}).Max("nElectron")); + EXPECT_EQ(128u, *df.Filter([](bool x) { return x; }, {"nElectron"}).Max("nElectron")); + try { + *df.Filter([](std::int8_t x) { return x == 0; }, {"nElectron"}).Count(); + FAIL() << "integer overflow should fail"; + } catch (const ROOT::RException &e) { + EXPECT_THAT(e.what(), ::testing::HasSubstr("integer overflow")); + } +} + +static void ReadTest(const std::string &name, const std::string &fname) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({name, fname})); + + auto count = df.Count(); + auto sumpt = df.Sum("pt"); + auto tag = df.Take("tag"); + auto njets = df.Take("R_rdf_sizeof_jets"); + auto sumjets = df.Sum>("jets"); + auto sumnnlosize = df.Sum>("R_rdf_sizeof_nnlo"); + auto sumvec = [](float red, const ROOT::RVec> &nnlo) { + auto sum = 0.f; + for (auto &v : nnlo) + for (auto e : v) + sum += e; + return red + sum; + }; + auto sumnnlo = df.Aggregate(sumvec, std::plus{}, "nnlo", 0.f); + auto rvec = df.Take("rvec"); + auto vectorasrvec = df.Take("jets"); + auto sumElectronPt = df.Aggregate([](float &acc, const Electron &e) { acc += e.pt; }, + [](float a, float b) { return a + b; }, "electron"); + auto sumVecElectronPt1 = df.Aggregate( + [](float &acc, const ROOT::RVec &ve) { + for (const auto &e : ve) + acc += e.pt; + }, + [](float a, float b) { return a + b; }, "VecElectron"); + auto sumVecElectronPt2 = df.Aggregate( + [](float &acc, const ROOT::RVec &pts) { + for (auto pt : pts) + acc += pt; + }, + [](float a, float b) { return a + b; }, "VecElectron.pt"); + + EXPECT_EQ(1ull, count.GetValue()); + EXPECT_DOUBLE_EQ(42.f, sumpt.GetValue()); + EXPECT_EQ(1ull, tag.GetValue().size()); + EXPECT_EQ(std::string("xyz"), tag.GetValue()[0]); + EXPECT_EQ(1ull, njets.GetValue().size()); + EXPECT_EQ(2u, njets.GetValue()[0]); + EXPECT_EQ(3.f, sumjets.GetValue()); + EXPECT_EQ(16.f, sumnnlo.GetValue()); + EXPECT_EQ(5u, sumnnlosize.GetValue()); + EXPECT_TRUE(All(rvec->at(0) == ROOT::RVecI{1, 2, 3})); + EXPECT_TRUE(All(vectorasrvec->at(0) == ROOT::RVecF{1.f, 2.f})); + EXPECT_FLOAT_EQ(137.0, sumElectronPt.GetValue()); + EXPECT_FLOAT_EQ(128. * 137.0, sumVecElectronPt1.GetValue()); + EXPECT_FLOAT_EQ(128. * 137.0, sumVecElectronPt2.GetValue()); +} + +static void ChainTest(const std::string &name, const std::string &fname) +{ + auto proc1 = RNTupleProcessor::Create({name, fname}); + auto df1 = ROOT::RDataFrame(std::make_unique(std::move(proc1))); + EXPECT_DOUBLE_EQ(42.0, df1.Sum("pt").GetValue()); + + auto proc2 = RNTupleProcessor::CreateChain({{name, fname}, {name, fname}}); + auto df2 = ROOT::RDataFrame(std::make_unique(std::move(proc2))); + EXPECT_DOUBLE_EQ(84.0, df2.Sum("pt").GetValue()); + + std::vector openSpecs(1000, {name, fname}); + auto proc1000 = RNTupleProcessor::CreateChain(openSpecs); + auto df1000 = ROOT::RDataFrame(std::make_unique(std::move(proc1000))); + EXPECT_DOUBLE_EQ(42000.0, df1000.Sum("pt").GetValue()); + + FileRAII guardFile1("RNTupleProcessorDS_test_chain_1.root"); + FileRAII guardFile2("RNTupleProcessorDS_test_chain_2.root"); + FileRAII guardFile3("RNTupleProcessorDS_test_chain_3.root"); + + { + auto model = RNTupleModel::Create(); + auto fldElectron = model->MakeField("e"); + auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile1.GetPath()); + fldElectron->pt = 1.0; + writer->Fill(); + } + { + auto model = RNTupleModel::Create(); + // Add dummy field to ensure that the Electron fields in the files have different field IDs + model->MakeField("dummy1"); + auto fldElectron = model->MakeField("e"); + auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile2.GetPath()); + // empty file in chain, no entries + } + { + auto model = RNTupleModel::Create(); + model->MakeField("dummy1"); + model->MakeField("dummy2"); + auto fldElectron = model->MakeField("e"); + auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile3.GetPath()); + fldElectron->pt = 2.0; + writer->Fill(); + fldElectron->pt = 3.0; + writer->Fill(); + } + + auto proc3 = RNTupleProcessor::CreateChain( + {{"chain", guardFile1.GetPath()}, {"chain", guardFile2.GetPath()}, {"chain", guardFile3.GetPath()}}); + auto df3 = ROOT::Experimental::RDF::FromRNTupleProcessor(std::move(proc3)); + // TODO Address NFiles in the processor + // EXPECT_EQ(3, df3.Describe().GetNFiles()); + auto sumElectronPt = + df3.Aggregate([](float &acc, const Electron &e) { acc += e.pt; }, [](float a, float b) { return a + b; }, "e"); + EXPECT_FLOAT_EQ(6.0, sumElectronPt.GetValue()); + // Trigger the event loop again + EXPECT_FLOAT_EQ(6.0, sumElectronPt.GetValue()); +} + +TEST_F(RNTupleProcessorDSTest, Read) +{ + ReadTest(fNtplName, fFileName); +} + +TEST_F(RNTupleProcessorDSTest, Chain) +{ + ChainTest(fNtplName, fFileName); +} + +TEST(RNTupleProcessorDS, Join) +{ + FileRAII guardFile1("RNTupleProcessorDS_test_join_1.root"); + FileRAII guardFile2("RNTupleProcessorDS_test_join_2.root"); + + { + auto model = RNTupleModel::Create(); + auto fldElectron = model->MakeField("electrons"); + auto writer = RNTupleWriter::Recreate(std::move(model), "primary", guardFile1.GetPath()); + for (unsigned i = 0; i < 10; ++i) { + fldElectron->pt = static_cast(i); + writer->Fill(); + } + } + { + auto model = RNTupleModel::Create(); + auto fldJets = model->MakeField>("jets"); + auto writer = RNTupleWriter::Recreate(std::move(model), "auxiliary", guardFile2.GetPath()); + for (unsigned i = 0; i < 10; ++i) { + *fldJets = {static_cast(i) * .1f, static_cast(i) * .2f}; + writer->Fill(); + } + } + + auto proc = RNTupleProcessor::CreateJoin({"primary", guardFile1.GetPath()}, {"auxiliary", guardFile2.GetPath()}, {}); + try { + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(std::move(proc)); + FAIL() << "creating a datasource from a join processor should throw"; + } catch (const std::runtime_error &err) { + EXPECT_THAT(err.what(), testing::HasSubstr("RNTupleProcessorDS: Joins are not yet supported")); + } +} + +// #ifdef R__USE_IMT +// struct IMTRAII { +// IMTRAII() { ROOT::EnableImplicitMT(); } +// ~IMTRAII() { ROOT::DisableImplicitMT(); } +// }; + +// TEST_F(RNTupleProcessorDSTest, ReadMT) +// { +// IMTRAII _; + +// ReadTest(fNtplName, fFileName); +// } + +// TEST_F(RNTupleProcessorDSTest, ChainMT) +// { +// IMTRAII _; + +// ChainTest(fNtplName, fFileName); +// } + +// TEST_F(RNTupleProcessorDSTest, ChainTailScheduling) +// { +// IMTRAII _; + +// FileRAII guardFile1("RNTupleProcessorDS_test_chain_tail_scheduling_1.root"); +// FileRAII guardFile2("RNTupleProcessorDS_test_chain_tail_scheduling_2.root"); +// FileRAII guardFile3("RNTupleProcessorDS_test_chain_tail_scheduling_3.root"); + +// { +// auto model = RNTupleModel::Create(); +// auto ptrX = model->MakeField("x"); +// auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile1.GetPath()); +// for (unsigned i = 0; i < 2; ++i) { +// *ptrX = i; +// writer->Fill(); +// writer->CommitCluster(); +// } +// } +// { +// auto model = RNTupleModel::Create(); +// model->MakeField("x"); +// auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile2.GetPath()); +// // Empty file +// } +// { +// auto model = RNTupleModel::Create(); +// auto ptrX = model->MakeField("x"); +// auto writer = RNTupleWriter::Recreate(std::move(model), "chain", guardFile3.GetPath()); +// for (unsigned i = 0; i < 11; ++i) { +// *ptrX = i; +// writer->Fill(); +// writer->CommitCluster(); +// } +// } + +// auto df = ROOT::RDataFrame(std::make_unique( +// "chain", std::vector{guardFile1.GetPath(), guardFile2.GetPath(), guardFile3.GetPath()})); +// EXPECT_EQ(3, df.Describe().GetNFiles()); +// auto sumX = df.Aggregate([](int &acc, int x) { acc += x; }, [](int a, int b) { return a + b; }, "x"); +// EXPECT_EQ(56, sumX.GetValue()); +// } +// #endif + +TEST_F(RNTupleProcessorDSTest, ModifyColumnValues) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + auto dfCorrected = + df.Define("jetsCorrected", + [](ROOT::RVec &jets) { + for (auto &jet : jets) { + jet *= 1.1; + } + return jets; + }, + {"jets"}) + .Define("jetsFiltered", [](const ROOT::RVec &jets) { return jets[jets <= 2.f]; }, {"jetsCorrected"}); + + ROOT::RVec jetsExpected{1.f, 2.f}; + ROOT::RVec jetsCorrectedExpected{1.1f, 2.2f}; + ROOT::RVec jetsFilteredExpected{1.1f}; + + // In the same action, we expect "jets" and "jetsCorrected" to be equal, with the modified values after the first + // "Define", because "jets" is modified in-place here. + dfCorrected.Foreach( + [&jetsCorrectedExpected, &jetsFilteredExpected](const ROOT::RVec &jets, const ROOT::RVec &jetsC, + const ROOT::RVec &jetsF) { + EXPECT_VEC_EQ(jetsCorrectedExpected, jets); + EXPECT_VEC_EQ(jets, jetsC); + EXPECT_VEC_EQ(jetsFilteredExpected, jetsF); + }, + {"jets", "jetsCorrected", "jetsFiltered"}); + + // Even though "jetsCorrected" is not used, "jets" should still be modified because "jetsFiltered" is defined from + // "jetsCorrected". + dfCorrected.Foreach( + [&jetsCorrectedExpected, &jetsFilteredExpected](const ROOT::RVec &jets, const ROOT::RVec &jetsF) { + EXPECT_VEC_EQ(jetsCorrectedExpected, jets); + EXPECT_VEC_EQ(jetsFilteredExpected, jetsF); + }, + {"jets", "jetsFiltered"}); + + // In separate actions, we expect "jets" to have its original on-disk value, but "jetsCorrected" (and by + // extension, "jetsFiltered") to have the modified values. + auto jets = dfCorrected.Take>("jets").GetValue(); + auto jetsCorrected = dfCorrected.Take>("jetsCorrected").GetValue(); + auto jetsFiltered = dfCorrected.Take>("jetsFiltered").GetValue(); + + ASSERT_EQ(1ull, jets.size()); + ASSERT_EQ(1ull, jetsCorrected.size()); + ASSERT_EQ(1ull, jetsFiltered.size()); + EXPECT_VEC_EQ(jetsExpected, jets[0]); + EXPECT_VEC_EQ(jetsCorrectedExpected, jetsCorrected[0]); + EXPECT_VEC_EQ(jetsFilteredExpected, jetsFiltered[0]); +} + +TEST(RNTupleProcessorDS, CollectionFieldTypes) +{ + // NB: The other tests already cover std::vector and std::array (and nestings thereof). + FileRAII fileGuard{"RNTupleProcessorDS_test_collection_field_types.root"}; + { + auto model = RNTupleModel::Create(); + *model->MakeField>("intSet") = std::set{3, 1, 2}; + *model->MakeField>("electronSet") = + std::set{Electron{1.f}, Electron{2.f}, Electron{3.f}}; + *model->MakeField>>("electronSetVec") = + std::set>{{Electron{1.f}, Electron{2.f}}, {Electron{3.f}}}; + *model->MakeField>>("electronSetSet") = + std::set>{{Electron{1.f}, Electron{2.f}}, {Electron{3.f}}}; + + // Untyped collection + auto fldJetPt = ROOT::RVectorField::CreateUntyped("jet_pt", std::make_unique>("_0")); + model->AddField(std::move(fldJetPt)); + + // Untyped collection with an untyped record, with a projection + std::vector> muon; + muon.emplace_back(std::make_unique>("muon_pt")); + auto fldMuonRecord = std::make_unique("_0", std::move(muon)); + auto fldMuons = ROOT::RVectorField::CreateUntyped("muon", std::move(fldMuonRecord)); + model->AddField(std::move(fldMuons)); + auto muonPtField = ROOT::RFieldBase::Create("muon_pt", "ROOT::VecOps::RVec").Unwrap(); + model->AddProjectedField(std::move(muonPtField), [](const std::string &fieldName) { + if (fieldName == "muon_pt") + return "muon"; + else + return "muon._0.muon_pt"; + }); + + auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard.GetPath()); + ntuple->Fill(); + } + + RNTupleProcessorDS ds(RNTupleProcessor::Create({"ntuple", fileGuard.GetPath()})); + + auto colNames = ds.GetColumnNames(); + + ASSERT_EQ(20, colNames.size()); + + EXPECT_TRUE(ds.HasColumn("intSet")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_intSet")); + EXPECT_TRUE(ds.HasColumn("electronSet")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_electronSet")); + EXPECT_TRUE(ds.HasColumn("electronSet.pt")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_electronSet.pt")); + EXPECT_TRUE(ds.HasColumn("jet_pt")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_jet_pt")); + EXPECT_TRUE(ds.HasColumn("muon_pt")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_muon_pt")); + EXPECT_TRUE(ds.HasColumn("muon.muon_pt")); + EXPECT_TRUE(ds.HasColumn("R_rdf_sizeof_muon.muon_pt")); + + EXPECT_STREQ("std::set", ds.GetTypeName("intSet").c_str()); + EXPECT_STREQ("std::set", ds.GetTypeName("electronSet").c_str()); + EXPECT_STREQ("ROOT::VecOps::RVec", ds.GetTypeName("electronSet.pt").c_str()); + EXPECT_STREQ("std::set>", ds.GetTypeName("electronSetSet").c_str()); + EXPECT_STREQ("ROOT::VecOps::RVec>", ds.GetTypeName("electronSetSet.pt").c_str()); + + // TODO(fdegeus) figure out how to (cleanly) still add inner vectors etc. as RVecs. + EXPECT_STREQ("std::set>", ds.GetTypeName("electronSetVec").c_str()); + EXPECT_STREQ("ROOT::VecOps::RVec", ds.GetTypeName("jet_pt").c_str()); + EXPECT_STREQ("ROOT::VecOps::RVec", ds.GetTypeName("muon_pt").c_str()); + EXPECT_STREQ(ds.GetTypeName("muon.muon_pt").c_str(), ds.GetTypeName("muon_pt").c_str()); +} + +TEST_F(RNTupleProcessorDSTest, AlternativeColumnTypes) +{ + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + + // Alternative inner type + auto usingDouble = df.Define("nJets", [](const ROOT::RVec &jets) { return jets.size(); }, {"jets"}) + .Take>("nJets") + .GetValue(); + EXPECT_EQ(2ull, ROOT::VecOps::Sum(usingDouble)); + + // Alternative outer type (original on-disk type) + auto asStdVec = df.Define("nJets", [](const std::vector &jets) { return jets.size(); }, {"jets"}) + .Take>("nJets") + .GetValue(); + EXPECT_EQ(2ull, ROOT::VecOps::Sum(asStdVec)); + + // Original datasource protofield type + auto asRVec = df.Define("nJets", [](const ROOT::RVec &jets) { return jets.size(); }, {"jets"}) + .Take>("nJets") + .GetValue(); + EXPECT_EQ(2ull, ROOT::VecOps::Sum(asRVec)); + + auto multipleAlternativeTypes = + df.Define("nJets", [](const std::vector &jets) { return jets.size(); }, {"jets"}) + .Define("smallestJet", [](const std::multiset &jets) { return *(jets.begin()); }, {"jets"}) + .Min("smallestJet") + .GetValue(); + EXPECT_FLOAT_EQ(1.f, multipleAlternativeTypes); + + auto jitted = df.Define("jetsType", "ROOT::Internal::RDF::TypeID2TypeName(typeid(jets))") + .Take("jetsType") + .GetValue(); + EXPECT_EQ("ROOT::VecOps::RVec", jitted[0]); + + // Original protofield type of ROOT::RVec>, test with different ROOT::RVec/std::vector combinations + auto nestedStdVecStdVec = + df.Define("nNnlo", [](const std::vector> &nnlo) { return nnlo.size(); }, {"nnlo"}) + .Take>("nNnlo") + .GetValue(); + EXPECT_EQ(3ull, ROOT::VecOps::Sum(nestedStdVecStdVec)); + + auto nestedStdVecRVec = + df.Define("nNnlo", [](const std::vector> &nnlo) { return nnlo.size(); }, {"nnlo"}) + .Take>("nNnlo") + .GetValue(); + EXPECT_EQ(3ull, ROOT::VecOps::Sum(nestedStdVecRVec)); + + auto nestedRVecStdVec = + df.Define("nNnlo", [](const ROOT::RVec> &nnlo) { return nnlo.size(); }, {"nnlo"}) + .Take>("nNnlo") + .GetValue(); + EXPECT_EQ(3ull, ROOT::VecOps::Sum(nestedRVecStdVec)); + + // Check that the ROOT RtypesCore typedefs are handled properly + auto usingTypeAlias1 = df.Define("nJets", [](const std::vector &jets) { return jets.size(); }, {"jets"}) + .Take>("nJets") + .GetValue(); + EXPECT_EQ(2ull, ROOT::VecOps::Sum(usingTypeAlias1)); + + auto usingTypeAlias2 = + df.Define("vecSum", [](const ROOT::RVec &rvec) { return ROOT::VecOps::Sum(rvec); }, {"rvec"}) + .Take("vecSum") + .GetValue(); + EXPECT_EQ(6, usingTypeAlias2[0]); + + try { + // Invalid outer field type + auto dfInvalid = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + dfInvalid.Define("firstJet", [](const std::pair &jets) { return jets.first; }, {"jets"}) + .Take>("firstJet") + .GetValue(); + FAIL() << "specifying templated actions with incompatible column types should throw"; + } catch (const std::runtime_error &err) { + EXPECT_THAT(err.what(), + testing::HasSubstr("RNTupleProcessorDS: invalid type \"std::pair\" for column " + "\"jets\" with on-disk type \"std::vector\"")); + } + + try { + // Invalid inner field types + auto dfInvalid = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({fNtplName, fFileName})); + dfInvalid.Define("nJets", [](const std::vector &jets) { return jets.size(); }, {"jets"}) + .Take>("nJets") + .GetValue(); + FAIL() << "specifying templated actions with incompatible column types should throw"; + } catch (const std::runtime_error &err) { + EXPECT_THAT(err.what(), + testing::HasSubstr("RNTupleProcessorDS: invalid type \"std::vector\" for column " + "\"jets\" with on-disk type \"std::vector\"")); + } +} + +const static std::array>, 3> arraysDatasetCol4El{ + ROOT::RVec>{ + {ROOT::RVecI{1, 2}, ROOT::RVecI{4, 5, 6}, ROOT::RVecI{42, 43, 44, 45}}, + }, + ROOT::RVec>{ + {ROOT::RVecI{55, 66}, ROOT::RVecI{-18, 33, std::numeric_limits::max()}, + ROOT::RVecI{42, std::numeric_limits::min(), 44, 1888}}, + {ROOT::RVecI{10, 11}, ROOT::RVecI{-32, -33, -34}, ROOT::RVecI{2953, -20, 343212}}, + }, + ROOT::RVec>{ + {ROOT::RVecI{-32, -33, -34}, ROOT::RVecI{42, -43, 44, -45}, ROOT::RVecI{1}}, + {ROOT::RVecI{0, 0, std::numeric_limits::min(), 42}, ROOT::RVecI{-32, -33, -34}, + ROOT::RVecI{30000, 40000, 50000}}, + {ROOT::RVecI{0, 0, std::numeric_limits::min(), 42}, ROOT::RVecI{-32, -33, -34}, + ROOT::RVecI{std::numeric_limits::min(), std::numeric_limits::max(), + std::numeric_limits::min()}}, + }}; + +class RNTupleProcessorDSArraysDataset : public ::testing::Test { +protected: + std::string fFileName = "rntupleProcessords_arrays_dataset.root"; + std::string fNtplName = "ntuple"; + + void SetUp() override + { + auto model = RNTupleModel::Create(); + auto col1_arr = model->MakeField>("col1_arr"); + auto col2_arr_rvec = model->MakeField>("col2_arr_rvec"); + auto col3_rvec_arr = model->MakeField>>("col3_rvec_arr"); + // FIXME(fdegeus) + // auto col4_arr_rvec_arr_rvec = + // model->MakeField>, 3>>("col4_arr_rvec_arr_rvec"); + // auto col5_class_with_arrays = model->MakeField("col5_class_with_arrays"); + auto ntuple = RNTupleWriter::Recreate(std::move(model), fNtplName, fFileName); + for (int i = 1; i <= 5; i++) { + + *col1_arr = {1 * i, 2 * i, 3 * i}; + + *col2_arr_rvec = {ROOT::RVecI{i}, ROOT::RVecI{1 * i, 2 * i, 3 * i}, + ROOT::RVecI{-1 * i, -2 * i, -3 * i, -4 * i, -5 * i}}; + + ROOT::RVec> rvecVal; + rvecVal.reserve(i); + for (auto j = 0; j < i; j++) { + rvecVal.emplace_back(std::array{1 * i, 2 * i, 3 * i}); + } + *col3_rvec_arr = rvecVal; + + // FIXME(fdegeus) + // *col4_arr_rvec_arr_rvec = arraysDatasetCol4El; + + // std::array col5_member_1{42.f, std::numeric_limits::max(), 0.f}; + // std::array col5_member_2{ROOT::RVecF{1.f}, ROOT::RVecF{2.f, 3.f}, + // ROOT::RVecF{4.f, 5.f, 6.f}}; ROOT::RVec> col5_member_3{col5_member_1, col5_member_1, + // col5_member_1, col5_member_1}; *col5_class_with_arrays = + // ClassWithArrays{std::move(col5_member_1), std::move(col5_member_2), std::move(col5_member_3)}; + + ntuple->Fill(); + } + } + + void TearDown() override { std::remove(fFileName.c_str()); } +}; + +void ReadArraysTest(const std::string &name, const std::string &fname) +{ + // These tests use the columns that contain std::array data on disk as RVecs + // reading them into RVecs and checking their values. + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({name, fname})); + + auto count = df.Count(); + + auto col1Take = df.Take("col1_arr"); + auto col2Take = df.Take>("col2_arr_rvec"); + auto col3Take = df.Take>("col3_rvec_arr"); + // FIXME(fdegeus) + // auto col4Take = df.Take>>>("col4_arr_rvec_arr_rvec"); + // auto col5TakeArr = df.Take("col5_class_with_arrays.fArr"); + // auto col5TakeArrRVec = df.Take>("col5_class_with_arrays.fArrRVec"); + // auto col5TakeRVecArr = df.Take>("col5_class_with_arrays.fRVecArr"); + + EXPECT_EQ(5ull, count.GetValue()); + + auto col1Val = col1Take.GetValue(); + EXPECT_EQ(col1Val.size(), 5); + for (int i = 1; i <= 5; i++) { + EXPECT_VEC_EQ(col1Val[i - 1], ROOT::RVecI{1 * i, 2 * i, 3 * i}); + } + + auto col2Val = col2Take.GetValue(); + EXPECT_EQ(col2Val.size(), 5); + for (int i = 1; i <= 5; i++) { + EXPECT_VEC_EQ(col2Val[i - 1][0], ROOT::RVecI{i}); + EXPECT_VEC_EQ(col2Val[i - 1][1], ROOT::RVecI{1 * i, 2 * i, 3 * i}); + EXPECT_VEC_EQ(col2Val[i - 1][2], ROOT::RVecI{-1 * i, -2 * i, -3 * i, -4 * i, -5 * i}); + } + + auto col3Val = col3Take.GetValue(); + EXPECT_EQ(col3Val.size(), 5); + for (int i = 1; i <= 5; i++) { + for (auto j = 0; j < i; j++) { + EXPECT_VEC_EQ(col3Val[i - 1][j], std::array{1 * i, 2 * i, 3 * i}); + } + } + + // FIXME(fdegeus) + // auto col4Val = col4Take.GetValue(); + // EXPECT_EQ(col4Val.size(), 5); + + // // Each event contains the values of arraysDatasetCol4El in column 4 + // for (unsigned i = 0; i < 5; i++) { + // // Unpack the three top-level std::arrays + // const auto &arr1 = col4Val[i][0]; + // const auto &arr2 = col4Val[i][1]; + // const auto &arr3 = col4Val[i][2]; + + // // The first contains an RVec with only one element + // for (unsigned k = 0; k < arr1.size(); k++) { + // EXPECT_VEC_EQ(arr1[0][k], arraysDatasetCol4El[0][0][k]); + // } + + // // The others contain more than one element in their sub-level RVecs + // for (unsigned j = 0; j < arr2.size(); j++) { + // for (unsigned k = 0; k < arr2[j].size(); k++) { + // EXPECT_VEC_EQ(arr2[j][k], arraysDatasetCol4El[1][j][k]); + // } + // } + + // for (unsigned j = 0; j < arr3.size(); j++) { + // for (unsigned k = 0; k < arr3[j].size(); k++) { + // EXPECT_VEC_EQ(arr3[j][k], arraysDatasetCol4El[2][j][k]); + // } + // } + // } + + // auto col5ValArr = col5TakeArr.GetValue(); + // auto col5ValArrRVec = col5TakeArrRVec.GetValue(); + // auto col5ValRVecArr = col5TakeRVecArr.GetValue(); + // EXPECT_EQ(col5ValArr.size(), 5); + // EXPECT_EQ(col5ValArrRVec.size(), 5); + // EXPECT_EQ(col5ValRVecArr.size(), 5); + + // std::array col5_member_1{42.f, std::numeric_limits::max(), 0.f}; + // for (const auto &arr : col5ValArr) { + // EXPECT_VEC_EQ(arr, col5_member_1); + // } + + // std::array col5_member_2{ROOT::RVecF{1.f}, ROOT::RVecF{2.f, 3.f}, ROOT::RVecF{4.f, 5.f, 6.f}}; + // for (const auto &arr : col5ValArrRVec) { + // for (unsigned i = 0; i < 3; i++) { + // EXPECT_VEC_EQ(arr[i], col5_member_2[i]); + // } + // } + + // ROOT::RVec> col5_member_3{col5_member_1, col5_member_1, col5_member_1, col5_member_1}; + // for (const auto &arr : col5ValRVecArr) { + // for (unsigned i = 0; i < 4; i++) { + // EXPECT_VEC_EQ(arr[i], col5_member_3[i]); + // } + // } +} + +TEST_F(RNTupleProcessorDSArraysDataset, Read) +{ + ReadArraysTest(fNtplName, fFileName); +} + +// #ifdef R__USE_IMT + +// TEST_F(RNTupleProcessorDSArraysDataset, ReadMT) +// { +// IMTRAII _; + +// ReadArraysTest(fNtplName, fFileName); +// } +// #endif + +void UseArraysAsRVec(const std::string &name, const std::string &fname) +{ + // These tests use the columns that contain std::array data on disk as RVecs + // passing them as arguments to functions that expect RVecs. + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({name, fname})); + + auto df1 = df.Define("col1_short", [](const ROOT::RVecI &arr) { return ROOT::VecOps::Take(arr, 2); }, {"col1_arr"}); + auto take1 = df1.Take("col1_short"); + + // Exercise jitting + auto df2 = df1.Define("sum_col2", + "int sum = 0; for (const auto &v: col3_rvec_arr) sum += ROOT::VecOps::Sum(v); return sum;"); + auto take2 = df2.Take("sum_col2"); + + auto take1Val = take1.GetValue(); + for (int i = 0; i < 5; i++) { + EXPECT_VEC_EQ(take1Val[i], ROOT::RVecI{(i + 1), (i + 1) * 2}); + } + + std::array expectedVals{6, 24, 54, 96, 150}; + EXPECT_VEC_EQ(take2.GetValue(), expectedVals); +} + +TEST_F(RNTupleProcessorDSArraysDataset, UseArrays) +{ + UseArraysAsRVec(fNtplName, fFileName); +} + +// #ifdef R__USE_IMT + +// TEST_F(RNTupleProcessorDSArraysDataset, UseArraysMT) +// { +// IMTRAII _; + +// UseArraysAsRVec(fNtplName, fFileName); +// } +// #endif + +void UseArraySizeColumn(const std::string &name, const std::string &fname) +{ + // These tests use the columns that contain std::array data on disk as RVecs + // checking the size of the collection with the R_rdf_sizeof_* columns + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(RNTupleProcessor::Create({name, fname})); + + auto sizeOfCol1 = df.Take("R_rdf_sizeof_col1_arr"); + // Use # here to exercise that too + auto sizeOfCol2 = df.Take>("#col2_arr_rvec"); + // auto sizeOfCol4 = df.Take>>>("R_rdf_sizeof_col4_arr_rvec_arr_rvec"); + + EXPECT_VEC_EQ(sizeOfCol1.GetValue(), std::array{3, 3, 3, 3, 3}); + + for (const auto &sizeVec : sizeOfCol2) { + EXPECT_VEC_EQ(sizeVec, ROOT::RVecULL{1, 3, 5}); + } + + // FIXME(fdegeus) + // // The sizes of elements in col4 are + // // { { { 2, 3, 4 } }, { { 2, 3, 4 }, { 2, 3, 3 } }, { { 3, 4, 1 }, { 4, 3, 3 }, { 4, 3, 3 } } } + // auto sizeOfCol4Val = sizeOfCol4.GetValue(); + // for (unsigned i = 0; i < 5; i++) { + // // Unpack the three top-level vectors + // const auto &rvec1 = sizeOfCol4Val[i][0]; + // const auto &rvec2 = sizeOfCol4Val[i][1]; + // const auto &rvec3 = sizeOfCol4Val[i][2]; + + // // The first contains an RVec with only one element + // EXPECT_VEC_EQ(rvec1[0], ROOT::RVecULL{2, 3, 4}); + + // // The others contain more than one element in their sub-level RVecs + // ROOT::RVec expectedRVecs2{{2, 3, 4}, {2, 3, 3}}; + // for (unsigned j = 0; j < rvec2.size(); j++) { + // EXPECT_VEC_EQ(rvec2[j], expectedRVecs2[j]); + // } + + // ROOT::RVec expectedRVecs3{{3, 4, 1}, {4, 3, 3}, {4, 3, 3}}; + // for (unsigned j = 0; j < rvec3.size(); j++) { + // EXPECT_VEC_EQ(rvec3[j], expectedRVecs3[j]); + // } + // } +} + +TEST_F(RNTupleProcessorDSArraysDataset, UseArraySizeColumn) +{ + UseArraySizeColumn(fNtplName, fFileName); +} + +// #ifdef R__USE_IMT + +// TEST_F(RNTupleProcessorDSArraysDataset, UseArraySizeColumnMT) +// { +// IMTRAII _; + +// UseArraySizeColumn(fNtplName, fFileName); +// } +// #endif diff --git a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx index 3951677f18d47..c05bdee30d1a0 100644 --- a/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleProcessor.hxx @@ -37,8 +37,16 @@ namespace Internal { struct RNTupleProcessorEntryLoader; const RNTupleProcessorEntry * LoadFullRNTupleProcessorEntry(ROOT::Experimental::RNTupleProcessor &processor, bool includeSubfields); + +namespace RDF { +class RNTupleProcessorColumnReader; +} } // namespace Internal +namespace RDF { +class RNTupleProcessorDS; +} // namespace RDF + // clang-format off /** \class ROOT::Experimental::RNTupleOpenSpec @@ -255,6 +263,8 @@ class RNTupleProcessor { friend class RNTupleSingleProcessor; friend class RNTupleChainProcessor; friend class RNTupleJoinProcessor; + friend class ROOT::Experimental::RDF::RNTupleProcessorDS; + friend class ROOT::Experimental::Internal::RDF::RNTupleProcessorColumnReader; protected: std::string fProcessorName; @@ -287,6 +297,13 @@ protected: virtual void Connect(const std::unordered_set &fieldIdxs, const Internal::RNTupleProcessorProvenance &provenance, bool updateFields) = 0; + void Reset() + { + fNEntriesProcessed = 0; + fCurrentEntryNumber = ROOT::kInvalidDescriptorId; + fCurrentProcessorNumber = 0; + } + ///////////////////////////////////////////////////////////////////////////// /// \brief Load the entry identified by the provided entry number. /// @@ -295,6 +312,9 @@ protected: /// \return `entryNumber` if the entry was successfully loaded, `kInvalidNTupleIndex` otherwise. virtual ROOT::NTupleSize_t LoadEntry(ROOT::NTupleSize_t entryNumber) = 0; + Internal::RNTupleProcessorEntry &GetEntry() { return *fEntry; } + const Internal::RNTupleProcessorEntry &GetEntry() const { return *fEntry; } + ///////////////////////////////////////////////////////////////////////////// /// \brief Get the total number of entries in this processor virtual ROOT::NTupleSize_t GetNEntries() = 0; @@ -322,6 +342,10 @@ protected: AddFieldToEntry(const std::string &fieldName, const std::string &typeName, void *valuePtr, const Internal::RNTupleProcessorProvenance &provenance) = 0; + virtual Internal::RNTupleProcessorEntry::FieldIndex_t + AddFieldToEntry(std::unique_ptr field, const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) = 0; + // TODO docs virtual void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) = 0; @@ -401,6 +425,16 @@ public: if constexpr (!std::is_void_v) { typeName = ROOT::Internal::GetRenormalizedTypeName(typeid(T)); } + + // The field already exists, so return the existing one. + if (auto fieldIdx = fEntry->FindFieldIndex(fieldName, typeName)) { + auto value = fEntry->GetValue(*fieldIdx); + // Need to check that the provided pointer is not conflicting with the existing one + // TODO do this more elegantly/better. At least throw an exception instead of an assert. + assert(valuePtr == nullptr || valuePtr == value.GetPtr().get()); + return RNTupleProcessorOptionalPtr(fEntry.get(), *fieldIdx); + } + auto fieldIdx = AddFieldToEntry(fieldName, typeName, valuePtr, Internal::RNTupleProcessorProvenance()); return RNTupleProcessorOptionalPtr(fEntry.get(), fieldIdx); } @@ -601,6 +635,8 @@ private: std::unique_ptr CreateAndConnectField(const std::string &qualifiedFieldName, const std::string &typeName); + std::unique_ptr ConnectField(std::unique_ptr field); + ///////////////////////////////////////////////////////////////////////////// /// \brief Initialize the processor by creating an (initially empty) `fEntry`, or setting an existing one. /// @@ -643,6 +679,10 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + Internal::RNTupleProcessorEntry::FieldIndex_t + AddFieldToEntry(std::unique_ptr field, const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) final; @@ -741,6 +781,10 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + Internal::RNTupleProcessorEntry::FieldIndex_t + AddFieldToEntry(std::unique_ptr field, const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) final; @@ -844,6 +888,10 @@ private: const std::string &fieldName, const std::string &typeName, void *valuePtr = nullptr, const Internal::RNTupleProcessorProvenance &provenance = Internal::RNTupleProcessorProvenance()) final; + Internal::RNTupleProcessorEntry::FieldIndex_t + AddFieldToEntry(std::unique_ptr field, const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) final; + void AddAllFieldsToEntry(const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) final; diff --git a/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx b/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx index d72ac8f282247..a61aba8bb2d4a 100644 --- a/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx @@ -159,9 +159,12 @@ public: /// /// \param[in] canonicalFieldName The name of the field in the entry, including its processor name prefixes and /// parent field names, if applicable. + /// \param[in] typeName Type of the field, if relevant. If no type name is provided, the first field corresponding to + /// the provided name is returned. /// /// \return A `std::optional` containing the field index if it was found. - std::optional FindFieldIndex(std::string_view canonicalFieldName, std::string_view typeName) const; + std::optional + FindFieldIndex(std::string_view canonicalFieldName, std::string_view typeName = "") const; ///////////////////////////////////////////////////////////////////////////// /// \brief Add a new field to the entry. @@ -183,6 +186,17 @@ public: /// \param[in] field The new field to use in the entry. void UpdateField(FieldIndex_t fieldIdx, std::unique_ptr field); + ///////////////////////////////////////////////////////////////////////////// + /// \brief Get a field by name (and optionally type) + /// + /// \param[in] canonicalFieldName The name of the field in the entry, including its processor name prefixes and + /// parent field names, if applicable. + /// \param[in] typeName Type of the field, if relevant. If no type name is provided, the first field corresponding to + /// the provided name is returned. + /// + /// \return A pointer to the field, or a `nullptr` if the field does not exist in the entry. + const ROOT::RFieldBase &GetField(FieldIndex_t fieldIdx) const; + ///////////////////////////////////////////////////////////////////////////// /// \brief Bind a new value pointer to a field in the entry. /// diff --git a/tree/ntuple/src/RNTupleProcessor.cxx b/tree/ntuple/src/RNTupleProcessor.cxx index a58f8381bc6d8..f60afb6972ed8 100644 --- a/tree/ntuple/src/RNTupleProcessor.cxx +++ b/tree/ntuple/src/RNTupleProcessor.cxx @@ -163,8 +163,6 @@ ROOT::Experimental::RNTupleSingleProcessor::CreateAndConnectField(const std::str } const auto &desc = fPageSource->GetSharedDescriptorGuard().GetRef(); - ROOT::RFieldZero fieldZero; - ROOT::Internal::SetAllowFieldSubstitutions(fieldZero, true); const auto onDiskFieldId = desc.FindFieldId(onDiskFieldName); @@ -187,6 +185,14 @@ ROOT::Experimental::RNTupleSingleProcessor::CreateAndConnectField(const std::str } field->SetOnDiskId(onDiskFieldId); + return ConnectField(std::move(field)); +} + +std::unique_ptr +ROOT::Experimental::RNTupleSingleProcessor::ConnectField(std::unique_ptr field) +{ + ROOT::RFieldZero fieldZero; + ROOT::Internal::SetAllowFieldSubstitutions(fieldZero, true); fieldZero.Attach(std::move(field)); ROOT::Internal::CallConnectPageSourceOnField(fieldZero, *fPageSource); return std::move(fieldZero.ReleaseSubfields()[0]); @@ -219,6 +225,33 @@ ROOT::Experimental::RNTupleSingleProcessor::AddFieldToEntry(const std::string &f return *fieldIdx; } +ROOT::Experimental::Internal::RNTupleProcessorEntry::FieldIndex_t +ROOT::Experimental::RNTupleSingleProcessor::AddFieldToEntry(std::unique_ptr field, + const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) +{ + auto fieldIdx = fEntry->FindFieldIndex(fieldName, field->GetTypeName()); + if (!fieldIdx) { + // Strip the processor name prefix(es), if present. + std::string qualifiedFieldName = fieldName; + if (provenance.IsPresentInFieldName(qualifiedFieldName)) { + qualifiedFieldName = qualifiedFieldName.substr(provenance.Get().size() + 1); + } + + field = ConnectField(std::move(field)); + + if (!field) { + throw RException(R__FAIL("cannot register field with name \"" + qualifiedFieldName + + "\" because it is not present in the on-disk information of the RNTuple(s) this " + "processor is created from")); + } + + fieldIdx = fEntry->AddField(qualifiedFieldName, std::move(field), valuePtr, provenance); + } + + return *fieldIdx; +} + void ROOT::Experimental::RNTupleSingleProcessor::AddAllFieldsToEntry( const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) { @@ -347,6 +380,7 @@ ROOT::NTupleSize_t ROOT::Experimental::RNTupleChainProcessor::GetNEntries() for (unsigned i = 0; i < fInnerProcessors.size(); ++i) { if (fInnerNEntries[i] == kInvalidNTupleIndex) { + fInnerProcessors[i]->Initialize(fEntry); fInnerNEntries[i] = fInnerProcessors[i]->GetNEntries(); } @@ -382,6 +416,14 @@ ROOT::Experimental::RNTupleChainProcessor::AddFieldToEntry(const std::string &fi return fInnerProcessors[fCurrentProcessorNumber]->AddFieldToEntry(fieldName, typeName, valuePtr, provenance); } +ROOT::Experimental::Internal::RNTupleProcessorEntry::FieldIndex_t +ROOT::Experimental::RNTupleChainProcessor::AddFieldToEntry(std::unique_ptr field, + const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) +{ + return fInnerProcessors[fCurrentProcessorNumber]->AddFieldToEntry(std::move(field), fieldName, valuePtr, provenance); +} + void ROOT::Experimental::RNTupleChainProcessor::AddAllFieldsToEntry( const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) { @@ -571,6 +613,37 @@ ROOT::Experimental::RNTupleJoinProcessor::AddFieldToEntry(const std::string &fie } } +ROOT::Experimental::Internal::RNTupleProcessorEntry::FieldIndex_t +ROOT::Experimental::RNTupleJoinProcessor::AddFieldToEntry(std::unique_ptr field, + const std::string &fieldName, void *valuePtr, + const Internal::RNTupleProcessorProvenance &provenance) +{ + auto auxProvenance = provenance.Evolve(fAuxiliaryProcessor->GetProcessorName()); + if (auxProvenance.IsPresentInFieldName(fieldName)) { + // If the primaryProcessor has a field with the name of the auxProcessor (either as a "proper" field or because + // the primary processor itself is a join where its auxProcessor bears the same name as the current auxProcessor), + // there will be name conflicts, so error out. + if (fPrimaryProcessor->CanReadFieldFromDisk(fieldName)) { + throw RException(R__FAIL("ambiguous field name: \"" + fieldName + + "\" is present in the primary RNTupleProcessor \"" + + fPrimaryProcessor->GetProcessorName() + + "\", but may also refer to a field in the auxiliary RNTupleProcessor named \"" + + fAuxiliaryProcessor->GetProcessorName() + + "\". To avoid this ambiguity, rename the auxiliary RNTupleProcessor.")); + } + + auto fieldIdx = fAuxiliaryProcessor->AddFieldToEntry(std::move(field), fieldName, valuePtr, auxProvenance); + if (fieldIdx) + fAuxiliaryFieldIdxs.insert(fieldIdx); + return fieldIdx; + } else { + auto fieldIdx = fPrimaryProcessor->AddFieldToEntry(std::move(field), fieldName, valuePtr, provenance); + if (fieldIdx) + fFieldIdxs.insert(fieldIdx); + return fieldIdx; + } +} + void ROOT::Experimental::RNTupleJoinProcessor::AddAllFieldsToEntry( const Internal::RNTupleProcessorProvenance &provenance, bool addPrefixProvenance, bool includeSubfields) { diff --git a/tree/ntuple/src/RNTupleProcessorEntry.cxx b/tree/ntuple/src/RNTupleProcessorEntry.cxx index bfe126d9da9c0..b25f04e68df10 100644 --- a/tree/ntuple/src/RNTupleProcessorEntry.cxx +++ b/tree/ntuple/src/RNTupleProcessorEntry.cxx @@ -43,7 +43,7 @@ ROOT::Experimental::Internal::RNTupleProcessorEntry::FindFieldIndex(std::string_ assert(!fieldIdxs.empty()); for (auto idx : fieldIdxs) { - if (fProcessorValues[idx].fField->GetTypeName() == typeName) { + if (fProcessorValues[idx].fField->GetTypeName() == typeName || typeName == "") { return idx; } } @@ -76,6 +76,12 @@ ROOT::Experimental::Internal::RNTupleProcessorEntry::AddField(const std::string return fieldIdx; } +const ROOT::RFieldBase &ROOT::Experimental::Internal::RNTupleProcessorEntry::GetField(FieldIndex_t fieldIdx) const +{ + assert(fieldIdx < fProcessorValues.size()); + return *fProcessorValues[fieldIdx].fField; +} + void ROOT::Experimental::Internal::RNTupleProcessorEntry::UpdateField(FieldIndex_t fieldIdx, std::unique_ptr field) { From 56d04c0102d5531962a332f3befba657b6c09309 Mon Sep 17 00:00:00 2001 From: Florine de Geus Date: Thu, 11 Jun 2026 10:51:44 +0200 Subject: [PATCH 7/7] [DF][WIP] Add RNTupleProcessorDS support for joins TODO: test unaligned joins --- .../dataframe/inc/ROOT/RNTupleProcessorDS.hxx | 3 +- tree/dataframe/src/RNTupleProcessorDS.cxx | 24 ++++--- .../test/datasource_ntuple_processor.cxx | 70 +++++++++++++++++-- .../ntuple/inc/ROOT/RNTupleProcessorEntry.hxx | 4 +- 4 files changed, 83 insertions(+), 18 deletions(-) diff --git a/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx b/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx index bd3a415e987ce..539cf184169b9 100644 --- a/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx +++ b/tree/dataframe/inc/ROOT/RNTupleProcessorDS.hxx @@ -74,7 +74,8 @@ class RNTupleProcessorDS final : public ROOT::RDF::RDataSource { /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however, /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec). - void AddField(const ROOT::RFieldBase &field, std::string_view colName, std::vector fieldInfos, + void AddField(const ROOT::RFieldBase &field, std::string_view colName, + Internal::RNTupleProcessorProvenance procProvenance, std::vector fieldInfos, bool convertToRVec = true); public: diff --git a/tree/dataframe/src/RNTupleProcessorDS.cxx b/tree/dataframe/src/RNTupleProcessorDS.cxx index 5cb8538ee65cd..3fb212b68dc59 100644 --- a/tree/dataframe/src/RNTupleProcessorDS.cxx +++ b/tree/dataframe/src/RNTupleProcessorDS.cxx @@ -213,6 +213,7 @@ class RNTupleProcessorColumnReader : public ROOT::Detail::RDF::RColumnReaderBase ROOT::Experimental::RDF::RNTupleProcessorDS::~RNTupleProcessorDS() = default; void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBase &field, std::string_view colName, + Internal::RNTupleProcessorProvenance procProvenance, std::vector fieldInfos, bool convertToRVec) { @@ -262,7 +263,7 @@ void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBas convertToRVec && (field.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" || field.GetTypeName().substr(0, 12) == "std::vector<" || field.GetTypeName() == ""); const auto *f = field.GetConstSubfields()[0]; - AddField(*f, colName, fieldInfos, representableAsRVec); + AddField(*f, colName, procProvenance, fieldInfos, representableAsRVec); // Note that at the end of the recursion, we handled the inner sub collections as well as the // collection as whole, so we are done. @@ -271,14 +272,14 @@ void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBas } else if (nRepetitions > 0) { // Fixed-size array, same logic as ROOT::RVec. const auto *f = field.GetConstSubfields()[0]; - AddField(*f, colName, fieldInfos); + AddField(*f, colName, procProvenance, fieldInfos); return; } else if (field.GetStructure() == ROOT::ENTupleStructure::kRecord) { // Inner fields of records are provided as individual RDF columns, e.g. "event.id" for (const auto &f : field.GetConstSubfields()) { auto innerName = colName.empty() ? f->GetFieldName() : (std::string(colName) + "." + f->GetFieldName()); // Inner fields of collections of records are always exposed as ROOT::RVec - AddField(*f, innerName, fieldInfos); + AddField(*f, innerName, procProvenance, fieldInfos); } // Do not add untyped record fields @@ -314,7 +315,8 @@ void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBas // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks") if (!fieldInfos.empty()) { const auto &info = fieldInfos.back(); - const std::string name = "R_rdf_sizeof_" + info.fFieldName; + const std::string name = + (procProvenance.Empty() ? "R_rdf_sizeof_" : procProvenance.Get() + "_R_rdf_sizeof_") + info.fFieldName; if (info.fNRepetitions > 0) { cardinalityField = std::make_unique(name, info.fNRepetitions); } else { @@ -363,7 +365,8 @@ void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBas } if (cardinalityField) { - std::string cardinalityFieldName = "R_rdf_sizeof_" + std::string(colName); + std::string cardinalityFieldName = + (procProvenance.Empty() ? "R_rdf_sizeof_" : procProvenance.Get() + "_R_rdf_sizeof_") + std::string(colName); fColumnNames.emplace_back(cardinalityFieldName); fColumnTypes.emplace_back(cardinalityField->GetTypeName()); fProcessor->AddFieldToEntry(std::move(cardinalityField), cardinalityFieldName, nullptr, @@ -371,11 +374,13 @@ void ROOT::Experimental::RDF::RNTupleProcessorDS::AddField(const ROOT::RFieldBas } fieldInfos.emplace_back(field.GetOnDiskId(), field.GetFieldName(), field.GetTypeName(), nRepetitions); - fColumnNames.emplace_back(colName); + std::string canonicalName = (procProvenance.Empty() ? "" : procProvenance.Get() + ".") + std::string(colName); + fColumnNames.emplace_back(canonicalName); fColumnTypes.emplace_back(valueField->GetTypeName()); + // Add nested fields explicitly to the entry, because they may be mapped differently. if (fieldInfos.size() > 1) { - fProcessor->AddFieldToEntry(std::move(valueField), std::string(colName), nullptr, + fProcessor->AddFieldToEntry(std::move(valueField), canonicalName, nullptr, Internal::RNTupleProcessorProvenance()); } } @@ -390,7 +395,7 @@ ROOT::Experimental::RDF::RNTupleProcessorDS::RNTupleProcessorDS( const auto &entry = fProcessor->GetEntry(); for (auto fieldIdx : entry.GetFieldIndices()) { const auto &field = entry.GetValue(fieldIdx).GetField(); - AddField(field, entry.GetQualifiedFieldName(fieldIdx), + AddField(field, entry.GetQualifiedFieldName(fieldIdx), entry.GetFieldProvenance(fieldIdx), std::vector()); } } @@ -557,9 +562,6 @@ bool ROOT::Experimental::RDF::RNTupleProcessorDS::SetEntry(unsigned int /* slot ROOT::RDataFrame ROOT::Experimental::RDF::FromRNTupleProcessor(std::unique_ptr processor) { - if (dynamic_cast(processor.get())) { - throw std::runtime_error("RNTupleProcessorDS: Joins are not yet supported"); - } return ROOT::RDataFrame(std::make_unique(std::move(processor))); } diff --git a/tree/dataframe/test/datasource_ntuple_processor.cxx b/tree/dataframe/test/datasource_ntuple_processor.cxx index 4b004f64d7af1..335077ce9b912 100644 --- a/tree/dataframe/test/datasource_ntuple_processor.cxx +++ b/tree/dataframe/test/datasource_ntuple_processor.cxx @@ -318,12 +318,72 @@ TEST(RNTupleProcessorDS, Join) } auto proc = RNTupleProcessor::CreateJoin({"primary", guardFile1.GetPath()}, {"auxiliary", guardFile2.GetPath()}, {}); - try { - auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(std::move(proc)); - FAIL() << "creating a datasource from a join processor should throw"; - } catch (const std::runtime_error &err) { - EXPECT_THAT(err.what(), testing::HasSubstr("RNTupleProcessorDS: Joins are not yet supported")); + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(std::move(proc)); + + auto count = df.Count(); + EXPECT_EQ(10, count.GetValue()); + + auto electronFilterCount = df.Filter([](float pt) { return pt > 4.f; }, {"electrons.pt"}).Count(); + EXPECT_EQ(5, electronFilterCount.GetValue()); + + auto jetSum = df.Aggregate( + [](float &acc, const ROOT::RVec &jets) { + for (const auto &jet : jets) + acc += jet; + }, + [](float a, float b) { return a + b; }, "auxiliary.jets"); + EXPECT_FLOAT_EQ(13.5, jetSum.GetValue()); +} + +TEST(RNTupleProcessorDS, JoinUnaligned) +{ + FileRAII guardFile1("RNTupleProcessorDS_test_join_unaligned_1.root"); + FileRAII guardFile2("RNTupleProcessorDS_test_join_unaligned_2.root"); + + { + auto model = RNTupleModel::Create(); + auto fldId = model->MakeField("idx"); + auto fldElectron = model->MakeField("electrons"); + auto writer = RNTupleWriter::Recreate(std::move(model), "primary", guardFile1.GetPath()); + for (unsigned i = 0; i < 10; ++i) { + // only fill odd events + if (i % 2 == 0) + continue; + *fldId = i; + fldElectron->pt = static_cast(i); + writer->Fill(); + } + } + { + auto model = RNTupleModel::Create(); + auto fldId = model->MakeField("idx"); + auto fldJets = model->MakeField>("jets"); + auto writer = RNTupleWriter::Recreate(std::move(model), "auxiliary", guardFile2.GetPath()); + // fill in reverse + for (int i = 9; i >= 0; --i) { + *fldId = i; + *fldJets = {static_cast(i) * .1f, static_cast(i) * .2f}; + writer->Fill(); + } } + + auto proc = + RNTupleProcessor::CreateJoin({"primary", guardFile1.GetPath()}, {"auxiliary", guardFile2.GetPath()}, {"idx"}); + auto df = ROOT::Experimental::RDF::FromRNTupleProcessor(std::move(proc)); + + auto count = df.Count(); + EXPECT_EQ(5, count.GetValue()); + + auto electronFilterCount = df.Filter([](float pt) { return pt > 4.f; }, {"electrons.pt"}).Count(); + EXPECT_EQ(3, electronFilterCount.GetValue()); + + auto jetSum = df.Aggregate( + [](float &acc, const ROOT::RVec &jets) { + for (const auto &jet : jets) + acc += jet; + }, + [](float a, float b) { return a + b; }, "auxiliary.jets"); + EXPECT_FLOAT_EQ(7.5, jetSum.GetValue()); } // #ifdef R__USE_IMT diff --git a/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx b/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx index a61aba8bb2d4a..b9ef8fcbde487 100644 --- a/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx +++ b/tree/ntuple/inc/ROOT/RNTupleProcessorEntry.hxx @@ -76,8 +76,10 @@ public: /// \param[in] fieldName Field name to check. bool IsPresentInFieldName(std::string_view fieldName) const { - return !fProvenance.empty() && fieldName.find(fProvenance + ".") == 0; + return !fProvenance.empty() && fieldName.find(fProvenance) == 0; } + + bool Empty() const { return fProvenance.empty(); } }; // clang-format off