Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion net/curl/inc/ROOT/RCurlConnection.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private:
void SetupErrorBuffer();
void SetOptions();
void ResetHandle();
RResult<void> SetUrl(const std::string &url);
void Perform(RStatus &status);

public:
Expand Down Expand Up @@ -109,6 +108,10 @@ public:
void ClearCredentials();
EHTTPCredentialsType GetCredentialsType() const;

/// Retargets this connection to `url`, reusing the underlying handle so curl can keep the connection
/// to the same host alive across requests. Call before a Send*Req to address a different object.
/// Returns an error if the URL cannot be set on the handle.
RResult<void> SetUrl(const std::string &url);
/// Checks if the resource exists and if it does, return the value of the content-length header as size
RStatus SendHeadReq(std::uint64_t &remoteSize);
/// Reads the given ranges from the remote resource. The ranges can be in any order and also overlapping. They
Expand Down
30 changes: 28 additions & 2 deletions net/curl/test/curl_connection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,32 @@ TEST(RCurlConnection, Put)
EXPECT_EQ(std::string(reinterpret_cast<const char *>(payload), payloadLen), body);
}

TEST(RCurlConnection, SetUrlThenPut)
{
TServerSocket sock(0, false, TServerSocket::kDefaultBacklog, -1, ESocketBindOption::kInaddrLoopback);
const std::string baseUrl =
std::string("http://") + sock.GetLocalInetAddress().GetHostAddress() + ":" + std::to_string(sock.GetLocalPort());

const unsigned char payload[] = "object body";
const std::size_t payloadLen = sizeof(payload) - 1; // exclude null terminator

std::string headers;
std::string body;
std::thread threadRecv(TaskRecvPut, &sock, &headers, &body);

// The connection is created with the base URL; SetUrl retargets it to a per-request URL (the
// mechanism that lets one connection be reused across many objects on the same host).
ROOT::Internal::RCurlConnection conn(baseUrl);
auto urlStatus = conn.SetUrl(baseUrl + "/myobject/42");
ASSERT_TRUE(static_cast<bool>(urlStatus));
auto status = conn.SendPutReq(payload, payloadLen);

threadRecv.join();
EXPECT_TRUE(static_cast<bool>(status));
EXPECT_EQ(0u, headers.find("PUT /myobject/42 ")) << headers.substr(0, 40);
EXPECT_EQ(std::string(reinterpret_cast<const char *>(payload), payloadLen), body);
}

/// GET (range read) after PUT on the same handle — verifies that WRITEFUNCTION is set correctly
/// in SendRangesReq after a PUT cleared it.
TEST(RCurlConnection, GetAfterPut)
Expand Down Expand Up @@ -194,8 +220,8 @@ TEST(RCurlConnection, GetAfterPut)
}
}

std::string response = "HTTP/1.1 200 OK\r\nContent-Length: " + std::to_string(expectedBody.size()) +
"\r\n\r\n" + expectedBody;
std::string response =
"HTTP/1.1 200 OK\r\nContent-Length: " + std::to_string(expectedBody.size()) + "\r\n\r\n" + expectedBody;
s->SendRaw(response.data(), response.size());
s->Close();
};
Expand Down
7 changes: 7 additions & 0 deletions tree/ntuple/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
# @author Jakob Blomer CERN
############################################################################

# RNTuple optionally writes to S3-compatible object storage via libcurl (RCurlHttp).
if(curl)
set(ROOTNTuple_OPTIONAL_DEPENDENCIES RCurlHttp)
endif()

ROOT_STANDARD_LIBRARY_PACKAGE(ROOTNTuple
HEADERS
ROOT/RCluster.hxx
Expand Down Expand Up @@ -100,6 +105,7 @@ DEPENDENCIES
Imt
RIO
ROOTVecOps
${ROOTNTuple_OPTIONAL_DEPENDENCIES}
)

target_link_libraries(ROOTNTuple PRIVATE xxHash::xxHash)
Expand All @@ -125,6 +131,7 @@ endif()
if(curl)
set(ROOTNTuple_EXTRA_HEADERS ${ROOTNTuple_EXTRA_HEADERS} ROOT/RPageStorageS3.hxx)
target_sources(ROOTNTuple PRIVATE src/RPageStorageS3.cxx)
target_compile_definitions(ROOTNTuple PRIVATE R__ENABLE_S3)
target_link_libraries(ROOTNTuple PRIVATE nlohmann_json::nlohmann_json)
endif()

