Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@

import static org.apache.parquet.bytes.BytesInput.concat;

import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.doubles.Double2IntMap;
import it.unimi.dsi.fastutil.doubles.DoubleIterator;
import it.unimi.dsi.fastutil.floats.Float2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.Float2IntMap;
import it.unimi.dsi.fastutil.floats.FloatIterator;
import it.unimi.dsi.fastutil.ints.Int2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.floats.Float2IntOpenHashMap;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
Expand Down Expand Up @@ -231,7 +230,8 @@ public String memUsageString(String prefix) {
public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter {

/* type specific dictionary content */
protected Object2IntMap<Binary> binaryDictionaryContent = new Object2IntLinkedOpenHashMap<>();
protected Object2IntMap<Binary> binaryDictionaryContent = new Object2IntOpenHashMap<>();
protected List<Binary> dictionaryValues = new ArrayList<>();

public PlainBinaryDictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -246,8 +246,10 @@ public PlainBinaryDictionaryValuesWriter(
public void writeBytes(Binary v) {
int id = binaryDictionaryContent.getInt(v);
if (id == -1) {
id = binaryDictionaryContent.size();
binaryDictionaryContent.put(v.copy(), id);
id = dictionaryValues.size();
Binary copied = v.copy();
binaryDictionaryContent.put(copied, id);
dictionaryValues.add(copied);
// length as int (4 bytes) + actual bytes
dictionaryByteSize += 4L + v.length();
}
Expand All @@ -260,12 +262,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder =
new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
Iterator<Binary> binaryIterator =
binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Binary entry = binaryIterator.next();
dictionaryEncoder.writeBytes(entry);
dictionaryEncoder.writeBytes(dictionaryValues.get(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -280,21 +279,16 @@ public int getDictionarySize() {
@Override
protected void clearDictionaryContent() {
binaryDictionaryContent.clear();
dictionaryValues.clear();
}

@Override
public void fallBackDictionaryEncodedData(ValuesWriter writer) {
// build reverse dictionary
Binary[] reverseDictionary = new Binary[getDictionarySize()];
for (Object2IntMap.Entry<Binary> entry : binaryDictionaryContent.object2IntEntrySet()) {
reverseDictionary[entry.getIntValue()] = entry.getKey();
}

// fall back to plain encoding
// fall back to plain encoding using the ordered dictionary values list
IntIterator iterator = encodedValues.iterator();
while (iterator.hasNext()) {
int id = iterator.next();
writer.writeBytes(reverseDictionary[id]);
writer.writeBytes(dictionaryValues.get(id));
}
}
}
Expand All @@ -317,8 +311,10 @@ public PlainFixedLenArrayDictionaryValuesWriter(
public void writeBytes(Binary value) {
int id = binaryDictionaryContent.getInt(value);
if (id == -1) {
id = binaryDictionaryContent.size();
binaryDictionaryContent.put(value.copy(), id);
id = dictionaryValues.size();
Binary copied = value.copy();
binaryDictionaryContent.put(copied, id);
dictionaryValues.add(copied);
dictionaryByteSize += length;
}
encodedValues.add(id);
Expand All @@ -330,12 +326,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(
length, lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
Iterator<Binary> binaryIterator =
binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
Binary entry = binaryIterator.next();
dictionaryEncoder.writeBytes(entry);
dictionaryEncoder.writeBytes(dictionaryValues.get(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -346,7 +339,8 @@ public DictionaryPage toDictPageAndClose() {
public static class PlainLongDictionaryValuesWriter extends DictionaryValuesWriter {

/* type specific dictionary content */
private Long2IntMap longDictionaryContent = new Long2IntLinkedOpenHashMap();
private Long2IntMap longDictionaryContent = new Long2IntOpenHashMap();
private LongArrayList dictionaryValues = new LongArrayList();

public PlainLongDictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -361,8 +355,9 @@ public PlainLongDictionaryValuesWriter(
public void writeLong(long v) {
int id = longDictionaryContent.get(v);
if (id == -1) {
id = longDictionaryContent.size();
id = dictionaryValues.size();
longDictionaryContent.put(v, id);
dictionaryValues.add(v);
dictionaryByteSize += 8;
}
encodedValues.add(id);
Expand All @@ -374,10 +369,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder =
new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
LongIterator longIterator = longDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
dictionaryEncoder.writeLong(longIterator.nextLong());
dictionaryEncoder.writeLong(dictionaryValues.getLong(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -392,32 +386,25 @@ public int getDictionarySize() {
@Override
protected void clearDictionaryContent() {
longDictionaryContent.clear();
dictionaryValues.clear();
}

@Override
public void fallBackDictionaryEncodedData(ValuesWriter writer) {
// build reverse dictionary
long[] reverseDictionary = new long[getDictionarySize()];
ObjectIterator<Long2IntMap.Entry> entryIterator =
longDictionaryContent.long2IntEntrySet().iterator();
while (entryIterator.hasNext()) {
Long2IntMap.Entry entry = entryIterator.next();
reverseDictionary[entry.getIntValue()] = entry.getLongKey();
}

// fall back to plain encoding
IntIterator iterator = encodedValues.iterator();
while (iterator.hasNext()) {
int id = iterator.next();
writer.writeLong(reverseDictionary[id]);
writer.writeLong(dictionaryValues.getLong(id));
}
}
}

public static class PlainDoubleDictionaryValuesWriter extends DictionaryValuesWriter {

/* type specific dictionary content */
private Double2IntMap doubleDictionaryContent = new Double2IntLinkedOpenHashMap();
private Double2IntMap doubleDictionaryContent = new Double2IntOpenHashMap();
private DoubleArrayList dictionaryValues = new DoubleArrayList();

public PlainDoubleDictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -432,8 +419,9 @@ public PlainDoubleDictionaryValuesWriter(
public void writeDouble(double v) {
int id = doubleDictionaryContent.get(v);
if (id == -1) {
id = doubleDictionaryContent.size();
id = dictionaryValues.size();
doubleDictionaryContent.put(v, id);
dictionaryValues.add(v);
dictionaryByteSize += 8;
}
encodedValues.add(id);
Expand All @@ -445,10 +433,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder =
new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
dictionaryEncoder.writeDouble(doubleIterator.nextDouble());
dictionaryEncoder.writeDouble(dictionaryValues.getDouble(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -463,32 +450,25 @@ public int getDictionarySize() {
@Override
protected void clearDictionaryContent() {
doubleDictionaryContent.clear();
dictionaryValues.clear();
}

@Override
public void fallBackDictionaryEncodedData(ValuesWriter writer) {
// build reverse dictionary
double[] reverseDictionary = new double[getDictionarySize()];
ObjectIterator<Double2IntMap.Entry> entryIterator =
doubleDictionaryContent.double2IntEntrySet().iterator();
while (entryIterator.hasNext()) {
Double2IntMap.Entry entry = entryIterator.next();
reverseDictionary[entry.getIntValue()] = entry.getDoubleKey();
}

// fall back to plain encoding
IntIterator iterator = encodedValues.iterator();
while (iterator.hasNext()) {
int id = iterator.next();
writer.writeDouble(reverseDictionary[id]);
writer.writeDouble(dictionaryValues.getDouble(id));
}
}
}

public static class PlainIntegerDictionaryValuesWriter extends DictionaryValuesWriter {

/* type specific dictionary content */
private Int2IntMap intDictionaryContent = new Int2IntLinkedOpenHashMap();
private Int2IntMap intDictionaryContent = new Int2IntOpenHashMap();
private IntArrayList dictionaryValues = new IntArrayList();

public PlainIntegerDictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -503,8 +483,9 @@ public PlainIntegerDictionaryValuesWriter(
public void writeInteger(int v) {
int id = intDictionaryContent.get(v);
if (id == -1) {
id = intDictionaryContent.size();
id = dictionaryValues.size();
intDictionaryContent.put(v, id);
dictionaryValues.add(v);
dictionaryByteSize += 4;
}
encodedValues.add(id);
Expand All @@ -516,11 +497,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder =
new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
it.unimi.dsi.fastutil.ints.IntIterator intIterator =
intDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
dictionaryEncoder.writeInteger(intIterator.nextInt());
dictionaryEncoder.writeInteger(dictionaryValues.getInt(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -535,32 +514,25 @@ public int getDictionarySize() {
@Override
protected void clearDictionaryContent() {
intDictionaryContent.clear();
dictionaryValues.clear();
}

@Override
public void fallBackDictionaryEncodedData(ValuesWriter writer) {
// build reverse dictionary
int[] reverseDictionary = new int[getDictionarySize()];
ObjectIterator<Int2IntMap.Entry> entryIterator =
intDictionaryContent.int2IntEntrySet().iterator();
while (entryIterator.hasNext()) {
Int2IntMap.Entry entry = entryIterator.next();
reverseDictionary[entry.getIntValue()] = entry.getIntKey();
}

// fall back to plain encoding
IntIterator iterator = encodedValues.iterator();
while (iterator.hasNext()) {
int id = iterator.next();
writer.writeInteger(reverseDictionary[id]);
writer.writeInteger(dictionaryValues.getInt(id));
}
}
}

public static class PlainFloatDictionaryValuesWriter extends DictionaryValuesWriter {

/* type specific dictionary content */
private Float2IntMap floatDictionaryContent = new Float2IntLinkedOpenHashMap();
private Float2IntMap floatDictionaryContent = new Float2IntOpenHashMap();
private FloatArrayList dictionaryValues = new FloatArrayList();

public PlainFloatDictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -575,8 +547,9 @@ public PlainFloatDictionaryValuesWriter(
public void writeFloat(float v) {
int id = floatDictionaryContent.get(v);
if (id == -1) {
id = floatDictionaryContent.size();
id = dictionaryValues.size();
floatDictionaryContent.put(v, id);
dictionaryValues.add(v);
dictionaryByteSize += 4;
}
encodedValues.add(id);
Expand All @@ -588,10 +561,9 @@ public DictionaryPage toDictPageAndClose() {
// return a dictionary only if we actually used it
PlainValuesWriter dictionaryEncoder =
new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
dictionaryEncoder.writeFloat(floatIterator.nextFloat());
dictionaryEncoder.writeFloat(dictionaryValues.getFloat(i));
}
return dictPage(dictionaryEncoder);
}
Expand All @@ -606,24 +578,16 @@ public int getDictionarySize() {
@Override
protected void clearDictionaryContent() {
floatDictionaryContent.clear();
dictionaryValues.clear();
}

@Override
public void fallBackDictionaryEncodedData(ValuesWriter writer) {
// build reverse dictionary
float[] reverseDictionary = new float[getDictionarySize()];
ObjectIterator<Float2IntMap.Entry> entryIterator =
floatDictionaryContent.float2IntEntrySet().iterator();
while (entryIterator.hasNext()) {
Float2IntMap.Entry entry = entryIterator.next();
reverseDictionary[entry.getIntValue()] = entry.getFloatKey();
}

// fall back to plain encoding
IntIterator iterator = encodedValues.iterator();
while (iterator.hasNext()) {
int id = iterator.next();
writer.writeFloat(reverseDictionary[id]);
writer.writeFloat(dictionaryValues.getFloat(id));
}
}
}
Expand Down