diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 7256a02cbaa0928592d0701e5b117f833321c72c..31c50564588ee7281d2e268f24972aedab245940 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -17,7 +17,9 @@ set (SOURCE_FILES jni/jni_common.cpp jni/ParquetColumnarBatchJniReader.cpp tablescan/ParquetReader.cpp - ) + io/orcfile/OrcFileRewrite.cc + hdfs/hdfs_internal.cpp + io/orcfile/HdfsFileInputStreamV2.cpp) #Find required protobuf package find_package(Protobuf REQUIRED) diff --git a/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.cpp b/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3c10e5b8a01906039d26c3a9ae1fc1a9f3984044 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.cpp @@ -0,0 +1,74 @@ +// +// Created by l00451143 on 2023/11/27. +// + +#include "hdfs_internal.h" +#include + +using namespace orc; + +LibHdfsShim::LibHdfsShim() { + // std::cout << "Create to new hdfs filesystem"<< std::endl; +} + +LibHdfsShim::~LibHdfsShim() { + // std::cout << "Begin to release hdfs filesystem"<< std::endl; + if (fs_ != nullptr){ + this->Disconnect(); + } + if (fs_ != nullptr && file_ != nullptr){ + this->CloseFile(); + } + // std::cout << "End to release hdfs filesystem"<< std::endl; +} + +StatusCode LibHdfsShim::Connect(const char *url, tPort port) { + // std::string urlStr(url); + // std::cout << "url: " << urlStr << ", port: " << port << std::endl; + this->fs_= hdfsConnect(url, port); + if (!fs_) { + // std::cout << "Fail to connect filesystem"<< std::endl; + return StatusCode::FSConnectError; + } + return StatusCode::OK; +} + +StatusCode LibHdfsShim::OpenFile(const char *path, int bufferSize, short replication, + int32_t blocksize) { + // std::string pathStr(path); + // std::cout << "path: " << pathStr << ", bufferSize: " << bufferSize << ", replication: " << replication << ", blocksize: " << blocksize << std::endl; + this->file_ = hdfsOpenFile(this->fs_, path, O_RDONLY, bufferSize, replication, blocksize); + if (!file_) { + // std::cout << "Fail to open file"<< std::endl; + this->Disconnect(); + return StatusCode::OpenFileError; + } + return StatusCode::OK; +} + +int LibHdfsShim::GetFileSize(const char *path) { + // std::string pathStr(path); + // std::cout << "path: " << pathStr << std::endl; + hdfsFileInfo* fileInfo = hdfsGetPathInfo(this->fs_, path); + if (!fileInfo){ + std::cout << "Fail to get path info"<< std::endl; + }else{ + // std::string fileName(fileInfo->mName); + // std::cout << "Success get path info, size: " << fileInfo->mSize << ", fileName: " << fileName << std::endl; + } + return fileInfo->mSize; +} + +int32_t LibHdfsShim::Read(void *buffer, int32_t length, int64_t offset) { + return hdfsPread(this->fs_, this->file_, offset, buffer, length); +} + +int LibHdfsShim::CloseFile() { + // std::cout << "Close hdfs filesystem"<< std::endl; + return hdfsCloseFile(this->fs_, this->file_); +} + +int LibHdfsShim::Disconnect() { + // std::cout << "Disconnect hdfs filesystem"<< std::endl; + return hdfsDisconnect(this->fs_); +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.h b/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.h new file mode 100644 index 0000000000000000000000000000000000000000..be153c1f307afdd7f429934bdec35cd1f8500306 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/hdfs/hdfs_internal.h @@ -0,0 +1,38 @@ +// +// Created by l00451143 on 2023/11/27. +// + +#ifndef SPARK_THESTRAL_PLUGIN_HDFS_INTERNAL_H +#define SPARK_THESTRAL_PLUGIN_HDFS_INTERNAL_H + +#endif //SPARK_THESTRAL_PLUGIN_HDFS_INTERNAL_H + +#include "include/hdfs.h" +#include "status.h" + +namespace orc { + +class LibHdfsShim { +public: + LibHdfsShim(); + ~LibHdfsShim(); + + // return hdfsFS + StatusCode Connect(const char* url, tPort port); + // return hdfsFile + StatusCode OpenFile(const char* path, int bufferSize, short replication, int32_t blocksize); + // return tSize + int32_t Read( void* buffer, int32_t length, int64_t offset); + + int GetFileSize(const char* path); + +private: + hdfsFS fs_; + hdfsFile file_; + + int CloseFile(); + + int Disconnect(); +}; + +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/hdfs/status.h b/omnioperator/omniop-spark-extension/cpp/src/hdfs/status.h new file mode 100644 index 0000000000000000000000000000000000000000..185f9870c21d42c6a73d2723a3617097ce86ace4 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/hdfs/status.h @@ -0,0 +1,22 @@ +// +// Created by l00451143 on 2023/11/27. +// + +#ifndef SPARK_THESTRAL_PLUGIN_STATUS_H +#define SPARK_THESTRAL_PLUGIN_STATUS_H + +#endif //SPARK_THESTRAL_PLUGIN_STATUS_H +namespace orc { + + enum StatusCode : char { + OK = 0, + FSConnectError = 1, + OpenFileError = 2, + ReadFileError = 3, + InfoFileError = 4 + }; + class Status { + public: + static bool ok(StatusCode code) { return code == OK; } + }; +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/include/hdfs.h b/omnioperator/omniop-spark-extension/cpp/src/include/hdfs.h new file mode 100644 index 0000000000000000000000000000000000000000..b8f47dbe14326e93f9cc55474b6a63600bce711f --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/include/hdfs.h @@ -0,0 +1,1086 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include /* for EINTERNAL, etc. */ +#include /* for O_RDONLY, O_WRONLY */ +#include /* for uint64_t, etc. */ +#include /* for time_t */ + +/* + * Support export of DLL symbols during libhdfs build, and import of DLL symbols + * during client application build. A client application may optionally define + * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but + * the compiler can produce more efficient code with it. + */ +#ifdef WIN32 + #ifdef LIBHDFS_DLL_EXPORT + #define LIBHDFS_EXTERNAL __declspec(dllexport) + #elif LIBHDFS_DLL_IMPORT + #define LIBHDFS_EXTERNAL __declspec(dllimport) + #else + #define LIBHDFS_EXTERNAL + #endif +#else + #ifdef LIBHDFS_DLL_EXPORT + #define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) + #elif LIBHDFS_DLL_IMPORT + #define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) + #else + #define LIBHDFS_EXTERNAL + #endif +#endif + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" + +/** All APIs set errno to meaningful values */ + +#ifdef __cplusplus +extern "C" { +#endif + /** + * Some utility decls used in libhdfs. + */ + struct hdfsBuilder; + typedef int32_t tSize; /// size of data for read/write io ops + typedef time_t tTime; /// time type in seconds + typedef int64_t tOffset;/// offset within the file + typedef uint16_t tPort; /// port + typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', + } tObjectKind; + struct hdfsStreamBuilder; + + + /** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ + struct hdfs_internal; + typedef struct hdfs_internal* hdfsFS; + + struct hdfsFile_internal; + typedef struct hdfsFile_internal* hdfsFile; + + struct hadoopRzOptions; + + struct hadoopRzBuffer; + + /** + * Determine if a file is open for read. + * + * @param file The HDFS file + * @return 1 if the file is open for read; 0 otherwise + */ + LIBHDFS_EXTERNAL + int hdfsFileIsOpenForRead(hdfsFile file); + + /** + * Determine if a file is open for write. + * + * @param file The HDFS file + * @return 1 if the file is open for write; 0 otherwise + */ + LIBHDFS_EXTERNAL + int hdfsFileIsOpenForWrite(hdfsFile file); + + struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; + }; + + /** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ + LIBHDFS_EXTERNAL + int hdfsFileGetReadStatistics(hdfsFile file, + struct hdfsReadStatistics **stats); + + /** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ + LIBHDFS_EXTERNAL + int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats); + + /** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ + LIBHDFS_EXTERNAL + int hdfsFileClearReadStatistics(hdfsFile file); + + /** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ + LIBHDFS_EXTERNAL + void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats); + + struct hdfsHedgedReadMetrics { + uint64_t hedgedReadOps; + uint64_t hedgedReadOpsWin; + uint64_t hedgedReadOpsInCurThread; + }; + + /** + * Get cluster wide hedged read metrics. + * + * @param fs The configured filesystem handle + * @param metrics (out parameter) on a successful return, the hedged read + * metrics. Unchanged otherwise. You must free the returned + * statistics with hdfsFreeHedgedReadMetrics. + * @return 0 if the metrics were successfully returned, -1 otherwise. + * On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support hedged read metrics. + */ + LIBHDFS_EXTERNAL + int hdfsGetHedgedReadMetrics(hdfsFS fs, struct hdfsHedgedReadMetrics **metrics); + + /** + * Free HDFS Hedged read metrics. + * + * @param metrics The HDFS Hedged read metrics to free + */ + LIBHDFS_EXTERNAL + void hdfsFreeHedgedReadMetrics(struct hdfsHedgedReadMetrics *metrics); + + /** + * hdfsConnectAsUser - Connect to a hdfstest file system as a specific user + * Connect to the hdfstest. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port) + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user); + + /** + * Connect - Connect to a hdfstest file system. + * Connect to the hdfstest. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnect(const char* nn, tPort port); + + /** + * Connect - Connect to an hdfstest file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user The user name to use when connecting + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ); + + /** + * Connect - Connect to an hdfstest file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectNewInstance(const char* nn, tPort port); + + /** + * Connect to HDFS using the parameters defined by the builder. + * + * The HDFS builder will be freed, whether or not the connection was + * successful. + * + * Every successful call to hdfsBuilderConnect should be matched with a call + * to Disconnect, when the hdfsFS is no longer needed. + * + * @param bld The HDFS builder + * @return Returns a handle to the filesystem, or NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld); + + /** + * Create an HDFS builder. + * + * @return The HDFS builder, or NULL on error. + */ + LIBHDFS_EXTERNAL + struct hdfsBuilder *hdfsNewBuilder(void); + + /** + * Force the builder to always create a new instance of the FileSystem, + * rather than possibly finding one in the cache. + * + * @param bld The HDFS builder + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld); + + /** + * Set the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param nn The NameNode to use. + * + * If the string given is 'default', the default NameNode + * configuration will be used (from the XML configuration files) + * + * If NULL is given, a LocalFileSystem will be created. + * + * If the string starts with a protocol type such as file:// or + * hdfstest://, this protocol type will be used. If not, the + * hdfstest:// protocol type will be used. + * + * You may specify a NameNode port in the usual way by + * passing a string of the format hdfstest://:. + * Alternately, you may set the port with + * hdfsBuilderSetNameNodePort. However, you must not pass the + * port in two different ways. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn); + + /** + * Set the port of the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param port The port. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port); + + /** + * Set the username to use when connecting to the HDFS cluster. + * + * @param bld The HDFS builder + * @param userName The user name. The string will be shallow-copied. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName); + + /** + * Set the path to the Kerberos ticket cache to use when connecting to + * the HDFS cluster. + * + * @param bld The HDFS builder + * @param kerbTicketCachePath The Kerberos ticket cache path. The string + * will be shallow-copied. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, + const char *kerbTicketCachePath); + + /** + * Free an HDFS builder. + * + * It is normally not necessary to call this function since + * hdfsBuilderConnect frees the builder. + * + * @param bld The HDFS builder + */ + LIBHDFS_EXTERNAL + void hdfsFreeBuilder(struct hdfsBuilder *bld); + + /** + * Set a configuration string for an HdfsBuilder. + * + * @param key The key to set. + * @param val The value, or NULL to set no value. + * This will be shallow-copied. You are responsible for + * ensuring that it remains valid until the builder is + * freed. + * + * @return 0 on success; nonzero error code otherwise. + */ + LIBHDFS_EXTERNAL + int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, + const char *val); + + /** + * Get a configuration string. + * + * @param key The key to find + * @param val (out param) The value. This will be set to NULL if the + * key isn't found. You must free this string with + * hdfsConfStrFree. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ + LIBHDFS_EXTERNAL + int hdfsConfGetStr(const char *key, char **val); + + /** + * Get a configuration integer. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ + LIBHDFS_EXTERNAL + int hdfsConfGetInt(const char *key, int32_t *val); + + /** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ + LIBHDFS_EXTERNAL + void hdfsConfStrFree(char *val); + + /** + * Disconnect - Disconnect from the hdfstest file system. + * Disconnect from hdfstest. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + * Even if there is an error, the resources associated with the + * hdfsFS will be freed. + */ + LIBHDFS_EXTERNAL + int hdfsDisconnect(hdfsFS fs); + + /** + * OpenFile - Open a hdfstest file in given mode. + * @deprecated Use the hdfsStreamBuilder functions instead. + * This function does not support setting block sizes bigger than 2 GB. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. Note that if you want a block size bigger + * than 2 GB, you must use the hdfsStreamBuilder API rather than this + * deprecated function. + * @return Returns the handle to the open file or NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, + int bufferSize, short replication, tSize blocksize); + + /** + * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. Will be deep-copied. + * @param flags The open flags, as in OpenFile. + * @return Returns the hdfsStreamBuilder, or NULL on error. + */ + LIBHDFS_EXTERNAL + struct hdfsStreamBuilder *hdfsStreamBuilderAlloc(hdfsFS fs, + const char *path, int flags); + + /** + * hdfsStreamBuilderFree - Free an HDFS file builder. + * + * It is normally not necessary to call this function since + * hdfsStreamBuilderBuild frees the builder. + * + * @param bld The hdfsStreamBuilder to free. + */ + LIBHDFS_EXTERNAL + void hdfsStreamBuilderFree(struct hdfsStreamBuilder *bld); + + /** + * hdfsStreamBuilderSetBufferSize - Set the stream buffer size. + * + * @param bld The hdfstest stream builder. + * @param bufferSize The buffer size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetBufferSize(struct hdfsStreamBuilder *bld, + int32_t bufferSize); + + /** + * hdfsStreamBuilderSetReplication - Set the replication for the stream. + * This is only relevant for output streams, which will create new blocks. + * + * @param bld The hdfstest stream builder. + * @param replication The replication to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetReplication(struct hdfsStreamBuilder *bld, + int16_t replication); + + /** + * hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for + * the stream. This is only relevant for output streams, which will create + * new blocks. + * + * @param bld The hdfstest stream builder. + * @param defaultBlockSize The default block size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld, + int64_t defaultBlockSize); + + /** + * hdfsStreamBuilderBuild - Build the stream by calling open or create. + * + * @param bld The hdfstest stream builder. This pointer will be freed, whether + * or not the open succeeds. + * + * @return the stream pointer on success, or NULL on error. Errno will be + * set on error. + */ + LIBHDFS_EXTERNAL + hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld); + + /** + * hdfsTruncateFile - Truncate a hdfstest file to given lenght. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param newlength The size the file is to be truncated to + * @return 1 if the file has been truncated to the desired newlength + * and is immediately available to be reused for write operations + * such as append. + * 0 if a background process of adjusting the length of the last + * block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength); + + /** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ + LIBHDFS_EXTERNAL + int hdfsUnbufferFile(hdfsFile file); + + /** + * CloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + * On error, errno will be set appropriately. + * If the hdfstest file was valid, the memory associated with it will + * be freed at the end of this call, even if there was an I/O + * error. + */ + LIBHDFS_EXTERNAL + int hdfsCloseFile(hdfsFS fs, hdfsFile file); + + + /** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsExists(hdfsFS fs, const char *path); + + + /** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + + /** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsTell(hdfsFS fs, hdfsFile file); + + + /** + * Read - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return On success, a positive number indicating how many bytes + * were read. + * On end-of-file, 0. + * On error, -1. Errno will be set to the error code. + * Just like the POSIX read function, Read will return -1 + * and set errno to EINTR if data is temporarily unavailable, + * but we are not yet at the end of the file. + */ + LIBHDFS_EXTERNAL + tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + /** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return See Read + */ + LIBHDFS_EXTERNAL + tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length); + + + /** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ + LIBHDFS_EXTERNAL + tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, + tSize length); + + + /** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsFlush(hdfsFS fs, hdfsFile file); + + + /** + * hdfsHFlush - Flush out the data in client's user buffer. After the + * return of this call, new readers will see the data. + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ + LIBHDFS_EXTERNAL + int hdfsHFlush(hdfsFS fs, hdfsFile file); + + + /** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ + LIBHDFS_EXTERNAL + int hdfsHSync(hdfsFS fs, hdfsFile file); + + + /** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsAvailable(hdfsFS fs, hdfsFile file); + + + /** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param recursive if path is a directory and set to + * non-zero, the directory is deleted else throws an exception. In + * case of a file the recursive argument is irrelevant. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsDelete(hdfsFS fs, const char* path, int recursive); + + /** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); + + + /** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ + LIBHDFS_EXTERNAL + char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize); + + + /** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsCreateDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); + + + /** + * hdfsFileInfo - Information about a file/directory. + */ + typedef struct { + tObjectKind mKind; /* file or directory */ + char *mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char *mOwner; /* the owner of the file */ + char *mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ + } hdfsFileInfo; + + + /** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty directory. + * errno is set to non-zero on error or zero on success. + */ + LIBHDFS_EXTERNAL + hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, + int *numEntries); + + + /** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path); + + + /** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + LIBHDFS_EXTERNAL + void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries); + + /** + * hdfsFileIsEncrypted: determine if a file is encrypted based on its + * hdfsFileInfo. + * @return -1 if there was an error (errno will be set), 0 if the file is + * not encrypted, 1 if the file is encrypted. + */ + LIBHDFS_EXTERNAL + int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo); + + + /** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ + LIBHDFS_EXTERNAL + char*** hdfsGetHosts(hdfsFS fs, const char* path, + tOffset start, tOffset length); + + + /** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + LIBHDFS_EXTERNAL + void hdfsFreeHosts(char ***blockHosts); + + + /** + * hdfsGetDefaultBlockSize - Get the default blocksize. + * + * @param fs The configured filesystem handle. + * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead. + * + * @return Returns the default blocksize, or -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + + + /** + * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the + * filesystem indicated by a given path. + * + * @param fs The configured filesystem handle. + * @param path The given path will be used to locate the actual + * filesystem. The full path does not have to exist. + * + * @return Returns the default blocksize, or -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path); + + + /** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetCapacity(hdfsFS fs); + + + /** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetUsed(hdfsFS fs); + + /** + * Change the user and/or group of a file or directory. + * + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner User string. Set to NULL for 'no change' + * @param group Group string. Set to NULL for 'no change' + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsChown(hdfsFS fs, const char* path, const char *owner, + const char *group); + + /** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsChmod(hdfsFS fs, const char* path, short mode); + + /** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or -1 for no change + * @param atime new access time or -1 for no change + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + /** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ + LIBHDFS_EXTERNAL + struct hadoopRzOptions *hadoopRzOptionsAlloc(void); + + /** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ + LIBHDFS_EXTERNAL + int hadoopRzOptionsSetSkipChecksum( + struct hadoopRzOptions *opts, int skip); + + /** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ + LIBHDFS_EXTERNAL + int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions *opts, const char *className); + + /** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ + LIBHDFS_EXTERNAL + void hadoopRzOptionsFree(struct hadoopRzOptions *opts); + + /** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, we will return a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * You can access the data within the hadoopRzBuffer with + * hadoopRzBufferGet. If you have reached EOF, the data + * within the hadoopRzBuffer will be NULL. You must still + * free hadoopRzBuffer instances containing NULL. + * + * On failure, we will return NULL plus an errno code. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ + LIBHDFS_EXTERNAL + struct hadoopRzBuffer* hadoopReadZero(hdfsFile file, + struct hadoopRzOptions *opts, int32_t maxLength); + + /** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ + LIBHDFS_EXTERNAL + int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer); + + /** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ + LIBHDFS_EXTERNAL + const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer); + + /** + * Release a buffer obtained through readZero. + * + * @param file The hdfstest stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ + LIBHDFS_EXTERNAL + void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer); + + /** + * Get the last exception root cause that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The root cause as a C-string. + */ + LIBHDFS_EXTERNAL + char* hdfsGetLastExceptionRootCause(); + + /** + * Get the last exception stack trace that happened in the context of the + * current thread, i.e. the thread that called into libHDFS. + * + * The pointer returned by this function is guaranteed to be valid until + * the next call into libHDFS by the current thread. + * Users of this function should not free the pointer. + * + * A NULL will be returned if no exception information could be retrieved + * for the previous call. + * + * @return The stack trace as a C-string. + */ + LIBHDFS_EXTERNAL + char* hdfsGetLastExceptionStackTrace(); + +#ifdef __cplusplus +} +#endif + +#undef LIBHDFS_EXTERNAL +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */ diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/HdfsFileInputStreamV2.cpp b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/HdfsFileInputStreamV2.cpp new file mode 100644 index 0000000000000000000000000000000000000000..80e626cce2a7240e3e4f8fdd5ebb3605e9646e0c --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/HdfsFileInputStreamV2.cpp @@ -0,0 +1,103 @@ +// +// Created by l00451143 on 2023/11/27. +// + +#include + +#include +#include "OrcFileRewrite.hh" +#include "hdfs/hdfs_internal.h" + + +#include "OrcFileRewrite.hh" + +namespace orc { + + class HdfsFileInputStreamV2 : public InputStream { + private: + std::string filepath_; + uint64_t total_length_; + const uint64_t READ_SIZE = 1024 * 1024; //1 MB + + std::unique_ptr file_system_; + + public: + HdfsFileInputStreamV2(std::string path) { + // std::cout << "Begin to create hdfs input steam"<< std::endl; + this->file_system_ = std::make_unique(); + + hdfs::URI uri; + try { + uri = hdfs::URI::parse_from_string(path); + } catch (const hdfs::uri_parse_error&) { + throw ParseError("Malformed URI: " + path); + } + // std::cout << "Success to parse uri, host: " << uri.get_host().c_str() + // << ", port: " << uri.get_port() + // << ", file path: " << uri.get_path() + // << std::endl; + + this->filepath_ = uri.get_path(); + + StatusCode fs_status = file_system_->Connect(uri.get_host().c_str(),static_cast(uri.get_port())); + if (fs_status != OK){ + throw ParseError("URI: " + path + ", fail to connect filesystem."); + } + // std::cout << "Success to connect hdfs file system"<< std::endl; + + StatusCode file_status = file_system_->OpenFile(filepath_.c_str(), 0, 0, 0); + if (file_status != OK){ + throw ParseError("file path: " + filepath_ + ", fail to connect filesystem."); + } + // std::cout << "Success to connect open hdfs file"<< std::endl; + + this->total_length_ = file_system_->GetFileSize(filepath_.c_str()); + // std::cout << "end to create hdfs input steam, total_length_: " << total_length_ << std::endl; + } + + ~HdfsFileInputStreamV2() override { + } + + uint64_t getLength() const override { + return this->total_length_; + } + + uint64_t getNaturalReadSize() const override { + return this->READ_SIZE; + } + + const std::string& getName() const override { + return filepath_; + } + + void read(void* buf, + uint64_t length, + uint64_t offset) override { + if (!buf) { + throw ParseError("Buffer is null"); + } + + // std::cout << "hdfs file input stream, begin read, length: " << length << ", offset: " << offset << std::endl; + + char* buf_ptr = reinterpret_cast(buf); + int32_t total_bytes_read = 0; + int32_t last_bytes_read = 0; + + do{ + last_bytes_read = file_system_->Read(buf_ptr, length - total_bytes_read, offset + total_bytes_read); + if (last_bytes_read < 0) { + // std::cout << "Fail to get read file, read bytes: " << last_bytes_read << std::endl; + throw ParseError("Error reading bytes the file."); + } + total_bytes_read += last_bytes_read; + buf_ptr += last_bytes_read; + // std::cout << "read hdfs, total_bytes_read: " << total_bytes_read << ", last_bytes_read: " << last_bytes_read << ", buf_ptr: " << buf_ptr << std::endl; + } while (total_bytes_read < length); + // std::cout << "hdfs file input stream, end read, total_bytes_read: " << total_bytes_read << ", last_bytes_read: " << last_bytes_read << std::endl; + } + }; + + std::unique_ptr readHdfsFileRewrite(const std::string& path, std::vector& tokens) { + return std::unique_ptr(new HdfsFileInputStreamV2(path)); + } +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.cc b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.cc new file mode 100644 index 0000000000000000000000000000000000000000..8ec77da2ce30c96cbab5ab4f6dfd768f4648a502 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.cc @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OrcFileRewrite.hh" +#include "orc/Exceptions.hh" +#include "io/Adaptor.hh" + +#include +#include +#include +#include +#include + +#ifdef _MSC_VER +#include +#define S_IRUSR _S_IREAD +#define S_IWUSR _S_IWRITE +#define stat _stat64 +#define fstat _fstat64 +#else +#include +#define O_BINARY 0 +#endif + +namespace orc { + std::unique_ptr readFileRewrite(const std::string& path, std::vector& tokens) { + if (strncmp(path.c_str(), "hdfs://", 7) == 0) { + return orc::readHdfsFileRewrite(std::string(path), tokens); + } else if (strncmp(path.c_str(), "file:", 5) == 0) { + return orc::readLocalFile(std::string(path.substr(5))); + } else { + return orc::readLocalFile(std::string(path)); + } + } +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.hh b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.hh new file mode 100644 index 0000000000000000000000000000000000000000..e7bcee95cecd9dd8b0ac7a120be74a507e47d8a5 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/io/orcfile/OrcFileRewrite.hh @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ORC_FILE_REWRITE_HH +#define ORC_FILE_REWRITE_HH + +#include + +#include "hdfspp/options.h" +#include "orc/OrcFile.hh" + +/** /file orc/OrcFile.hh + @brief The top level interface to ORC. +*/ + +namespace orc { + + /** + * Create a stream to a local file or HDFS file if path begins with "hdfs://" + * @param path the name of the file in the local file system or HDFS + */ + ORC_UNIQUE_PTR readFileRewrite(const std::string& path, std::vector& tokens); + + /** + * Create a stream to an HDFS file. + * @param path the uri of the file in HDFS + */ + ORC_UNIQUE_PTR readHdfsFileRewrite(const std::string& path, std::vector& tokens); +} + +#endif diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp index c0f4c1ae17031e933dd4467328c629d8045c3f20..df67ac4297f0906f8b2a3346ba1798928ef42e39 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp @@ -52,7 +52,8 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniRe env->ReleaseStringUTFChars(serTailJstr, ptr); } - std::unique_ptr reader = createReader(orc::readFile(filePath), readerOptions); + std::vector tokens; + std::unique_ptr reader = createReader(orc::readFileRewrite(filePath, tokens), readerOptions); env->ReleaseStringUTFChars(path, pathPtr); orc::Reader *readerNew = reader.release(); return (jlong)(readerNew); diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h index 714d97ee67df137da8c6dcc79f8aa2173a33066d..878af0242301d5e4d0ac3375108a9b7e86d7ee58 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h @@ -44,6 +44,8 @@ #include #include "common/debug.h" +#include "io/orcfile/OrcFileRewrite.hh" + #ifdef __cplusplus extern "C" { #endif