Expand Down
1 change: 0 additions & 1 deletion tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ protected:
/// Updates the descriptor and calls InitImpl() that handles the backend-specific details (file, DAOS, etc.)
void InitImpl(RNTupleModel &model) final;

virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) = 0;
virtual RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
/// Vector commit of preprocessed pages. The `ranges` array specifies a range of sealed pages to be
Expand Down
1 change: 0 additions & 1 deletion tree/ntuple/inc/ROOT/RPageStorageDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ private:
protected:
using RPagePersistentSink::InitImpl;
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
std::vector<RNTupleLocator>
Expand Down
3 changes: 1 addition & 2 deletions tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ private:
protected:
using RPagePersistentSink::InitImpl;
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override;
RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) override;
std::vector<RNTupleLocator>
CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask) final;
std::uint64_t StageClusterImpl() final;
Expand Down
73 changes: 70 additions & 3 deletions tree/ntuple/inc/ROOT/RPageStorageS3.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
#ifndef ROOT_RPageStorageS3
#define ROOT_RPageStorageS3

#include <ROOT/RCurlConnection.hxx>
#include <ROOT/RError.hxx>
#include <ROOT/RNTuple.hxx>
#include <ROOT/RPageStorage.hxx>

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

namespace ROOT {
namespace Experimental {
Expand All @@ -43,9 +47,10 @@ struct RNTupleAnchorS3 {
std::uint16_t fVersionMajor = RNTuple::kVersionMajor;
std::uint16_t fVersionMinor = RNTuple::kVersionMinor;
std::uint16_t fVersionPatch = RNTuple::kVersionPatch;
/// Pattern for resolving object IDs to full S3 URLs.
/// ${baseurl} is replaced with the anchor URL, ${objid} with the numeric object ID.
std::string fUrlTemplate;
/// Pattern for resolving object IDs to full S3 URLs. ${baseurl} is replaced with the anchor URL,
/// ${objid} with the numeric object ID. Defaults to the scheme this writer uses; the reader
/// overrides it from the stored anchor.
std::string fUrlTemplate = "${baseurl}/${objid}";
/// Object ID and byte offset of the compressed header within the S3 object
std::uint64_t fHeaderObjId = 0;
std::uint64_t fHeaderOffset = 0;
Expand All @@ -67,6 +72,68 @@ struct RNTupleAnchorS3 {
static RResult<RNTupleAnchorS3> CreateFromJSON(const std::string &json);
};

/// \brief Translate an ntpl+s3 URI into its plain HTTP(S) equivalent.
///
/// Accepts `ntpl+s3+http://host/bucket/path` and `ntpl+s3+https://host/bucket/path`, returning the
/// URL with the scheme replaced by http or https respectively. Throws RException on any other scheme.
std::string ParseS3Url(std::string_view uri);

// clang-format off
/**
\class ROOT::Experimental::Internal::RPageSinkS3
\ingroup NTuple
\brief Storage provider that writes ntuple pages into S3-compatible object storage.
Currently implements Mode B (one sealed page per S3 object, kTypeObject64 locators).
Mode A (multiple packed pages per object, kTypeMulti locators) will be added separately.
\warning The S3 backend is experimental and under active development.
*/
// clang-format on
class RPageSinkS3 : public ROOT::Internal::RPagePersistentSink {
private:
/// HTTP base URL for this ntuple (derived from the s3 scheme URI); never has a trailing slash
std::string fBaseUrl;
/// One HTTP connection reused for every upload, so curl keeps it alive across objects on the same
/// host instead of re-handshaking per object.
ROOT::Internal::RCurlConnection fConnection;
/// Object ID counter; incremented for each object written.
std::uint64_t fObjectId{0};
/// Tracks the number of bytes committed to the current cluster (reset in StageClusterImpl)
std::uint64_t fNBytesCurrentCluster{0};
/// Anchor metadata populated during the write path and uploaded last in CommitDatasetImpl
RNTupleAnchorS3 fAnchor;

/// Resolve a numeric object ID to its full HTTP URL
std::string MakeObjectUrl(std::uint64_t objId) const;
/// Upload raw bytes to the given S3 URL via an HTTP PUT request
void PutObject(const std::string &url, const unsigned char *data, std::size_t size);

/// Tag to select the internal constructor that takes an already-resolved base URL.
struct RFromBaseUrl {};
/// Internal constructor used by CloneAsHidden: the public constructor derives the base URL by parsing
/// an s3 scheme URI, whereas a clone already has a resolved base URL to write under.
RPageSinkS3(std::string_view ntupleName, std::string baseUrl, const ROOT::RNTupleWriteOptions &options,
RFromBaseUrl);

protected:
using RPagePersistentSink::InitImpl;
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
std::uint64_t StageClusterImpl() final;
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final;
using RPagePersistentSink::CommitDatasetImpl;
ROOT::Internal::RNTupleLink CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final;

public:
RPageSinkS3(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options);
~RPageSinkS3() override;

std::unique_ptr<ROOT::Internal::RPageSink>
CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final;
}; // class RPageSinkS3

} // namespace Internal
} // namespace Experimental
} // namespace ROOT
Expand Down
25 changes: 24 additions & 1 deletion tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
#include <ROOT/RNTupleZip.hxx>
#include <ROOT/RPageAllocator.hxx>
#include <ROOT/RPageSinkBuf.hxx>
#include <ROOT/StringUtils.hxx>
#ifdef R__ENABLE_DAOS
#include <ROOT/RPageStorageDaos.hxx>
#endif
#ifdef R__ENABLE_S3
#include <ROOT/RPageStorageS3.hxx>
#endif

