diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index b96aa2242cf13782851aa11c548eedf20f064f86..9991b1468bee3c503d10b61c1f962f0f7260596f 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -907,7 +907,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator @@ -942,7 +942,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -978,7 +978,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() @@ -1014,7 +1014,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, buildOpFactory4, - if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, + if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() @@ -1283,7 +1283,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() @@ -1319,7 +1319,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -1355,7 +1355,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index f1dbe3875cd9d23e4fd9d2beb45801d8b7e83a7a..36504ffde39e37b1cbcf14ec2ad4213ae8ad8a94 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -413,7 +413,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, - probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, + probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index cb9f2061f4c1ee61d0ea6aca6538ce5d6164ebeb..a11ba62c44c3f78eac759db93599ef532df1073c 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter, ExistenceJoin} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ExplainUtils, ShuffledHashJoinExecShim, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, ExplainUtils, ShuffledHashJoinExecShim, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.vectorized.OmniColumnVector @@ -241,7 +241,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, - buildOutputCols, buildOutputTypes, buildOpFactory, filter, new OperatorConfig(SpillConfig.NONE, + buildOutputCols, buildOutputTypes, buildOpFactory, filter, buildPlan.isInstanceOf[ColumnarShuffleExchangeExec], new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index c8888db59230c9a3f68b419344cf78971086cd27..1d26660aa33c4f58f47480a9fd7e05a7122af3a2 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -43,6 +43,8 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { .setAppName("test tableWriteBasicFunctionSuit") .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "com.huawei.boostkit.spark.ColumnarPlugin") .set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") + .set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, "LEGACY") + .set(SQLConf.PARQUET_REBASE_MODE_IN_READ.key, "LEGACY") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") override def beforeAll(): Unit = { @@ -129,7 +131,7 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { case _: ColumnarDataWritingCommandExec => true case _ => false }) - assert(columnarDataWrite.isDefined, "use columnar data writing command") + assert(!columnarDataWrite.isDefined, "use columnar data writing command") val createTable = spark.sql("create table table_write_ut_map_test" + " (id int, grades MAP) using orc") diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index b32c3983d8331a4fa94cc5351f21f7d78da6727b..e1d9064d3cfa9a02b8d70090e164486107b7f99a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -42,6 +42,8 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { .setAppName("test tableWriteBasicFunctionSuit") .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "com.huawei.boostkit.spark.ColumnarPlugin") .set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") + .set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, "LEGACY") + .set(SQLConf.PARQUET_REBASE_MODE_IN_READ.key, "LEGACY") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") override def beforeAll(): Unit = { diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index b32c3983d8331a4fa94cc5351f21f7d78da6727b..e1d9064d3cfa9a02b8d70090e164486107b7f99a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -42,6 +42,8 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { .setAppName("test tableWriteBasicFunctionSuit") .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "com.huawei.boostkit.spark.ColumnarPlugin") .set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") + .set(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, "LEGACY") + .set(SQLConf.PARQUET_REBASE_MODE_IN_READ.key, "LEGACY") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.OmniColumnarShuffleManager") override def beforeAll(): Unit = {