diff --git a/omnidata/omnidata-spark-connector/connector/pom.xml b/omnidata/omnidata-spark-connector/connector/pom.xml
index dda5fe43f387e6d1fa7346321109b821f8cd0322..0cb895b9888435d682020e2a47872788ffc1083f 100644
--- a/omnidata/omnidata-spark-connector/connector/pom.xml
+++ b/omnidata/omnidata-spark-connector/connector/pom.xml
@@ -24,6 +24,7 @@
2.12.0
1.6.1
1.35.0
+ 2.12
@@ -73,9 +74,99 @@
curator-recipes
${dep.curator.version}
+
+
+
+ io.airlift
+ log
+ 206
+ test
+
+
+ io.airlift
+ stats
+ 206
+ test
+
+
+ org.apache.lucene
+ lucene-analyzers-common
+ 7.2.1
+ test
+
+
+ it.unimi.dsi
+ fastutil
+ 6.5.9
+ test
+
+
+ io.airlift
+ bytecode
+ 1.2
+ test
+
+
+ io.hetu.core
+ presto-parser
+ ${dep.hetu.version}
+ test
+
+
+ io.airlift
+ json
+ 206
+ test
+
+
+ org.testng
+ testng
+ 6.10
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.9.5
+ test
+
+
+ objenesis
+ org.objenesis
+
+
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ 3.2.3
+ test
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ test-jar
+ test
+ ${spark.version}
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ test-jar
+ ${spark.version}
+ test
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ test-jar
+ test
+ ${spark.version}
+
src/main/scala
+ src/test/java
org.codehaus.mojo
diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java
index 7802b7b5ab0fbd0880b123f6ae69ac6cf1acf546..f0c8c1fc282a8dc471178bf5f9db98f7172f3761 100644
--- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java
+++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java
@@ -18,9 +18,9 @@
package com.huawei.boostkit.omnidata.spark;
+import org.apache.spark.sql.execution.vectorized.OmniColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
@@ -44,6 +44,12 @@ public class PageDeRunLength {
}
}
+ private final boolean isOperatorCombineEnabled;
+
+ public PageDeRunLength(boolean isOperatorCombineEnabled) {
+ this.isOperatorCombineEnabled = isOperatorCombineEnabled;
+ }
+
/**
* decompress byteColumnVector
*
@@ -54,7 +60,8 @@ public class PageDeRunLength {
public Optional decompressByteArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
byte value = writableColumnVector.getByte(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -78,7 +85,8 @@ public class PageDeRunLength {
public Optional decompressBooleanArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
boolean value = writableColumnVector.getBoolean(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -102,7 +110,8 @@ public class PageDeRunLength {
public Optional decompressIntArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
int value = writableColumnVector.getInt(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -126,7 +135,8 @@ public class PageDeRunLength {
public Optional decompressShortArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
short value = writableColumnVector.getShort(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -150,7 +160,8 @@ public class PageDeRunLength {
public Optional decompressLongArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
long value = writableColumnVector.getLong(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -174,7 +185,8 @@ public class PageDeRunLength {
public Optional decompressFloatArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
float value = writableColumnVector.getFloat(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -198,7 +210,8 @@ public class PageDeRunLength {
public Optional decompressDoubleArray(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
double value = writableColumnVector.getDouble(0);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -221,7 +234,8 @@ public class PageDeRunLength {
*/
public Optional decompressVariableWidth(int positionCount,
WritableColumnVector writableColumnVector) throws Exception {
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.StringType);
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
if (writableColumnVector.isNullAt(0)) {
columnVector.putNulls(0, positionCount);
} else {
@@ -247,7 +261,8 @@ public class PageDeRunLength {
int precision = ((DecimalType) writableColumnVector.dataType()).precision();
int scale = ((DecimalType) writableColumnVector.dataType()).scale();
Decimal value = writableColumnVector.getDecimal(0, precision, scale);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, writableColumnVector.dataType());
+ WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount,
+ writableColumnVector);
for (int rowId = 0; rowId < positionCount; rowId++) {
if (writableColumnVector.isNullAt(rowId)) {
columnVector.putNull(rowId);
@@ -262,4 +277,15 @@ public class PageDeRunLength {
}
return Optional.of(columnVector);
}
-}
+
+ private WritableColumnVector getColumnVector(boolean isOperatorCombineEnabled, int positionCount,
+ WritableColumnVector writableColumnVector) {
+ WritableColumnVector columnVector ;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, writableColumnVector.dataType(), true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, writableColumnVector.dataType());
+ }
+ return columnVector;
+ }
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java
index 5d46338bdb5d2791491d53623dddc6725d588c55..3eb827103eecfceed7c601d439fc480d290208f3 100644
--- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java
+++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java
@@ -30,13 +30,16 @@ import com.huawei.boostkit.omnidata.exception.OmniDataException;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.prestosql.spi.type.DateType;
-
import io.prestosql.spi.type.Decimals;
+
+import org.apache.spark.sql.execution.vectorized.OmniColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
@@ -53,6 +56,11 @@ import java.util.TimeZone;
* @since 2021-03-30
*/
public class PageDecoding extends AbstractDecoding> {
+ private static final Logger LOG = LoggerFactory.getLogger(PageDecoding.class);
+
+ /**
+ * Log appended files.
+ */
private static Field filedElementsAppended;
static {
@@ -64,73 +72,51 @@ public class PageDecoding extends AbstractDecoding decodeArray(Optional type, SliceInput sliceInput) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("not support array decode");
}
@Override
public Optional decodeByteArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putByte(position, sliceInput.readByte());
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "byte");
}
@Override
public Optional decodeBooleanArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- boolean value = sliceInput.readByte() != 0;
- columnVector.putBoolean(position, value);
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.BooleanType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "boolean");
}
@Override
public Optional decodeIntArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putInt(position, sliceInput.readInt());
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.IntegerType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "int");
}
@Override
@@ -141,85 +127,44 @@ public class PageDecoding extends AbstractDecoding decodeShortArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putShort(position, sliceInput.readShort());
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.ShortType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType);
}
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "short");
}
@Override
public Optional decodeLongArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putLong(position, sliceInput.readLong());
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.LongType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "long");
}
@Override
public Optional decodeFloatArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putFloat(position, intBitsToFloat(sliceInput.readInt()));
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "float");
}
@Override
public Optional decodeDoubleArray(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putDouble(position, longBitsToDouble(sliceInput.readLong()));
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.DoubleType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "double");
}
@Override
@@ -242,7 +187,12 @@ public class PageDecoding extends AbstractDecoding decodeDate(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DateType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putInt(position, sliceInput.readInt());
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.DateType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.DateType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "date");
}
@Override
public Optional decodeLongToInt(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putInt(position, (int) sliceInput.readLong());
- } else {
- columnVector.putNull(position);
- }
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.IntegerType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType);
}
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToInt");
}
@Override
public Optional decodeLongToShort(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putShort(position, (short) sliceInput.readLong());
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, DataTypes.ShortType, true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType);
}
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToShort");
}
@Override
public Optional decodeLongToByte(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putByte(position, (byte) sliceInput.readLong());
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToByte");
}
@Override
public Optional decodeLongToFloat(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- columnVector.putFloat(position, intBitsToFloat((int) sliceInput.readLong()));
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToFloat");
}
@Override
public Optional decodeDecimal(Optional type, SliceInput sliceInput, String decodeName) {
int positionCount = sliceInput.readInt();
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
- if (!(type.get() instanceof DecimalDecodeType)) {
- Optional.empty();
+ DecimalDecodeType decimalDecodeType;
+ if ((type.get() instanceof DecimalDecodeType)) {
+ decimalDecodeType = (DecimalDecodeType) type.get();
+ } else {
+ return Optional.empty();
}
- DecimalDecodeType decimalDecodeType = (DecimalDecodeType) type.get();
int scale = decimalDecodeType.getScale();
int precision = decimalDecodeType.getPrecision();
- OnHeapColumnVector columnVector = new OnHeapColumnVector(positionCount, new DecimalType(precision, scale));
+ WritableColumnVector columnVector;
+ if (isOperatorCombineEnabled) {
+ columnVector = new OmniColumnVector(positionCount, new DecimalType(precision, scale), true);
+ } else {
+ columnVector = new OnHeapColumnVector(positionCount, new DecimalType(precision, scale));
+ }
+ boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
for (int position = 0; position < positionCount; position++) {
if (valueIsNull == null || !valueIsNull[position]) {
- BigInteger value = null;
+ BigInteger value;
switch (decodeName) {
case "LONG_ARRAY":
value = BigInteger.valueOf(sliceInput.readLong());
@@ -455,24 +361,8 @@ public class PageDecoding extends AbstractDecoding decodeTimestamp(Optional type, SliceInput sliceInput) {
int positionCount = sliceInput.readInt();
-
- boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, TimestampType);
- for (int position = 0; position < positionCount; position++) {
- if (valueIsNull == null || !valueIsNull[position]) {
- // milliseconds to microsecond
- int rawOffset = TimeZone.getDefault().getRawOffset();
- columnVector.putLong(position, (sliceInput.readLong() - rawOffset) * 1000);
- } else {
- columnVector.putNull(position);
- }
- }
- try {
- PageDecoding.filedElementsAppended.set(columnVector, positionCount);
- } catch (Exception e) {
- throw new OmniDataException(e.getMessage());
- }
- return Optional.of(columnVector);
+ return getWritableColumnVector(sliceInput, positionCount, columnVector, "timestamp");
}
private Optional typeToDecodeName(Optional optType) {
@@ -513,4 +403,67 @@ public class PageDecoding extends AbstractDecoding getWritableColumnVector(SliceInput sliceInput, int positionCount,
+ WritableColumnVector columnVector, String type) {
+ boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
+ for (int position = 0; position < positionCount; position++) {
+ if (valueIsNull == null || !valueIsNull[position]) {
+ putData(columnVector, sliceInput, position, type);
+ } else {
+ columnVector.putNull(position);
+ }
+ }
+ try {
+ PageDecoding.filedElementsAppended.set(columnVector, positionCount);
+ } catch (Exception e) {
+ throw new OmniDataException(e.getMessage());
+ }
+ return Optional.of(columnVector);
+ }
+
+ private void putData(WritableColumnVector columnVector, SliceInput sliceInput, int position, String type) {
+ switch (type) {
+ case "byte":
+ columnVector.putByte(position, sliceInput.readByte());
+ break;
+ case "boolean":
+ columnVector.putBoolean(position, sliceInput.readByte() != 0);
+ break;
+ case "int":
+ case "date":
+ columnVector.putInt(position, sliceInput.readInt());
+ break;
+ case "short":
+ columnVector.putShort(position, sliceInput.readShort());
+ break;
+ case "long":
+ columnVector.putLong(position, sliceInput.readLong());
+ break;
+ case "float":
+ columnVector.putFloat(position, intBitsToFloat(sliceInput.readInt()));
+ break;
+ case "double":
+ columnVector.putDouble(position, longBitsToDouble(sliceInput.readLong()));
+ break;
+ case "longToInt":
+ columnVector.putInt(position, (int) sliceInput.readLong());
+ break;
+ case "longToShort":
+ columnVector.putShort(position, (short) sliceInput.readLong());
+ break;
+ case "longToByte":
+ columnVector.putByte(position, (byte) sliceInput.readLong());
+ break;
+ case "longToFloat":
+ columnVector.putFloat(position, intBitsToFloat((int) sliceInput.readLong()));
+ break;
+ case "timestamp":
+ // milliseconds to microsecond
+ int rawOffset = TimeZone.getDefault().getRawOffset();
+ columnVector.putLong(position, (sliceInput.readLong() - rawOffset) * 1000);
+ break;
+ default:
+ }
+ }
}
diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java
index 062afec51d9ac89a0ec95e27498a18c30e0823fc..656aa69e77b01c763b0d5ddcb24b00217f91682c 100644
--- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java
+++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java
@@ -30,6 +30,8 @@ import io.airlift.slice.SliceInput;
import io.hetu.core.transport.execution.buffer.SerializedPage;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Optional;
@@ -39,16 +41,28 @@ import java.util.Optional;
* @since 2021-03-30
*/
public class PageDeserializer implements Deserializer {
+ private static final Logger LOG = LoggerFactory.getLogger(PageDeserializer.class);
+
private final PageDecoding decoding;
private final DecodeType[] columnTypes;
private final int[] columnOrders;
- public PageDeserializer(DecodeType[] columnTypes, int[] columnOrders) {
+ /**
+ * initialize page deserializer
+ *
+ * @param columnTypes column type
+ * @param columnOrders column index
+ * @param isOperatorCombineEnabled whether combine is enabled
+ */
+ public PageDeserializer(DecodeType[] columnTypes, int[] columnOrders, boolean isOperatorCombineEnabled) {
this.columnTypes = columnTypes;
- this.decoding = new PageDecoding();
+ this.decoding = new PageDecoding(isOperatorCombineEnabled);
this.columnOrders = columnOrders;
+ if (isOperatorCombineEnabled) {
+ LOG.info("OmniRuntime PushDown deserialization info: deserialize to OmniColumnVector");
+ }
}
@Override
@@ -56,6 +70,7 @@ public class PageDeserializer implements Deserializer {
if (page.isEncrypted()) {
throw new UnsupportedOperationException("unsupported compressed page.");
}
+
SliceInput sliceInput = page.getSlice().getInput();
int numberOfBlocks = sliceInput.readInt();
int returnLength = columnOrders.length;
@@ -88,5 +103,4 @@ public class PageDeserializer implements Deserializer {
}
return columnVectors;
}
-
-}
+}
\ No newline at end of file
diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java
index 57dc84a1beba09dfde351c5272f253e1d15f89a7..9f8b928a8815fba5bdaa9f061f23d1d60a10df9b 100644
--- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java
+++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java
@@ -26,8 +26,6 @@ import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import com.huawei.boostkit.omnidata.decode.type.DecodeType;
import com.huawei.boostkit.omnidata.decode.type.LongDecodeType;
import com.huawei.boostkit.omnidata.decode.type.RowDecodeType;
-import com.huawei.boostkit.omnidata.exception.OmniDataException;
-import com.huawei.boostkit.omnidata.exception.OmniErrorCode;
import com.huawei.boostkit.omnidata.model.AggregationInfo;
import com.huawei.boostkit.omnidata.model.Column;
import com.huawei.boostkit.omnidata.model.Predicate;
@@ -83,7 +81,6 @@ import org.apache.spark.sql.catalyst.expressions.Or;
import org.apache.spark.sql.catalyst.expressions.Remainder;
import org.apache.spark.sql.catalyst.expressions.Subtract;
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction;
-import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.ndp.AggExeInfo;
import org.apache.spark.sql.execution.ndp.FilterExeInfo;
import org.apache.spark.sql.execution.ndp.PushDownInfo;
@@ -126,7 +123,7 @@ public class DataIoAdapter {
private boolean hasNextPage = false;
- private DataReaderImpl orcDataReader = null;
+ private DataReaderImpl orcDataReader = null;
private List columnTypesList = new ArrayList<>();
@@ -156,6 +153,10 @@ public class DataIoAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DataIoAdapter.class);
+ private boolean isPushDownAgg = true;
+
+ private boolean isOperatorCombineEnabled;
+
/**
* Contact with Omni-Data-Server
*
@@ -177,69 +178,81 @@ public class DataIoAdapter {
// initCandidates
initCandidates(pageCandidate, filterOutPut);
- // create AggregationInfo
- // init agg candidates
- List partitionColumnBatch = JavaConverters.seqAsJavaList(partitionColumn);
- for (Attribute attribute : partitionColumnBatch) {
- partitionColumnName.add(attribute.name());
- }
- List aggExecutionList =
- JavaConverters.seqAsJavaList(pushDownOperators.aggExecutions());
- if (aggExecutionList.size() == 0) {
+ // add partition column
+ JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name()));
+
+ // init column info
+ if (pushDownOperators.aggExecutions().size() == 0) {
+ isPushDownAgg = false;
initColumnInfo(sparkOutPut);
}
- DataSource dataSource = initDataSource(pageCandidate);
- RowExpression rowExpression = initFilter(pushDownOperators.filterExecutions());
- Optional prestoFilter = rowExpression == null ?
- Optional.empty() : Optional.of(rowExpression);
- Optional aggregations =
- initAggAndGroupInfo(aggExecutionList);
- // create limitLong
+
+ // create filter
+ Optional filterRowExpression = initFilter(pushDownOperators.filterExecutions());
+
+ // create agg
+ Optional aggregations = initAggAndGroupInfo(pushDownOperators.aggExecutions());
+
+ // create limit
OptionalLong limitLong = NdpUtils.convertLimitExeInfo(pushDownOperators.limitExecution());
+ // create TaskSource
+ DataSource dataSource = initDataSource(pageCandidate);
Predicate predicate = new Predicate(
- omnidataTypes, omnidataColumns, prestoFilter, omnidataProjections,
+ omnidataTypes, omnidataColumns, filterRowExpression, omnidataProjections,
ImmutableMap.of(), ImmutableMap.of(), aggregations, limitLong);
TaskSource taskSource = new TaskSource(dataSource, predicate, 1048576);
+
+ // create deserializer
+ this.isOperatorCombineEnabled =
+ pageCandidate.isOperatorCombineEnabled() && NdpUtils.checkOmniOpColumns(omnidataColumns);
PageDeserializer deserializer = initPageDeserializer();
- WritableColumnVector[] page = null;
- int failedTimes = 0;
- String[] sdiHostArray = pageCandidate.getSdiHosts().split(",");
- int randomIndex = (int) (Math.random() * sdiHostArray.length);
- List sdiHostList = new ArrayList<>(Arrays.asList(sdiHostArray));
- Optional availableSdiHost = getRandomAvailableSdiHost(sdiHostArray,
+
+ // get available host
+ String[] pushDownHostArray = pageCandidate.getpushDownHosts().split(",");
+ List pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray));
+ Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray,
JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts()));
- availableSdiHost.ifPresent(sdiHostList::add);
- Iterator sdiHosts = sdiHostList.iterator();
- Set sdiHostSet = new HashSet<>();
- sdiHostSet.add(sdiHostArray[randomIndex]);
- while (sdiHosts.hasNext()) {
- String sdiHost;
+ availablePushDownHost.ifPresent(pushDownHostList::add);
+ return getIterator(pushDownHostList.iterator(), taskSource, pushDownHostArray, deserializer,
+ pushDownHostList.size());
+ }
+
+ private Iterator getIterator(Iterator pushDownHosts, TaskSource taskSource,
+ String[] pushDownHostArray, PageDeserializer deserializer,
+ int pushDownHostsSize) throws UnknownHostException {
+ int randomIndex = (int) (Math.random() * pushDownHostArray.length);
+ int failedTimes = 0;
+ WritableColumnVector[] page = null;
+ Set pushDownHostSet = new HashSet<>();
+ pushDownHostSet.add(pushDownHostArray[randomIndex]);
+ while (pushDownHosts.hasNext()) {
+ String pushDownHost;
if (failedTimes == 0) {
- sdiHost = sdiHostArray[randomIndex];
+ pushDownHost = pushDownHostArray[randomIndex];
} else {
- sdiHost = sdiHosts.next();
- if (sdiHostSet.contains(sdiHost)) {
+ pushDownHost = pushDownHosts.next();
+ if (pushDownHostSet.contains(pushDownHost)) {
continue;
}
}
- String ipAddress = InetAddress.getByName(sdiHost).getHostAddress();
+ String ipAddress = InetAddress.getByName(pushDownHost).getHostAddress();
Properties properties = new Properties();
properties.put("omnidata.client.target.list", ipAddress);
properties.put("omnidata.client.task.timeout", taskTimeout);
- LOG.info("Push down node info: [hostname :{} ,ip :{}]", sdiHost, ipAddress);
+ LOG.info("Push down node info: [hostname :{} ,ip :{}]", pushDownHost, ipAddress);
try {
- orcDataReader = new DataReaderImpl(
+ orcDataReader = new DataReaderImpl<>(
properties, taskSource, deserializer);
hasNextPage = true;
- page = (WritableColumnVector[]) orcDataReader.getNextPageBlocking();
+ page = orcDataReader.getNextPageBlocking();
if (orcDataReader.isFinished()) {
orcDataReader.close();
hasNextPage = false;
}
break;
} catch (Exception e) {
- LOG.warn("Push down failed node info [hostname :{} ,ip :{}]", sdiHost, ipAddress, e);
+ LOG.warn("Push down failed node info [hostname :{} ,ip :{}]", pushDownHost, ipAddress, e);
++failedTimes;
if (orcDataReader != null) {
orcDataReader.close();
@@ -247,7 +260,7 @@ public class DataIoAdapter {
}
}
}
- int retryTime = Math.min(TASK_FAILED_TIMES, sdiHostList.size());
+ int retryTime = Math.min(TASK_FAILED_TIMES, pushDownHostsSize);
if (failedTimes >= retryTime) {
LOG.warn("No Omni-data-server to Connect, Task has tried {} times.", retryTime);
throw new TaskExecutionException("No Omni-data-server to Connect");
@@ -257,8 +270,9 @@ public class DataIoAdapter {
return l.iterator();
}
- private Optional getRandomAvailableSdiHost(String[] sdiHostArray, Map fpuHosts) {
- List existingHosts = Arrays.asList(sdiHostArray);
+ private Optional getRandomAvailablePushDownHost(String[] pushDownHostArray,
+ Map fpuHosts) {
+ List existingHosts = Arrays.asList(pushDownHostArray);
List allHosts = new ArrayList<>(fpuHosts.values());
allHosts.removeAll(existingHosts);
if (allHosts.size() > 0) {
@@ -270,20 +284,20 @@ public class DataIoAdapter {
}
public boolean hasNextIterator(List