diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index f144be7cec68cf42303713bd7e24f36a63dfaffd..bb16d39ee1ca9a439cf0fb2284884c4caffe0e74 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -348,20 +348,6 @@ object ColumnarShuffleExchangeExec extends Logging { newIter }, isOrderSensitive = isOrderSensitive) case h@HashPartitioning(expressions, numPartitions) => - //containsRollUp(expressions): Avoid data skew caused by rollup expressions. - //expressions.length > 6: Avoid q11 data skew - //expressions.length == 3: Avoid q28 data skew when the resin rule is enabled. - if (containsRollUp(expressions) || expressions.length > 6 || expressions.length == 3) { - rdd.mapPartitionsWithIndexInternal((_, cbIter) => { - val partitionKeyExtractor: InternalRow => Any = { - val projection = - UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) - row => projection(row).getInt(0) - } - val newIter = computePartitionId(cbIter, partitionKeyExtractor) - newIter - }, isOrderSensitive = isOrderSensitive) - } else { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { val addPid2ColumnBatch = addPidToColumnBatch() // omni project @@ -399,7 +385,6 @@ object ColumnarShuffleExchangeExec extends Logging { } } }, isOrderSensitive = isOrderSensitive) - } case SinglePartition => rdd.mapPartitionsWithIndexInternal((_, cbIter) => { cbIter.map { cb => (0, cb) }