Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,15 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f
// Serialization happens in here, so that we can
// get rid of the intermediate tree 2 table object, saving memory.
auto batch = source.finalize();
if (!batch) {
throw std::runtime_error("FragmentToBatch::finalize() returned null RecordBatch");
}
auto mock = std::make_shared<arrow::io::MockOutputStream>();
int64_t expectedSize = 0;
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
if (!mockWriter.ok()) {
throw std::runtime_error(fmt::format("Unable to create mock stream writer: {}", mockWriter.status().ToString()));
}
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);

expectedSize = mock->Tell().ValueOrDie();
Expand All @@ -275,6 +281,9 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f
}

auto deferredWriterStream = source.streamer(buffer);
if (!deferredWriterStream) {
throw std::runtime_error("FragmentToBatch streamer returned null OutputStream");
}

auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema());
if (outBatch.ok() == false) {
Expand Down