diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index e57d702e6e60dd59d9f70cb39e7f2bef91060945..45780185a3c4dad66193e71bc6b13e506be34591 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -20,7 +20,7 @@ set (SOURCE_FILES jni/jni_common.cpp jni/ParquetColumnarBatchJniReader.cpp tablescan/ParquetReader.cpp - ) + io/ParquetObsFile.cc) #Find required protobuf package find_package(Protobuf REQUIRED) diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.cc b/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.cc new file mode 100644 index 0000000000000000000000000000000000000000..32b294853e6a7ccf0bf47da47856ee9db9763fbc --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.cc @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "ParquetObsFile.hh" +#include "securec.h" +#include "common/debug.h" + +using namespace arrow::io; +using namespace arrow; + +namespace spark::reader { + std::shared_ptr readObsFile(const std::string& path, ObsConfig *obsInfo) { + return std::shared_ptr(new ObsReadableFile(path, obsInfo)); + } + + typedef struct CallbackData { + char *buf; + uint64_t length; + uint64_t readLength; + obs_status retStatus; + } CallbackData; + + obs_status responsePropertiesCallback(const obs_response_properties *properties, void *data) { + if (NULL == properties) { + LogsError("OBS error, obs_response_properties is null!"); + return OBS_STATUS_ErrorUnknown; + } + CallbackData *ret = (CallbackData *)data; + ret->length = properties->content_length; + return OBS_STATUS_OK; + } + + void commonErrorHandle(const obs_error_details *error) { + if (!error) { + return; + } + if (error->message) { + LogsError("OBS error message: %s", error->message); + } + if (error->resource) { + LogsError("OBS error resource: %s", error->resource); + } + if (error->further_details) { + LogsError("OBS error further details: %s", error->further_details); + } + if (error->extra_details_count) { + LogsError("OBS error extra details:"); + for (int i = 0; i < error->extra_details_count; i++) { + LogsError("[name] %s: [value] %s", error->extra_details[i].name, error->extra_details[i].value); + } + } + } + + void responseCompleteCallback(obs_status status, const obs_error_details *error, void *data) { + if (data) { + CallbackData *ret = (CallbackData *)data; + ret->retStatus = status; + } + commonErrorHandle(error); + } + + obs_status getObjectDataCallback(int buffer_size, const char *buffer, void *data) { + CallbackData *callbackData = (CallbackData *)data; + int read = buffer_size; + if (callbackData->readLength + buffer_size > callbackData->length) { + LogsError("OBS get object failed, read buffer size(%d) is bigger than the remaining buffer\ + (totalLength[%ld] - readLength[%ld] = %ld).\n", + buffer_size, callbackData->length, callbackData->readLength, + callbackData->length - callbackData->readLength); + return OBS_STATUS_InvalidParameter; + } + memcpy_s(callbackData->buf + callbackData->readLength, read, buffer, read); + callbackData->readLength += read; + return OBS_STATUS_OK; + } + + obs_status ObsReadableFile::obsInit() { + obs_status status = OBS_STATUS_BUTT; + status = obs_initialize(OBS_INIT_ALL); + if (OBS_STATUS_OK != status) { + LogsError("OBS initialize failed(%s).", obs_get_status_name(status)); + throw std::runtime_error("OBS initialize failed."); + } + return status; + } + + obs_status ObsReadableFile::obsInitStatus = obsInit(); + + void ObsReadableFile::getObsInfo(ObsConfig *obsConf) { + memcpy_s(&obsInfo, sizeof(ObsConfig), obsConf, sizeof(ObsConfig)); + + std::string obsFilename = filename.substr(OBS_PROTOCOL_SIZE); + uint64_t splitNum = obsFilename.find_first_of("/"); + std::string bucket = obsFilename.substr(0, splitNum); + uint32_t bucketLen = bucket.length(); + strcpy_s(obsInfo.bucket, bucketLen + 1, bucket.c_str()); + option.bucket_options.bucket_name = obsInfo.bucket; + + memset_s(&objectInfo, sizeof(obs_object_info), 0, sizeof(obs_object_info)); + std::string key = obsFilename.substr(splitNum + 1); + strcpy_s(obsInfo.objectKey, key.length() + 1, key.c_str()); + objectInfo.key = obsInfo.objectKey; + + if (obsInfo.hostLen > bucketLen && strncmp(obsInfo.hostName, obsInfo.bucket, bucketLen) == 0) { + obsInfo.hostLen = obsInfo.hostLen - bucketLen - 1; + memcpy_s(obsInfo.hostName, obsInfo.hostLen, obsInfo.hostName + bucketLen + 1, obsInfo.hostLen); + obsInfo.hostName[obsInfo.hostLen - 1] = '\0'; + } + + option.bucket_options.host_name = obsInfo.hostName; + option.bucket_options.access_key = obsInfo.accessKey; + option.bucket_options.secret_access_key = obsInfo.secretKey; + option.bucket_options.token = obsInfo.token; + } + + ObsReadableFile::ObsReadableFile(std::string _filename, ObsConfig *obsConf) { + filename = _filename; + init_obs_options(&option); + + getObsInfo(obsConf); + + CallbackData data; + data.retStatus = OBS_STATUS_BUTT; + data.length = 0; + obs_response_handler responseHandler = { + &responsePropertiesCallback, + &responseCompleteCallback + }; + + get_object_metadata(&option, &objectInfo, 0, &responseHandler, &data); + if (OBS_STATUS_OK != data.retStatus) { + throw std::runtime_error("get obs object(" + filename + ") metadata failed, error_code: " + + obs_get_status_name(data.retStatus)); + } + totalLength = data.length; + + memset_s(&conditions, sizeof(obs_get_conditions), 0, sizeof(obs_get_conditions)); + init_get_properties(&conditions); + } + + Result> ObsReadableFile::ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, io::default_io_context().pool())); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + buffer->ZeroPadding(); + } + return std::move(buffer); + } + + Result ObsReadableFile::ReadAt(int64_t offset, int64_t length, void* buf) { + if (!buf) { + throw std::runtime_error("Buffer is null."); + } + conditions.start_byte = offset; + conditions.byte_count = length; + + obs_get_object_handler handler = { + { &responsePropertiesCallback, + &responseCompleteCallback}, + &getObjectDataCallback + }; + + CallbackData data; + data.retStatus = OBS_STATUS_BUTT; + data.length = length; + data.readLength = 0; + data.buf = reinterpret_cast(buf); + do { + // the data.buf offset is processed in the callback function getObjectDataCallback + uint64_t tmpRead = data.readLength; + get_object(&option, &objectInfo, &conditions, 0, &handler, &data); + if (OBS_STATUS_OK != data.retStatus) { + LogsError("get obs object failed, length=%ld, readLength=%ld, offset=%ld", + data.length, data.readLength, offset); + throw std::runtime_error("get obs object(" + filename + ") failed, error_code: " + + obs_get_status_name(data.retStatus)); + } + + // read data buffer size = 0, no more remaining data need to read + if (tmpRead == data.readLength) { + break; + } + conditions.start_byte = offset + data.readLength; + conditions.byte_count = length - data.readLength; + } while (data.readLength < length); + + return data.readLength; + } +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.hh b/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.hh new file mode 100644 index 0000000000000000000000000000000000000000..143f0441ad59d3c36c8f2da7efb752bca97a9cf5 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/ParquetObsFile.hh @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PARQURTOBSFILE_H +#define PARQURTOBSFILE_H + +#include "eSDKOBS.h" +#include +#include +#include +#include + +#define OBS_READ_SIZE 1024 +#define OBS_KEY_SIZE 2048 +#define OBS_TOKEN_SIZE 8192 +#define OBS_PROTOCOL_SIZE 6 + +using namespace arrow::io; +using namespace arrow; + +namespace spark::reader { + typedef struct ObsConfig { + char hostName[OBS_KEY_SIZE]; + char accessKey[OBS_KEY_SIZE]; + char secretKey[OBS_KEY_SIZE]; + char token[OBS_TOKEN_SIZE]; + char bucket[OBS_KEY_SIZE]; + char objectKey[OBS_KEY_SIZE]; + uint32_t hostLen; + } ObsConfig; + + std::shared_ptr readObsFile(const std::string& path, ObsConfig *obsInfo); + + class ObsReadableFile : public RandomAccessFile { + private: + obs_options option; + obs_object_info objectInfo; + obs_get_conditions conditions; + ObsConfig obsInfo; + + std::string filename; + uint64_t totalLength; + const uint64_t READ_SIZE = OBS_READ_SIZE * OBS_READ_SIZE; + + static obs_status obsInitStatus; + + static obs_status obsInit(); + + bool is_open_ = true; + + void getObsInfo(ObsConfig *obsInfo); + + public: + ObsReadableFile(std::string _filename, ObsConfig *obsInfo); + + Result> ReadAt(int64_t position, int64_t nbytes) override; + + Result ReadAt(int64_t offset, int64_t length, void* buf) override; + + Status Close() override { + if (is_open_) { + is_open_ = false; + return Status::OK(); + } + return Status::OK(); + } + + bool closed() const override { + return !is_open_; + } + + Status CheckClosed() { + if (!is_open_) { + return Status::Invalid("Operation on closed OBS file"); + } + return Status::OK(); + } + + Result GetSize() override { + return totalLength; + } + + Result Read(int64_t nbytes, void* out) override { + return Result(Status::NotImplemented("Not implemented")); + } + + Result> Read(int64_t nbytes) override { + return Result>(Status::NotImplemented("Not implemented")); + } + + Status Seek(int64_t position) override { + return Status::NotImplemented("Not implemented"); + } + + Result Tell() const override { + return Result(Status::NotImplemented("Not implemented")); + } + + ~ObsReadableFile() {} + }; +} + +#endif diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp index fda647658a2477e3c7ac213fa9223a64cab09f39..e24bff186a5ac7ad36076fd0b4346c79bbc18eb5 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp @@ -41,6 +41,39 @@ std::vector GetIndices(JNIEnv *env, jobject jsonObj, const char* name) return indices; } +void parseObs(JNIEnv* env, jobject jsonObj, ObsConfig &obsInfo) { + jobject obsObject = env->CallObjectMethod(jsonObj, jsonMethodObj, env->NewStringUTF("obsInfo")); + if (obsObject == NULL) { + LogsWarn("get obs info failed, obs info is null."); + return; + } + + jstring jEndpoint = (jstring)env->CallObjectMethod(obsObject, jsonMethodString, env->NewStringUTF("endpoint")); + auto endpointCharPtr = env->GetStringUTFChars(jEndpoint, JNI_FALSE); + std::string endpoint = endpointCharPtr; + obsInfo.hostLen = endpoint.length() + 1; + strcpy_s(obsInfo.hostName, obsInfo.hostLen, endpoint.c_str()); + env->ReleaseStringUTFChars(jEndpoint, endpointCharPtr); + + jstring jAk = (jstring)env->CallObjectMethod(obsObject, jsonMethodString, env->NewStringUTF("ak")); + auto akCharPtr = env->GetStringUTFChars(jAk, JNI_FALSE); + std::string ak = akCharPtr; + strcpy_s(obsInfo.accessKey, ak.length() + 1, ak.c_str()); + env->ReleaseStringUTFChars(jAk, akCharPtr); + + jstring jSk = (jstring)env->CallObjectMethod(obsObject, jsonMethodString, env->NewStringUTF("sk")); + auto skCharPtr = env->GetStringUTFChars(jSk, JNI_FALSE); + std::string sk = skCharPtr; + strcpy_s(obsInfo.secretKey, sk.length() + 1, sk.c_str()); + env->ReleaseStringUTFChars(jSk, skCharPtr); + + jstring jToken = (jstring)env->CallObjectMethod(obsObject, jsonMethodString, env->NewStringUTF("token")); + auto tokenCharPtr = env->GetStringUTFChars(jToken, JNI_FALSE); + std::string token = tokenCharPtr; + strcpy_s(obsInfo.token, token.length() + 1, token.c_str()); + env->ReleaseStringUTFChars(jToken, tokenCharPtr); +} + JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJniReader_initializeReader(JNIEnv *env, jobject jObj, jobject jsonObj) { @@ -63,8 +96,11 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJ auto row_group_indices = GetIndices(env, jsonObj, "rowGroupIndices"); auto column_indices = GetIndices(env, jsonObj, "columnIndices"); + ObsConfig obsInfo; + parseObs(env, jsonObj, obsInfo); + ParquetReader *pReader = new ParquetReader(); - auto state = pReader->InitRecordReader(file, capacity, row_group_indices, column_indices, ugiString); + auto state = pReader->InitRecordReader(file, capacity, row_group_indices, column_indices, ugiString, obsInfo); if (state != Status::OK()) { env->ThrowNew(runtimeExceptionClass, state.ToString().c_str()); return 0; diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp index a21c97df98817caa421a0436ffc0d8a44123e81a..ea72097091cfe93ddaa261e81126c0bd4ac37553 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp @@ -86,7 +86,8 @@ Filesystem* spark::reader::GetFileSystemPtr(std::string& path, std::string& ugi) } Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, - const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi) + const std::vector& row_group_indices, const std::vector& column_indices, + std::string& ugi, ObsConfig& obsInfo) { arrow::MemoryPool* pool = default_memory_pool(); @@ -99,11 +100,16 @@ Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, auto arrow_reader_properties = parquet::ArrowReaderProperties(); arrow_reader_properties.set_batch_size(capacity); - // Get the file from filesystem - mutex_.lock(); - Filesystem* fs = GetFileSystemPtr(filePath, ugi); - mutex_.unlock(); - ARROW_ASSIGN_OR_RAISE(auto file, fs->filesys_ptr->OpenInputFile(filePath)); + std::shared_ptr file; + if (0 == strncmp(filePath.c_str(), "obs://", OBS_PROTOCOL_SIZE)) { + file = readObsFile(filePath, &obsInfo); + } else { + // Get the file from filesystem + mutex_.lock(); + Filesystem* fs = GetFileSystemPtr(filePath, ugi); + mutex_.unlock(); + ARROW_ASSIGN_OR_RAISE(file, fs->filesys_ptr->OpenInputFile(filePath)); + } FileReaderBuilder reader_builder; ARROW_RETURN_NOT_OK(reader_builder.Open(file, reader_properties)); diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h index 9ef59abe7822f3b73f8876c4acead3010117fdc3..9a55d785ca9a3e926cea28ebea392e7e73680da7 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h @@ -34,6 +34,8 @@ #include #include #include +#include +#include namespace spark::reader { class ParquetReader { @@ -41,7 +43,8 @@ namespace spark::reader { ParquetReader() {} arrow::Status InitRecordReader(std::string& path, int64_t capacity, - const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi); + const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi, + ObsConfig& obsInfo); arrow::Status ReadNextBatch(std::shared_ptr *batch); diff --git a/omnioperator/omniop-spark-extension/cpp/test/tablescan/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/tablescan/CMakeLists.txt index 0f026d752ed3fbecd746a7ac8cd85e730f7076f4..2d8dcdbeb34e7e98075b1b513ae1bfd24920fe52 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/tablescan/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/test/tablescan/CMakeLists.txt @@ -6,5 +6,7 @@ set(SCAN_TEST_TARGET tablescantest) add_library(${SCAN_TEST_TARGET} STATIC ${SCAN_TESTS_LIST} parquet_scan_test.cpp) target_compile_options(${SCAN_TEST_TARGET} PUBLIC ) +target_link_libraries(${SCAN_TEST_TARGET} eSDKOBS) + target_include_directories(${SCAN_TEST_TARGET} PUBLIC $ENV{JAVA_HOME}/include) target_include_directories(${SCAN_TEST_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) diff --git a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp index a7da7f0ff79da724350cc3bbc3f62fcff68b948b..39c30151e3d4d81ffc8fa6fff7dc5e7766b153a0 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp @@ -44,7 +44,8 @@ TEST(read, test_parquet_reader) ParquetReader *reader = new ParquetReader(); std::string ugi = "root@sample"; - auto state1 = reader->InitRecordReader(filename, 1024, row_group_indices, column_indices, ugi); + ObsConfig obsInfo; + auto state1 = reader->InitRecordReader(filename, 1024, row_group_indices, column_indices, ugi, obsInfo); ASSERT_EQ(state1, Status::OK()); std::shared_ptr batch; diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/ObsConf.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/ObsConf.java index 244ee1204602b038bc404313da9126d480cbcf43..0c9228c88b02573d742706a46cecfeda8d40f258 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/ObsConf.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/ObsConf.java @@ -26,6 +26,7 @@ import com.obs.services.model.ISecurityKey; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.json.JSONObject; public class ObsConf { private static final Logger LOG = LoggerFactory.getLogger(ObsConf.class); @@ -164,4 +165,15 @@ public class ObsConf { } } } + + public static JSONObject constructObsJSONObject() { + JSONObject obsJsonItem = new JSONObject(); + obsJsonItem.put("endpoint", ObsConf.getEndpoint()); + synchronized (ObsConf.getLock()) { + obsJsonItem.put("ak", ObsConf.getAk()); + obsJsonItem.put("sk", ObsConf.getSk()); + obsJsonItem.put("token", ObsConf.getToken()); + } + return obsJsonItem; + } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java index c2ba2b7cf6e0bf681dde531f3b13ee93ebed2c49..128ff6ca190012de4ece35e25e13bf3266fdfff2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java @@ -159,7 +159,7 @@ public class OrcColumnarBatchJniReader { } // just used for obs - job.put("obsInfo", constructObsJSONObject()); + job.put("obsInfo", ObsConf.constructObsJSONObject()); reader = initializeReader(path, job); return reader; @@ -364,17 +364,6 @@ public class OrcColumnarBatchJniReader { } } - public JSONObject constructObsJSONObject() { - JSONObject obsJsonItem = new JSONObject(); - obsJsonItem.put("endpoint", ObsConf.getEndpoint()); - synchronized (ObsConf.getLock()) { - obsJsonItem.put("ak", ObsConf.getAk()); - obsJsonItem.put("sk", ObsConf.getSk()); - obsJsonItem.put("token", ObsConf.getToken()); - } - return obsJsonItem; - } - public static void tokenDebug(String mesg) { try { LOGGER.debug("\n\n=============" + mesg + "=============\n" + UserGroupInformation.getCurrentUser().toString()); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java index 3a5cffb09c4792e2731dcd9ab8b377417f888b4c..c45f33bb50a565d0937304817d9a8f6daf86e3e3 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java @@ -18,6 +18,7 @@ package com.huawei.boostkit.spark.jni; +import com.huawei.boostkit.spark.ObsConf; import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.*; @@ -46,6 +47,8 @@ public class ParquetColumnarBatchJniReader { job.put("rowGroupIndices", rowgroupIndices.stream().mapToInt(Integer::intValue).toArray()); job.put("columnIndices", columnIndices.stream().mapToInt(Integer::intValue).toArray()); job.put("ugi", ugi); + // just used for obs + job.put("obsInfo", ObsConf.constructObsJSONObject()); parquetReader = initializeReader(job); return parquetReader; }