diff --git a/omnidata/omnidata-spark-connector/connector/pom.xml b/omnidata/omnidata-spark-connector/connector/pom.xml index dda5fe43f387e6d1fa7346321109b821f8cd0322..0cb895b9888435d682020e2a47872788ffc1083f 100644 --- a/omnidata/omnidata-spark-connector/connector/pom.xml +++ b/omnidata/omnidata-spark-connector/connector/pom.xml @@ -24,6 +24,7 @@ 2.12.0 1.6.1 1.35.0 + 2.12 @@ -73,9 +74,99 @@ curator-recipes ${dep.curator.version} + + + + io.airlift + log + 206 + test + + + io.airlift + stats + 206 + test + + + org.apache.lucene + lucene-analyzers-common + 7.2.1 + test + + + it.unimi.dsi + fastutil + 6.5.9 + test + + + io.airlift + bytecode + 1.2 + test + + + io.hetu.core + presto-parser + ${dep.hetu.version} + test + + + io.airlift + json + 206 + test + + + org.testng + testng + 6.10 + test + + + org.mockito + mockito-core + 1.9.5 + test + + + objenesis + org.objenesis + + + + + org.scalatest + scalatest_${scala.binary.version} + 3.2.3 + test + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + ${spark.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + ${spark.version} + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + test + ${spark.version} + src/main/scala + src/test/java org.codehaus.mojo diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java index 7802b7b5ab0fbd0880b123f6ae69ac6cf1acf546..f0c8c1fc282a8dc471178bf5f9db98f7172f3761 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java @@ -18,9 +18,9 @@ package com.huawei.boostkit.omnidata.spark; +import org.apache.spark.sql.execution.vectorized.OmniColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.DecimalType; @@ -44,6 +44,12 @@ public class PageDeRunLength { } } + private final boolean isOperatorCombineEnabled; + + public PageDeRunLength(boolean isOperatorCombineEnabled) { + this.isOperatorCombineEnabled = isOperatorCombineEnabled; + } + /** * decompress byteColumnVector * @@ -54,7 +60,8 @@ public class PageDeRunLength { public Optional decompressByteArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { byte value = writableColumnVector.getByte(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -78,7 +85,8 @@ public class PageDeRunLength { public Optional decompressBooleanArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { boolean value = writableColumnVector.getBoolean(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -102,7 +110,8 @@ public class PageDeRunLength { public Optional decompressIntArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { int value = writableColumnVector.getInt(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -126,7 +135,8 @@ public class PageDeRunLength { public Optional decompressShortArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { short value = writableColumnVector.getShort(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -150,7 +160,8 @@ public class PageDeRunLength { public Optional decompressLongArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { long value = writableColumnVector.getLong(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -174,7 +185,8 @@ public class PageDeRunLength { public Optional decompressFloatArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { float value = writableColumnVector.getFloat(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -198,7 +210,8 @@ public class PageDeRunLength { public Optional decompressDoubleArray(int positionCount, WritableColumnVector writableColumnVector) throws Exception { double value = writableColumnVector.getDouble(0); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -221,7 +234,8 @@ public class PageDeRunLength { */ public Optional decompressVariableWidth(int positionCount, WritableColumnVector writableColumnVector) throws Exception { - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.StringType); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); if (writableColumnVector.isNullAt(0)) { columnVector.putNulls(0, positionCount); } else { @@ -247,7 +261,8 @@ public class PageDeRunLength { int precision = ((DecimalType) writableColumnVector.dataType()).precision(); int scale = ((DecimalType) writableColumnVector.dataType()).scale(); Decimal value = writableColumnVector.getDecimal(0, precision, scale); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, writableColumnVector.dataType()); + WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, + writableColumnVector); for (int rowId = 0; rowId < positionCount; rowId++) { if (writableColumnVector.isNullAt(rowId)) { columnVector.putNull(rowId); @@ -262,4 +277,15 @@ public class PageDeRunLength { } return Optional.of(columnVector); } -} + + private WritableColumnVector getColumnVector(boolean isOperatorCombineEnabled, int positionCount, + WritableColumnVector writableColumnVector) { + WritableColumnVector columnVector ; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, writableColumnVector.dataType(), true); + } else { + columnVector = new OnHeapColumnVector(positionCount, writableColumnVector.dataType()); + } + return columnVector; + } +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java index 5d46338bdb5d2791491d53623dddc6725d588c55..3eb827103eecfceed7c601d439fc480d290208f3 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDecoding.java @@ -30,13 +30,16 @@ import com.huawei.boostkit.omnidata.exception.OmniDataException; import io.airlift.slice.SliceInput; import io.airlift.slice.Slices; import io.prestosql.spi.type.DateType; - import io.prestosql.spi.type.Decimals; + +import org.apache.spark.sql.execution.vectorized.OmniColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.DecimalType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -53,6 +56,11 @@ import java.util.TimeZone; * @since 2021-03-30 */ public class PageDecoding extends AbstractDecoding> { + private static final Logger LOG = LoggerFactory.getLogger(PageDecoding.class); + + /** + * Log appended files. + */ private static Field filedElementsAppended; static { @@ -64,73 +72,51 @@ public class PageDecoding extends AbstractDecoding decodeArray(Optional type, SliceInput sliceInput) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("not support array decode"); } @Override public Optional decodeByteArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putByte(position, sliceInput.readByte()); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "byte"); } @Override public Optional decodeBooleanArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - boolean value = sliceInput.readByte() != 0; - columnVector.putBoolean(position, value); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.BooleanType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.BooleanType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "boolean"); } @Override public Optional decodeIntArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putInt(position, sliceInput.readInt()); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.IntegerType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "int"); } @Override @@ -141,85 +127,44 @@ public class PageDecoding extends AbstractDecoding decodeShortArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putShort(position, sliceInput.readShort()); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.ShortType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType); } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "short"); } @Override public Optional decodeLongArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putLong(position, sliceInput.readLong()); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.LongType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.LongType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "long"); } @Override public Optional decodeFloatArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putFloat(position, intBitsToFloat(sliceInput.readInt())); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "float"); } @Override public Optional decodeDoubleArray(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putDouble(position, longBitsToDouble(sliceInput.readLong())); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.DoubleType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.DoubleType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "double"); } @Override @@ -242,7 +187,12 @@ public class PageDecoding extends AbstractDecoding decodeDate(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.DateType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putInt(position, sliceInput.readInt()); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.DateType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.DateType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "date"); } @Override public Optional decodeLongToInt(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putInt(position, (int) sliceInput.readLong()); - } else { - columnVector.putNull(position); - } + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.IntegerType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.IntegerType); } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToInt"); } @Override public Optional decodeLongToShort(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putShort(position, (short) sliceInput.readLong()); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, DataTypes.ShortType, true); + } else { + columnVector = new OnHeapColumnVector(positionCount, DataTypes.ShortType); } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToShort"); } @Override public Optional decodeLongToByte(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.ByteType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putByte(position, (byte) sliceInput.readLong()); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToByte"); } @Override public Optional decodeLongToFloat(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, DataTypes.FloatType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - columnVector.putFloat(position, intBitsToFloat((int) sliceInput.readLong())); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "longToFloat"); } @Override public Optional decodeDecimal(Optional type, SliceInput sliceInput, String decodeName) { int positionCount = sliceInput.readInt(); - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); - if (!(type.get() instanceof DecimalDecodeType)) { - Optional.empty(); + DecimalDecodeType decimalDecodeType; + if ((type.get() instanceof DecimalDecodeType)) { + decimalDecodeType = (DecimalDecodeType) type.get(); + } else { + return Optional.empty(); } - DecimalDecodeType decimalDecodeType = (DecimalDecodeType) type.get(); int scale = decimalDecodeType.getScale(); int precision = decimalDecodeType.getPrecision(); - OnHeapColumnVector columnVector = new OnHeapColumnVector(positionCount, new DecimalType(precision, scale)); + WritableColumnVector columnVector; + if (isOperatorCombineEnabled) { + columnVector = new OmniColumnVector(positionCount, new DecimalType(precision, scale), true); + } else { + columnVector = new OnHeapColumnVector(positionCount, new DecimalType(precision, scale)); + } + boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); for (int position = 0; position < positionCount; position++) { if (valueIsNull == null || !valueIsNull[position]) { - BigInteger value = null; + BigInteger value; switch (decodeName) { case "LONG_ARRAY": value = BigInteger.valueOf(sliceInput.readLong()); @@ -455,24 +361,8 @@ public class PageDecoding extends AbstractDecoding decodeTimestamp(Optional type, SliceInput sliceInput) { int positionCount = sliceInput.readInt(); - - boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); WritableColumnVector columnVector = new OnHeapColumnVector(positionCount, TimestampType); - for (int position = 0; position < positionCount; position++) { - if (valueIsNull == null || !valueIsNull[position]) { - // milliseconds to microsecond - int rawOffset = TimeZone.getDefault().getRawOffset(); - columnVector.putLong(position, (sliceInput.readLong() - rawOffset) * 1000); - } else { - columnVector.putNull(position); - } - } - try { - PageDecoding.filedElementsAppended.set(columnVector, positionCount); - } catch (Exception e) { - throw new OmniDataException(e.getMessage()); - } - return Optional.of(columnVector); + return getWritableColumnVector(sliceInput, positionCount, columnVector, "timestamp"); } private Optional typeToDecodeName(Optional optType) { @@ -513,4 +403,67 @@ public class PageDecoding extends AbstractDecoding getWritableColumnVector(SliceInput sliceInput, int positionCount, + WritableColumnVector columnVector, String type) { + boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); + for (int position = 0; position < positionCount; position++) { + if (valueIsNull == null || !valueIsNull[position]) { + putData(columnVector, sliceInput, position, type); + } else { + columnVector.putNull(position); + } + } + try { + PageDecoding.filedElementsAppended.set(columnVector, positionCount); + } catch (Exception e) { + throw new OmniDataException(e.getMessage()); + } + return Optional.of(columnVector); + } + + private void putData(WritableColumnVector columnVector, SliceInput sliceInput, int position, String type) { + switch (type) { + case "byte": + columnVector.putByte(position, sliceInput.readByte()); + break; + case "boolean": + columnVector.putBoolean(position, sliceInput.readByte() != 0); + break; + case "int": + case "date": + columnVector.putInt(position, sliceInput.readInt()); + break; + case "short": + columnVector.putShort(position, sliceInput.readShort()); + break; + case "long": + columnVector.putLong(position, sliceInput.readLong()); + break; + case "float": + columnVector.putFloat(position, intBitsToFloat(sliceInput.readInt())); + break; + case "double": + columnVector.putDouble(position, longBitsToDouble(sliceInput.readLong())); + break; + case "longToInt": + columnVector.putInt(position, (int) sliceInput.readLong()); + break; + case "longToShort": + columnVector.putShort(position, (short) sliceInput.readLong()); + break; + case "longToByte": + columnVector.putByte(position, (byte) sliceInput.readLong()); + break; + case "longToFloat": + columnVector.putFloat(position, intBitsToFloat((int) sliceInput.readLong())); + break; + case "timestamp": + // milliseconds to microsecond + int rawOffset = TimeZone.getDefault().getRawOffset(); + columnVector.putLong(position, (sliceInput.readLong() - rawOffset) * 1000); + break; + default: + } + } } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java index 062afec51d9ac89a0ec95e27498a18c30e0823fc..656aa69e77b01c763b0d5ddcb24b00217f91682c 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeserializer.java @@ -30,6 +30,8 @@ import io.airlift.slice.SliceInput; import io.hetu.core.transport.execution.buffer.SerializedPage; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Optional; @@ -39,16 +41,28 @@ import java.util.Optional; * @since 2021-03-30 */ public class PageDeserializer implements Deserializer { + private static final Logger LOG = LoggerFactory.getLogger(PageDeserializer.class); + private final PageDecoding decoding; private final DecodeType[] columnTypes; private final int[] columnOrders; - public PageDeserializer(DecodeType[] columnTypes, int[] columnOrders) { + /** + * initialize page deserializer + * + * @param columnTypes column type + * @param columnOrders column index + * @param isOperatorCombineEnabled whether combine is enabled + */ + public PageDeserializer(DecodeType[] columnTypes, int[] columnOrders, boolean isOperatorCombineEnabled) { this.columnTypes = columnTypes; - this.decoding = new PageDecoding(); + this.decoding = new PageDecoding(isOperatorCombineEnabled); this.columnOrders = columnOrders; + if (isOperatorCombineEnabled) { + LOG.info("OmniRuntime PushDown deserialization info: deserialize to OmniColumnVector"); + } } @Override @@ -56,6 +70,7 @@ public class PageDeserializer implements Deserializer { if (page.isEncrypted()) { throw new UnsupportedOperationException("unsupported compressed page."); } + SliceInput sliceInput = page.getSlice().getInput(); int numberOfBlocks = sliceInput.readInt(); int returnLength = columnOrders.length; @@ -88,5 +103,4 @@ public class PageDeserializer implements Deserializer { } return columnVectors; } - -} +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 57dc84a1beba09dfde351c5272f253e1d15f89a7..9f8b928a8815fba5bdaa9f061f23d1d60a10df9b 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -26,8 +26,6 @@ import static io.prestosql.spi.type.BooleanType.BOOLEAN; import com.huawei.boostkit.omnidata.decode.type.DecodeType; import com.huawei.boostkit.omnidata.decode.type.LongDecodeType; import com.huawei.boostkit.omnidata.decode.type.RowDecodeType; -import com.huawei.boostkit.omnidata.exception.OmniDataException; -import com.huawei.boostkit.omnidata.exception.OmniErrorCode; import com.huawei.boostkit.omnidata.model.AggregationInfo; import com.huawei.boostkit.omnidata.model.Column; import com.huawei.boostkit.omnidata.model.Predicate; @@ -83,7 +81,6 @@ import org.apache.spark.sql.catalyst.expressions.Or; import org.apache.spark.sql.catalyst.expressions.Remainder; import org.apache.spark.sql.catalyst.expressions.Subtract; import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction; -import org.apache.spark.sql.execution.datasources.PartitionedFile; import org.apache.spark.sql.execution.ndp.AggExeInfo; import org.apache.spark.sql.execution.ndp.FilterExeInfo; import org.apache.spark.sql.execution.ndp.PushDownInfo; @@ -126,7 +123,7 @@ public class DataIoAdapter { private boolean hasNextPage = false; - private DataReaderImpl orcDataReader = null; + private DataReaderImpl orcDataReader = null; private List columnTypesList = new ArrayList<>(); @@ -156,6 +153,10 @@ public class DataIoAdapter { private static final Logger LOG = LoggerFactory.getLogger(DataIoAdapter.class); + private boolean isPushDownAgg = true; + + private boolean isOperatorCombineEnabled; + /** * Contact with Omni-Data-Server * @@ -177,69 +178,81 @@ public class DataIoAdapter { // initCandidates initCandidates(pageCandidate, filterOutPut); - // create AggregationInfo - // init agg candidates - List partitionColumnBatch = JavaConverters.seqAsJavaList(partitionColumn); - for (Attribute attribute : partitionColumnBatch) { - partitionColumnName.add(attribute.name()); - } - List aggExecutionList = - JavaConverters.seqAsJavaList(pushDownOperators.aggExecutions()); - if (aggExecutionList.size() == 0) { + // add partition column + JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + + // init column info + if (pushDownOperators.aggExecutions().size() == 0) { + isPushDownAgg = false; initColumnInfo(sparkOutPut); } - DataSource dataSource = initDataSource(pageCandidate); - RowExpression rowExpression = initFilter(pushDownOperators.filterExecutions()); - Optional prestoFilter = rowExpression == null ? - Optional.empty() : Optional.of(rowExpression); - Optional aggregations = - initAggAndGroupInfo(aggExecutionList); - // create limitLong + + // create filter + Optional filterRowExpression = initFilter(pushDownOperators.filterExecutions()); + + // create agg + Optional aggregations = initAggAndGroupInfo(pushDownOperators.aggExecutions()); + + // create limit OptionalLong limitLong = NdpUtils.convertLimitExeInfo(pushDownOperators.limitExecution()); + // create TaskSource + DataSource dataSource = initDataSource(pageCandidate); Predicate predicate = new Predicate( - omnidataTypes, omnidataColumns, prestoFilter, omnidataProjections, + omnidataTypes, omnidataColumns, filterRowExpression, omnidataProjections, ImmutableMap.of(), ImmutableMap.of(), aggregations, limitLong); TaskSource taskSource = new TaskSource(dataSource, predicate, 1048576); + + // create deserializer + this.isOperatorCombineEnabled = + pageCandidate.isOperatorCombineEnabled() && NdpUtils.checkOmniOpColumns(omnidataColumns); PageDeserializer deserializer = initPageDeserializer(); - WritableColumnVector[] page = null; - int failedTimes = 0; - String[] sdiHostArray = pageCandidate.getSdiHosts().split(","); - int randomIndex = (int) (Math.random() * sdiHostArray.length); - List sdiHostList = new ArrayList<>(Arrays.asList(sdiHostArray)); - Optional availableSdiHost = getRandomAvailableSdiHost(sdiHostArray, + + // get available host + String[] pushDownHostArray = pageCandidate.getpushDownHosts().split(","); + List pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); + Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); - availableSdiHost.ifPresent(sdiHostList::add); - Iterator sdiHosts = sdiHostList.iterator(); - Set sdiHostSet = new HashSet<>(); - sdiHostSet.add(sdiHostArray[randomIndex]); - while (sdiHosts.hasNext()) { - String sdiHost; + availablePushDownHost.ifPresent(pushDownHostList::add); + return getIterator(pushDownHostList.iterator(), taskSource, pushDownHostArray, deserializer, + pushDownHostList.size()); + } + + private Iterator getIterator(Iterator pushDownHosts, TaskSource taskSource, + String[] pushDownHostArray, PageDeserializer deserializer, + int pushDownHostsSize) throws UnknownHostException { + int randomIndex = (int) (Math.random() * pushDownHostArray.length); + int failedTimes = 0; + WritableColumnVector[] page = null; + Set pushDownHostSet = new HashSet<>(); + pushDownHostSet.add(pushDownHostArray[randomIndex]); + while (pushDownHosts.hasNext()) { + String pushDownHost; if (failedTimes == 0) { - sdiHost = sdiHostArray[randomIndex]; + pushDownHost = pushDownHostArray[randomIndex]; } else { - sdiHost = sdiHosts.next(); - if (sdiHostSet.contains(sdiHost)) { + pushDownHost = pushDownHosts.next(); + if (pushDownHostSet.contains(pushDownHost)) { continue; } } - String ipAddress = InetAddress.getByName(sdiHost).getHostAddress(); + String ipAddress = InetAddress.getByName(pushDownHost).getHostAddress(); Properties properties = new Properties(); properties.put("omnidata.client.target.list", ipAddress); properties.put("omnidata.client.task.timeout", taskTimeout); - LOG.info("Push down node info: [hostname :{} ,ip :{}]", sdiHost, ipAddress); + LOG.info("Push down node info: [hostname :{} ,ip :{}]", pushDownHost, ipAddress); try { - orcDataReader = new DataReaderImpl( + orcDataReader = new DataReaderImpl<>( properties, taskSource, deserializer); hasNextPage = true; - page = (WritableColumnVector[]) orcDataReader.getNextPageBlocking(); + page = orcDataReader.getNextPageBlocking(); if (orcDataReader.isFinished()) { orcDataReader.close(); hasNextPage = false; } break; } catch (Exception e) { - LOG.warn("Push down failed node info [hostname :{} ,ip :{}]", sdiHost, ipAddress, e); + LOG.warn("Push down failed node info [hostname :{} ,ip :{}]", pushDownHost, ipAddress, e); ++failedTimes; if (orcDataReader != null) { orcDataReader.close(); @@ -247,7 +260,7 @@ public class DataIoAdapter { } } } - int retryTime = Math.min(TASK_FAILED_TIMES, sdiHostList.size()); + int retryTime = Math.min(TASK_FAILED_TIMES, pushDownHostsSize); if (failedTimes >= retryTime) { LOG.warn("No Omni-data-server to Connect, Task has tried {} times.", retryTime); throw new TaskExecutionException("No Omni-data-server to Connect"); @@ -257,8 +270,9 @@ public class DataIoAdapter { return l.iterator(); } - private Optional getRandomAvailableSdiHost(String[] sdiHostArray, Map fpuHosts) { - List existingHosts = Arrays.asList(sdiHostArray); + private Optional getRandomAvailablePushDownHost(String[] pushDownHostArray, + Map fpuHosts) { + List existingHosts = Arrays.asList(pushDownHostArray); List allHosts = new ArrayList<>(fpuHosts.values()); allHosts.removeAll(existingHosts); if (allHosts.size() > 0) { @@ -270,20 +284,20 @@ public class DataIoAdapter { } public boolean hasNextIterator(List pageList, PageToColumnar pageToColumnarClass, - PartitionedFile partitionFile, boolean isVectorizedReader) - throws Exception { + boolean isVectorizedReader) { if (!hasNextPage) { return false; } - WritableColumnVector[] page = (WritableColumnVector[]) orcDataReader.getNextPageBlocking(); + WritableColumnVector[] page = orcDataReader.getNextPageBlocking(); if (orcDataReader.isFinished()) { orcDataReader.close(); + hasNextPage = false; return false; } List l = new ArrayList<>(); l.add(page); pageList.addAll(pageToColumnarClass - .transPageToColumnar(l.iterator(), isVectorizedReader)); + .transPageToColumnar(l.iterator(), isVectorizedReader, isOperatorCombineEnabled)); return true; } @@ -305,6 +319,7 @@ public class DataIoAdapter { listAtt = JavaConverters.seqAsJavaList(filterOutPut); TASK_FAILED_TIMES = pageCandidate.getMaxFailedTimes(); taskTimeout = pageCandidate.getTaskTimeout(); + isPushDownAgg = true; } private RowExpression extractNamedExpression(Expression namedExpression) { @@ -327,9 +342,7 @@ public class DataIoAdapter { omnidataColumns.add(new Column(columnId, aggColumnName, prestoType, isPartitionKey, partitionValue)); columnNameSet.add(aggColumnName); - if (null == columnNameMap.get(aggColumnName)) { - columnNameMap.put(aggColumnName, columnNameMap.size()); - } + columnNameMap.computeIfAbsent(aggColumnName, k -> columnNameMap.size()); omnidataProjections.add(new InputReferenceExpression(aggProjectionId, prestoType)); } @@ -545,28 +558,6 @@ public class DataIoAdapter { new AggregationInfo(aggregationMap, groupingKeys)); } - private Optional extractAggAndGroupExpression( - List aggExecutionList) { - Optional resAggregationInfo = Optional.empty(); - for (AggExeInfo aggExeInfo : aggExecutionList) { - List aggregateExpressions = JavaConverters.seqAsJavaList( - aggExeInfo.aggregateExpressions()); - List namedExpressions = JavaConverters.seqAsJavaList( - aggExeInfo.groupingExpressions()); - resAggregationInfo = createAggregationInfo(aggregateExpressions, namedExpressions); - } - return resAggregationInfo; - } - - private RowExpression extractFilterExpression(Seq filterExecution) { - List filterExecutionList = JavaConverters.seqAsJavaList(filterExecution); - RowExpression resRowExpression = null; - for (FilterExeInfo filterExeInfo : filterExecutionList) { - resRowExpression = reverseExpressionTree(filterExeInfo.filter()); - } - return resRowExpression; - } - private RowExpression reverseExpressionTree(Expression filterExpression) { RowExpression resRowExpression = null; if (filterExpression == null) { @@ -599,7 +590,6 @@ public class DataIoAdapter { ExpressionOperator expressionOperType = ExpressionOperator.valueOf(filterExpression.getClass().getSimpleName()); Expression left; - Expression right; String operatorName; switch (expressionOperType) { case Or: @@ -624,7 +614,7 @@ public class DataIoAdapter { left = ((EqualTo) filterExpression).left(); } return getRowExpression(left, - "equal", rightExpressions); + "EQUAL", rightExpressions); case IsNotNull: Signature isnullSignature = new Signature( QualifiedObjectName.valueOfDefaultFunction("not"), @@ -644,11 +634,11 @@ public class DataIoAdapter { if (((LessThan) filterExpression).left() instanceof Literal) { rightExpressions.add(((LessThan) filterExpression).left()); left = ((LessThan) filterExpression).right(); - operatorName = "greater_than"; + operatorName = "GREATER_THAN"; } else { rightExpressions.add(((LessThan) filterExpression).right()); left = ((LessThan) filterExpression).left(); - operatorName = "less_than"; + operatorName = "LESS_THAN"; } return getRowExpression(left, operatorName, rightExpressions); @@ -656,11 +646,11 @@ public class DataIoAdapter { if (((GreaterThan) filterExpression).left() instanceof Literal) { rightExpressions.add(((GreaterThan) filterExpression).left()); left = ((GreaterThan) filterExpression).right(); - operatorName = "less_than"; + operatorName = "LESS_THAN"; } else { rightExpressions.add(((GreaterThan) filterExpression).right()); left = ((GreaterThan) filterExpression).left(); - operatorName = "greater_than"; + operatorName = "GREATER_THAN"; } return getRowExpression(left, operatorName, rightExpressions); @@ -668,11 +658,11 @@ public class DataIoAdapter { if (((GreaterThanOrEqual) filterExpression).left() instanceof Literal) { rightExpressions.add(((GreaterThanOrEqual) filterExpression).left()); left = ((GreaterThanOrEqual) filterExpression).right(); - operatorName = "less_than_or_equal"; + operatorName = "LESS_THAN_OR_EQUAL"; } else { rightExpressions.add(((GreaterThanOrEqual) filterExpression).right()); left = ((GreaterThanOrEqual) filterExpression).left(); - operatorName = "greater_than_or_equal"; + operatorName = "GREATER_THAN_OR_EQUAL"; } return getRowExpression(left, operatorName, rightExpressions); @@ -680,11 +670,11 @@ public class DataIoAdapter { if (((LessThanOrEqual) filterExpression).left() instanceof Literal) { rightExpressions.add(((LessThanOrEqual) filterExpression).left()); left = ((LessThanOrEqual) filterExpression).right(); - operatorName = "greater_than_or_equal"; + operatorName = "GREATER_THAN_OR_EQUAL"; } else { rightExpressions.add(((LessThanOrEqual) filterExpression).right()); left = ((LessThanOrEqual) filterExpression).left(); - operatorName = "less_than_or_equal"; + operatorName = "LESS_THAN_OR_EQUAL"; } return getRowExpression(left, operatorName, rightExpressions); @@ -729,9 +719,9 @@ public class DataIoAdapter { filterProjectionId = expressionInfo.getProjectionId(); } // deal with right expression - List argumentValues = new ArrayList<>(); + List argumentValues; List multiArguments = new ArrayList<>(); - int rightProjectionId = -1; + int rightProjectionId; RowExpression rowExpression; if (rightExpression != null && rightExpression.size() > 0 && rightExpression.get(0) instanceof AttributeReference) { @@ -756,9 +746,9 @@ public class DataIoAdapter { return rowExpression; } - // column projection赋值 + // column projection private int putFilterValue(Expression valueExpression, Type prestoType) { - // Filter赋值 + // set filter int columnId = NdpUtils.getColumnId(valueExpression.toString()) - columnOffset; String filterColumnName = valueExpression.toString().split("#")[0].toLowerCase(Locale.ENGLISH); if (null != fieldMap.get(filterColumnName)) { @@ -767,14 +757,17 @@ public class DataIoAdapter { boolean isPartitionKey = partitionColumnName.contains(filterColumnName); int filterProjectionId = fieldMap.size(); fieldMap.put(filterColumnName, filterProjectionId); - filterTypesList.add(NdpUtils.transDataIoDataType(valueExpression.dataType())); - filterOrdersList.add(filterProjectionId); + String partitionValue = NdpUtils.getPartitionValue(filePath, filterColumnName); columnNameSet.add(filterColumnName); - omnidataProjections.add(new InputReferenceExpression(filterProjectionId, prestoType)); omnidataColumns.add(new Column(columnId, filterColumnName, prestoType, isPartitionKey, partitionValue)); - omnidataTypes.add(prestoType); + if (isPushDownAgg) { + filterTypesList.add(NdpUtils.transDataIoDataType(valueExpression.dataType())); + filterOrdersList.add(filterProjectionId); + omnidataProjections.add(new InputReferenceExpression(filterProjectionId, prestoType)); + omnidataTypes.add(prestoType); + } if (null == columnNameMap.get(filterColumnName)) { columnNameMap.put(filterColumnName, columnNameMap.size()); } @@ -804,22 +797,18 @@ public class DataIoAdapter { private List getValue(List rightExpression, String operatorName, String sparkType) { - Object objectValue; List argumentValues = new ArrayList<>(); if (null == rightExpression || rightExpression.size() == 0) { return argumentValues; } - switch (operatorName.toLowerCase(Locale.ENGLISH)) { - case "in": - List inValue = new ArrayList<>(); - for (Expression rExpression : rightExpression) { - inValue.add(rExpression.toString()); - } - argumentValues = inValue; - break; - default: - argumentValues.add(rightExpression.get(0).toString()); - break; + if ("in".equals(operatorName.toLowerCase(Locale.ENGLISH))) { + List inValue = new ArrayList<>(); + for (Expression rExpression : rightExpression) { + inValue.add(rExpression.toString()); + } + argumentValues = inValue; + } else { + argumentValues.add(rightExpression.get(0).toString()); } return argumentValues; } @@ -830,9 +819,9 @@ public class DataIoAdapter { DecodeType[] filterTypes = filterTypesList.toArray(new DecodeType[0]); int[] filterOrders = filterOrdersList.stream().mapToInt(Integer::intValue).toArray(); if (columnTypes.length == 0) { - return new PageDeserializer(filterTypes, filterOrders); + return new PageDeserializer(filterTypes, filterOrders, isOperatorCombineEnabled); } else { - return new PageDeserializer(columnTypes, columnOrders); + return new PageDeserializer(columnTypes, columnOrders, isOperatorCombineEnabled); } } @@ -852,14 +841,28 @@ public class DataIoAdapter { return dataSource; } - private RowExpression initFilter(Seq filterExecutions) { - return extractFilterExpression(filterExecutions); + private Optional initFilter(Seq filterExecutions) { + List filterExecutionList = JavaConverters.seqAsJavaList(filterExecutions); + Optional resRowExpression = Optional.empty(); + for (FilterExeInfo filterExeInfo : filterExecutionList) { + resRowExpression = Optional.ofNullable(reverseExpressionTree(filterExeInfo.filter())); + } + return resRowExpression; } private Optional initAggAndGroupInfo( - List aggExecutionList) { - // create AggregationInfo - return extractAggAndGroupExpression(aggExecutionList); + Seq aggExeInfoSeq) { + List aggExecutionList = + JavaConverters.seqAsJavaList(aggExeInfoSeq); + Optional resAggregationInfo = Optional.empty(); + for (AggExeInfo aggExeInfo : aggExecutionList) { + List aggregateExpressions = JavaConverters.seqAsJavaList( + aggExeInfo.aggregateExpressions()); + List namedExpressions = JavaConverters.seqAsJavaList( + aggExeInfo.groupingExpressions()); + resAggregationInfo = createAggregationInfo(aggregateExpressions, namedExpressions); + } + return resAggregationInfo; } private void initColumnInfo(Seq sparkOutPut) { @@ -887,6 +890,8 @@ public class DataIoAdapter { ++filterColumnId; } } -} - + public boolean isOperatorCombineEnabled() { + return isOperatorCombineEnabled; + } +} diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java index 7333e4df1eb80d18357661fb59b89ecce30257a1..ad886b69c32e76c88b249828871b63b9dff967f1 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java @@ -20,6 +20,7 @@ package org.apache.spark.sql; import com.huawei.boostkit.omnidata.decode.type.*; +import com.huawei.boostkit.omnidata.model.Column; import io.airlift.slice.Slice; import io.prestosql.spi.relation.ConstantExpression; import io.prestosql.spi.type.*; @@ -33,6 +34,8 @@ import org.apache.spark.sql.execution.ndp.LimitExeInfo; import org.apache.spark.sql.types.*; import org.apache.spark.sql.types.DateType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -69,6 +72,26 @@ import static java.lang.Float.parseFloat; */ public class NdpUtils { + /** + * Types supported by OmniOperator. + */ + public static final Set supportTypes = new HashSet() { + { + add(StandardTypes.INTEGER); + add(StandardTypes.DATE); + add(StandardTypes.SMALLINT); + add(StandardTypes.BIGINT); + add(StandardTypes.VARCHAR); + add(StandardTypes.CHAR); + add(StandardTypes.DECIMAL); + add(StandardTypes.ROW); + add(StandardTypes.DOUBLE); + add(StandardTypes.VARBINARY); + add(StandardTypes.BOOLEAN); + } + }; + private static final Logger LOG = LoggerFactory.getLogger(NdpUtils.class); + public static int getColumnOffset(StructType dataSchema, Seq outPut) { List attributeList = JavaConverters.seqAsJavaList(outPut); String columnName = ""; @@ -92,6 +115,7 @@ public class NdpUtils { Seq aggExeInfo) { String columnName = ""; int columnTempId = 0; + boolean isFind = false; if (aggExeInfo != null && aggExeInfo.size() > 0) { List aggExecutionList = JavaConverters.seqAsJavaList(aggExeInfo); for (AggExeInfo aggExeInfoTemp : aggExecutionList) { @@ -106,10 +130,13 @@ public class NdpUtils { Matcher matcher = pattern.matcher(expression.toString()); if (matcher.find()) { columnTempId = Integer.parseInt(matcher.group(1)); + isFind = true; break; } } - break; + if (isFind) { + break; + } } List namedExpressions = JavaConverters.seqAsJavaList( aggExeInfoTemp.groupingExpressions()); @@ -238,7 +265,7 @@ public class NdpUtils { if (DATE.equals(prestoType)) { return new DateDecodeType(); } - throw new RuntimeException("unsupported this prestoType:" + prestoType); + throw new UnsupportedOperationException("unsupported this prestoType:" + prestoType); } public static DecodeType transDataIoDataType(DataType dataType) { @@ -271,7 +298,7 @@ public class NdpUtils { case "datetype": return new DateDecodeType(); default: - throw new RuntimeException("unsupported this type:" + strType); + throw new UnsupportedOperationException("unsupported this type:" + strType); } } @@ -439,4 +466,21 @@ public class NdpUtils { } return isInDate; } + + /** + * Check if the input pages contains datatypes unsuppoted by OmniColumnVector. + * + * @param columns Input columns + * @return false if contains unsupported type + */ + public static boolean checkOmniOpColumns(List columns) { + for (Column column : columns) { + String base = column.getType().getTypeSignature().getBase(); + if (!supportTypes.contains(base)) { + LOG.info("Unsupported operator data type {}, rollback", base); + return false; + } + } + return true; + } } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageCandidate.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageCandidate.java index 8ca14685e342845cd93de93cdf50190fcbf7e7a0..db8dfeef8024284c2a5d9037dfd30612de80e0b0 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageCandidate.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageCandidate.java @@ -31,24 +31,27 @@ public class PageCandidate { public int columnOffset; - public String sdiHosts; + public String pushDownHosts; - private String fileFormat; + private final String fileFormat; public int maxFailedTimes; - private int taskTimeout; + private final int taskTimeout; - public PageCandidate(String filePath, Long startPos, Long splitLen, int columnOffset, - String sdiHosts, String fileFormat, int maxFailedTimes, int taskTimeout) { + private final boolean isOperatorCombineEnabled; + + public PageCandidate(String filePath, Long startPos, Long splitLen, int columnOffset, String pushDownHosts, + String fileFormat, int maxFailedTimes, int taskTimeout, boolean isOperatorCombineEnabled) { this.filePath = filePath; this.startPos = startPos; this.splitLen = splitLen; this.columnOffset = columnOffset; - this.sdiHosts = sdiHosts; + this.pushDownHosts = pushDownHosts; this.fileFormat = fileFormat; this.maxFailedTimes = maxFailedTimes; this.taskTimeout = taskTimeout; + this.isOperatorCombineEnabled = isOperatorCombineEnabled; } public Long getStartPos() { @@ -67,8 +70,8 @@ public class PageCandidate { return columnOffset; } - public String getSdiHosts() { - return sdiHosts; + public String getpushDownHosts() { + return pushDownHosts; } public String getFileFormat() { @@ -82,4 +85,8 @@ public class PageCandidate { public int getTaskTimeout() { return taskTimeout; } + + public boolean isOperatorCombineEnabled() { + return isOperatorCombineEnabled; + } } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageToColumnar.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageToColumnar.java index 42e7bc1bdf9bfc7533ef5d253bbc2d3a1fb74eb5..ddee828f23f9f35789244b215e72a1e0b6011c00 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageToColumnar.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PageToColumnar.java @@ -21,10 +21,10 @@ package org.apache.spark.sql; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.Seq; import java.io.Serializable; @@ -36,20 +36,20 @@ import java.util.List; * PageToColumnar */ public class PageToColumnar implements Serializable { - StructType structType = null; - Seq outPut = null; + private static final Logger LOG = LoggerFactory.getLogger(PageToColumnar.class); + + StructType structType; + Seq outPut; + public PageToColumnar(StructType structType, Seq outPut) { this.structType = structType; this.outPut = outPut; } public List transPageToColumnar(Iterator writableColumnVectors, - boolean isVectorizedReader) { - scala.collection.Iterator structFieldIterator = structType.iterator(); - List columnType = new ArrayList<>(); - - while (structFieldIterator.hasNext()) { - columnType.add(structFieldIterator.next().dataType()); + boolean isVectorizedReader, boolean isOperatorCombineEnabled) { + if (isOperatorCombineEnabled) { + LOG.info("OmniRuntime PushDown column info: OmniColumnVector transform to Columnar"); } List internalRowList = new ArrayList<>(); while (writableColumnVectors.hasNext()) { @@ -58,25 +58,22 @@ public class PageToColumnar implements Serializable { continue; } int positionCount = columnVector[0].getElementsAppended(); - if (positionCount > 0) { - if (isVectorizedReader) { - ColumnarBatch columnarBatch = new ColumnarBatch(columnVector); - columnarBatch.setNumRows(positionCount); - internalRowList.add(columnarBatch); - } else { - for (int j = 0; j < positionCount; j++) { - MutableColumnarRow mutableColumnarRow = - new MutableColumnarRow(columnVector); - mutableColumnarRow.rowId = j; - internalRowList.add(mutableColumnarRow); - } + if (positionCount <= 0) { + continue; + } + if (isVectorizedReader) { + ColumnarBatch columnarBatch = new ColumnarBatch(columnVector); + columnarBatch.setNumRows(positionCount); + internalRowList.add(columnarBatch); + } else { + for (int j = 0; j < positionCount; j++) { + MutableColumnarRow mutableColumnarRow = + new MutableColumnarRow(columnVector); + mutableColumnarRow.rowId = j; + internalRowList.add(mutableColumnarRow); } } } return internalRowList; } } - - - - diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PushDownManager.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PushDownManager.java index a1278adabfbe329973535b61ea959119fa282131..75d7b1cc0b1fb06f45a6f5f2c4b765d798a90b50 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PushDownManager.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/PushDownManager.java @@ -47,9 +47,8 @@ public class PushDownManager { private static final int ZOOKEEPER_RETRY_INTERVAL_MS = 1000; - public scala.collection.Map getZookeeperData( - int timeOut, String parentPath, String zkAddress) throws Exception { - Map fpuMap = new HashMap<>(); + public scala.collection.Map getZookeeperData( + int timeOut, String parentPath, String zkAddress) throws Exception { CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkAddress) .sessionTimeoutMs(timeOut) @@ -67,12 +66,11 @@ public class PushDownManager { if (!path.contains("-lock-")) { byte[] data = zkClient.getData().forPath(parentPath + "/" + path); PushDownData statusInfo = mapper.readValue(data, PushDownData.class); - fpuMap.put(path, statusInfo.getDatanodeHost()); pushDownInfoMap.put(path, statusInfo); } } if (checkAllPushDown(pushDownInfoMap)) { - return javaMapToScala(fpuMap); + return javaMapToScala(pushDownInfoMap); } else { return javaMapToScala(new HashMap<>()); } @@ -110,11 +108,11 @@ public class PushDownManager { return true; } - private static scala.collection.Map javaMapToScala(Map kafkaParams) { + private static scala.collection.Map javaMapToScala(Map kafkaParams) { scala.collection.Map scalaMap = JavaConverters.mapAsScalaMap(kafkaParams); Object objTest = Map$.MODULE$.newBuilder().$plus$plus$eq(scalaMap.toSeq()); Object resultTest = ((scala.collection.mutable.Builder) objTest).result(); - scala.collection.Map retMap = (scala.collection.Map) resultTest; + scala.collection.Map retMap = (scala.collection.Map) resultTest; return retMap; } -} +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 91bcafbad5f9ba962f6397803c5159e273d585a9..3e4c6410ae1b8e551b6f257f8693b0046da0ab61 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -17,24 +17,25 @@ package org.apache.spark.sql.execution -import java.util.concurrent.TimeUnit._ +import com.sun.xml.internal.bind.v2.TODO +import java.util.concurrent.TimeUnit._ import scala.collection.mutable.HashMap - import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path - import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{PushDownData, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.datasources.{FileScanRDDPushDown, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.ndp.NdpSupport +import org.apache.spark.sql.execution.ndp.NdpConf.{getNdpPartialPushdown, getNdpPartialPushdownEnable, getTaskTimeout} +import org.apache.spark.sql.execution.ndp.{NdpConf, NdpSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -42,6 +43,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet +import scala.util.Random + trait DataSourceScanExec extends LeafExecNode { def relation: BaseRelation def tableIdentifier: Option[TableIdentifier] @@ -160,7 +163,7 @@ case class RowDataSourceScanExec( * @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule * [[DisableUnnecessaryBucketedScan]] for details. */ -case class FileSourceScanExec( +abstract class BaseFileSourceScanExec( @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, @@ -169,7 +172,7 @@ case class FileSourceScanExec( optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier], - partiTionColumn: Seq[Attribute], + partitionColumn: Seq[Attribute], disableBucketedScan: Boolean = false ) extends DataSourceScanExec with NdpSupport { @@ -573,13 +576,8 @@ case class FileSourceScanExec( FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } } - if (isPushDown) { - new FileScanRDDPushDown(fsRelation.sparkSession, filePartitions, requiredSchema, output, - relation.dataSchema, ndpOperators, partiTionColumn, supportsColumnar, fsRelation.fileFormat) - } else { - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) - } + RDDPushDown(fsRelation, filePartitions, readFile) } /** @@ -620,13 +618,7 @@ case class FileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - if (isPushDown) { - new FileScanRDDPushDown(fsRelation.sparkSession, partitions, requiredSchema, output, - relation.dataSchema, ndpOperators, partiTionColumn, supportsColumnar, fsRelation.fileFormat) - } else { - // TODO 重写一个FileScanRDD 重新调用 - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) - } + RDDPushDown(fsRelation, partitions, readFile) } // Filters unused DynamicPruningExpression expressions - one which has been replaced @@ -655,8 +647,73 @@ case class FileSourceScanExec( optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, filterOutput), None, - partiTionColumn.map(QueryPlan.normalizeExpressions(_, output)), + partitionColumn.map(QueryPlan.normalizeExpressions(_, output)), disableBucketedScan ) } + + private def RDDPushDown(fsRelation: HadoopFsRelation, filePartitions: Seq[FilePartition], readFile: (PartitionedFile) => Iterator[InternalRow]): RDD[InternalRow] = { + if (isPushDown) { + val partialCondition = allFilterExecInfo.nonEmpty && aggExeInfos.isEmpty && limitExeInfo.isEmpty && getNdpPartialPushdownEnable(fsRelation.sparkSession) + val partialPdRate = getNdpPartialPushdown(fsRelation.sparkSession) + var partialChildOutput = Seq[Attribute]() + if (partialCondition) { + partialChildOutput = allFilterExecInfo.head.child.output + logInfo(s"partial push down rate: ${partialPdRate}") + } + new FileScanRDDPushDown(fsRelation.sparkSession, filePartitions, requiredSchema, output, + relation.dataSchema, ndpOperators, partitionColumn, supportsColumnar, fsRelation.fileFormat, readFile, partialCondition, partialPdRate, zkRate, partialChildOutput) + } else { + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } + } } + +case class FileSourceScanExec( + @transient relation: HadoopFsRelation, + output: Seq[Attribute], + requiredSchema: StructType, + partitionFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], + dataFilters: Seq[Expression], + tableIdentifier: Option[TableIdentifier], + partitionColumn: Seq[Attribute], + disableBucketedScan: Boolean = false) + extends BaseFileSourceScanExec( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + partitionColumn, + disableBucketedScan) { + +} + +case class NdpFileSourceScanExec( + @transient relation: HadoopFsRelation, + output: Seq[Attribute], + requiredSchema: StructType, + partitionFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], + dataFilters: Seq[Expression], + tableIdentifier: Option[TableIdentifier], + partitionColumn: Seq[Attribute], + disableBucketedScan: Boolean = false) + extends BaseFileSourceScanExec( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + partitionColumn, + disableBucketedScan) { + } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index c55ed09a0b0c25588ac5bd1e2878ba2b5c9b2709..6aaed2f7be442daacbc7855d479a91f6dc6c60c6 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -18,21 +18,25 @@ package org.apache.spark.sql.execution.datasources import java.util - import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.parquet.io.ParquetDecodingException import org.apache.spark.{SparkUpgradeException, TaskContext, Partition => RDDPartition} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.{DataIoAdapter, NdpUtils, PageCandidate, PageToColumnar, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.ndp.{NdpConf, PushDownInfo} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BasePredicate, Expression, Predicate, UnsafeProjection} +import org.apache.spark.sql.execution.{QueryExecutionException, RowToColumnConverter} +import org.apache.spark.sql.execution.ndp.{FilterExeInfo, NdpConf, PushDownInfo} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +import java.io.FileNotFoundException +import scala.util.Random + /** * An RDD that scans a list of file partitions. @@ -46,7 +50,12 @@ class FileScanRDDPushDown( pushDownOperators: PushDownInfo, partitionColumns: Seq[Attribute], isColumnVector: Boolean, - fileFormat: FileFormat) + fileFormat: FileFormat, + readFunction: (PartitionedFile) => Iterator[InternalRow], + partialCondition: Boolean, + partialPdRate: Double, + zkPdRate: Double, + partialChildOutput: Seq[Attribute]) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { var columnOffset = -1 @@ -81,125 +90,43 @@ class FileScanRDDPushDown( scala.collection.mutable.Map[String, scala.collection.mutable.Map[String, Seq[Expression]]]() var projectId = 0 val expressions: util.ArrayList[Object] = new util.ArrayList[Object]() + val enableOffHeapColumnVector: Boolean = sparkSession.sessionState.conf.offHeapColumnVectorEnabled + val columnBatchSize: Int = sparkSession.sessionState.conf.columnBatchSize + val converters = new RowToColumnConverter(StructType.fromAttributes(output)) private val timeOut = NdpConf.getNdpZookeeperTimeout(sparkSession) private val parentPath = NdpConf.getNdpZookeeperPath(sparkSession) private val zkAddress = NdpConf.getNdpZookeeperAddress(sparkSession) private val taskTimeout = NdpConf.getTaskTimeout(sparkSession) + private val operatorCombineEnabled = NdpConf.getNdpOperatorCombineEnabled(sparkSession) override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val pageToColumnarClass = new PageToColumnar(requiredSchema, output) - - val iterator = new Iterator[Object] with AutoCloseable { - private val inputMetrics = context.taskMetrics().inputMetrics - private val existingBytesRead = inputMetrics.bytesRead - private val getBytesReadCallback = - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - private def incTaskInputMetricsBytesRead(): Unit = { - inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) - } - - private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = null - private[this] var currentIterator: Iterator[Object] = null - private[this] val sdiHosts = split.asInstanceOf[FilePartition].sdi - val dataIoClass = new DataIoAdapter() - - def hasNext: Boolean = { - // Kill the task in case it has been marked as killed. This logic is from - // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order - // to avoid performance overhead. - context.killTaskIfInterrupted() - val hasNext = currentIterator != null && currentIterator.hasNext - if (hasNext) { - hasNext - } else { - val tmp: util.ArrayList[Object] = new util.ArrayList[Object]() - var hasnextIterator = false - try { - hasnextIterator = dataIoClass.hasNextIterator(tmp, pageToColumnarClass, - currentFile, isColumnVector) - } catch { - case e : Exception => - throw e - } - val ret = if (hasnextIterator && tmp.size() > 0) { - currentIterator = tmp.asScala.iterator - hasnextIterator - } else { - nextIterator() - } - ret - } - } - def next(): Object = { - val nextElement = currentIterator.next() - // TODO: we should have a better separation of row based and batch based scan, so that we - // don't need to run this `if` for every record. - if (nextElement.isInstanceOf[ColumnarBatch]) { - incTaskInputMetricsBytesRead() - inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) - } else { - // too costly to update every record - if (inputMetrics.recordsRead % - SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { - incTaskInputMetricsBytesRead() - } - inputMetrics.incRecordsRead(1) - } - nextElement - } - - /** Advances to the next file. Returns true if a new non-empty iterator is available. */ - private def nextIterator(): Boolean = { - if (files.hasNext) { - currentFile = files.next() - // logInfo(s"Reading File $currentFile") - InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - val pageCandidate = new PageCandidate(currentFile.filePath, currentFile.start, - currentFile.length, columnOffset, sdiHosts, - fileFormat.toString, maxFailedTimes, taskTimeout) - val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, - partitionColumns, filterOutput, pushDownOperators) - currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, - isColumnVector).asScala.iterator - try { - hasNext - } catch { - case e: SchemaColumnConvertNotSupportedException => - val message = "Parquet column cannot be converted in " + - s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + - s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" - throw new QueryExecutionException(message, e) - case e: ParquetDecodingException => - if (e.getCause.isInstanceOf[SparkUpgradeException]) { - throw e.getCause - } else if (e.getMessage.contains("Can not read value at")) { - val message = "Encounter error while reading parquet files. " + - "One possible cause: Parquet column cannot be converted in the " + - "corresponding files. Details: " - throw new QueryExecutionException(message, e) - } - throw e - } - } else { - currentFile = null - InputFileBlockHolder.unset() - false - } - } - - override def close(): Unit = { - incTaskInputMetricsBytesRead() - InputFileBlockHolder.unset() - } + var iterator : PushDownIterator = null + if (isPartialPushDown(partialCondition, partialPdRate, zkPdRate)) { + logDebug("partial push down task on spark") + val partialFilterCondition = pushDownOperators.filterExecutions.reduce((a, b) => FilterExeInfo(And(a.filter, b.filter), partialChildOutput)) + val predicate = Predicate.create(partialFilterCondition.filter, partialChildOutput) + predicate.initialize(0) + iterator = new PartialPushDownIterator(split, context, pageToColumnarClass, predicate) + } else { + logDebug("partial push down task on omnidata") + iterator = new PushDownIterator(split, context, pageToColumnarClass) } - // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener[Unit](_ => iterator.close()) iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. } + def isPartialPushDown(partialCondition: Boolean, partialPdRate: Double, zkPdRate: Double): Boolean = { + var res = false + val randomNum = Random.nextDouble; + if (partialCondition && (randomNum > partialPdRate || randomNum > zkPdRate)) { + res = true + } + res + } + override protected def getPartitions: Array[RDDPartition] = { filePartitions.map { partitionFile => { val retHost = mutable.HashMap.empty[String, Long] @@ -215,7 +142,12 @@ class FileScanRDDPushDown( var mapNum = 0 if (fpuMap == null) { val pushDownManagerClass = new PushDownManager() - fpuMap = pushDownManagerClass.getZookeeperData(timeOut, parentPath, zkAddress) + val fMap = pushDownManagerClass.getZookeeperData(timeOut, parentPath, zkAddress) + val hostMap = mutable.Map[String,String]() + for (kv <- fMap) { + hostMap.put(kv._1, kv._2.getDatanodeHost) + } + fpuMap = hostMap } while (datanode.hasNext && mapNum < maxFailedTimes) { val datanodeStr = datanode.next()._1 @@ -243,4 +175,193 @@ class FileScanRDDPushDown( override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { split.asInstanceOf[FilePartition].preferredLocations() } + + class PushDownIterator(split: RDDPartition, + context: TaskContext, + pageToColumnarClass: PageToColumnar) + extends Iterator[Object] with AutoCloseable { + + val inputMetrics: InputMetrics = context.taskMetrics().inputMetrics + val existingBytesRead: Long = inputMetrics.bytesRead + val getBytesReadCallback: () => Long = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + val files: Iterator[PartitionedFile] = split.asInstanceOf[FilePartition].files.toIterator + var currentFile: PartitionedFile = null + var currentIterator: Iterator[Object] = null + val sdiHosts: String = split.asInstanceOf[FilePartition].sdi + val dataIoClass = new DataIoAdapter() + + def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + val hasNext = currentIterator != null && currentIterator.hasNext + if (hasNext) { + hasNext + } else { + val tmp: util.ArrayList[Object] = new util.ArrayList[Object]() + var hasnextIterator = false + try { + hasnextIterator = dataIoClass.hasNextIterator(tmp, pageToColumnarClass, isColumnVector) + } catch { + case e : Exception => + throw e + } + val ret = if (hasnextIterator && tmp.size() > 0) { + currentIterator = tmp.asScala.iterator + hasnextIterator + } else { + nextIterator() + } + ret + } + } + def next(): Object = { + val nextElement = currentIterator.next() + // TODO: we should have a better separation of row based and batch based scan, so that we + // don't need to run this `if` for every record. + if (nextElement.isInstanceOf[ColumnarBatch]) { + incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + } + nextElement + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + // logInfo(s"Reading File $currentFile") + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + val pageCandidate = new PageCandidate(currentFile.filePath, currentFile.start, + currentFile.length, columnOffset, sdiHosts, + fileFormat.toString, maxFailedTimes, taskTimeout,operatorCombineEnabled) + val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, + partitionColumns, filterOutput, pushDownOperators) + currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, + isColumnVector, dataIoClass.isOperatorCombineEnabled).asScala.iterator + iteHasNext() + } else { + unset() + } + } + + def iteHasNext(): Boolean = { + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } + + def unset(): Boolean = { + currentFile = null + InputFileBlockHolder.unset() + false + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } + } + + class PartialPushDownIterator(split: RDDPartition, + context: TaskContext, + pageToColumnarClass: PageToColumnar, + predicate: BasePredicate) + extends PushDownIterator(split: RDDPartition, context: TaskContext, pageToColumnarClass: PageToColumnar) { + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + override def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + predicate.initialize(0) + currentIterator = readCurrentFile() + .map {c => + val rowIterator = c.rowIterator().asScala + val ri = rowIterator.filter { row => + val r = predicate.eval(row) + r + } + + val localOutput = output + val toUnsafe = UnsafeProjection.create(localOutput, filterOutput) + val projectRi = ri.map(toUnsafe) + val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(columnBatchSize, StructType.fromAttributes(output)) + } else { + OnHeapColumnVector.allocateColumns(columnBatchSize, StructType.fromAttributes(output)) + } + val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) + + TaskContext.get().addTaskCompletionListener[Unit] { _ => + cb.close() + } + + cb.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowCount < columnBatchSize && projectRi.hasNext) { + val row = projectRi.next() + converters.convert(row, vectors.toArray) + rowCount += 1 + } + cb.setNumRows(rowCount) + cb + } + iteHasNext() + } else { + unset() + } + } + + private def readCurrentFile(): Iterator[ColumnarBatch] = { + try { + readFunction(currentFile).asInstanceOf[Iterator[ColumnarBatch]] + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + } } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpPushDown.scala index e365f1f9d17fd9f4222d91b389958538015b12be..ff1558d402d838a7ed1eaf586dc444b1ddb1ff88 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpPushDown.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.ndp import java.util.{Locale, Properties} + import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{PushDownManager, SparkSession} +import org.apache.spark.sql.{PushDownData, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BinaryExpression, Expression, NamedExpression, PredicateHelper, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{Partial, PartialMerge} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, GlobalLimitExec, LeafExecNode, LocalLimitExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, GlobalLimitExec, LeafExecNode, LocalLimitExec, NdpFileSourceScanExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.HadoopFsRelation @@ -41,6 +42,7 @@ case class NdpPushDown(sparkSession: SparkSession) extends Rule[SparkPlan] with PredicateHelper { private val pushDownEnabled = NdpConf.getNdpEnabled(sparkSession) private var fpuHosts: scala.collection.Map[String, String] = _ + private var zkRate: Double = 1.0 // filter performance blackList: like, startswith, endswith, contains private val filterWhiteList = Set("or", "and", "not", "equalto", "isnotnull", "lessthan", "greaterthan", "greaterthanorequal", "lessthanorequal", "in", "literal", "isnull", @@ -102,7 +104,19 @@ case class NdpPushDown(sparkSession: SparkSession) def shouldPushDown(): Boolean = { val pushDownManagerClass = new PushDownManager() - fpuHosts = pushDownManagerClass.getZookeeperData(timeOut, parentPath, zkAddress) + val fpuMap = pushDownManagerClass.getZookeeperData(timeOut, parentPath, zkAddress) + val fmap = mutable.Map[String,String]() + var rts = 0 + var mts = 0 + for (kv <- fpuMap) { + fmap.put(kv._1, kv._2.getDatanodeHost) + rts += kv._2.getRunningTasks + mts += kv._2.getMaxTasks + } + if (rts != 0 && mts != 0 && (rts.toDouble / mts.toDouble) > 0.4) { + zkRate = 0.5 + } + fpuHosts = fmap fpuHosts.nonEmpty } @@ -165,11 +179,26 @@ case class NdpPushDown(sparkSession: SparkSession) if (s.scan.isPushDown) { s.scan match { case f: FileSourceScanExec => - val scan = f.copy(output = s.scanOutput) - scan.pushDown(s.scan) - scan.fpuHosts(fpuHosts) - logInfo(s"Push down with [${scan.ndpOperators}]") - scan + val ndpScan = NdpFileSourceScanExec( + f.relation, + s.scanOutput, + f.requiredSchema, + f.partitionFilters, + f.optionalBucketSet, + f.optionalNumCoalescedBuckets, + f.dataFilters, + f.tableIdentifier, + f.partitionColumn, + f.disableBucketedScan + ) + ndpScan.pushZkRate(zkRate) + if (s.scan.allFilterExecInfo.nonEmpty) { + ndpScan.partialPushDownFilterList(s.scan.allFilterExecInfo) + } + ndpScan.pushDown(s.scan) + ndpScan.fpuHosts(fpuHosts) + logInfo(s"Push down with [${ndpScan.ndpOperators}]") + ndpScan case _ => throw new UnsupportedOperationException() } } else { @@ -178,6 +207,7 @@ case class NdpPushDown(sparkSession: SparkSession) } } + def pushDownOperator(plan: SparkPlan): SparkPlan = { val p = pushDownOperatorInternal(plan) replaceWrapper(p) @@ -234,6 +264,7 @@ case class NdpPushDown(sparkSession: SparkSession) logInfo(s"Fail to push down filter, since ${s.scan.nodeName} contains dynamic pruning") f } else { + s.scan.partialPushDownFilter(f); // TODO: move selectivity info to pushdown-info if (filterSelectivityEnabled && selectivity.nonEmpty) { logInfo(s"Selectivity: ${selectivity.get}") @@ -320,6 +351,7 @@ object NdpConf { val NDP_ENABLED = "spark.sql.ndp.enabled" val PARQUET_MERGESCHEMA = "spark.sql.parquet.mergeSchema" val NDP_FILTER_SELECTIVITY_ENABLE = "spark.sql.ndp.filter.selectivity.enable" + val NDP_OPERATOR_COMBINE_ENABLED = "spark.sql.ndp.operator.combine.enable" val NDP_TABLE_SIZE_THRESHOLD = "spark.sql.ndp.table.size.threshold" val NDP_ZOOKEEPER_TIMEOUT = "spark.sql.ndp.zookeeper.timeout" val NDP_ALIVE_OMNIDATA = "spark.sql.ndp.alive.omnidata" @@ -335,6 +367,8 @@ object NdpConf { val NDP_PKI_DIR = "spark.sql.ndp.pki.dir" val NDP_MAX_FAILED_TIMES = "spark.sql.ndp.max.failed.times" val NDP_CLIENT_TASK_TIMEOUT = "spark.sql.ndp.task.timeout" + val NDP_PARTIAL_PUSHDOWN = "spark.sql.ndp.partial.pushdown" + val NDP_PARTIAL_PUSHDOWN_ENABLE = "spark.sql.ndp.partial.pushdown.enable" def toBoolean(key: String, value: String, sparkSession: SparkSession): Boolean = { try { @@ -399,6 +433,11 @@ object NdpConf { sparkSession.conf.getOption(NDP_FILTER_SELECTIVITY_ENABLE).getOrElse("true"), sparkSession) } + def getNdpOperatorCombineEnabled(sparkSession: SparkSession): Boolean = { + toBoolean(NDP_OPERATOR_COMBINE_ENABLED, + sparkSession.conf.getOption(NDP_OPERATOR_COMBINE_ENABLED).getOrElse("false"), sparkSession) + } + def getNdpTableSizeThreshold(sparkSession: SparkSession): Long = { val result = toNumber(NDP_TABLE_SIZE_THRESHOLD, sparkSession.conf.getOption(NDP_TABLE_SIZE_THRESHOLD).getOrElse("10240"), @@ -427,6 +466,21 @@ object NdpConf { result } + def getNdpPartialPushdown(sparkSession: SparkSession): Double = { + val partialNum = toNumber(NDP_PARTIAL_PUSHDOWN, + sparkSession.conf.getOption(NDP_PARTIAL_PUSHDOWN).getOrElse("1"), + _.toDouble, "double", sparkSession) + checkDoubleValue(NDP_PARTIAL_PUSHDOWN, partialNum, + rate => rate >= 0.0 && rate <= 1.0, + s"The $NDP_PARTIAL_PUSHDOWN value must be in [0.0, 1.0].", sparkSession) + partialNum + } + + def getNdpPartialPushdownEnable(sparkSession: SparkSession): Boolean = { + toBoolean(NDP_PARTIAL_PUSHDOWN_ENABLE, + sparkSession.conf.getOption(NDP_PARTIAL_PUSHDOWN_ENABLE).getOrElse("false"), sparkSession) + } + def getNdpUdfWhitelist(sparkSession: SparkSession): Option[String] = { sparkSession.conf.getOption(NDP_UDF_WHITELIST) } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpSupport.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpSupport.scala index b20178ef4eb3c412abbea75f213f937b7e3da5df..665867a8c69ff686d027536f9dc76517c772ee34 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpSupport.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/ndp/NdpSupport.scala @@ -18,13 +18,16 @@ package org.apache.spark.sql.execution.ndp -import scala.collection.mutable.ListBuffer +import org.apache.spark.sql.PushDownData +import scala.collection.mutable.ListBuffer import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{FilterExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import scala.collection.mutable + // filter in aggregate could be push down through aggregate, separate filter and aggregate case class AggExeInfo( aggregateExpressions: Seq[AggregateFunction], @@ -48,6 +51,16 @@ trait NdpSupport extends SparkPlan { val aggExeInfos = new ListBuffer[AggExeInfo]() var limitExeInfo: Option[LimitExeInfo] = None var fpuHosts: scala.collection.Map[String, String] = _ + val allFilterExecInfo = new ListBuffer[FilterExec]() + var zkRate: Double = 1.0 + + def partialPushDownFilter(filter: FilterExec): Unit = { + allFilterExecInfo += filter + } + + def partialPushDownFilterList(filters: ListBuffer[FilterExec]): Unit = { + allFilterExecInfo ++= filters + } def pushDownFilter(filter: FilterExeInfo): Unit = { filterExeInfos += filter @@ -78,6 +91,10 @@ trait NdpSupport extends SparkPlan { def isPushDown: Boolean = filterExeInfos.nonEmpty || aggExeInfos.nonEmpty || limitExeInfo.nonEmpty + + def pushZkRate(pRate: Double): Unit = { + zkRate = pRate + } } object NdpSupport { @@ -85,4 +102,4 @@ object NdpSupport { AggExeInfo(agg.aggregateExpressions.map(_.aggregateFunction), agg.groupingExpressions, agg.output) } -} +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/pom.xml b/omnidata/omnidata-spark-connector/stub/pom.xml index df308e1e8bbe4b72c4df1938e2a3d249b17c386e..29bd539be796061f7b31a54e763720968c458cfe 100644 --- a/omnidata/omnidata-spark-connector/stub/pom.xml +++ b/omnidata/omnidata-spark-connector/stub/pom.xml @@ -15,8 +15,15 @@ jar 1.6.1 + 3.1.1 + + org.apache.spark + spark-hive_2.12 + ${spark.version} + compile + com.google.inject guice @@ -49,6 +56,7 @@ org.apache.maven.plugins maven-compiler-plugin + 3.1 8 8 diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/AbstractDecoding.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/AbstractDecoding.java index 043e176cf5a69fe194d1b88c6d418e15a2f97de7..3d43b9b3275f6bda564a6de2270b2ab7f4851373 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/AbstractDecoding.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/AbstractDecoding.java @@ -20,11 +20,23 @@ package com.huawei.boostkit.omnidata.decode; import com.huawei.boostkit.omnidata.decode.type.DecodeType; +import com.huawei.boostkit.omnidata.decode.type.LongToByteDecodeType; +import com.huawei.boostkit.omnidata.decode.type.LongToFloatDecodeType; +import com.huawei.boostkit.omnidata.decode.type.LongToIntDecodeType; +import com.huawei.boostkit.omnidata.decode.type.LongToShortDecodeType; +import com.huawei.boostkit.omnidata.exception.OmniDataException; import io.airlift.slice.SliceInput; +import io.prestosql.spi.type.DateType; +import io.prestosql.spi.type.RowType; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; +import java.util.stream.IntStream; /** * Abstract decoding @@ -33,29 +45,133 @@ import java.util.Optional; * @since 2021-07-31 */ public abstract class AbstractDecoding implements Decoding { + private static final Map DECODE_METHODS; + + static { + DECODE_METHODS = new HashMap<>(); + Method[] methods = Decoding.class.getDeclaredMethods(); + for (Method method : methods) { + if (method.isAnnotationPresent(Decode.class)) { + DECODE_METHODS.put(method.getAnnotation(Decode.class).value(), method); + } + } + } private Method getDecodeMethod(String decodeName) { - return null; + return DECODE_METHODS.get(decodeName); } private String getDecodeName(SliceInput input) { - return null; + int length = input.readInt(); + byte[] bytes = new byte[length]; + input.readBytes(bytes); + + return new String(bytes, StandardCharsets.UTF_8); } private Optional typeToDecodeName(DecodeType type) { - return null; + Class javaType = null; + if (type.getJavaType().isPresent()) { + javaType = type.getJavaType().get(); + } + if (javaType == double.class) { + return Optional.of("DOUBLE_ARRAY"); + } else if (javaType == float.class) { + return Optional.of("FLOAT_ARRAY"); + } else if (javaType == int.class) { + return Optional.of("INT_ARRAY"); + } else if (javaType == long.class) { + return Optional.of("LONG_ARRAY"); + } else if (javaType == byte.class) { + return Optional.of("BYTE_ARRAY"); + } else if (javaType == boolean.class) { + return Optional.of("BOOLEAN_ARRAY"); + } else if (javaType == short.class) { + return Optional.of("SHORT_ARRAY"); + } else if (javaType == String.class) { + return Optional.of("VARIABLE_WIDTH"); + } else if (javaType == RowType.class) { + return Optional.of("ROW"); + } else if (javaType == DateType.class) { + return Optional.of("DATE"); + } else if (javaType == LongToIntDecodeType.class) { + return Optional.of("LONG_TO_INT"); + } else if (javaType == LongToShortDecodeType.class) { + return Optional.of("LONG_TO_SHORT"); + } else if (javaType == LongToByteDecodeType.class) { + return Optional.of("LONG_TO_BYTE"); + } else if (javaType == LongToFloatDecodeType.class) { + return Optional.of("LONG_TO_FLOAT"); + } else { + return Optional.empty(); + } } @Override public T decode(Optional type, SliceInput sliceInput) { - return null; + try { + String decodeName = getDecodeName(sliceInput); + if (type.isPresent()) { + Optional decodeNameOpt = typeToDecodeName(type.get()); + if ("DECIMAL".equals(decodeNameOpt.orElse(decodeName)) && !"RLE".equals(decodeName)) { + Method method = getDecodeMethod("DECIMAL"); + return (T) method.invoke(this, type, sliceInput, decodeName); + } + if (!"RLE".equals(decodeName)) { + decodeName = decodeNameOpt.orElse(decodeName); + } + } + Method method = getDecodeMethod(decodeName); + return (T) method.invoke(this, type, sliceInput); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new OmniDataException("decode failed " + e.getMessage()); + } } + /** + * decode empty bits. + * + * @param sliceInput input message + * @param positionCount the index of position + * @return corresponding optional object + * */ public Optional decodeNullBits(SliceInput sliceInput, int positionCount) { - return null; + if (!sliceInput.readBoolean()) { + return Optional.empty(); + } + + // read null bits 8 at a time + boolean[] valueIsNull = new boolean[positionCount]; + for (int position = 0; position < (positionCount & ~0b111); position += 8) { + boolean[] nextEightValue = getIsNullValue(sliceInput.readByte()); + int finalPosition = position; + IntStream.range(0, 8).forEach(pos -> valueIsNull[finalPosition + pos] = nextEightValue[pos]); + } + + // read last null bits + if ((positionCount & 0b111) > 0) { + byte value = sliceInput.readByte(); + int maskInt = 0b1000_0000; + for (int pos = positionCount & ~0b111; pos < positionCount; pos++) { + valueIsNull[pos] = ((value & maskInt) != 0); + maskInt >>>= 1; + } + } + + return Optional.of(valueIsNull); } private boolean[] getIsNullValue(byte value) { - return null; + boolean[] isNullValue = new boolean[8]; + isNullValue[0] = ((value & 0b1000_0000) != 0); + isNullValue[1] = ((value & 0b0100_0000) != 0); + isNullValue[2] = ((value & 0b0010_0000) != 0); + isNullValue[3] = ((value & 0b0001_0000) != 0); + isNullValue[4] = ((value & 0b0000_1000) != 0); + isNullValue[5] = ((value & 0b0000_0100) != 0); + isNullValue[6] = ((value & 0b0000_0010) != 0); + isNullValue[7] = ((value & 0b0000_0001) != 0); + + return isNullValue; } -} +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/ArrayDecodeType.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/ArrayDecodeType.java index ca2f9942a0429325719f9de5c148a622e2fec4f8..c03a92380fefc0c72255ac554949f28af5f8f9fb 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/ArrayDecodeType.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/ArrayDecodeType.java @@ -29,9 +29,18 @@ import java.util.Optional; * @since 2021-07-31 */ public class ArrayDecodeType implements DecodeType { + private final T elementType; + + public ArrayDecodeType(T elementType) { + this.elementType = elementType; + } + + public T getElementType() { + return elementType; + } + @Override public Optional> getJavaType() { return Optional.empty(); } -} - +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/IntDecodeType.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/IntDecodeType.java index 49331b421d92470bb484f07b875caa2e34aeb5ac..763b295d30809a7accd04287b1255d8f9b608891 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/IntDecodeType.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/IntDecodeType.java @@ -30,7 +30,6 @@ import java.util.Optional; public class IntDecodeType implements DecodeType { @Override public Optional> getJavaType() { - return Optional.empty(); + return Optional.of(int.class); } -} - +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/MapDecodeType.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/MapDecodeType.java index 651e4e776fcc0fa771fb499209499f845747d4b7..f3a5351c4f8488bc329e678b265464041ea4eca1 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/MapDecodeType.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/decode/type/MapDecodeType.java @@ -30,9 +30,16 @@ import java.util.Optional; * @since 2021-07-31 */ public class MapDecodeType implements DecodeType { + private final K keyType; + private final V valueType; + + public MapDecodeType(K keyType, V valueType) { + this.keyType = keyType; + this.valueType = valueType; + } + @Override public Optional> getJavaType() { return Optional.empty(); } -} - +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/exception/OmniDataException.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/exception/OmniDataException.java index c3da4708db16a7f5830dac5f7f1f1dc1cf876df7..77915733320a2a136a7faa515edc7830df575923 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/exception/OmniDataException.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/exception/OmniDataException.java @@ -23,9 +23,14 @@ import static com.huawei.boostkit.omnidata.exception.OmniErrorCode.OMNIDATA_GENE public class OmniDataException extends RuntimeException { public OmniDataException(String message) { + super(message); } public OmniErrorCode getErrorCode() { return OMNIDATA_GENERIC_ERROR; } -} + @Override + public String getMessage() { + return super.getMessage(); + } +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/serialize/OmniDataBlockEncodingSerde.java b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/serialize/OmniDataBlockEncodingSerde.java index a1baaad829c74eff6b063ff92e27d09e1055c5a9..2b7f8c7debb1d1eea9a499a150ef4c0be0e240c2 100644 --- a/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/serialize/OmniDataBlockEncodingSerde.java +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/com/huawei/boostkit/omnidata/serialize/OmniDataBlockEncodingSerde.java @@ -19,25 +19,81 @@ package com.huawei.boostkit.omnidata.serialize; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.ImmutableMap; + import io.airlift.slice.SliceInput; import io.airlift.slice.SliceOutput; import io.prestosql.spi.block.*; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Optional; + /** * Block Encoding Serde * * @since 2021-07-31 */ public final class OmniDataBlockEncodingSerde implements BlockEncodingSerde { + private final Map blockEncodings; + + public OmniDataBlockEncodingSerde() { + blockEncodings = + ImmutableMap.builder() + .put(VariableWidthBlockEncoding.NAME, new VariableWidthBlockEncoding()) + .put(ByteArrayBlockEncoding.NAME, new ByteArrayBlockEncoding()) + .put(ShortArrayBlockEncoding.NAME, new ShortArrayBlockEncoding()) + .put(IntArrayBlockEncoding.NAME, new IntArrayBlockEncoding()) + .put(LongArrayBlockEncoding.NAME, new LongArrayBlockEncoding()) + .put(Int128ArrayBlockEncoding.NAME, new Int128ArrayBlockEncoding()) + .put(DictionaryBlockEncoding.NAME, new DictionaryBlockEncoding()) + .put(ArrayBlockEncoding.NAME, new ArrayBlockEncoding()) + .put(RowBlockEncoding.NAME, new RowBlockEncoding()) + .put(SingleRowBlockEncoding.NAME, new SingleRowBlockEncoding()) + .put(RunLengthBlockEncoding.NAME, new RunLengthBlockEncoding()) + .put(LazyBlockEncoding.NAME, new LazyBlockEncoding()) + .build(); + } + + private static String readLengthPrefixedString(SliceInput sliceInput) { + int length = sliceInput.readInt(); + byte[] bytes = new byte[length]; + sliceInput.readBytes(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private static void writeLengthPrefixedString(SliceOutput sliceOutput, String value) { + byte[] bytes = value.getBytes(UTF_8); + sliceOutput.writeInt(bytes.length); + sliceOutput.writeBytes(bytes); + } @Override - public Block readBlock(SliceInput input) { - return null; + public Block readBlock(SliceInput input) { + return blockEncodings.get(readLengthPrefixedString(input)).readBlock(this, input); } @Override public void writeBlock(SliceOutput output, Block block) { + Block readBlock = block; + while (true) { + String encodingName = readBlock.getEncodingName(); - } -} + BlockEncoding blockEncoding = blockEncodings.get(encodingName); + + Optional replacementBlock = blockEncoding.replacementBlockForWrite(readBlock); + if (replacementBlock.isPresent()) { + readBlock = replacementBlock.get(); + continue; + } + writeLengthPrefixedString(output, encodingName); + + blockEncoding.writeBlock(this, output, readBlock); + + break; + } + } +} \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/stub/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java b/omnidata/omnidata-spark-connector/stub/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java new file mode 100644 index 0000000000000000000000000000000000000000..9d6b2acdfb3237474db6c8336941031098303bfc --- /dev/null +++ b/omnidata/omnidata-spark-connector/stub/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java @@ -0,0 +1,258 @@ +package org.apache.spark.sql.execution.vectorized; + +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * OmniColumnVector + */ +public class OmniColumnVector extends WritableColumnVector { + public OmniColumnVector(int capacity, DataType type, boolean initVec) { + super(capacity, type); + } + + @Override + public int getDictId(int rowId) { + return 0; + } + + @Override + protected void reserveInternal(int capacity) { + + } + + @Override + public void putNotNull(int rowId) { + + } + + @Override + public void putNull(int rowId) { + + } + + @Override + public void putNulls(int rowId, int count) { + + } + + @Override + public void putNotNulls(int rowId, int count) { + + } + + @Override + public void putBoolean(int rowId, boolean value) { + + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { + + } + + @Override + public void putByte(int rowId, byte value) { + + } + + @Override + public void putBytes(int rowId, int count, byte value) { + + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putShort(int rowId, short value) { + + } + + @Override + public void putShorts(int rowId, int count, short value) { + + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + + } + + @Override + public void putShorts(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putInt(int rowId, int value) { + + } + + @Override + public void putInts(int rowId, int count, int value) { + + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + + } + + @Override + public void putInts(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putLong(int rowId, long value) { + + } + + @Override + public void putLongs(int rowId, int count, long value) { + + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + + } + + @Override + public void putLongs(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putFloat(int rowId, float value) { + + } + + @Override + public void putFloats(int rowId, int count, float value) { + + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + + } + + @Override + public void putFloats(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putFloatsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putDouble(int rowId, double value) { + + } + + @Override + public void putDoubles(int rowId, int count, double value) { + + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + + } + + @Override + public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putDoublesLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + + } + + @Override + public void putArray(int rowId, int offset, int length) { + + } + + @Override + public int putByteArray(int rowId, byte[] value, int offset, int count) { + return 0; + } + + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return null; + } + + @Override + public int getArrayLength(int rowId) { + return 0; + } + + @Override + public int getArrayOffset(int rowId) { + return 0; + } + + @Override + protected WritableColumnVector reserveNewColumn(int capacity, DataType type) { + return null; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public boolean getBoolean(int rowId) { + return false; + } + + @Override + public byte getByte(int rowId) { + return 0; + } + + @Override + public short getShort(int rowId) { + return 0; + } + + @Override + public int getInt(int rowId) { + return 0; + } + + @Override + public long getLong(int rowId) { + return 0; + } + + @Override + public float getFloat(int rowId) { + return 0; + } + + @Override + public double getDouble(int rowId) { + return 0; + } +}