diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 7256a02cbaa0928592d0701e5b117f833321c72c..455723aa12855e3d771df7e36492c3e96d0f8fbe 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -10,6 +10,8 @@ set (SOURCE_FILES io/OutputStream.cc io/SparkFile.cc io/WriterOptions.cc + io/orcfile/OmniOrcFile.cc + io/orcfile/OmniOrcHdfsFile.cc shuffle/splitter.cpp common/common.cpp jni/SparkJniWrapper.cpp @@ -17,7 +19,12 @@ set (SOURCE_FILES jni/jni_common.cpp jni/ParquetColumnarBatchJniReader.cpp tablescan/ParquetReader.cpp - ) + io/arrowadapter/FileSystemAdapter.cc + io/arrowadapter/UtilInternal.cc + io/arrowadapter/HdfsAdapter.cc + io/arrowadapter/LocalfsAdapter.cc + io/UriInfo.cc +) #Find required protobuf package find_package(Protobuf REQUIRED) diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.cc b/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.cc new file mode 100644 index 0000000000000000000000000000000000000000..2c635ac64e90cbab431d71c25ab6acb51682b9f8 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.cc @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "UriInfo.h" + +UriInfo::UriInfo(std::string _uri, std::string _scheme, std::string _path, std::string _host, + std::string _port): host_string(std::move(_host)), + scheme_string(std::move(_scheme)), + port_string(std::move(_port)), + path_string(std::move(_path)), + uri_string(std::move(_uri)) +{ +} + +UriInfo::UriInfo(std::string _scheme, std::string _path, std::string _host, + std::string _port): host_string(std::move(_host)), + scheme_string(std::move(_scheme)), + port_string(std::move(_port)), + path_string(std::move(_path)), + uri_string("Not initialize origin uri!") +{ +} + +UriInfo::~UriInfo() {} + +const std::string UriInfo::scheme() const { + return scheme_string; +} + +const std::string UriInfo::host() const { + return host_string; +} + +const std::string UriInfo::port() const { + return port_string; +} + +const std::string UriInfo::path() const { + return path_string; +} + +const std::string UriInfo::ToString() const { + return uri_string; +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.h b/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.h new file mode 100644 index 0000000000000000000000000000000000000000..30ca0a456eda0a8edbbb9da69dddf4a5f9dc044d --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/UriInfo.h @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef URI_INFO_H +#define URI_INFO_H + +/// \brief A parsed URI +class UriInfo { +public: + UriInfo(std::string _uri, std::string _scheme, std::string _path, std::string _host, std::string _port); + + UriInfo(std::string _scheme, std::string _path, std::string _host, std::string _port); + + ~UriInfo(); + + const std::string scheme() const ; + + /// The URI host name, such as "localhost", "127.0.0.1" or "::1", or the empty + /// string is the URI does not have a host component. + const std::string host() const ; + + /// The URI path component. + const std::string path() const ; + + /// The URI port number, as a string such as "80", or the empty string is the URI + /// does not have a port number component. + const std::string port() const ; + + /// Get the string representation of this URI. + const std::string ToString() const ; + +private: + std::string host_string; + std::string scheme_string; + std::string port_string; + std::string path_string; + std::string uri_string; +}; +#endif \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.cc b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.cc new file mode 100644 index 0000000000000000000000000000000000000000..6ea2a87258dcc2759b918d962f7ab5b1a4251873 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.cc @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "FileSystemAdapter.h" +#include "arrow/filesystem/hdfs.h" +#include "HdfsAdapter.h" +#include "LocalfsAdapter.h" +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/path_util.h" +#include "UtilInternal.h" +#include "arrow/io/slow.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/macros.h" +#include "arrow/util/parallel.h" + +namespace arrow_adapter { + +using arrow::internal::Uri; +using arrow::fs::internal::RemoveLeadingSlash; +using arrow::fs::internal::ToSlashes; +using arrow::fs::FileSystem; +using arrow::fs::HadoopFileSystem; +using arrow::fs::LocalFileSystem; +using arrow::fs::internal::MockFileSystem; +using arrow::Result; + +namespace { + +Result> +FileSystemFromUriReal(const UriInfo &uri, const arrow::io::IOContext &io_context, std::string *out_path) { + const auto scheme = uri.scheme(); + + if (scheme == "file") { + std::string path; + ARROW_ASSIGN_OR_RAISE(auto options, buildLocalfsOptionsFromUri(uri, &path)); + if (out_path != nullptr) { + *out_path = path; + } + return std::make_shared(options, io_context); + } + + if (scheme == "hdfs" || scheme == "viewfs") { + ARROW_ASSIGN_OR_RAISE(auto options, buildHdfsOptionsFromUri(uri)); + if (out_path != nullptr) { + *out_path = uri.path(); + } + ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context)); + return hdfs; + } + + if (scheme == "mock") { + // MockFileSystem does not have an absolute / relative path distinction, + // normalize path by removing leading slash. + if (out_path != nullptr) { + *out_path = std::string(RemoveLeadingSlash(uri.path())); + } + return std::make_shared(CurrentTimePoint(), + io_context); + } + + return arrow::fs::FileSystemFromUri(uri.ToString(), io_context, out_path); +} + +} // namespace + + +Result> FileSystemFromUriOrPath(const UriInfo &uri, + std::string *out_path) { + return FileSystemFromUriOrPath(uri, arrow::io::IOContext(), out_path); +} + +Result> FileSystemFromUriOrPath( + const UriInfo &uri, const arrow::io::IOContext &io_context, + std::string *out_path) { + const auto& uri_string = uri.ToString(); + if (arrow::fs::internal::DetectAbsolutePath(uri_string)) { + // Normalize path separators + if (out_path != nullptr) { + *out_path = ToSlashes(uri_string); + } + return std::make_shared(); + } + return FileSystemFromUriReal(uri, io_context, out_path); +} + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.h b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.h new file mode 100644 index 0000000000000000000000000000000000000000..cbb10e6fadfa538ec438c0cd17a534453beefb68 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/FileSystemAdapter.h @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/filesystem/type_fwd.h" +#include "arrow/io/interfaces.h" +#include "arrow/type_fwd.h" +#include "arrow/util/compare.h" +#include "arrow/util/macros.h" +#include "arrow/util/type_fwd.h" +#include "arrow/util/visibility.h" +#include "arrow/util/windows_fixup.h" +#include "io/UriInfo.h" + +namespace arrow_adapter { + +using arrow::Result; + +using arrow::fs::FileSystem; + +/// \defgroup filesystem-factories Functions for creating FileSystem instances + +/// @{ + +/// \brief Create a new FileSystem by URI +/// +/// Same as FileSystemFromUriOrPath, but it use uri that constructed by client +ARROW_EXPORT +Result> FileSystemFromUriOrPath(const UriInfo &uri, + std::string* out_path = NULLPTR); + + +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", +/// "gs" and "gcs". +/// +/// \param[in] uri a URI-based path, ex: file:///some/local/path +/// \param[in] io_context an IOContext which will be associated with the filesystem +/// \param[out] out_path (optional) Path inside the filesystem. +/// \return out_fs FileSystem instance. + + +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Same as FileSystemFromUri, but in addition also recognize non-URIs +/// and treat them as local filesystem paths. Only absolute local filesystem +/// paths are allowed. +ARROW_EXPORT +Result> FileSystemFromUriOrPath( + const UriInfo &uri, const arrow::io::IOContext &io_context, + std::string *out_path = NULLPTR); + +/// @} + +// namespace fs +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.cc b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.cc new file mode 100644 index 0000000000000000000000000000000000000000..efcf2a754254abe9732a84008588885ac5e6a904 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.cc @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/util/value_parsing.h" +#include "HdfsAdapter.h" + +namespace arrow_adapter { + +using arrow::internal::ParseValue; + +using arrow::Result; +using arrow::fs::HdfsOptions; + +Result buildHdfsOptionsFromUri(const UriInfo &uri){ + HdfsOptions options; + + std::string host; + host = uri.scheme() + "://" + uri.host(); + + // configure endpoint + int32_t port; + if (uri.port().empty() || (port = atoi(uri.port().c_str())) == -1) { + // default port will be determined by hdfs FileSystem impl + options.ConfigureEndPoint(host, 0); + } else { + options.ConfigureEndPoint(host, port); + } + + return options; +} + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.h b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.h new file mode 100644 index 0000000000000000000000000000000000000000..445083638c3ca68b375bf09d76e3b7201ac98fc5 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/HdfsAdapter.h @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/hdfs.h" +#include "io/UriInfo.h" + +namespace arrow_adapter { + +using arrow::Result; +using arrow::fs::HdfsOptions; + +ARROW_EXPORT +Result buildHdfsOptionsFromUri(const UriInfo &uri); + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.cc b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.cc new file mode 100644 index 0000000000000000000000000000000000000000..97b91b22a14a3bfff5f3fdc1a0a42fbf1acb08da --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.cc @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "arrow/filesystem/localfs.h" +#include "arrow/util/io_util.h" +#include "LocalfsAdapter.h" +#include "arrow/result.h" + +namespace arrow_adapter { + +using ::arrow::internal::IOErrorFromErrno; +using ::arrow::internal::NativePathString; +using ::arrow::internal::PlatformFilename; +using arrow::Result; +using arrow::fs::LocalFileSystemOptions; +using arrow::Status; + +Result buildLocalfsOptionsFromUri(const UriInfo &uri, std::string* out_path){ + std::string path; + const auto host = uri.host(); + if (!host.empty()) { + return Status::Invalid("Unsupported hostname in non-Windows local URI: '", + uri.ToString(), "'"); + } else { + *out_path = uri.path(); + } + + return LocalFileSystemOptions(); +} + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.h b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.h new file mode 100644 index 0000000000000000000000000000000000000000..47f256706e5ebf06c31c68795226b970509aa845 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/LocalfsAdapter.h @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/localfs.h" +#include "io/UriInfo.h" + +namespace arrow_adapter { + +using arrow::Result; +using arrow::fs::LocalFileSystemOptions; +using arrow::Status; + +ARROW_EXPORT +Result buildLocalfsOptionsFromUri(const UriInfo &uri, std::string* out_path); + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.cc b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.cc new file mode 100644 index 0000000000000000000000000000000000000000..d82676df4ec2c6c47cc0bb4995d862884ee0bb37 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.cc @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "UtilInternal.h" + +namespace arrow_adapter { + +using arrow::fs::TimePoint; + +TimePoint CurrentTimePoint() { + auto now = std::chrono::system_clock::now(); + return TimePoint( + std::chrono::duration_cast(now.time_since_epoch())); +} + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.h b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.h new file mode 100644 index 0000000000000000000000000000000000000000..67d51eb4646ff3b641c8164cc84d331797016fc0 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/arrowadapter/UtilInternal.h @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/interfaces.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow_adapter { + +using arrow::fs::TimePoint; + +ARROW_EXPORT +TimePoint CurrentTimePoint(); + +} +// namespace arrow \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.cc b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.cc new file mode 100644 index 0000000000000000000000000000000000000000..af2077da80801eaf8e7e4572d1847bf753880442 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.cc @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OmniOrcFile.hh" +#include "orc/Exceptions.hh" + +#ifdef _MSC_VER +#else +#define O_BINARY 0 +#endif + +namespace orc { + std::unique_ptr readOmniFile(const UriInfo &uri) { + if (uri.scheme() == "hdfs") { + return orc::readOmniHdfsFile(uri); + } else { + return orc::readLocalFile(uri.path()); + } + } +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.hh b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.hh new file mode 100644 index 0000000000000000000000000000000000000000..302b3c858807fe5cbd191daf6ef5073441cd4032 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcFile.hh @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ORC_FILE_REWRITE_HH +#define ORC_FILE_REWRITE_HH + +#include + +#include "hdfspp/options.h" +#include "orc/OrcFile.hh" +#include "io/UriInfo.h" + +/** /file orc/OrcFile.hh + @brief The top level interface to ORC. +*/ + +namespace orc { + + + + /** + * Create a stream to a local file or HDFS file if path begins with "hdfs://" + * @param path the name of the file in the local file system or HDFS + */ + std::unique_ptr readOmniFile(const UriInfo &uri); + + /** + * Create a stream to an HDFS file. + * @param path the uri of the file in HDFS + */ + std::unique_ptr readOmniHdfsFile(const UriInfo &uri); +} + +#endif diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcHdfsFile.cc b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcHdfsFile.cc new file mode 100644 index 0000000000000000000000000000000000000000..2c7528022c9a54a692536fa602b51132ea84cd94 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OmniOrcHdfsFile.cc @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OmniOrcFile.hh" +#include "orc/Exceptions.hh" +#include "hdfspp/hdfspp.h" + +namespace orc { + + class OmniHdfsFileInputStream : public InputStream { + private: + UriInfo uri; + std::unique_ptr file; + std::unique_ptr file_system; + uint64_t totalLength; + const uint64_t READ_SIZE = 1024 * 1024; //1 MB + + public: + explicit OmniHdfsFileInputStream(const UriInfo& _uri): uri(_uri) { + + hdfs::ConfigParser parser; + if(!parser.LoadDefaultResources()){ + throw ParseError("Could not load default resources. "); + } + auto stats = parser.ValidateResources(); + //validating core-site.xml + if(!stats[0].second.ok()){ + throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString()); + } + //validating hdfs-site.xml + if(!stats[1].second.ok()){ + throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString()); + } + hdfs::Options options; + if(!parser.get_options(options)){ + throw ParseError("Could not load Options object. "); + } + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping file_system into a unique pointer to guarantee deletion + file_system = std::unique_ptr( + hdfs::FileSystem::New(io_service, "", options)); + if (file_system == nullptr) { + throw ParseError("Can't create FileSystem object. "); + } + hdfs::Status status; + + //Checking if the user supplied the host + if(!uri.host().empty()){ + std::string port = !uri.port().empty() ? + uri.port() : ""; + status = file_system->Connect(uri.host(), port); + if (!status.ok()) { + throw ParseError("Can't connect to " + uri.host() + + ":" + port + ". " + status.ToString()); + } + } else { + status = file_system->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + throw ParseError("Error connecting to " + + options.defaultFS.str() + ". " + status.ToString()); + } else { + throw ParseError( + "Error connecting to the cluster: defaultFS is empty. " + + status.ToString()); + } + } + } + + if (file_system == nullptr) { + throw ParseError("Can't connect the file system. "); + } + + hdfs::FileHandle *file_raw = nullptr; + status = file_system->Open(uri.path(), &file_raw); + if (!status.ok()) { + throw ParseError("Can't open " + + uri.path() + ". " + status.ToString()); + } + //Wrapping file_raw into a unique pointer to guarantee deletion + file.reset(file_raw); + + hdfs::StatInfo stat_info; + status = file_system->GetFileInfo(uri.path(), stat_info); + if (!status.ok()) { + throw ParseError("Can't stat " + + uri.path() + ". " + status.ToString()); + } + totalLength = stat_info.length; + } + + uint64_t getLength() const override { + return totalLength; + } + + uint64_t getNaturalReadSize() const override { + return READ_SIZE; + } + + void read(void* buf, + uint64_t length, + uint64_t offset) override { + + if (!buf) { + throw ParseError("Buffer is null"); + } + + char* buf_ptr = reinterpret_cast(buf); + hdfs::Status status; + size_t total_bytes_read = 0; + size_t last_bytes_read = 0; + + do { + status = file->PositionRead(buf_ptr, + static_cast(length) - total_bytes_read, + static_cast(offset + total_bytes_read), &last_bytes_read); + if(!status.ok()) { + throw ParseError("Error reading the file: " + status.ToString()); + } + total_bytes_read += last_bytes_read; + buf_ptr += last_bytes_read; + } while (total_bytes_read < length); + } + + const std::string& getName() const override { + return uri.path(); + } + + ~OmniHdfsFileInputStream() override; + }; + + OmniHdfsFileInputStream::~OmniHdfsFileInputStream() { + } + + std::unique_ptr readOmniHdfsFile(const UriInfo &uri) { + return std::unique_ptr(new OmniHdfsFileInputStream(uri)); + } +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp index c0f4c1ae17031e933dd4467328c629d8045c3f20..8876f2ec5e173d96edc214ef32cfccab464efbaf 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp @@ -20,16 +20,18 @@ #include "OrcColumnarBatchJniReader.h" #include #include "jni_common.h" +#include "io/UriInfo.h" using namespace omniruntime::vec; using namespace omniruntime::type; using namespace std; using namespace orc; + static constexpr int32_t MAX_DECIMAL64_DIGITS = 18; JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniReader_initializeReader(JNIEnv *env, - jobject jObj, jstring path, jobject jsonObj) + jobject jObj, jobject jsonObj) { JNI_FUNC_START @@ -39,8 +41,6 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniRe jlong tailLocation = env->CallLongMethod(jsonObj, jsonMethodLong, env->NewStringUTF("tailLocation")); jstring serTailJstr = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("serializedTail")); - const char *pathPtr = env->GetStringUTFChars(path, nullptr); - std::string filePath(pathPtr); orc::MemoryPool *pool = orc::getDefaultPool(); orc::ReaderOptions readerOptions; readerOptions.setMemoryPool(*pool); @@ -52,8 +52,27 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniRe env->ReleaseStringUTFChars(serTailJstr, ptr); } - std::unique_ptr reader = createReader(orc::readFile(filePath), readerOptions); - env->ReleaseStringUTFChars(path, pathPtr); + jstring schemaJstr = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("scheme")); + const char *schemaPtr = env->GetStringUTFChars(schemaJstr, nullptr); + std::string schemaStr(schemaPtr); + env->ReleaseStringUTFChars(schemaJstr, schemaPtr); + + jstring fileJstr = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("path")); + const char *filePtr = env->GetStringUTFChars(fileJstr, nullptr); + std::string fileStr(filePtr); + env->ReleaseStringUTFChars(fileJstr, filePtr); + + jstring hostJstr = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("host")); + const char *hostPtr = env->GetStringUTFChars(hostJstr, nullptr); + std::string hostStr(hostPtr); + env->ReleaseStringUTFChars(hostJstr, hostPtr); + + jint port = (jint)env->CallIntMethod(jsonObj, jsonMethodInt, env->NewStringUTF("port")); + UriInfo uri(schemaStr, fileStr, hostStr, std::to_string(port)); + + std::unique_ptr reader; + reader = createReader(orc::readOmniFile(uri), readerOptions); + orc::Reader *readerNew = reader.release(); return (jlong)(readerNew); JNI_FUNC_END(runtimeExceptionClass) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h index 714d97ee67df137da8c6dcc79f8aa2173a33066d..6c7560a7fc6146b8b82e0a654c17f1314401297e 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h @@ -33,7 +33,7 @@ #include #include #include -#include +#include #include #include #include @@ -69,7 +69,7 @@ enum class PredicateOperatorType { * Signature: (Ljava/lang/String;Lorg/json/simple/JSONObject;)J */ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniReader_initializeReader - (JNIEnv* env, jobject jObj, jstring path, jobject job); + (JNIEnv* env, jobject jObj, jobject job); /* * Class: com_huawei_boostkit_spark_jni_OrcColumnarBatchJniReader diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp index fda647658a2477e3c7ac213fa9223a64cab09f39..682193168b6021402f71f8a1cc572a143d1d1562 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp @@ -20,6 +20,7 @@ #include "ParquetColumnarBatchJniReader.h" #include "jni_common.h" #include "tablescan/ParquetReader.h" +#include "io/UriInfo.h" using namespace omniruntime::vec; using namespace omniruntime::type; @@ -28,6 +29,7 @@ using namespace arrow; using namespace parquet::arrow; using namespace spark::reader; + std::vector GetIndices(JNIEnv *env, jobject jsonObj, const char* name) { jintArray indicesArray = (jintArray)env->CallObjectMethod(jsonObj, jsonMethodObj, env->NewStringUTF(name)); @@ -45,17 +47,36 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJ jobject jObj, jobject jsonObj) { JNI_FUNC_START - // Get filePath - jstring path = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("filePath")); - const char *filePath = env->GetStringUTFChars(path, JNI_FALSE); - std::string file(filePath); - env->ReleaseStringUTFChars(path, filePath); + // Get uriStr + jstring uri = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("uri")); + const char *uriStr = env->GetStringUTFChars(uri, JNI_FALSE); + std::string uriString(uriStr); + env->ReleaseStringUTFChars(uri, uriStr); jstring ugiTemp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("ugi")); const char *ugi = env->GetStringUTFChars(ugiTemp, JNI_FALSE); std::string ugiString(ugi); env->ReleaseStringUTFChars(ugiTemp, ugi); + jstring schemeTmp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("scheme")); + const char *scheme = env->GetStringUTFChars(schemeTmp, JNI_FALSE); + std::string schemeString(scheme); + env->ReleaseStringUTFChars(schemeTmp, scheme); + + jstring hostTmp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("host")); + const char *host = env->GetStringUTFChars(hostTmp, JNI_FALSE); + std::string hostString(host); + env->ReleaseStringUTFChars(hostTmp, host); + + jstring pathTmp = (jstring)env->CallObjectMethod(jsonObj, jsonMethodString, env->NewStringUTF("path")); + const char *path = env->GetStringUTFChars(pathTmp, JNI_FALSE); + std::string pathString(ugi); + env->ReleaseStringUTFChars(pathTmp, path); + + jint port = (jint)env->CallIntMethod(jsonObj, jsonMethodInt, env->NewStringUTF("port")); + + UriInfo uriInfo(uriString, schemeString, pathString, hostString, std::to_string(port)); + // Get capacity for each record batch int64_t capacity = (int64_t)env->CallLongMethod(jsonObj, jsonMethodLong, env->NewStringUTF("capacity")); @@ -64,7 +85,7 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJ auto column_indices = GetIndices(env, jsonObj, "columnIndices"); ParquetReader *pReader = new ParquetReader(); - auto state = pReader->InitRecordReader(file, capacity, row_group_indices, column_indices, ugiString); + auto state = pReader->InitRecordReader(uriInfo, capacity, row_group_indices, column_indices, ugiString); if (state != Status::OK()) { env->ThrowNew(runtimeExceptionClass, state.ToString().c_str()); return 0; diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp index 4f917e22c1d51d4e15ed0fa2a861408441a9d040..21e50b5ca2a996f73faa8d676e5a955ee27f6afd 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp @@ -19,8 +19,9 @@ #include #include -#include +#include "io/arrowadapter/FileSystemAdapter.h" #include +#include "io/UriInfo.h" #include "jni/jni_common.h" #include "ParquetReader.h" @@ -30,6 +31,8 @@ using namespace arrow; using namespace parquet::arrow; using namespace arrow::compute; using namespace spark::reader; +using namespace arrow::internal; + static std::mutex mutex_; static std::map restore_filesysptr; @@ -69,15 +72,16 @@ std::string spark::reader::GetFileSystemKey(std::string& path, std::string& ugi) return result; } -Filesystem* spark::reader::GetFileSystemPtr(std::string& path, std::string& ugi, arrow::Status &status) +Filesystem* spark::reader::GetFileSystemPtr(UriInfo &uri, std::string& ugi, arrow::Status &status) { - auto key = GetFileSystemKey(path, ugi); + std::string fullPath = uri.ToString(); + auto key = GetFileSystemKey(fullPath, ugi); // if not find key, create the filesystem ptr auto iter = restore_filesysptr.find(key); if (iter == restore_filesysptr.end()) { Filesystem* fs = new Filesystem(); - auto result = fs::FileSystemFromUriOrPath(path); + auto result = arrow_adapter::FileSystemFromUriOrPath(uri); status = result.status(); if (!status.ok()) { return nullptr; @@ -89,8 +93,9 @@ Filesystem* spark::reader::GetFileSystemPtr(std::string& path, std::string& ugi, return restore_filesysptr[key]; } -Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, - const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi) +Status ParquetReader::InitRecordReader(UriInfo &uri, int64_t capacity, + const std::vector& row_group_indices, const std::vector& column_indices, + std::string& ugi) { arrow::MemoryPool* pool = default_memory_pool(); @@ -104,12 +109,12 @@ Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, // Get the file from filesystem Status result; mutex_.lock(); - Filesystem* fs = GetFileSystemPtr(filePath, ugi, result); + Filesystem* fs = GetFileSystemPtr(uri, ugi, result); mutex_.unlock(); if (fs == nullptr || fs->filesys_ptr == nullptr) { return Status::IOError(result); } - ARROW_ASSIGN_OR_RAISE(auto file, fs->filesys_ptr->OpenInputFile(filePath)); + ARROW_ASSIGN_OR_RAISE(auto file, fs->filesys_ptr->OpenInputFile(uri.ToString())); FileReaderBuilder reader_builder; ARROW_RETURN_NOT_OK(reader_builder.Open(file, reader_properties)); diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h index 8fef9d495801bd51f5c04d9ddf86f241a76715d0..15b89869bb630c9d4d98cf5e3e2f9ccf209b6f24 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h @@ -34,14 +34,19 @@ #include #include #include +#include "io/UriInfo.h" + +using namespace arrow::internal; + namespace spark::reader { class ParquetReader { public: ParquetReader() {} - arrow::Status InitRecordReader(std::string& path, int64_t capacity, - const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi); + arrow::Status InitRecordReader(UriInfo &uri, int64_t capacity, + const std::vector& row_group_indices, + const std::vector& column_indices, std::string& ugi); arrow::Status ReadNextBatch(std::shared_ptr *batch); @@ -62,7 +67,7 @@ namespace spark::reader { std::string GetFileSystemKey(std::string& path, std::string& ugi); - Filesystem* GetFileSystemPtr(std::string& path, std::string& ugi, arrow::Status &status); + Filesystem* GetFileSystemPtr(UriInfo &uri, std::string& ugi, arrow::Status &status); int CopyToOmniVec(std::shared_ptr vcType, int &omniTypeId, uint64_t &omniVecId, std::shared_ptr array); diff --git a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt index ba1ad3a773c35a101cf728f00a19ba30b0dae607..1ce718c70e6670d53b6b5d50d31b07543c889784 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/test/CMakeLists.txt @@ -1,5 +1,7 @@ aux_source_directory(${CMAKE_CURRENT_LIST_DIR} TEST_ROOT_SRCS) +add_subdirectory(io/arrowadapter) +add_subdirectory(io/orcfile) add_subdirectory(shuffle) add_subdirectory(utils) add_subdirectory(tablescan) @@ -10,6 +12,8 @@ set(MY_LINK shuffletest utilstest tablescantest + arrowadaptertest + orcfiletest ) # find gtest package diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..aec5bbc4032b32ea7840a61e10f043e70e3b007d --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/CMakeLists.txt @@ -0,0 +1,7 @@ +aux_source_directory(${CMAKE_CURRENT_LIST_DIR} ARROW_ADAPTER_TESTS_LIST) +set(ARROW_ADAPTER_TARGET arrowadaptertest) +add_library(${ARROW_ADAPTER_TARGET} STATIC ${ARROW_ADAPTER_TESTS_LIST}) +target_compile_options(${ARROW_ADAPTER_TARGET} PUBLIC ) +target_include_directories(${ARROW_ADAPTER_TARGET} PUBLIC ${CMAKE_BINARY_DIR}/src) +target_include_directories(${ARROW_ADAPTER_TARGET} PUBLIC $ENV{JAVA_HOME}/include) +target_include_directories(${ARROW_ADAPTER_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/OmniFileSystemTest.cc b/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/OmniFileSystemTest.cc new file mode 100644 index 0000000000000000000000000000000000000000..0df553a52a9f812cbe0797a2b3640e7022473973 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/test/io/arrowadapter/OmniFileSystemTest.cc @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2020-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include "gtest/gtest.h" +#include "io/arrowadapter/FileSystemAdapter.h" +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/util/checked_cast.h" +#include "arrow/result.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/localfs.h" +#include "../../utils/test_utils.h" +#include "arrow/util/uri.h" + +using namespace arrow::fs::internal; +using arrow::fs::TimePoint; +using arrow::fs::FileSystem; +using arrow_adapter::FileSystemFromUriOrPath; +using arrow::internal::TemporaryDir; +using arrow::fs::LocalFileSystem; +using arrow::fs::LocalFileSystemOptions; +using arrow::internal::PlatformFilename; +using arrow::internal::FileDescriptor; +using arrow::Result; +using arrow::fs::HadoopFileSystem; +using arrow::fs::HdfsOptions; + +class TestMockFS : public ::testing::Test { +public: + void SetUp() override { + time_ = TimePoint(TimePoint::duration(42)); + fs_ = std::make_shared(time_); + } + + std::vector AllDirs() { + return arrow::internal::checked_pointer_cast(fs_)->AllDirs(); + } + + void CheckDirs(const std::vector& expected) { + ASSERT_EQ(AllDirs(), expected); + } + +protected: + TimePoint time_; + std::shared_ptr fs_; +}; + +TEST_F(TestMockFS, FileSystemFromUriOrPath) { + std::string path; + UriInfo uri1("mock", "", "", "-1"); + ASSERT_OK_AND_ASSIGN(fs_, FileSystemFromUriOrPath(uri1, &path)); + ASSERT_EQ(path, ""); + CheckDirs({}); // Ensures it's a MockFileSystem + + UriInfo uri2("mock", "foo/bar", "", "-1"); + ASSERT_OK_AND_ASSIGN(fs_, FileSystemFromUriOrPath(uri2, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); + + UriInfo ur3("mock", "/foo/bar", "", "-1"); + ASSERT_OK_AND_ASSIGN(fs_, FileSystemFromUriOrPath(ur3, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); +} + +struct CommonPathFormatter { + std::string operator()(std::string fn) { return fn; } + bool supports_uri() { return true; } +}; + +using PathFormatters = ::testing::Types; + +// Non-overloaded version of FileSystemFromUri, for template resolution +Result> FSFromUriOrPath(const UriInfo& uri, + std::string* out_path = NULLPTR) { + return arrow_adapter::FileSystemFromUriOrPath(uri, out_path); +} + + +template +class TestLocalFs : public ::testing::Test { +public: + void SetUp() override { + ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("test-localfs-")); + local_path_ = EnsureTrailingSlash(path_formatter_(temp_dir_->path().ToString())); + MakeFileSystem(); + } + + void MakeFileSystem() { + local_fs_ = std::make_shared(options_); + } + + template + void CheckFileSystemFromUriFunc(const UriInfo& uri, + FileSystemFromUriFunc&& fs_from_uri) { + if (!path_formatter_.supports_uri()) { + return; // skip + } + std::string path; + ASSERT_OK_AND_ASSIGN(fs_, fs_from_uri(uri, &path)); + ASSERT_EQ(path, local_path_); + + // Test that the right location on disk is accessed + CreateFile(fs_.get(), local_path_ + "abc", "some data"); + CheckConcreteFile(this->temp_dir_->path().ToString() + "abc", 9); + } + + void TestFileSystemFromUri(const UriInfo& uri) { + CheckFileSystemFromUriFunc(uri, FSFromUriOrPath); + } + + void CheckConcreteFile(const std::string& path, int64_t expected_size) { + ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path)); + ASSERT_OK_AND_ASSIGN(FileDescriptor fd, ::arrow::internal::FileOpenReadable(fn)); + auto result = ::arrow::internal::FileGetSize(fd.fd()); + ASSERT_OK_AND_ASSIGN(int64_t size, result); + ASSERT_EQ(size, expected_size); + } + + void TestLocalUri(const UriInfo& uri, const std::string& expected_path) { + CheckLocalUri(uri, expected_path, FSFromUriOrPath); + } + + template + void CheckLocalUri(const UriInfo& uri, const std::string& expected_path, + FileSystemFromUriFunc&& fs_from_uri) { + if (!path_formatter_.supports_uri()) { + return; // skip + } + std::string path; + ASSERT_OK_AND_ASSIGN(fs_, fs_from_uri(uri, &path)); + ASSERT_EQ(fs_->type_name(), "local"); + ASSERT_EQ(path, expected_path); + } + + void TestInvalidUri(const UriInfo& uri) { + if (!path_formatter_.supports_uri()) { + return; // skip + } + ASSERT_RAISES(Invalid, FSFromUriOrPath(uri)); + } + +protected: + std::unique_ptr temp_dir_; + std::shared_ptr fs_; + std::string local_path_; + PathFormatter path_formatter_; + std::shared_ptr local_fs_; + LocalFileSystemOptions options_ = LocalFileSystemOptions::Defaults(); +}; + +TYPED_TEST_SUITE(TestLocalFs, PathFormatters); + +TYPED_TEST(TestLocalFs, FileSystemFromUriFile){ + std::string path; + ASSERT_OK_AND_ASSIGN(auto uri_string, arrow::internal::UriFromAbsolutePath(this->local_path_)); + UriInfo uri1(uri_string, "", uri_string, "", "-1"); + this->TestFileSystemFromUri(uri1); + + path = "/foo/bar"; + UriInfo uri2("file", path, "", "-1"); + this->TestLocalUri(uri2, path); + + path = "/some path/%percent"; + UriInfo uri3("file", path, "", "-1"); + this->TestLocalUri(uri3, path); + + path = "/some path/%中文魑魅魍魉"; + UriInfo uri4("file", path, "", "-1"); + this->TestLocalUri(uri4, path); +} + +TYPED_TEST(TestLocalFs, FileSystemFromUriNoScheme){ + + UriInfo uri1(this->local_path_, "", "", "", "-1"); + this->TestFileSystemFromUri(uri1); + + UriInfo uri2("foo/bar", "", "", "", "-1"); + this->TestInvalidUri(uri2); +} diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..cdb765aa3581f7885e5df8716fffc343c58f0c20 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(${CMAKE_CURRENT_LIST_DIR} ORC_FILE_TESTS_LIST) +set(MAIN_PATH ${CMAKE_CURRENT_SOURCE_DIR}) + +configure_file(orcfile_test.h.in ${CMAKE_CURRENT_SOURCE_DIR}/orcfile_test.h) +set(ORC_FILE_TARGET orcfiletest) + +add_library(${ORC_FILE_TARGET} STATIC ${ORC_FILE_TESTS_LIST}) +target_compile_options(${ORC_FILE_TARGET} PUBLIC ) +target_include_directories(${ORC_FILE_TARGET} PUBLIC ${CMAKE_BINARY_DIR}/src) +target_include_directories(${ORC_FILE_TARGET} PUBLIC $ENV{JAVA_HOME}/include) +target_include_directories(${ORC_FILE_TARGET} PUBLIC $ENV{JAVA_HOME}/include/linux) diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/OmniOrcHdfsFileTest.cc b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/OmniOrcHdfsFileTest.cc new file mode 100644 index 0000000000000000000000000000000000000000..63119058a047fb1ba8cd15d377f54be520afba09 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/OmniOrcHdfsFileTest.cc @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2020-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" +#include "io/orcfile/OmniOrcFile.hh" +#include "orcfile_test.h" + +TEST(OrcReader, createLocalFileReader) { + std::string filename = "/resources/orc_data_all_type"; + filename = PROJECT_PATH + filename; + + std::unique_ptr reader; + std::unique_ptr rowReader; + std::unique_ptr batch; + orc::ReaderOptions readerOpts; + orc::RowReaderOptions rowReaderOpts; + std::list cols; + + cols.push_back(1); + rowReaderOpts.include(cols); + UriInfo uriInfo("file", filename, "", ""); + reader = orc::createReader(orc::readOmniFile(uriInfo), readerOpts); + EXPECT_NE(nullptr, reader); +} diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/orcfile_test.h.in b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/orcfile_test.h.in new file mode 100644 index 0000000000000000000000000000000000000000..5ca616ec499c349478cb839213a4eb7bb289439c --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/orcfile_test.h.in @@ -0,0 +1 @@ +#define PROJECT_PATH "@MAIN_PATH@" \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/resources/orc_data_all_type b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/resources/orc_data_all_type new file mode 100644 index 0000000000000000000000000000000000000000..9cc57fa78ccdae728d2d902f587c30c337b0e4a5 Binary files /dev/null and b/omnioperator/omniop-spark-extension/cpp/test/io/orcfile/resources/orc_data_all_type differ diff --git a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp index a7da7f0ff79da724350cc3bbc3f62fcff68b948b..4f4a3d78815cce6844486dacd2426ff6c12d8e20 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp @@ -27,6 +27,7 @@ using namespace spark::reader; using namespace arrow; using namespace omniruntime::vec; + /* * CREATE TABLE `parquet_test` ( `c1` int, `c2` varChar(60), `c3` string, `c4` bigint, * `c5` char(40), `c6` float, `c7` double, `c8` decimal(9,8), `c9` decimal(18,5), @@ -44,7 +45,8 @@ TEST(read, test_parquet_reader) ParquetReader *reader = new ParquetReader(); std::string ugi = "root@sample"; - auto state1 = reader->InitRecordReader(filename, 1024, row_group_indices, column_indices, ugi); + UriInfo uri(filename, "", "", "", "-1"); + auto state1 = reader->InitRecordReader(uri, 1024, row_group_indices, column_indices, ugi); ASSERT_EQ(state1, Status::OK()); std::shared_ptr batch; diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 9010cf1504f70eb66fde51c72f60a39318320214..7451eece73b422da1f7670586fd867a31134f38e 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -481,4 +481,20 @@ void DeletePathAll(const char* path) { rmdir(path); } } -} \ No newline at end of file +} + +void CreateFile(FileSystem* fs, const std::string& path, const std::string& data) { + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(data)); + ASSERT_OK(stream->Close()); +} + +void AssertFileInfo(FileSystem* fs, const std::string& path, FileType type) { + ASSERT_OK_AND_ASSIGN(FileInfo info, fs->GetFileInfo(path)); + AssertFileInfo(info, path, type); +} + +void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type) { + ASSERT_EQ(info.path(), path); + ASSERT_EQ(info.type(), type) << "For path '" << info.path() << "'"; +} diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index b7380254a687ed6f3eaf8234df944feac9087404..a6d0eff8681bc68333a5bb18e3d249f605313dbc 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -27,6 +27,14 @@ #include #include "shuffle/splitter.h" #include "jni/concurrent_map.h" +#include "arrow/filesystem/filesystem.h" +#include "arrow/result.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/filesystem/type_fwd.h" + +using arrow::fs::FileSystem; +using arrow::fs::FileInfo; +using arrow::fs::FileType; static ConcurrentMap> testShuffleSplitterHolder; @@ -131,4 +139,10 @@ void GetFilePath(const char *path, const char *filename, char *filepath); void DeletePathAll(const char* path); +void CreateFile(FileSystem* fs, const std::string& path, const std::string& data); + +void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type); + +void AssertFileInfo(FileSystem* fs, const std::string& path, FileType type); + #endif //SPARK_THESTRAL_PLUGIN_TEST_UTILS_H \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java index d80a236533c6b2b3305b2f443b759877239d6089..94b7cd5eec76e8a3d4d86546b882bad392a38529 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java @@ -35,6 +35,7 @@ import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.net.URI; public class OrcColumnarBatchJniReader { @@ -133,10 +134,10 @@ public class OrcColumnarBatchJniReader { /** * Init Orc reader. * - * @param path split file path + * @param uri split file path * @param options split file options */ - public long initializeReaderJava(String path, ReaderOptions options) { + public long initializeReaderJava(URI uri, ReaderOptions options) { JSONObject job = new JSONObject(); if (options.getOrcTail() == null) { job.put("serializedTail", ""); @@ -144,10 +145,15 @@ public class OrcColumnarBatchJniReader { job.put("serializedTail", options.getOrcTail().getSerializedTail().toString()); } job.put("tailLocation", 9223372036854775807L); - reader = initializeReader(path, job); + + job.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); + job.put("host", uri.getHost() == null ? "" : uri.getHost()); + job.put("path", uri.getPath() == null ? "" : uri.getPath()); + job.put("port", uri.getPort()); + + reader = initializeReader(job); return reader; } - /** * Init Orc RecordReader. * @@ -285,7 +291,7 @@ public class OrcColumnarBatchJniReader { return (int)rtn; } - public native long initializeReader(String path, JSONObject job); + public native long initializeReader(JSONObject job); public native long initializeRecordReader(long reader, JSONObject job); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java index 3a5cffb09c4792e2731dcd9ab8b377417f888b4c..5f4b35a5065c6873b7e6f2cefee39f162d0982cb 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java @@ -20,13 +20,13 @@ package com.huawei.boostkit.spark.jni; import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.*; - -import org.apache.spark.sql.catalyst.util.RebaseDateTime; - +import org.apache.hadoop.fs.Path; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.net.URI; import java.util.List; public class ParquetColumnarBatchJniReader { @@ -38,14 +38,22 @@ public class ParquetColumnarBatchJniReader { NativeLoader.getInstance(); } - public long initializeReaderJava(String path, int capacity, - List rowgroupIndices, List columnIndices, String ugi) { + public long initializeReaderJava(Path path, int capacity, + List rowgroupIndices, List columnIndices, String ugi) throws UnsupportedEncodingException { JSONObject job = new JSONObject(); + URI uri = path.toUri(); + + job.put("uri", path.toString()); job.put("filePath", path); job.put("capacity", capacity); job.put("rowGroupIndices", rowgroupIndices.stream().mapToInt(Integer::intValue).toArray()); job.put("columnIndices", columnIndices.stream().mapToInt(Integer::intValue).toArray()); job.put("ugi", ugi); + + job.put("scheme", uri.getScheme() == null ? "" : uri.getScheme()); + job.put("host", uri.getHost() == null ? "" : uri.getHost()); + job.put("port", uri.getPort()); + job.put("path", uri.getPath() == null ? "" : uri.getPath()); parquetReader = initializeReader(job); return parquetReader; } @@ -101,7 +109,7 @@ public class ParquetColumnarBatchJniReader { } nativeGetId++; } - return (int)rtn; + return (int) rtn; } public void close() { diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/orc/OmniOrcColumnarBatchReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/orc/OmniOrcColumnarBatchReader.java index c170b04e4a4b678d962200772cf0c542bed591c4..bb6b4e8273c087b041901ccf9b12bbba5309915b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/orc/OmniOrcColumnarBatchReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/orc/OmniOrcColumnarBatchReader.java @@ -134,7 +134,7 @@ public class OmniOrcColumnarBatchReader extends RecordReader rowgroupIndices = getFilteredBlocks(split.getStart(), split.getEnd()); List columnIndices = getColumnIndices(requestedSchema.getColumns(), fileSchema.getColumns()); String ugi = UserGroupInformation.getCurrentUser().toString(); - reader.initializeReaderJava(split.getPath().toString(), capacity, rowgroupIndices, columnIndices, ugi); + reader.initializeReaderJava(split.getPath(), capacity, rowgroupIndices, columnIndices, ugi); // Add missing Cols flags. initializeInternal(); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderDataTypeTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderDataTypeTest.java index 73db9a981dba57a4551ee832ce32b4298983115d..d2740bb98864d278e0df9179a87d243189869850 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderDataTypeTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderDataTypeTest.java @@ -30,8 +30,12 @@ import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import static org.junit.Assert.*; @@ -54,12 +58,16 @@ public class OrcColumnarBatchJniReaderDataTypeTest extends TestCase { } public void initReaderJava() { - JSONObject job = new JSONObject(); - job.put("serializedTail",""); - job.put("tailLocation",9223372036854775807L); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/000000_0"); - System.out.println(directory.getAbsolutePath()); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReader(directory.getAbsolutePath(), job); + String absolutePath = directory.getAbsolutePath(); + System.out.println(absolutePath); + URI uri = null; + try { + uri = new URI(absolutePath); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, OrcFile.readerOptions(new Configuration())); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderNotPushDownTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderNotPushDownTest.java index d9fe13683343f4299ad2b4b2290b0cbf47d761e1..7a220470b766a4b0b2ddb21e951f81d2fbe4c312 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderNotPushDownTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderNotPushDownTest.java @@ -30,8 +30,12 @@ import org.junit.Before; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import static org.junit.Assert.*; @@ -54,12 +58,16 @@ public class OrcColumnarBatchJniReaderNotPushDownTest extends TestCase { } public void initReaderJava() { - JSONObject job = new JSONObject(); - job.put("serializedTail",""); - job.put("tailLocation",9223372036854775807L); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/000000_0"); - System.out.println(directory.getAbsolutePath()); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReader(directory.getAbsolutePath(), job); + String absolutePath = directory.getAbsolutePath(); + System.out.println(absolutePath); + URI uri = null; + try { + uri = new URI(absolutePath); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, OrcFile.readerOptions(new Configuration())); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderPushDownTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderPushDownTest.java index 87f0cc1d2920982de3b73d9046d173a8f2c8fbb8..3930bd67c8d7620d3e758f57a9f483c0832cee9e 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderPushDownTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderPushDownTest.java @@ -23,6 +23,8 @@ import junit.framework.TestCase; import org.apache.hadoop.mapred.join.ArrayListBackedIterator; import org.apache.orc.OrcFile.ReaderOptions; import org.apache.orc.Reader.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; import org.hamcrest.Condition; import org.json.JSONObject; import org.junit.After; @@ -38,6 +40,8 @@ import nova.hetu.omniruntime.vector.Vec; import java.io.File; import java.lang.reflect.Array; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,12 +64,16 @@ public class OrcColumnarBatchJniReaderPushDownTest extends TestCase { } public void initReaderJava() { - JSONObject job = new JSONObject(); - job.put("serializedTail",""); - job.put("tailLocation",9223372036854775807L); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/000000_0"); - System.out.println(directory.getAbsolutePath()); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReader(directory.getAbsolutePath(), job); + String absolutePath = directory.getAbsolutePath(); + System.out.println(absolutePath); + URI uri = null; + try { + uri = new URI(absolutePath); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, OrcFile.readerOptions(new Configuration())); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCNotPushDownTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCNotPushDownTest.java index 484365c537231b46816e139b090d2384f08b5588..6262bb6b2dbd923b7c25375caf079fce1b3716d4 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCNotPushDownTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCNotPushDownTest.java @@ -22,6 +22,8 @@ import junit.framework.TestCase; import nova.hetu.omniruntime.vector.IntVec; import nova.hetu.omniruntime.vector.LongVec; import nova.hetu.omniruntime.vector.VarcharVec; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; import org.json.JSONObject; import org.junit.After; import org.junit.Before; @@ -30,6 +32,8 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import static org.junit.Assert.*; @@ -52,12 +56,16 @@ public class OrcColumnarBatchJniReaderSparkORCNotPushDownTest extends TestCase { } public void initReaderJava() { - JSONObject job = new JSONObject(); - job.put("serializedTail",""); - job.put("tailLocation",9223372036854775807L); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/part-00000-2d6ca713-08b0-4b40-828c-f7ee0c81bb9a-c000.snappy.orc"); - System.out.println(directory.getAbsolutePath()); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReader(directory.getAbsolutePath(), job); + String absolutePath = directory.getAbsolutePath(); + System.out.println(absolutePath); + URI uri = null; + try { + uri = new URI(absolutePath); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, OrcFile.readerOptions(new Configuration())); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCPushDownTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCPushDownTest.java index b03d60aac4b61291c614bce9f7a52503918a1106..e7c1d334e12a4b60627fcd2f8012aa4901cf3120 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCPushDownTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderSparkORCPushDownTest.java @@ -24,6 +24,8 @@ import nova.hetu.omniruntime.vector.IntVec; import nova.hetu.omniruntime.vector.LongVec; import nova.hetu.omniruntime.vector.VarcharVec; import nova.hetu.omniruntime.vector.Vec; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; import org.json.JSONObject; import org.junit.After; import org.junit.Before; @@ -32,6 +34,8 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import static org.junit.Assert.*; @@ -54,12 +58,16 @@ public class OrcColumnarBatchJniReaderSparkORCPushDownTest extends TestCase { } public void initReaderJava() { - JSONObject job = new JSONObject(); - job.put("serializedTail",""); - job.put("tailLocation",9223372036854775807L); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/part-00000-2d6ca713-08b0-4b40-828c-f7ee0c81bb9a-c000.snappy.orc"); - System.out.println(directory.getAbsolutePath()); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReader(directory.getAbsolutePath(), job); + String absolutePath = directory.getAbsolutePath(); + System.out.println(absolutePath); + URI uri = null; + try { + uri = new URI(absolutePath); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, OrcFile.readerOptions(new Configuration())); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderTest.java index 99801bcfb86567a5a2cb44dc43e4428496b00ed3..d4bcd0a5894690d9dc9f5bdf292363024cb35855 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReaderTest.java @@ -42,6 +42,8 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import org.apache.hadoop.conf.Configuration; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import org.apache.orc.Reader.Options; @@ -92,7 +94,13 @@ public class OrcColumnarBatchJniReaderTest extends TestCase { OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf); File directory = new File("src/test/java/com/huawei/boostkit/spark/jni/orcsrc/000000_0"); String path = directory.getAbsolutePath(); - orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(path, readerOptions); + URI uri = null; + try { + uri = new URI(path); + } catch (URISyntaxException ignore) { + } + assertTrue(uri != null); + orcColumnarBatchJniReader.reader = orcColumnarBatchJniReader.initializeReaderJava(uri, readerOptions); assertTrue(orcColumnarBatchJniReader.reader != 0); } diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReaderTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReaderTest.java index 5996413555c00cd1dedc2fe81bf50da5efe3c097..eca8989816b18bef63ad8fe1801fd8a7c10f17c8 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReaderTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReaderTest.java @@ -20,6 +20,7 @@ package com.huawei.boostkit.spark.jni; import junit.framework.TestCase; import nova.hetu.omniruntime.vector.*; +import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Before; import org.junit.FixMethodOrder; @@ -46,8 +47,8 @@ public class ParquetColumnarBatchJniReaderTest extends TestCase { List columnIndices = new ArrayList<>(); Collections.addAll(columnIndices, 0, 1, 3, 6, 7, 8, 9, 10, 12); File file = new File("../cpp/test/tablescan/resources/parquet_data_all_type"); - String path = file.getAbsolutePath(); - parquetColumnarBatchJniReader.initializeReaderJava(path, 100000, rowGroupIndices, columnIndices, "root@sample"); + parquetColumnarBatchJniReader.initializeReaderJava(new Path(file.getAbsolutePath()), 100000, + rowGroupIndices, columnIndices, "root@sample"); vecs = new Vec[9]; }