#include <Compression.h>
#include <TError.h>
Expand Down Expand Up @@ -188,6 +192,9 @@ ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_vie
throw RException(R__FAIL("This RNTuple build does not support DAOS."));
#endif

if (ROOT::StartsWith(location, "ntpl+s3+http://") || ROOT::StartsWith(location, "ntpl+s3+https://"))
throw RException(R__FAIL("S3 read support is not yet implemented."));

return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
}

Expand Down Expand Up @@ -920,6 +927,14 @@ ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::st
#endif
}

if (ROOT::StartsWith(location, "ntpl+s3+http://") || ROOT::StartsWith(location, "ntpl+s3+https://")) {
#ifdef R__ENABLE_S3
return std::make_unique<ROOT::Experimental::Internal::RPageSinkS3>(ntupleName, location, options);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this need the (guarded) ROOT/RPageStorageS3.hxx include?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is included near the top of the file at line 30 as

#ifdef R__ENABLE_S3
#include <ROOT/RPageStorageS3.hxx>
#endif

#else
throw RException(R__FAIL("This RNTuple build does not support S3."));
#endif
}

// Otherwise assume that the user wants us to create a file.
return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
}
Expand Down Expand Up @@ -1169,9 +1184,17 @@ void ROOT::Internal::RPagePersistentSink::CommitPage(ColumnHandle_t columnHandle
{
fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());

auto element = columnHandle.fColumn->GetElement();
RPageStorage::RSealedPage sealedPage;
{
RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
sealedPage = SealPage(page, *element);
}
fCounters->fSzZip.Add(page.GetNBytes());

ROOT::RClusterDescriptor::RPageInfo pageInfo;
pageInfo.SetNElements(page.GetNElements());
pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
pageInfo.SetLocator(CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage));
pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
}
Expand Down
14 changes: 0 additions & 14 deletions tree/ntuple/src/RPageStorageDaos.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -257,20 +257,6 @@ void ROOT::Experimental::Internal::RPageSinkDaos::InitImpl(unsigned char *serial
WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
}

ROOT::RNTupleLocator ROOT::Experimental::Internal::RPageSinkDaos::CommitPageImpl(ColumnHandle_t columnHandle,
const ROOT::Internal::RPage &page)
{
auto element = columnHandle.fColumn->GetElement();
RPageStorage::RSealedPage sealedPage;
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
sealedPage = SealPage(page, *element);
}

fCounters->fSzZip.Add(page.GetNBytes());
return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
}

ROOT::RNTupleLocator
ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageImpl(ROOT::DescriptorId_t,
const RPageStorage::RSealedPage &sealedPage)
Expand Down
14 changes: 0 additions & 14 deletions tree/ntuple/src/RPageStorageFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,6 @@ ROOT::Internal::RPageSinkFile::WriteSealedPage(const RPageStorage::RSealedPage &
return result;
}

ROOT::RNTupleLocator
ROOT::Internal::RPageSinkFile::CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page)
{
auto element = columnHandle.fColumn->GetElement();
RPageStorage::RSealedPage sealedPage;
{
RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
sealedPage = SealPage(page, *element);
}

fCounters->fSzZip.Add(page.GetNBytes());
return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
}

ROOT::RNTupleLocator ROOT::Internal::RPageSinkFile::CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId,
const RPageStorage::RSealedPage &sealedPage)
{
Expand Down
Loading
Loading