From c119d51b0ccb346ae8e9e444ad29f915aa9fe4e6 Mon Sep 17 00:00:00 2001 From: xuli <1061529620@qq.com> Date: Sat, 17 Jun 2023 06:35:06 +0000 Subject: [PATCH 1/2] =?UTF-8?q?=E9=80=82=E9=85=8Drow=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=97=B6=EF=BC=8Cdecimal=E6=95=B0=E6=8D=AE=E5=9C=A8=E8=A7=A3?= =?UTF-8?q?=E5=8E=8B=E6=97=B6=EF=BC=8C=E9=9C=80=E8=A6=81=E5=81=9Anull?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: xuli <1061529620@qq.com> --- .../com/huawei/boostkit/omnidata/spark/PageDeRunLength.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java index f0c8c1fc2..52cd6b1ec 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java @@ -264,7 +264,7 @@ public class PageDeRunLength { WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, writableColumnVector); for (int rowId = 0; rowId < positionCount; rowId++) { - if (writableColumnVector.isNullAt(rowId)) { + if (writableColumnVector.isNullAt(rowId) || value == null) { columnVector.putNull(rowId); } else { columnVector.putDecimal(rowId, value, precision); -- Gitee From 234a6edd9e42c0bc0822112776aef7d497509c21 Mon Sep 17 00:00:00 2001 From: xuli <1061529620@qq.com> Date: Sat, 17 Jun 2023 07:57:12 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=E9=80=82=E9=85=8Drow=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=97=B6=EF=BC=8Cdecimal=E6=95=B0=E6=8D=AE=E5=9C=A8=E8=A7=A3?= =?UTF-8?q?=E5=8E=8B=E6=97=B6=EF=BC=8C=E9=9C=80=E8=A6=81=E5=81=9Anull?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: xuli <1061529620@qq.com> --- .../org/apache/spark/sql/DataIoAdapter.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 0a9270ca0..ae4312aa6 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -215,11 +215,20 @@ public class DataIoAdapter { PageDeserializer deserializer = initPageDeserializer(); // get available host - String[] pushDownHostArray = pageCandidate.getpushDownHosts().split(","); - List pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); - Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, - JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); - availablePushDownHost.ifPresent(pushDownHostList::add); + List pushDownHostList = new ArrayList<>(); + String[] pushDownHostArray; + if (pageCandidate.getpushDownHosts().length() == 0) { + Optional availablePushDownHost = getRandomAvailablePushDownHost(new String[]{}, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + pushDownHostArray = pushDownHostList.toArray(new String[]{}); + } else { + pushDownHostArray = pageCandidate.getpushDownHosts().split(","); + pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); + Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + } return getIterator(pushDownHostList.iterator(), taskSource, pushDownHostArray, deserializer, pushDownHostList.size()); } @@ -282,8 +291,9 @@ public class DataIoAdapter { List allHosts = new ArrayList<>(fpuHosts.values()); allHosts.removeAll(existingHosts); if (allHosts.size() > 0) { - LOG.info("Add another available host: " + allHosts.get(0)); - return Optional.of(allHosts.get(0)); + int randomIndex = (int) (Math.random() * allHosts.size()); + LOG.info("Add another available host: " + allHosts.get(randomIndex)); + return Optional.of(allHosts.get(randomIndex)); } else { return Optional.empty(); } -- Gitee