diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java index f0c8c1fc282a8dc471178bf5f9db98f7172f3761..52cd6b1ecaff38e53bb491d62c6d59cd88387517 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/com/huawei/boostkit/omnidata/spark/PageDeRunLength.java @@ -264,7 +264,7 @@ public class PageDeRunLength { WritableColumnVector columnVector = getColumnVector(isOperatorCombineEnabled, positionCount, writableColumnVector); for (int rowId = 0; rowId < positionCount; rowId++) { - if (writableColumnVector.isNullAt(rowId)) { + if (writableColumnVector.isNullAt(rowId) || value == null) { columnVector.putNull(rowId); } else { columnVector.putDecimal(rowId, value, precision); diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 0a9270ca0cf8ebee67550c250299b22ec7d86801..ae4312aa60a4e10fcde38f5041335475d4de2b72 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -215,11 +215,20 @@ public class DataIoAdapter { PageDeserializer deserializer = initPageDeserializer(); // get available host - String[] pushDownHostArray = pageCandidate.getpushDownHosts().split(","); - List pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); - Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, - JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); - availablePushDownHost.ifPresent(pushDownHostList::add); + List pushDownHostList = new ArrayList<>(); + String[] pushDownHostArray; + if (pageCandidate.getpushDownHosts().length() == 0) { + Optional availablePushDownHost = getRandomAvailablePushDownHost(new String[]{}, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + pushDownHostArray = pushDownHostList.toArray(new String[]{}); + } else { + pushDownHostArray = pageCandidate.getpushDownHosts().split(","); + pushDownHostList = new ArrayList<>(Arrays.asList(pushDownHostArray)); + Optional availablePushDownHost = getRandomAvailablePushDownHost(pushDownHostArray, + JavaConverters.mapAsJavaMap(pushDownOperators.fpuHosts())); + availablePushDownHost.ifPresent(pushDownHostList::add); + } return getIterator(pushDownHostList.iterator(), taskSource, pushDownHostArray, deserializer, pushDownHostList.size()); } @@ -282,8 +291,9 @@ public class DataIoAdapter { List allHosts = new ArrayList<>(fpuHosts.values()); allHosts.removeAll(existingHosts); if (allHosts.size() > 0) { - LOG.info("Add another available host: " + allHosts.get(0)); - return Optional.of(allHosts.get(0)); + int randomIndex = (int) (Math.random() * allHosts.size()); + LOG.info("Add another available host: " + allHosts.get(randomIndex)); + return Optional.of(allHosts.get(randomIndex)); } else { return Optional.empty(); }