diff --git a/omnioperator/omniop-hive-extension/pom.xml b/omnioperator/omniop-hive-extension/pom.xml index 1050d4a1b71818330f9b5a719109f15b4cac4574..1659738790acbc5b690d00eb01caabca5589fc08 100644 --- a/omnioperator/omniop-hive-extension/pom.xml +++ b/omnioperator/omniop-hive-extension/pom.xml @@ -180,16 +180,16 @@ 3.0.0 provided - - junit - junit - 4.13.1 - test - org.apache.hive hive-cli 3.1.0 + provided + + + junit + junit + 4.13.1 test diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java index 9ae9a50dd511db1f1860ea96f2941d3b2a30e056..291bb5321cc6f41b259acfaec9da8f5305b956a8 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java @@ -51,6 +51,7 @@ import com.huawei.boostkit.hive.shuffle.OmniVecBatchSerDe; import nova.hetu.omniruntime.constants.FunctionType; +import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.QueryPlan; @@ -104,6 +105,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCoalesce; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat; @@ -192,7 +194,8 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { public void run(HookContext hookContext) throws Exception { this.hookContext = hookContext; - omniHiveConf = new OmniHiveConf(hookContext.getConf()); + HiveConf hookContextConf = hookContext.getConf(); + omniHiveConf = new OmniHiveConf(hookContextConf); stringLength = omniHiveConf.stringLength; clearGlobalVar(); QueryPlan queryPlan = hookContext.getQueryPlan(); @@ -214,6 +217,25 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { if (tezWork == null || checkDataType()) { return; } + SessionState sessionState = SessionState.get(); + String fileName = null; + if (sessionState instanceof CliSessionState) { + fileName = ((CliSessionState) sessionState).fileName; + } + String[] enhancedSqls = omniHiveConf.enhancedSqls; + if (enhancedSqls != null && fileName != null) { + long containerSize = omniHiveConf.enhancedSize != 0 ? omniHiveConf.enhancedSize + : (long) (1.5 * hookContextConf.getLong("hive.tez.container.size", 0)); + for (String enhancedSql : omniHiveConf.enhancedSqls) { + if (fileName.contains(enhancedSql)) { + hookContextConf.setLong("hive.tez.container.size", containerSize); + hookContextConf.setLong("tez.am.resource.memory.mb", containerSize); + hookContextConf.setLong("tez.task.resource.memory.mb", containerSize); + break; + } + } + + } if (OMNI_OPERATOR.contains(OperatorType.REDUCESINK)) { initMapJoinOpToWork(); initReduceSinkReplaceable(); diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveConf.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveConf.java index 1f3ee070d9e771bee19cfec8df85610b92476c6e..0b9905b401aba4302a9471a889d22830534546d4 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveConf.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniHiveConf.java @@ -39,6 +39,8 @@ class OmniHiveConf { public boolean enableAdaptivePartialAggregation; public long adaptivePartialAggregationMinRows; public double adaptivePartialAggregationRatio; + public String[] enhancedSqls; + public long enhancedSize; public OmniHiveConf(HiveConf hiveConf) { enableOperatorJoin = hiveConf.getBoolean("omni.hive.join.enabled", true); @@ -66,6 +68,8 @@ class OmniHiveConf { enableAdaptivePartialAggregation = hiveConf.getBoolean("omni.hive.adaptivePartialAggregation.enabled", false); adaptivePartialAggregationMinRows = hiveConf.getLong("omni.hive.adaptivePartialAggregationMinRows", 500000); adaptivePartialAggregationRatio = hiveConf.getDouble("omni.hive.adaptivePartialAggregationRatio", 0.8); + enhancedSqls = hiveConf.getStrings("omni.hive.enhanced.sqls"); + enhancedSize = hiveConf.getLong("omni.hive.enhanced.size", 0); } public boolean isEnableOperator(OperatorType operatorType) {