From bac6b9547ea02bbe06c8a814698a919d7db28ac1 Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 2 Jan 2025 16:14:02 +0800 Subject: [PATCH 01/17] feat:AzureBlob storage support. --- linkis-commons/linkis-storage/pom.xml | 19 ++ .../storage/fs/impl/AzureBlobFileSystem.java | 310 ++++++++++++++++++ .../storage/utils/StorageConfiguration.java | 4 + .../linkis/storage/utils/StorageUtils.java | 1 + linkis-dist/package/conf/linkis.properties | 5 +- 5 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index def795ebd..d23294e8f 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -99,6 +99,25 @@ aws-java-sdk-s3 1.12.261 + + + com.azure + azure-sdk-bom + 1.2.30 + + + + com.azure + azure-storage-blob + 12.29.0 + + + + com.azure + azure-identity + 1.14.2 + + diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java new file mode 100644 index 000000000..7dce75e75 --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -0,0 +1,310 @@ +package org.apache.linkis.storage.fs.impl; + +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlockBlobClient; +import org.apache.commons.math3.util.Pair; +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.storage.exception.StorageWarnException; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageConfiguration; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; + +public class AzureBlobFileSystem extends FileSystem { + + /** + * manipulate Azure storage resources and Blob container + * 管理命名空间下的存储资源和Blob容器 + */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.
+ * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + /** + * @param dest + * @return a pair of containerName and blobName. + */ + private Pair azureLocation(FsPath dest) { + String path = dest.getPath(); + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = path.split("/", 2); + return Pair.create(names[0], names[1]); + } + + private Pair azureLocation(String dest) { + FsPath fp = new FsPath(dest); + return this.azureLocation(fp); + } + + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map properties) throws IOException { + + /** + * The storage account provides the top-level namespace for the Blob service. + * 每个账户提供了一个顶级的命名空间 + * TODO 是否需要对多账户支持 + */ + String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); + + /* + * The default credential first checks environment variables for configuration + * If environment configuration is incomplete, it will try managed identity + */ + DefaultAzureCredential defaultCredential = + new DefaultAzureCredentialBuilder().build(); + + // Azure SDK client builders accept the credential as a parameter + serviceClient = new BlobServiceClientBuilder() + .endpoint("https://" + acctName + ".blob.core.windows.net/") + .credential(defaultCredential) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.AZURE; + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + } + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + Pair result = azureLocation(dest); + BlobClient blobclient = getBlobContainerClient(result.getFirst()) + .getBlobClient(result.getSecond()); + return blobclient.openInputStream(); + } + + /** + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream + */ + @Override + public OutputStream write(FsPath dest, boolean overwrite) { + + Pair result = azureLocation(dest); + BlobClient blobclient = getBlobContainerClient(result.getFirst()) + .getBlobClient(result.getSecond()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob
+ * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException + */ + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; + } + Pair names = this.azureLocation(dest); + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getFirst()); + client.getBlobClient(names.getSecond()); + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.
+ * 扁平化展示未删除的blob对象,最多5000条 + * TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List list(FsPath path) throws IOException { + Pair result = azureLocation(path); + return getBlobContainerClient(result.getFirst()).listBlobs().stream() + // TODO 是否过滤已删除对象 + .filter(item -> !item.isDeleted()) + .map(item -> new FsPath(item.getName())) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean exists(FsPath dest) throws IOException { + Pair file = this.azureLocation(dest); + return getBlobContainerClient(file.getFirst()) + .getBlobClient(file.getSecond()) + .exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + Pair file = this.azureLocation(dest); + return getBlobContainerClient(file.getFirst()) + .getBlobClient(file.getSecond()) + .deleteIfExists(); + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + Pair oriNames = this.azureLocation(origin); + Pair destNames = this.azureLocation(dest); + + BlobClient oriClient = getBlobContainerClient(oriNames.getFirst()) + .getBlobClient(oriNames.getSecond()); + BlockBlobClient destClient = getBlobContainerClient(destNames.getFirst()) + .getBlobClient(destNames.getSecond()).getBlockBlobClient(); + destClient.uploadFromUrl(oriClient.getBlobUrl(), true); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // TODO 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index 70a3839b6..7a80d35f7 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -158,4 +158,8 @@ public class StorageConfiguration { public static CommonVars S3_BUCKET = new CommonVars("linkis.storage.s3.bucket", "", null, null); + + public static CommonVars AZURE_ACCT_NAME = + new CommonVars("linkis.storage.azure.acctName", "", null, null); + } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java index 07bc0510b..e446c5f0c 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java @@ -55,6 +55,7 @@ public class StorageUtils { public static final String FILE = "file"; public static final String OSS = "oss"; public static final String S3 = "s3"; + public static final String AZURE = "azureBlob"; public static final String FILE_SCHEMA = "file://"; public static final String HDFS_SCHEMA = "hdfs://"; diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties index c04294c1b..303e53349 100644 --- a/linkis-dist/package/conf/linkis.properties +++ b/linkis-dist/package/conf/linkis.properties @@ -117,4 +117,7 @@ linkis.storage.s3.access.key= linkis.storage.s3.secret.key= linkis.storage.s3.endpoint= linkis.storage.s3.region= -linkis.storage.s3.bucket= \ No newline at end of file +linkis.storage.s3.bucket= + +# azure file system +linkis.storage.azure.acctName= \ No newline at end of file -- Gitee From 342bb21dae2c3cbaa8fd1a320703b5342e7cbffa Mon Sep 17 00:00:00 2001 From: btxod Date: Fri, 3 Jan 2025 15:00:56 +0800 Subject: [PATCH 02/17] feat:dependencies management. --- linkis-commons/linkis-storage/pom.xml | 21 ++++++++------------- pom.xml | 7 +++++++ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index d23294e8f..50e1c2305 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -6,9 +6,9 @@ ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at - ~ + ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ + ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -101,21 +101,16 @@ - com.azure - azure-sdk-bom - 1.2.30 + com.azure + azure-storage-blob - - com.azure - azure-storage-blob - 12.29.0 + com.azure + azure-storage-common - - com.azure - azure-identity - 1.14.2 + com.azure + azure-identity diff --git a/pom.xml b/pom.xml index 327a455a3..8e7ce87df 100644 --- a/pom.xml +++ b/pom.xml @@ -1361,6 +1361,13 @@ spring-cloud-starter-alibaba-nacos-discovery ${spring.cloud.nacos.version} + + com.azure + azure-sdk-bom + 1.2.30 + pom + import + -- Gitee From 6d335a94c624673dfa80779be7e3c37de8ebf5a3 Mon Sep 17 00:00:00 2001 From: btxod Date: Fri, 3 Jan 2025 17:24:08 +0800 Subject: [PATCH 03/17] feat:dependencies management. --- linkis-commons/linkis-storage/pom.xml | 12 ++++++------ pom.xml | 9 ++------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 50e1c2305..bbbd29908 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -101,16 +101,16 @@ - com.azure - azure-storage-blob + com.azure + azure-storage-blob - com.azure - azure-storage-common + com.azure + azure-storage-common - com.azure - azure-identity + com.azure + azure-identity diff --git a/pom.xml b/pom.xml index 8e7ce87df..fb0f0f643 100644 --- a/pom.xml +++ b/pom.xml @@ -218,6 +218,7 @@ 2.2.9.RELEASE Hoxton.SR12 2.2.9.RELEASE + 1.2.30 UTF-8 @@ -1364,7 +1365,7 @@ com.azure azure-sdk-bom - 1.2.30 + ${azure.blob.bom} pom import @@ -1422,12 +1423,6 @@ spring-test test - - org.postgresql - postgresql - ${postgresql.connector.version} - test - -- Gitee From e3fecdb9b0b76cd4ab374e19d7c945d13f804418 Mon Sep 17 00:00:00 2001 From: btxod Date: Fri, 3 Jan 2025 17:28:18 +0800 Subject: [PATCH 04/17] chore:format code. --- .../storage/fs/impl/AzureBlobFileSystem.java | 572 +++++++++--------- .../storage/utils/StorageConfiguration.java | 3 +- 2 files changed, 292 insertions(+), 283 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 7dce75e75..a7a129496 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -1,20 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.linkis.storage.fs.impl; -import com.azure.identity.DefaultAzureCredential; -import com.azure.identity.DefaultAzureCredentialBuilder; -import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlockBlobClient; -import org.apache.commons.math3.util.Pair; import org.apache.linkis.common.io.FsPath; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; import org.apache.linkis.storage.utils.StorageConfiguration; import org.apache.linkis.storage.utils.StorageUtils; +import org.apache.commons.math3.util.Pair; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -22,289 +32,289 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlockBlobClient; + import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; public class AzureBlobFileSystem extends FileSystem { - /** - * manipulate Azure storage resources and Blob container - * 管理命名空间下的存储资源和Blob容器 - */ - private BlobServiceClient serviceClient; - - /** - * getBlobContainerClient - * - * @param containerName - * @return client which can manipulate Azure Storage containers and their blobs.
- * 操作一个容器和其blobs的客户端 - */ - private BlobContainerClient getBlobContainerClient(String containerName) { - return serviceClient.getBlobContainerClient(containerName); - } - - /** - * @param dest - * @return a pair of containerName and blobName. - */ - private Pair azureLocation(FsPath dest) { - String path = dest.getPath(); - // split to container and blob object, - // container/dir/blobname will split to container and dir/blobname - String[] names = path.split("/", 2); - return Pair.create(names[0], names[1]); - } - - private Pair azureLocation(String dest) { - FsPath fp = new FsPath(dest); - return this.azureLocation(fp); - } - - /** - * init serviceClient - * - * @param properties - * @throws IOException - */ - @Override - public void init(Map properties) throws IOException { - - /** - * The storage account provides the top-level namespace for the Blob service. - * 每个账户提供了一个顶级的命名空间 - * TODO 是否需要对多账户支持 - */ - String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); - - /* - * The default credential first checks environment variables for configuration - * If environment configuration is incomplete, it will try managed identity - */ - DefaultAzureCredential defaultCredential = - new DefaultAzureCredentialBuilder().build(); - - // Azure SDK client builders accept the credential as a parameter - serviceClient = new BlobServiceClientBuilder() - .endpoint("https://" + acctName + ".blob.core.windows.net/") - .credential(defaultCredential) - .buildClient(); - } - - /** - * name of the fileSystem - * - * @return - */ - @Override - public String fsName() { - return StorageUtils.AZURE; - } - - @Override - public String rootUserName() { - return ""; - } - - /** - * @param dest - * @return - * @throws IOException - */ - @Override - public FsPath get(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return path; - } else { - throw new StorageWarnException( - TO_BE_UNKNOW.getErrorCode(), - "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); - } - } - - /** - * Opens a blob input stream to download the blob. - * - * @param dest - * @return - * @throws BlobStorageException – If a storage service error occurred. - */ - @Override - public InputStream read(FsPath dest) { - Pair result = azureLocation(dest); - BlobClient blobclient = getBlobContainerClient(result.getFirst()) - .getBlobClient(result.getSecond()); - return blobclient.openInputStream(); - } - - /** - * @param dest - * @param overwrite - * @return - * @throws BlobStorageException – If a storage service error occurred. - * @see BlockBlobClient #getBlobOutputStream - */ - @Override - public OutputStream write(FsPath dest, boolean overwrite) { - - Pair result = azureLocation(dest); - BlobClient blobclient = getBlobContainerClient(result.getFirst()) - .getBlobClient(result.getSecond()); - return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); - } + /** manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.
+ * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + /** + * @param dest + * @return a pair of containerName and blobName. + */ + private Pair azureLocation(FsPath dest) { + String path = dest.getPath(); + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = path.split("/", 2); + return Pair.create(names[0], names[1]); + } + + private Pair azureLocation(String dest) { + FsPath fp = new FsPath(dest); + return this.azureLocation(fp); + } + + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map properties) throws IOException { /** - * create a blob
- * 创建一个对象("文件") - * - * @param dest - * @return - * @throws IOException + * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 + * TODO 是否需要对多账户支持 */ - @Override - public boolean create(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return false; - } - Pair names = this.azureLocation(dest); - BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getFirst()); - client.getBlobClient(names.getSecond()); - return true; - } + String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); - /** - * Flat listing 5000 results at a time,without deleted.
- * 扁平化展示未删除的blob对象,最多5000条 - * TODO 分页接口,迭代器接口? - * - * @param path - * @return - * @throws IOException + /* + * The default credential first checks environment variables for configuration + * If environment configuration is incomplete, it will try managed identity */ - @Override - public List list(FsPath path) throws IOException { - Pair result = azureLocation(path); - return getBlobContainerClient(result.getFirst()).listBlobs().stream() - // TODO 是否过滤已删除对象 - .filter(item -> !item.isDeleted()) - .map(item -> new FsPath(item.getName())) - .collect(Collectors.toList()); - } - - @Override - public boolean canRead(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean canWrite(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean exists(FsPath dest) throws IOException { - Pair file = this.azureLocation(dest); - return getBlobContainerClient(file.getFirst()) - .getBlobClient(file.getSecond()) - .exists(); - } - - @Override - public boolean delete(FsPath dest) throws IOException { - Pair file = this.azureLocation(dest); - return getBlobContainerClient(file.getFirst()) - .getBlobClient(file.getSecond()) - .deleteIfExists(); - } - - @Override - public boolean copy(String origin, String dest) throws IOException { - Pair oriNames = this.azureLocation(origin); - Pair destNames = this.azureLocation(dest); - - BlobClient oriClient = getBlobContainerClient(oriNames.getFirst()) - .getBlobClient(oriNames.getSecond()); - BlockBlobClient destClient = getBlobContainerClient(destNames.getFirst()) - .getBlobClient(destNames.getSecond()).getBlockBlobClient(); - destClient.uploadFromUrl(oriClient.getBlobUrl(), true); - return true; - } - - @Override - public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { - // 没有事务性保证 - this.copy(oldDest.getPath(), newDest.getPath()); - this.delete(oldDest); - return true; - } - - @Override - public boolean mkdir(FsPath dest) throws IOException { - return this.create(dest.getPath()); - } - - @Override - public boolean mkdirs(FsPath dest) throws IOException { - return this.mkdir(dest); - } - - // TODO 下面这些方法可能都无法支持 - @Override - public String listRoot() throws IOException { - return ""; - } - - @Override - public long getTotalSpace(FsPath dest) throws IOException { - return 0; + DefaultAzureCredential defaultCredential = new DefaultAzureCredentialBuilder().build(); + + // Azure SDK client builders accept the credential as a parameter + serviceClient = + new BlobServiceClientBuilder() + .endpoint("https://" + acctName + ".blob.core.windows.net/") + .credential(defaultCredential) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.AZURE; + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); } - - @Override - public long getFreeSpace(FsPath dest) throws IOException { - return 0; + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + Pair result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getFirst()).getBlobClient(result.getSecond()); + return blobclient.openInputStream(); + } + + /** + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream + */ + @Override + public OutputStream write(FsPath dest, boolean overwrite) { + + Pair result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getFirst()).getBlobClient(result.getSecond()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob
+ * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException + */ + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; } - - @Override - public long getUsableSpace(FsPath dest) throws IOException { - return 0; + Pair names = this.azureLocation(dest); + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getFirst()); + client.getBlobClient(names.getSecond()); + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.
+ * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List list(FsPath path) throws IOException { + Pair result = azureLocation(path); + return getBlobContainerClient(result.getFirst()).listBlobs().stream() + // TODO 是否过滤已删除对象 + .filter(item -> !item.isDeleted()) + .map(item -> new FsPath(item.getName())) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; } - - @Override - public boolean canExecute(FsPath dest) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user, String group) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user) throws IOException { - return false; - } - - @Override - public boolean setGroup(FsPath dest, String group) throws IOException { - return false; - } - - @Override - public boolean setPermission(FsPath dest, String permission) throws IOException { - return false; - } - - @Override - public void close() throws IOException { - + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; } + } + + @Override + public boolean exists(FsPath dest) throws IOException { + Pair file = this.azureLocation(dest); + return getBlobContainerClient(file.getFirst()).getBlobClient(file.getSecond()).exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + Pair file = this.azureLocation(dest); + return getBlobContainerClient(file.getFirst()).getBlobClient(file.getSecond()).deleteIfExists(); + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + Pair oriNames = this.azureLocation(origin); + Pair destNames = this.azureLocation(dest); + + BlobClient oriClient = + getBlobContainerClient(oriNames.getFirst()).getBlobClient(oriNames.getSecond()); + BlockBlobClient destClient = + getBlobContainerClient(destNames.getFirst()) + .getBlobClient(destNames.getSecond()) + .getBlockBlobClient(); + destClient.uploadFromUrl(oriClient.getBlobUrl(), true); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // TODO 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException {} } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index 7a80d35f7..16c75164d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -160,6 +160,5 @@ public class StorageConfiguration { new CommonVars("linkis.storage.s3.bucket", "", null, null); public static CommonVars AZURE_ACCT_NAME = - new CommonVars("linkis.storage.azure.acctName", "", null, null); - + new CommonVars("linkis.storage.azure.acctName", "", null, null); } -- Gitee From 301ee59880e3b95011258cb7d6a70a6fc6173bc8 Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 7 Jan 2025 16:00:14 +0800 Subject: [PATCH 05/17] feat:use https as fs name. --- .../impl/BuildAzureBlobFileSystem.java | 60 +++++++++++++++++++ .../storage/fs/impl/AzureBlobFileSystem.java | 2 +- .../storage/utils/StorageConfiguration.java | 3 +- .../linkis/storage/utils/StorageUtils.java | 19 +++--- 4 files changed, 73 insertions(+), 11 deletions(-) create mode 100644 linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java new file mode 100644 index 000000000..a33bc84fc --- /dev/null +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.factory.impl; + +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.storage.factory.BuildFactory; +import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem; +import org.apache.linkis.storage.utils.StorageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class BuildAzureBlobFileSystem implements BuildFactory { + private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); + + @Override + public Fs getFs(String user, String proxyUser) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public Fs getFs(String user, String proxyUser, String label) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.BLOB; + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index a7a129496..c8454617d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -112,7 +112,7 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public String fsName() { - return StorageUtils.AZURE; + return StorageUtils.BLOB; } @Override diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index 16c75164d..2fec7779a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -71,7 +71,8 @@ public class StorageConfiguration { new CommonVars<>( "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," - + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem", + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" + +"org.apache.linkis.storage.factory.impl.AzureBlobFileSystem", null, null); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java index e446c5f0c..111738f02 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageUtils.java @@ -17,6 +17,9 @@ package org.apache.linkis.storage.utils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.utils.CloseableUtils; import org.apache.linkis.common.io.Fs; import org.apache.linkis.common.io.FsPath; import org.apache.linkis.common.io.resultset.ResultSet; @@ -30,10 +33,8 @@ import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.resultset.ResultSetFactory; import org.apache.linkis.storage.resultset.ResultSetReaderFactory; import org.apache.linkis.storage.resultset.ResultSetWriterFactory; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.utils.CloseableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; import java.lang.reflect.Method; @@ -43,9 +44,6 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.CONFIGURATION_NOT_READ; public class StorageUtils { @@ -55,12 +53,13 @@ public class StorageUtils { public static final String FILE = "file"; public static final String OSS = "oss"; public static final String S3 = "s3"; - public static final String AZURE = "azureBlob"; + public static final String BLOB = "https"; public static final String FILE_SCHEMA = "file://"; public static final String HDFS_SCHEMA = "hdfs://"; public static final String OSS_SCHEMA = "oss://"; public static final String S3_SCHEMA = "s3://"; + public static final String BLOB_SCHEMA = "https://"; private static final NumberFormat nf = NumberFormat.getInstance(); @@ -217,7 +216,9 @@ public class StorageUtils { * @return */ public static FsPath getFsPath(String path) { - if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA)) { + if (path.startsWith(FILE_SCHEMA) + || path.startsWith(HDFS_SCHEMA) + || path.startsWith(BLOB_SCHEMA)) { return new FsPath(path); } else { return new FsPath(FILE_SCHEMA + path); -- Gitee From 29ef48907c5be4e9de60db0b954e3e7dc8cf9913 Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 7 Jan 2025 16:07:54 +0800 Subject: [PATCH 06/17] =?UTF-8?q?chore:=E8=AF=AF=E5=88=A0=E4=BA=86pg?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index fb0f0f643..7c2105b43 100644 --- a/pom.xml +++ b/pom.xml @@ -1423,6 +1423,12 @@ spring-test test + + org.postgresql + postgresql + ${postgresql.connector.version} + test + -- Gitee From 0be97fc3792bf765a53b806d8b74daefe928ddd8 Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 7 Jan 2025 17:03:41 +0800 Subject: [PATCH 07/17] fix:add comma. --- .../org/apache/linkis/storage/utils/StorageConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index 2fec7779a..3df005e06 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -71,7 +71,7 @@ public class StorageConfiguration { new CommonVars<>( "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," - + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem" + + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," +"org.apache.linkis.storage.factory.impl.AzureBlobFileSystem", null, null); -- Gitee From bfdefdbb2caa8648b8eb6a5617d29f09fb74ce62 Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 7 Jan 2025 17:14:15 +0800 Subject: [PATCH 08/17] fix:use correct builders name. --- .../linkis/storage/factory/impl/BuildAzureBlobFileSystem.java | 1 - .../org/apache/linkis/storage/utils/StorageConfiguration.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java index a33bc84fc..292bb952e 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.linkis.storage.factory.impl; import org.apache.linkis.common.io.Fs; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index 3df005e06..b56f46b68 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -72,7 +72,7 @@ public class StorageConfiguration { "wds.linkis.storage.build.fs.classes", "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," - +"org.apache.linkis.storage.factory.impl.AzureBlobFileSystem", + +"org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem", null, null); -- Gitee From b44c2096b5e25a4987078f7ddf1e8aaa17cc4afa Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 7 Jan 2025 17:22:40 +0800 Subject: [PATCH 09/17] chore:testcase suit --- .../apache/linkis/storage/utils/StorageConfigurationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala index 6534b25c6..cfb87a1fe 100644 --- a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala +++ b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala @@ -63,7 +63,7 @@ class StorageConfigurationTest { ) Assertions.assertEquals( "org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," + - "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem", + "org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem", storagebuildfsclasses ) Assertions.assertTrue(issharenode) -- Gitee From fa2bee3bdec84e165672d5e7bd687a0bcad3e79c Mon Sep 17 00:00:00 2001 From: btxod Date: Mon, 13 Jan 2025 17:18:33 +0800 Subject: [PATCH 10/17] =?UTF-8?q?feat:CR=E4=BF=AE=E6=94=B9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/fs/impl/AzureBlobFileSystem.java | 629 ++++++++++-------- .../storage/utils/StorageConfiguration.java | 2 + linkis-dist/package/conf/linkis.properties | 3 +- 3 files changed, 359 insertions(+), 275 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index c8454617d..67475aecf 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -17,304 +17,385 @@ package org.apache.linkis.storage.fs.impl; +import com.azure.core.util.polling.SyncPoller; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobCopyInfo; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; import org.apache.linkis.common.io.FsPath; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; import org.apache.linkis.storage.utils.StorageConfiguration; import org.apache.linkis.storage.utils.StorageUtils; -import org.apache.commons.math3.util.Pair; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import com.azure.identity.DefaultAzureCredential; -import com.azure.identity.DefaultAzureCredentialBuilder; -import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlockBlobClient; - import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; +import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA; public class AzureBlobFileSystem extends FileSystem { - /** manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 */ - private BlobServiceClient serviceClient; - - /** - * getBlobContainerClient - * - * @param containerName - * @return client which can manipulate Azure Storage containers and their blobs.
- * 操作一个容器和其blobs的客户端 - */ - private BlobContainerClient getBlobContainerClient(String containerName) { - return serviceClient.getBlobContainerClient(containerName); - } - - /** - * @param dest - * @return a pair of containerName and blobName. - */ - private Pair azureLocation(FsPath dest) { - String path = dest.getPath(); - // split to container and blob object, - // container/dir/blobname will split to container and dir/blobname - String[] names = path.split("/", 2); - return Pair.create(names[0], names[1]); - } - - private Pair azureLocation(String dest) { - FsPath fp = new FsPath(dest); - return this.azureLocation(fp); - } - - /** - * init serviceClient - * - * @param properties - * @throws IOException - */ - @Override - public void init(Map properties) throws IOException { + private static final String SLASH = "/"; + + public static class PahtInfo { + private String schema = "http://"; // http + private String domain; // + private String container; // container name + private String blobName; // blob name + private String tail; + + public PahtInfo(String domain, String container, String blobName) { + this.domain = domain; + this.container = container; + this.blobName = blobName; + if (blobName != null) { + String[] names = blobName.split(SLASH, -1); + tail = names[names.length - 1]; + } + } + + public String toFullName() { + return schema + domain + SLASH + container + SLASH + blobName; + } + + public String getSchema() { + return schema; + } + + public String getDomain() { + return domain; + } + + public String getContainer() { + return container; + } + + public String getBlobName() { + return blobName; + } + + public String getTail() { + return tail; + } + + @Override + public String toString() { + return "PahtInfo{" + + "schema='" + schema + '\'' + + ", domain='" + domain + '\'' + + ", container='" + container + '\'' + + ", blobName='" + blobName + '\'' + + ", tail='" + tail + '\'' + + '}'; + } + } + + /** + * manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 + */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.
+ * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + private PahtInfo azureLocation(String path) { + return this.azureLocation(new FsPath(path)); + } + + /** + * @param dest + * @return domain name,container name,blob name + */ + private PahtInfo azureLocation(FsPath dest) { + //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname + // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname + String path = dest.getPath(); + // myaccount.blob.core.windows.net/mycontainer/dir/blobname + // will split to myaccount.blob.core.windows.net + // and mycontainer/dir/blobname + String[] paths = path.split(SLASH, 2); + if (paths.length < 2) { + throw new IllegalArgumentException("file path error,with out container:" + path); + } + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = paths[1].split(SLASH, 2); + if (names.length < 2) { + return new PahtInfo(paths[0], names[0], null); + } else { + return new PahtInfo(paths[0], names[0], names[1]); + } + } + + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map properties) throws IOException { + + /** + * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 + */ + String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); + String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties); + // Azure SDK client builders accept the credential as a parameter + serviceClient = + new BlobServiceClientBuilder() + .endpoint(BLOB_SCHEMA + acctName + ".blob.core.windows.net/") + .connectionString(connectStr) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.BLOB; + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + } + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.openInputStream(); + } /** - * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 - * TODO 是否需要对多账户支持 + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream */ - String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); + @Override + public OutputStream write(FsPath dest, boolean overwrite) { - /* - * The default credential first checks environment variables for configuration - * If environment configuration is incomplete, it will try managed identity + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob
+ * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException */ - DefaultAzureCredential defaultCredential = new DefaultAzureCredentialBuilder().build(); - - // Azure SDK client builders accept the credential as a parameter - serviceClient = - new BlobServiceClientBuilder() - .endpoint("https://" + acctName + ".blob.core.windows.net/") - .credential(defaultCredential) - .buildClient(); - } - - /** - * name of the fileSystem - * - * @return - */ - @Override - public String fsName() { - return StorageUtils.BLOB; - } - - @Override - public String rootUserName() { - return ""; - } - - /** - * @param dest - * @return - * @throws IOException - */ - @Override - public FsPath get(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return path; - } else { - throw new StorageWarnException( - TO_BE_UNKNOW.getErrorCode(), - "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; + } + PahtInfo names = this.azureLocation(dest); + // TODO 如果是路径的话后面补一个文件. + if (!names.getTail().contains(".")) { + String tmp = names.toFullName() + SLASH + "_tmp.txt"; + names = this.azureLocation(tmp); + } + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); + try (BlobOutputStream bos = + client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { + bos.write(1); + bos.flush(); + } + + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.
+ * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List list(FsPath path) throws IOException { + final PahtInfo result = azureLocation(path); + return getBlobContainerClient(result.getContainer()).listBlobs().stream() + // Azure不会返回已删除对象 + .filter(item -> !item.isDeleted()) + .map(item -> { + FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); + // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 + if (item.getProperties().getContentType() == null) { + tmp.setIsdir(true); + } + return tmp; + }) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } } - } - - /** - * Opens a blob input stream to download the blob. - * - * @param dest - * @return - * @throws BlobStorageException – If a storage service error occurred. - */ - @Override - public InputStream read(FsPath dest) { - Pair result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getFirst()).getBlobClient(result.getSecond()); - return blobclient.openInputStream(); - } - - /** - * @param dest - * @param overwrite - * @return - * @throws BlobStorageException – If a storage service error occurred. - * @see BlockBlobClient #getBlobOutputStream - */ - @Override - public OutputStream write(FsPath dest, boolean overwrite) { - - Pair result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getFirst()).getBlobClient(result.getSecond()); - return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); - } - - /** - * create a blob
- * 创建一个对象("文件") - * - * @param dest - * @return - * @throws IOException - */ - @Override - public boolean create(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return false; + + @Override + public boolean exists(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists(); } - Pair names = this.azureLocation(dest); - BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getFirst()); - client.getBlobClient(names.getSecond()); - return true; - } - - /** - * Flat listing 5000 results at a time,without deleted.
- * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? - * - * @param path - * @return - * @throws IOException - */ - @Override - public List list(FsPath path) throws IOException { - Pair result = azureLocation(path); - return getBlobContainerClient(result.getFirst()).listBlobs().stream() - // TODO 是否过滤已删除对象 - .filter(item -> !item.isDeleted()) - .map(item -> new FsPath(item.getName())) - .collect(Collectors.toList()); - } - - @Override - public boolean canRead(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; + + @Override + public boolean copy(String origin, String dest) throws IOException { + PahtInfo oriNames = this.azureLocation(origin); + PahtInfo destNames = this.azureLocation(dest); + + BlobClient oriClient = + getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); + BlockBlobClient destClient = + getBlobContainerClient(destNames.getContainer()) + .getBlobClient(destNames.getBlobName()) + .getBlockBlobClient(); + SyncPoller poller = destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); + poller.waitForCompletion(); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); } - } - - @Override - public boolean canWrite(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException { } - } - - @Override - public boolean exists(FsPath dest) throws IOException { - Pair file = this.azureLocation(dest); - return getBlobContainerClient(file.getFirst()).getBlobClient(file.getSecond()).exists(); - } - - @Override - public boolean delete(FsPath dest) throws IOException { - Pair file = this.azureLocation(dest); - return getBlobContainerClient(file.getFirst()).getBlobClient(file.getSecond()).deleteIfExists(); - } - - @Override - public boolean copy(String origin, String dest) throws IOException { - Pair oriNames = this.azureLocation(origin); - Pair destNames = this.azureLocation(dest); - - BlobClient oriClient = - getBlobContainerClient(oriNames.getFirst()).getBlobClient(oriNames.getSecond()); - BlockBlobClient destClient = - getBlobContainerClient(destNames.getFirst()) - .getBlobClient(destNames.getSecond()) - .getBlockBlobClient(); - destClient.uploadFromUrl(oriClient.getBlobUrl(), true); - return true; - } - - @Override - public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { - // 没有事务性保证 - this.copy(oldDest.getPath(), newDest.getPath()); - this.delete(oldDest); - return true; - } - - @Override - public boolean mkdir(FsPath dest) throws IOException { - return this.create(dest.getPath()); - } - - @Override - public boolean mkdirs(FsPath dest) throws IOException { - return this.mkdir(dest); - } - - // TODO 下面这些方法可能都无法支持 - @Override - public String listRoot() throws IOException { - return ""; - } - - @Override - public long getTotalSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getFreeSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getUsableSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public boolean canExecute(FsPath dest) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user, String group) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user) throws IOException { - return false; - } - - @Override - public boolean setGroup(FsPath dest, String group) throws IOException { - return false; - } - - @Override - public boolean setPermission(FsPath dest, String permission) throws IOException { - return false; - } - - @Override - public void close() throws IOException {} } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java index b56f46b68..d2199c9a9 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/utils/StorageConfiguration.java @@ -162,4 +162,6 @@ public class StorageConfiguration { public static CommonVars AZURE_ACCT_NAME = new CommonVars("linkis.storage.azure.acctName", "", null, null); + public static CommonVars AZURE_ACCT_CONNECT_STR = + new CommonVars("linkis.storage.azure.connectstr", "", null, null); } diff --git a/linkis-dist/package/conf/linkis.properties b/linkis-dist/package/conf/linkis.properties index 303e53349..019adc366 100644 --- a/linkis-dist/package/conf/linkis.properties +++ b/linkis-dist/package/conf/linkis.properties @@ -120,4 +120,5 @@ linkis.storage.s3.region= linkis.storage.s3.bucket= # azure file system -linkis.storage.azure.acctName= \ No newline at end of file +linkis.storage.azure.acctName= +linkis.storage.azure.connectstr= -- Gitee From 9ca0ac381cb16e538d22ddfc93c643233d29e47c Mon Sep 17 00:00:00 2001 From: btxod Date: Tue, 21 Jan 2025 16:41:23 +0800 Subject: [PATCH 11/17] chore:upgrade netty --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7c2105b43..a3fcde0cd 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,7 @@ 1.19.4 2.23.1 - 4.1.86.Final + 4.1.108 9.4.48.v20220622 4.5.13 ${httpclient.version} -- Gitee From 763343d1c7307723e28a3b030e39b82491b386b3 Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 10:36:31 +0800 Subject: [PATCH 12/17] chore:add log --- .../linkis/storage/fs/impl/AzureBlobFileSystem.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 67475aecf..9ad65ee31 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -31,6 +31,8 @@ import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; import org.apache.linkis.storage.utils.StorageConfiguration; import org.apache.linkis.storage.utils.StorageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -45,6 +47,8 @@ import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA; public class AzureBlobFileSystem extends FileSystem { + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); + private static final String SLASH = "/"; public static class PahtInfo { @@ -223,8 +227,9 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public OutputStream write(FsPath dest, boolean overwrite) { - + LOG.info("azblob,write:{}",dest.getPath()); PahtInfo result = azureLocation(dest); + LOG.info("azblob,write:{}",result); BlobClient blobclient = getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); @@ -240,6 +245,7 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public boolean create(String dest) throws IOException { + LOG.info("azblob,create:{}",dest); FsPath path = new FsPath(dest); if (exists(path)) { return false; @@ -305,7 +311,9 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean exists(FsPath dest) throws IOException { + LOG.info("azblob exists:{}",dest); PahtInfo file = this.azureLocation(dest); + LOG.info("azblob exists:{}",file); return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); } @@ -341,6 +349,7 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean mkdir(FsPath dest) throws IOException { + LOG.info("azblob,mkdir:{}",dest.getPath()); return this.create(dest.getPath()); } -- Gitee From 2ef438d9a82734933fd8b9b12d4c995070cedb5e Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 14:14:17 +0800 Subject: [PATCH 13/17] fix:canExecute means canAccess. --- .../org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 9ad65ee31..f132f09d5 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -381,7 +381,7 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean canExecute(FsPath dest) throws IOException { - return false; + return true; } @Override -- Gitee From 007d91152a999660f6c825c4a1d6bf6c393683a7 Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 14:46:15 +0800 Subject: [PATCH 14/17] =?UTF-8?q?chore:=E5=BF=BD=E7=95=A5=E5=8F=AA?= =?UTF-8?q?=E6=9C=89=E5=AE=B9=E5=99=A8=E7=9A=84=E6=9F=A5=E8=AF=A2.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/fs/impl/AzureBlobFileSystem.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index f132f09d5..54dd02745 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -26,6 +26,7 @@ import com.azure.storage.blob.models.BlobCopyInfo; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.specialized.BlobOutputStream; import com.azure.storage.blob.specialized.BlockBlobClient; +import org.apache.commons.lang3.StringUtils; import org.apache.linkis.common.io.FsPath; import org.apache.linkis.storage.exception.StorageWarnException; import org.apache.linkis.storage.fs.FileSystem; @@ -227,9 +228,9 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public OutputStream write(FsPath dest, boolean overwrite) { - LOG.info("azblob,write:{}",dest.getPath()); + LOG.info("azblob,write:{}", dest.getPath()); PahtInfo result = azureLocation(dest); - LOG.info("azblob,write:{}",result); + LOG.info("azblob,write:{}", result); BlobClient blobclient = getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); @@ -245,7 +246,7 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public boolean create(String dest) throws IOException { - LOG.info("azblob,create:{}",dest); + LOG.info("azblob,create:{}", dest); FsPath path = new FsPath(dest); if (exists(path)) { return false; @@ -311,9 +312,11 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean exists(FsPath dest) throws IOException { - LOG.info("azblob exists:{}",dest); PahtInfo file = this.azureLocation(dest); - LOG.info("azblob exists:{}",file); + if (StringUtils.isBlank(file.getBlobName())) { + LOG.warn("no file specified:{}", file); + return true; + } return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); } @@ -349,7 +352,7 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean mkdir(FsPath dest) throws IOException { - LOG.info("azblob,mkdir:{}",dest.getPath()); + LOG.info("azblob,mkdir:{}", dest.getPath()); return this.create(dest.getPath()); } -- Gitee From 659e87bff6c392147959469890bbc2aa3f6a8168 Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 15:03:44 +0800 Subject: [PATCH 15/17] =?UTF-8?q?chore:=E5=BF=BD=E7=95=A5=E5=8F=AA?= =?UTF-8?q?=E6=9C=89=E5=AE=B9=E5=99=A8=E7=9A=84=E6=9F=A5=E8=AF=A2.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/linkis/storage/fs/impl/AzureBlobFileSystem.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 54dd02745..11344b2ec 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -228,9 +228,7 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public OutputStream write(FsPath dest, boolean overwrite) { - LOG.info("azblob,write:{}", dest.getPath()); PahtInfo result = azureLocation(dest); - LOG.info("azblob,write:{}", result); BlobClient blobclient = getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); @@ -246,7 +244,6 @@ public class AzureBlobFileSystem extends FileSystem { */ @Override public boolean create(String dest) throws IOException { - LOG.info("azblob,create:{}", dest); FsPath path = new FsPath(dest); if (exists(path)) { return false; @@ -352,7 +349,6 @@ public class AzureBlobFileSystem extends FileSystem { @Override public boolean mkdir(FsPath dest) throws IOException { - LOG.info("azblob,mkdir:{}", dest.getPath()); return this.create(dest.getPath()); } -- Gitee From dc01b3094b5c308fc26b167408fdabbc219e435a Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 15:42:41 +0800 Subject: [PATCH 16/17] feat:add azure support. --- .../apache/linkis/filesystem/validator/PathValidator.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala b/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala index b2d5db4fe..827b67cc2 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala @@ -85,10 +85,11 @@ class PathValidator extends Logging { } def checkPath(path: String, username: String): Unit = { - // unchecked hdfs,oss,s3 + // unchecked hdfs,oss,s3,azureblob if ( - (path.contains(StorageUtils.HDFS_SCHEMA)) || (path - .contains(StorageUtils.OSS_SCHEMA)) || (path.contains(StorageUtils.S3_SCHEMA)) + (path.contains(StorageUtils.HDFS_SCHEMA)) || (path + .contains(StorageUtils.OSS_SCHEMA)) || (path.contains(StorageUtils.S3_SCHEMA) + || (path.contains(StorageUtils.BLOB_SCHEMA)) ) { return } -- Gitee From 3959f812cfa4b94bfa0d327cdb77a9f31ad4d7df Mon Sep 17 00:00:00 2001 From: btxod Date: Thu, 23 Jan 2025 15:59:19 +0800 Subject: [PATCH 17/17] fix:amend --- .../org/apache/linkis/filesystem/validator/PathValidator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala b/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala index 827b67cc2..b3d155f6d 100644 --- a/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala +++ b/linkis-public-enhancements/linkis-pes-publicservice/src/main/scala/org/apache/linkis/filesystem/validator/PathValidator.scala @@ -88,7 +88,7 @@ class PathValidator extends Logging { // unchecked hdfs,oss,s3,azureblob if ( (path.contains(StorageUtils.HDFS_SCHEMA)) || (path - .contains(StorageUtils.OSS_SCHEMA)) || (path.contains(StorageUtils.S3_SCHEMA) + .contains(StorageUtils.OSS_SCHEMA)) || (path.contains(StorageUtils.S3_SCHEMA)) || (path.contains(StorageUtils.BLOB_SCHEMA)) ) { return -- Gitee