From 16fb3aa4a0e1240ace5d01302f51a4fcd034fe51 Mon Sep 17 00:00:00 2001 From: Anllcik <654610542@qq.com> Date: Thu, 21 Dec 2023 16:10:10 +0800 Subject: [PATCH] add cache --- .../boostkit/hive/cache/BytesColumnCache.java | 11 ++ .../boostkit/hive/cache/ColumnCache.java | 18 +++ .../hive/cache/DecimalColumnCache.java | 11 ++ .../hive/cache/DoubleColumnCache.java | 10 ++ .../boostkit/hive/cache/LongColumnCache.java | 11 ++ .../boostkit/hive/cache/VarcharCache.java | 6 + .../huawei/boostkit/hive/cache/VecBuffer.java | 24 +++ .../boostkit/hive/cache/VecBufferCache.java | 120 ++++++++++++++ .../boostkit/hive/cache/VectorCache.java | 16 ++ .../hive/converter/DoubleVecConverter.java | 148 +++++++++++++++++ .../hive/converter/IntVecConverter.java | 103 ++++++++++++ .../hive/converter/LongVecConverter.java | 149 ++++++++++++++++++ .../boostkit/hive/converter/VecConverter.java | 139 ++++++++++++++++ 13 files changed, 766 insertions(+) create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/BytesColumnCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/ColumnCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DecimalColumnCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DoubleColumnCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/LongColumnCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VarcharCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBuffer.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBufferCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VectorCache.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/DoubleVecConverter.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/IntVecConverter.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/LongVecConverter.java create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/VecConverter.java diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/BytesColumnCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/BytesColumnCache.java new file mode 100644 index 000000000..4f4abdeea --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/BytesColumnCache.java @@ -0,0 +1,11 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class BytesColumnCache extends ColumnCache { + public final VarcharCache[] dataCache; + + public BytesColumnCache() { + dataCache = new VarcharCache[BATCH]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/ColumnCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/ColumnCache.java new file mode 100644 index 000000000..d7b34a403 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/ColumnCache.java @@ -0,0 +1,18 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class ColumnCache { + public boolean[] isNull; + + public boolean noNulls = true; + + public ColumnCache() { + isNull = new boolean[BATCH]; + } + + public void reset() { + noNulls = true; + isNull = new boolean[BATCH]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DecimalColumnCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DecimalColumnCache.java new file mode 100644 index 000000000..f8789dc83 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DecimalColumnCache.java @@ -0,0 +1,11 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class DecimalColumnCache extends ColumnCache{ + public final byte[][] dataCache; + + public DecimalColumnCache(){ + dataCache=new byte[BATCH][16]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DoubleColumnCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DoubleColumnCache.java new file mode 100644 index 000000000..afe666c6c --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/DoubleColumnCache.java @@ -0,0 +1,10 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class DoubleColumnCache extends ColumnCache{ + public final double[] dataCache; + public DoubleColumnCache(){ + dataCache=new double[BATCH]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/LongColumnCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/LongColumnCache.java new file mode 100644 index 000000000..3673f2eb7 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/LongColumnCache.java @@ -0,0 +1,11 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class LongColumnCache extends ColumnCache{ + public final long[] dataCache; + + public LongColumnCache(){ + dataCache=new long[BATCH]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VarcharCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VarcharCache.java new file mode 100644 index 000000000..f3d73dfe1 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VarcharCache.java @@ -0,0 +1,6 @@ +package com.huawei.boostkit.hive.cache; + +public class VarcharCache { + public int offset; + public byte[] values; +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBuffer.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBuffer.java new file mode 100644 index 000000000..f43b63fd7 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBuffer.java @@ -0,0 +1,24 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; + +public class VecBuffer { + public byte[] byteBuffer; + public byte[] isNull; + public int[] offset; + public boolean noNulls; + public boolean isChar; + + public VecBuffer(int colLength, boolean isChar) { + this.isChar = isChar; + offset = new int[BATCH + 1]; + + byteBuffer = new byte[colLength * BATCH]; + isNull = new byte[BATCH]; + if (!isChar) { + for (int i = 0; i < offset.length; i++) { + offset[i] = colLength * i; + } + } + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBufferCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBufferCache.java new file mode 100644 index 000000000..698ef8687 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VecBufferCache.java @@ -0,0 +1,120 @@ +package com.huawei.boostkit.hive.cache; + +import static com.huawei.boostkit.hive.cache.VectorCache.BATCH; +import static com.huawei.boostkit.hive.shuffle.OmniVecBatchSerDe.TYPE_LEN; +import static com.huawei.boostkit.hive.shuffle.omniVecBatchSerDe.getEstimateLen; +import static com.huawei.boostkit.hive.shuffle.VecSerdeBody; + +import nova.hetu.omniruntime.vector.BooleanVec; +import nova.hetu.omniruntime.vector.Decimal128Vec; +import nova.hetu.omniruntime.vector.DoubleVec; +import nova.hetu.omniruntime.vector.IntVec; +import nova.hetu.omniruntime.vector.LongVec; +import nova.hetu.omniruntime.vector.ShortVec; +import nova.hetu.omniruntime.vector.VarcharVec; +import nova.hetu.omniruntime.vector.Vec; + +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.util.List; + +public class VecBufferCache { + public VecBuffer[] cache; + public PrimitiveObjectInspector.PrimitiveCategory[] categories; + + private int[] columnTypeLen; + + public VecBufferCache(int colNum, List typeInfos) { + this.categories = typeInfos.stream().map(typeInfo -> ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()).toArray(PrimitiveObjectInspector.PrimitiveCategory[]::new); + columnTypeLen = new int[this.categories.length]; + cache = new VecBuffer[colNum]; + for (int i = 0; i < this.categories.length; i++) { + columnTypeLen[i] = TYPE_LEN.getOrDefault(this.categories[i], 0); + if (columnTypeLen[i] == 0) { + cache[i] = new VecBuffer(getEstimateLen((PrimitiveTypeInfo) typeInfos.get(i)), true); + } else { + cache[i] = new VecBuffer(columnTypeLen[i], false); + } + } + } + + public void addVecSerdBody(VecSerdBody[] vecSerdBodies, int rowCount, int offset) { + for (int i = 0; i < vecSerdBodies.length; i++) { + int cacheIndex = i + offset; + System.arraycopy(vecSerdBodies[i].value, 0, cache[cacheIndex].byteBuffer, cache[cacheIndex].offset[rowCount], vecSerdBodies[i].length); + if (cache[cacheIndex].isChar) { + cache[cacheIndex].offset[rowCount + 1] = cache[cacheIndex].offset[rowCount] + vecSerdBodies[i].length; + } + if (cache[cacheIndex].noNulls && vecSerdBodies[i].isNull == 1) { + cache[cacheIndex].noNulls = false; + } + cache[cacheIndex].isNull[rowCount] = vecSerdBodies[i].isNull; + } + } + + public Vec[] getValueVecBatchCache(int rowCount) { + if (rowCount == 0) { + return null; + } + int vectorCount = cache.length; + Vec[] vecs = new Vec[vectorCount]; + for (int i = 0; i < vectorCount; i++) { + vecs[i] = buildVec(i, rowCount); + } + reset(); + return vecs; + } + + public void reset() { + for (int i = 0; i < this.categories.length; i++) { + cache[i].noNulls = true; + } + } + + private Vec buildVec(int index, int rowCount) { + Vec vec; + switch (categories[index]) { + case INT: + case DATE: + vec = new IntVec(rowCount); + break; + case LONG: + case TIMESTAMP: + vec = new LongVec(rowCount); + break; + case SHORT: + vec = new ShortVec(rowCount); + break; + case BOOLEAN: + vec = new BooleanVec(rowCount); + break; + case DOUBLE: + vec = new DoubleVec(rowCount); + break; + case VARCHAR: + case CHAR: + case STRING: + vec = new VarcharVec(cache[index].offset[rowCount], rowCount); + if (rowCount = BATCH) { + ((VarcharVec) vec).setOffsetBuf(cache[index].offset, rowCount); + } else { + int[] offsets = new int[rowCount + 1]; + System.arraycopy(cache[index].offset, 0, offsets, 0, rowCount + 1); + ((VarcharVec) vec).setOffsetBuf(offsets, rowCount); + } + break; + case DECIMAL: + vec = new Decimal128Vec(rowCount); + break; + default: + throw new IllegalStateException("Unexpected value: " + categories[index]); + } + vec.setValuesBuf(cache[index].byteBuffer, cache[index].offset[rowCount]); + if (!cache[index].noNulls) { + vec.setNullsBuf(cache[index].isNull, rowCount); + } + return vec; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VectorCache.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VectorCache.java new file mode 100644 index 000000000..422674b5d --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/cache/VectorCache.java @@ -0,0 +1,16 @@ +package com.huawei.boostkit.hive.cache; + +import static org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch.DEFAULT_SIZE; + +public class VectorCache { + public static final int BATCH = 4 * DEFAULT_SIZE; + + public Object[][] dataCache; + + public VectorCache() { + } + + public VectorCache(int colNum) { + dataCache = new Object[colNum][BATCH]; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/DoubleVecConverter.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/DoubleVecConverter.java new file mode 100644 index 000000000..8e5288c4d --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/DoubleVecConverter.java @@ -0,0 +1,148 @@ +package com.huawei.boostkit.hive.converter; + +import com.huawei.boostkit.hive.cache.ColumnCache; +import com.huawei.boostkit.hive.cache.DoubleColumnCache; + +import nova.hetu.omniruntime.vector.DictionaryVec; +import nova.hetu.omniruntime.vector.DoubleVec; +import nova.hetu.omniruntime.vector.Vec; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.lazy.LazyDouble; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.DoubleWritable; + +public class DoubleVecConverter implements VecConverter { + public Object fromOmniVec(Vec vec, int index) { + if (vec.isNull(index)) { + return null; + } + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + return dictionaryVec.getDouble(index); + } + DoubleVec DoubleVec = (DoubleVec) vec; + return DoubleVec.get(index); + } + + @Override + public Object calculateValue(Object col) { + if (col == null) { + return null; + } + double doubleValue; + if (col instanceof LazyDouble) { + LazyDouble lazyDouble = (LazyDouble) col; + doubleValue = lazyDouble.getWritableObject().get(); + } else if (col instanceof DoubleWritable) { + doubleValue = ((DoubleWritable) col).get(); + } else { + doubleValue = (double) col; + } + return doubleValue; + } + + @Override + public Vec toOmniVec(Object[] col, int columnSize) { + DoubleVec doubleVec = new DoubleVec(columnSize); + double[] doubleValues = new double[columnSize]; + for (int i = 0; i < columnSize; i++) { + if (col[i] == null) { + doubleVec.setNull(i); + continue; + } + doubleValues[i] = (double) col[i]; + } + doubleVec.put(doubleValues, 0, 0, columnSize); + return doubleVec; + } + + @Override + public Vec toOmniVec(ColumnCache columnCache, int columnSize) { + DoubleVec doubleVec = new DoubleVec(columnSize); + DoubleColumnCache doubleColumnCache = (DoubleColumnCache) columnCache; + if (doubleColumnCache.noNulls) { + for (int i = 0; i < columnSize; i++) { + doubleVec.set(i, doubleColumnCache.dataCache[i]); + } + } else { + for (int i = 0; i < columnSize; i++) { + if (doubleColumnCache.isNull[i]) { + doubleVec.setNull(i); + } else { + doubleVec.set(i, doubleColumnCache.dataCache[i]); + } + } + } + return doubleVec; + } + + @Override + public void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + ColumnCache columnCache, int colIndex, int rowCount, + PrimitiveTypeInfo primitiveTypeInfo) { + DoubleColumnVector columnVector = (DoubleColumnVector) vectorizedRowBatch.cols[vectorColIndex]; + DoubleColumnCache doubleColumnCache = (DoubleColumnCache) columnCache; + double[] vector = columnVector.vector; + if (!columnVector.noNulls) { + doubleColumnCache.noNulls = false; + } + if (columnVector.isRepeating) { + if (columnVector.isNull[0]) { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + doubleColumnCache.isNull[rowCount + i] = true; + } + } else { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + doubleColumnCache.dataCache[rowCount + i] = vector[0]; + } + } + } else if (vectorizedRowBatch.selectedInUse) { + if (columnVector.noNulls) { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + doubleColumnCache.dataCache[rowCount + i] = vector[vectorizedRowBatch.selected[i]]; + } + } else { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + if (columnVector.isNull[vectorizedRowBatch.selected[i]]) { + doubleColumnCache.isNull[rowCount + i] = true; + } else { + doubleColumnCache.dataCache[rowCount + i] = vector[vectorizedRowBatch.selected[i]]; + } + } + } + } else { + if (columnVector.noNulls) { + System.arraycopy(vector, 0, doubleColumnCache.dataCache, rowCount, vectorizedRowBatch.size); + } else { + System.arraycopy(vector, 0, doubleColumnCache.dataCache, rowCount, vectorizedRowBatch.size); + System.arraycopy(columnVector.isNull, 0, doubleColumnCache.isNull, rowCount, vectorizedRowBatch.size); + } + } + } + + @Override + public ColumnVector getColumnVectorFromOmniVec(Vec vec, int start, int end) { + DoubleColumnVector doubleColumnVector = new DoubleColumnVector(); + for (int i = start; i < end; i++) { + if (vec.isNull(i)) { + doubleColumnVector.vector[i - start] = 0.00; + doubleColumnVector.isNull[i - start] = true; + doubleColumnVector.noNulls = false; + continue; + } + double value; + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + value = dictionaryVec.getDouble(i); + } else { + DoubleVec doubleVec = (DoubleVec) vec; + value = doubleVec.get(i); + } + doubleColumnVector.vector[i - start] = value; + } + return doubleColumnVector; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/IntVecConverter.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/IntVecConverter.java new file mode 100644 index 000000000..c4813579d --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/IntVecConverter.java @@ -0,0 +1,103 @@ +package com.huawei.boostkit.hive.converter; + +import com.huawei.boostkit.hive.cache.ColumnCache; +import com.huawei.boostkit.hive.cache.LongColumnCache; + +import nova.hetu.omniruntime.vector.DictionaryVec; +import nova.hetu.omniruntime.vector.IntVec; +import nova.hetu.omniruntime.vector.Vec; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.serde2.lazy.LazyInteger; +import org.apache.hadoop.io.IntWritable; + + +public class IntVecConverter implements VecConverter { + public Object fromOmniVec(Vec vec, int index) { + if (vec.isNull(index)) { + return null; + } + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + return dictionaryVec.getInt(index); + } + IntVec intVec = (IntVec) vec; + return intVec.get(index); + } + + @Override + public Object calculateValue(Object col) { + if (col == null) { + return null; + } + int intValue; + if (col instanceof LazyInteger) { + LazyInteger lazyInteger = (LazyInteger) col; + intValue = lazyInteger.getWritableObject().get(); + } else if (col instanceof IntWritable) { + intValue = ((IntWritable) col).get(); + } else { + intValue = (int) col; + } + return intValue; + } + + @Override + public Vec toOmniVec(Object[] col, int columnSize) { + IntVec intVec = new IntVec(columnSize); + int[] intValues = new int[columnSize]; + for (int i = 0; i < columnSize; i++) { + if (col[i] == null) { + intVec.setNull(i); + continue; + } + intValues[i] = (int) col[i]; + } + intVec.put(intValues, 0, 0, columnSize); + return intVec; + } + + @Override + public Vec toOmniVec(ColumnCache columnCache, int columnSize) { + IntVec intVec = new IntVec(columnSize); + LongColumnCache longColumnCache = (LongColumnCache) columnCache; + if (longColumnCache.noNulls) { + for (int i = 0; i < columnSize; i++) { + intVec.set(i, (int) longColumnCache.dataCache[i]); + } + } else { + for (int i = 0; i < columnSize; i++) { + if (longColumnCache.isNull[i]) { + intVec.setNull(i); + } else { + intVec.set(i, (int) longColumnCache.dataCache[i]); + } + } + } + return intVec; + } + + @Override + public ColumnVector getColumnVectorFromOmniVec(Vec vec, int start, int end) { + LongColumnVector longColumnVector = new LongColumnVector(); + for (int i = start; i < end; i++) { + if (vec.isNull(i)) { + longColumnVector.vector[i - start] = 1L; + longColumnVector.isNull[i - start] = true; + longColumnVector.noNulls = false; + continue; + } + long value; + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + value = dictionaryVec.getInt(i); + } else { + IntVec intVec = (IntVec) vec; + value = intVec.get(i); + } + longColumnVector.vector[i - start] = value; + } + return longColumnVector; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/LongVecConverter.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/LongVecConverter.java new file mode 100644 index 000000000..b67ca44b2 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/LongVecConverter.java @@ -0,0 +1,149 @@ +package com.huawei.boostkit.hive.converter; + +import com.huawei.boostkit.hive.cache.ColumnCache; +import com.huawei.boostkit.hive.cache.LongColumnCache; + +import nova.hetu.omniruntime.vector.DictionaryVec; +import nova.hetu.omniruntime.vector.LongVec; +import nova.hetu.omniruntime.vector.Vec; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.lazy.LazyLong; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.LongWritable; + + +public class LongVecConverter implements VecConverter { + public Object fromOmniVec(Vec vec, int index) { + if (vec.isNull(index)) { + return null; + } + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + return dictionaryVec.getLong(index); + } + LongVec longVec = (LongVec) vec; + return longVec.get(index); + } + + @Override + public Object calculateValue(Object col) { + if (col == null) { + return null; + } + long longValue; + if (col instanceof LazyLong) { + LazyLong lazyLong = (LazyLong) col; + longValue = lazyLong.getWritableObject().get(); + } else if (col instanceof LongWritable) { + longValue = ((LongWritable) col).get(); + } else { + longValue = (long) col; + } + return longValue; + } + + @Override + public Vec toOmniVec(Object[] col, int columnSize) { + LongVec longVec = new LongVec(columnSize); + long[] longValues = new long[columnSize]; + for (int i = 0; i < columnSize; i++) { + if (col[i] == null) { + longVec.setNull(i); + continue; + } + longValues[i] = (long) col[i]; + } + longVec.put(longValues, 0, 0, columnSize); + return longVec; + } + + @Override + public Vec toOmniVec(ColumnCache columnCache, int columnSize) { + LongVec longVec = new LongVec(columnSize); + LongColumnCache longColumnCache = (LongColumnCache) columnCache; + if (longColumnCache.noNulls) { + for (int i = 0; i < columnSize; i++) { + longVec.set(i, longColumnCache.dataCache[i]); + } + } else { + for (int i = 0; i < columnSize; i++) { + if (longColumnCache.isNull[i]) { + longVec.setNull(i); + } else { + longVec.set(i, longColumnCache.dataCache[i]); + } + } + } + return longVec; + } + + @Override + public void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + ColumnCache columnCache, int colIndex, int rowCount, + PrimitiveTypeInfo primitiveTypeInfo) { + LongColumnVector columnVector = (LongColumnVector) vectorizedRowBatch.cols[vectorColIndex]; + LongColumnCache longColumnCache = (LongColumnCache) columnCache; + long[] vector = columnVector.vector; + if (!columnVector.noNulls) { + longColumnCache.noNulls = false; + } + if (columnVector.isRepeating) { + if (columnVector.isNull[0]) { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + longColumnCache.isNull[rowCount + i] = true; + } + } else { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + longColumnCache.dataCache[rowCount + i] = vector[0]; + } + } + } else if (vectorizedRowBatch.selectedInUse) { + if (columnVector.noNulls) { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + longColumnCache.dataCache[rowCount + i] = vector[vectorizedRowBatch.selected[i]]; + } + } else { + for (int i = 0; i < vectorizedRowBatch.size; i++) { + if (columnVector.isNull[vectorizedRowBatch.selected[i]]) { + longColumnCache.isNull[rowCount + i] = true; + } else { + longColumnCache.dataCache[rowCount + i] = vector[vectorizedRowBatch.selected[i]]; + } + } + } + } else { + if (columnVector.noNulls) { + System.arraycopy(vector, 0, longColumnCache.dataCache, rowCount, vectorizedRowBatch.size); + } else { + System.arraycopy(vector, 0, longColumnCache.dataCache, rowCount, vectorizedRowBatch.size); + System.arraycopy(columnVector.isNull, 0, longColumnCache.isNull, rowCount, vectorizedRowBatch.size); + } + } + } + + @Override + public ColumnVector getColumnVectorFromOmniVec(Vec vec, int start, int end) { + LongColumnVector longColumnVector = new LongColumnVector(); + for (int i = start; i < end; i++) { + if (vec.isNull(i)) { + longColumnVector.vector[i - start] = 1L; + longColumnVector.isNull[i - start] = true; + longColumnVector.noNulls = false; + continue; + } + long value; + if (vec instanceof DictionaryVec) { + DictionaryVec dictionaryVec = (DictionaryVec) vec; + value = dictionaryVec.getLong(i); + } else { + LongVec longVec = (LongVec) vec; + value = longVec.get(i); + } + longColumnVector.vector[i - start] = value; + } + return longColumnVector; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/VecConverter.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/VecConverter.java new file mode 100644 index 000000000..02ebf9915 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/converter/VecConverter.java @@ -0,0 +1,139 @@ +package com.huawei.boostkit.hive.converter; + +import com.huawei.boostkit.hive.cache.ColumnCache; +import com.huawei.boostkit.hive.cache.VectorCache; + +import nova.hetu.omniruntime.vector.Vec; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; + +import java.util.HashMap; +import java.util.Map; + +public interface VecConverter { + Map CONVERTER_MAP = + new HashMap() { + { + put(PrimitiveObjectInspector.PrimitiveCategory.BYTE, new ByteVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.SHORT, new ShortVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.INT, new IntVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.LONG, new LongVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN, new BooleanVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.FLOAT, new DoubleVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.DOUBLE, new DoubleVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.STRING, new StringVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.CHAR, new CharVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.VARCHAR, new VarcharVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP, new TimestampVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.DATE, new DateVecConverter()); + put(PrimitiveObjectInspector.PrimitiveCategory.DECIMAL, new DecimalVecConverter()); + } + }; + + /** + * convert vec to java type data + * + * @param vec omni vec + * @param index the row index + * @return row data + */ + default Object fromOmniVec(Vec vec, int index) { + throw new RuntimeException(String.format("%s doesn't support fromOmniVec(Vec vec)", + this.getClass().getSimpleName())); + } + + /** + * convert vec to java type data + * + * @param vec omni vec + * @param index the row index + * @param primitiveObjectInspector primitiveObjectInspector + * @return row data + */ + default Object fromOmniVec(Vec vec, int index, PrimitiveObjectInspector primitiveObjectInspector) { + return fromOmniVec(vec, index); + } + + /** + * convert data from hive type to java type + * + * @param col hive data + * @return converted data + */ + default Object calculateValue(Object col) { + throw new RuntimeException(String.format("%s doesn't support calculate(Object col)", + this.getClass().getSimpleName())); + } + + /** + * convert data from hive type to java type + * + * @param col hive data + * @param primitiveTypeInfo primitiveTypeInfo + * @return converted data + */ + default Object calculateValue(Object col, PrimitiveTypeInfo primitiveTypeInfo) { + return calculateValue(col); + } + + /** + * convert data to vec + * + * @param col data + * @param columnSize columnSize + * @return omni vec + */ + default Vec toOmniVec(Object[] col, int columnSize) { + throw new RuntimeException(String.format("%s doesn't support toOmniVec(Object[] col, int columnSize)", + this.getClass().getSimpleName())); + } + + default Vec toOmniVec(Object[] col, int columnSize, PrimitiveTypeInfo primitiveTypeInfo) { + return toOmniVec(col, columnSize); + } + + default Vec toOmniVec(ColumnCache columnCache, int columnSize, PrimitiveTypeInfo primitiveTypeInfo) { + return toOmniVec(columnCache, columnSize); + } + + default Vec toOmniVec(ColumnCache columnCache, int columnSize) { + throw new RuntimeException(String.format("%s doesn't support toOmniVec(ColumnCache columnCache,int columnSize)", + this.getClass().getSimpleName())); + } + + default void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + VectorCache vectorCache, int colIndex, int rowCount) { + throw new RuntimeException(String.format("%s doesn't support setValueFromColumnVector(VectorizedRowBatch " + + "vectorizedRowBatch, int vectorColIndex, VectorCache vectorCache,int colIndex,int rowCount)", + this.getClass().getSimpleName())); + } + + default void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + VectorCache vectorCache, int colIndex, int rowCount, + PrimitiveTypeInfo primitiveTypeInfo) { + setValueFromColumnVector(vectorizedRowBatch, vectorColIndex, vectorCache, colIndex, rowCount); + } + + default void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + ColumnCache columnCache, int colIndex, int rowCount) { + } + + default void setValueFromColumnVector(VectorizedRowBatch vectorizedRowBatch, int vectorColIndex, + ColumnCache columnCache, int colIndex, int rowCount, + PrimitiveTypeInfo primitiveTypeInfo) { + setValueFromColumnVector(vectorizedRowBatch, vectorColIndex, columnCache, colIndex, rowCount); + } + + default ColumnVector getColumnVectorFromOmniVec(Vec vec, int start, int end) { + throw new RuntimeException(String.format("%s doesn't support getColumnVectorFromOmniVec(Vec vec, int start, " + + "int end)", this.getClass().getSimpleName())); + } + + default ColumnVector getColumnVectorFromOmniVec(Vec vec, int start, int end, + PrimitiveObjectInspector primitiveObjectInspector) { + return getColumnVectorFromOmniVec(vec, start, end); + } +} -- Gitee