Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.types.RowType;

import org.apache.arrow.c.ArrowArray;
Expand Down Expand Up @@ -70,10 +71,19 @@ public ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
schema = ArrowSchema.allocateNew(allocator);
}

public ArrowFormatWriter formatWriter() {
return realWriter;
}

public boolean write(InternalRow currentRow) {
return realWriter.write(currentRow);
}

public void write(
ColumnVector[] columns, @Nullable int[] pickedInColumn, int startIndex, int batchRows) {
realWriter.write(columns, pickedInColumn, startIndex, batchRows);
}

public ArrowCStruct toCStruct() {
VectorSchemaRoot vectorSchemaRoot = realWriter.getVectorSchemaRoot();
return ArrowUtils.serializeToCStruct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.arrow.writer.ArrowFieldWriters;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;
import org.apache.paimon.utils.Preconditions;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -187,6 +189,15 @@ public boolean write(InternalRow currentRow) {
return true;
}

public void write(
ColumnVector[] columns, @Nullable int[] pickedInColumn, int startIndex, int batchRows) {
Preconditions.checkState(rowId == 0, "rowId must be 0 before writing columns.");
for (int i = 0; i < columns.length; i++) {
fieldWriters[i].write(columns[i], pickedInColumn, startIndex, batchRows);
}
rowId = batchRows;
}

public long memoryUsed() {
vectorSchemaRoot.setRowCount(rowId);
long memoryUsed = 0;
Expand All @@ -213,6 +224,14 @@ public void close() {
allocator.close();
}

public int getBatchSize() {
return batchSize;
}

public ArrowFieldWriter[] getFieldWriters() {
return fieldWriters;
}

public VectorSchemaRoot getVectorSchemaRoot() {
return vectorSchemaRoot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.paimon.arrow.vector.ArrowCStruct;
import org.apache.paimon.arrow.vector.ArrowFormatCWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.BundleFormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.VectorizedBundleRecords;

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
Expand All @@ -34,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;

/** Arrow bundle writer. */
Expand Down Expand Up @@ -72,6 +77,9 @@ public void addElement(InternalRow internalRow) {
public void writeBundle(BundleRecords bundleRecords) throws IOException {
if (bundleRecords instanceof ArrowBundleRecords) {
add(((ArrowBundleRecords) bundleRecords).getVectorSchemaRoot());
} else if (bundleRecords instanceof VectorizedBundleRecords) {
VectorizedBundleRecords records = (VectorizedBundleRecords) bundleRecords;
add(records.batch(), records.selected());
} else {
for (InternalRow row : bundleRecords) {
addElement(row);
Expand All @@ -98,6 +106,26 @@ public void add(VectorSchemaRoot vsr) {
}
}

public void add(VectorizedColumnBatch batch, @Nullable int[] selected) {
if (!arrowFormatWriter.empty()) {
flush();
}

int batchSize = arrowFormatWriter.formatWriter().getBatchSize();
ColumnVector[] columns = batch.columns;
int totalNumRows = selected != null ? selected.length : batch.getNumRows();

int startIndex = 0;
while (startIndex < totalNumRows) {
int batchRows = Math.min(batchSize, totalNumRows - startIndex);
arrowFormatWriter.write(columns, selected, startIndex, batchRows);
startIndex += batchRows;
if (startIndex < totalNumRows) {
flush();
}
}
}

@Override
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
return suggestedCheck && (underlyingStream.getPos() > targetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected void doWrite(
for (int i = 0; i < batchRows; i++) {
int row = getRowNumber(startIndex, i, pickedInColumn);
if (columnVector.isNullAt(row)) {
varCharVector.setNull(row);
varCharVector.setNull(i);
} else {
byte[] value = ((BytesColumnVector) columnVector).getBytes(row).getBytes();
varCharVector.setSafe(i, value);
Expand Down Expand Up @@ -154,7 +154,7 @@ protected void doWrite(
for (int i = 0; i < batchRows; i++) {
int row = getRowNumber(startIndex, i, pickedInColumn);
if (columnVector.isNullAt(row)) {
bitVector.setNull(row);
bitVector.setNull(i);
} else {
int value = ((BooleanColumnVector) columnVector).getBoolean(row) ? 1 : 0;
bitVector.setSafe(i, value);
Expand Down Expand Up @@ -473,7 +473,7 @@ protected void doWrite(
if (columnVector.isNullAt(row)) {
timeMilliVector.setNull(i);
} else {
int value = ((IntColumnVector) columnVector).getInt(i);
int value = ((IntColumnVector) columnVector).getInt(row);
timeMilliVector.setSafe(i, value);
}
}
Expand Down
Loading