diff --git a/dli_prefect/dli_block.py b/dli_prefect/dli_block.py new file mode 100644 index 0000000000000000000000000000000000000000..006e210db71a379e44411324a9a89ac46a77f3ac --- /dev/null +++ b/dli_prefect/dli_block.py @@ -0,0 +1,473 @@ +import time +from logging import Logger +from typing import Optional + +from dli.dli_client import DliClient +from dli.exception import DliException +from dli.table import TableSchema +from prefect.blocks.core import Block +from prefect.exceptions import MissingContextError +from prefect.logging.loggers import get_logger, get_run_logger +from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible +from pydantic import Field, SecretStr + + +class Column: + def __init__(self, name, data_type): + self.name = name + self.data_type = data_type + + +class DliBlock(Block): + _logo_url = "https://res.hc-cdn.com/console-ui-common/default/logo.svg" + _block_type_name = "HuaweiCloud DLI" + _documentation_url = "https://support.huaweicloud.com/sdkreference-dli/dli_04_0033.html" + + auth_mode = 'AK/SK' + + huawei_cloud_access_key_id: Optional[SecretStr] = Field( + default=None, + description="A specific Huawei Cloud access key ID.", + title="Huawei Cloud Access Key ID", + ) + huawei_cloud_secret_access_key: Optional[SecretStr] = Field( + default=None, + description="A specific Huawei Cloud secret access key.", + title="Huawei Cloud Access Key Secret", + ) + huawei_cloud_security_token: Optional[SecretStr] = Field( + default=None, + description="SecurityToken in the temporary access key, " + "You can select a temporary token or AK/SK for authentication", + title="Huawei Cloud Security Token", + ) + region: Optional[SecretStr] = Field( + default=None, + description="Server encoding", + title="Region", + ) + project_id: Optional[SecretStr] = Field( + default=None, + description="API credential item ID", + title="Project_id", + ) + end_point: str = Field( + default="dli.cn-north-4.myhuaweicloud.com", + description=( + "Service address for connecting to DLI. The value can contain the protocol type, domain name, " + "and port number. Example: https://your-endpoint:443." + ), + title="End Point", + ) + obs_endpoint: Optional[SecretStr] = Field( + default="https://obs.cn-south-1.myhuaweicloud.com", + description=( + "Service address for connecting to OBS. The value can contain the protocol type, domain name, " + "and port number. Example: https://your-endpoint:443. (For security purposes, HTTPS is recommended.)" + ), + title="Obs End Point", + ) + username: Optional[SecretStr] = Field( + default=None, + description="Your Huawei Cloud account name", + title="Username", + ) + queue_name: Optional[SecretStr] = Field( + default=None, + description="Queue name", + title="Queue_name", + ) + + @property + def logger(self) -> Logger: + """ + Returns a logger based on whether the ObjectStorageBlock + is called from within a flow or task run context. + If a run context is present, the logger property returns a run logger. + Else, it returns a default logger labeled with the class's name. + + Returns: + The run logger or a default logger with the class's name. + """ + try: + return get_run_logger() + except MissingContextError: + return get_logger(self.__class__.__name__) + + @sync_compatible + async def upload_resource(self, kind, obs_jar_paths, group_name): + """ + Upload different types of resource packages to Huawei Cloud's OBS service + Args: + kind: Type of resource package, the currently supported package types include: + jar: User jar file + pyfile: User Python file + file: User file + modelfile: User AI model file + obs_jar_paths: OBS path for the corresponding resource package, structured as: + {bucketName}.{obs domain}/{jarPath}/{jarName}. + For example: "https://bucketname.obs.cn-north-1.myhuaweicloud.com/jarname.jar" + group_name: Name of the group to which the resource package belongs. + + Returns: + + """ + + try: + dli_client = self._get_dli_client() + await run_sync_in_worker_thread(dli_client.upload_resource, kind, obs_jar_paths, group_name) + self.logger.info(f"Resource uploaded successfully for kind '{kind}' in group '{group_name}'.") + except DliException as e: + self.logger.error(f"Failed to upload resource for kind '{kind}' in group '{group_name}': {e}") + + @sync_compatible + async def delete_resource(self, resource_name, group_name): + """ + Delete specified resource package + Args: + resource_name: Name of the resource + group_name: Name of the group to which the resource package belongs. + + Returns: + + """ + try: + dli_client = self._get_dli_client() + await run_sync_in_worker_thread(dli_client.delete_resource, resource_name, group_name) + self.logger.info(f"Resource '{resource_name}' in group '{group_name}' deleted successfully.") + except DliException as e: + self.logger.error(f"Failed to delete resource '{resource_name}' in group '{group_name}': {e}") + + @sync_compatible + async def create_db(self, db_name: str = None): + """ + Create a database from DLI + + Args: + db_name: The database you want to delete + + Returns: + + """ + client = self._get_dli_client() + try: + db = await run_sync_in_worker_thread(client.create_database, db_name) + self.logger.info(f"Database '{db_name}' created successfully.") + return db + except DliException as e: + self.logger.error(f"Failed to create database '{db_name}': {e}") + + @sync_compatible + async def delete_database(self, db_name: str): + """ + Delete a database from DLI + + Args: + db_name: The database you want to delete + + Returns: + + """ + dli_client = self._get_dli_client() + try: + await run_sync_in_worker_thread(dli_client.delete_database, db_name) + self.logger.info(f"Database '{db_name}' deleted successfully.") + except DliException as e: + self.logger.error(f"Failed to delete database '{db_name}': {e}") + + @sync_compatible + async def list_all_dbs(self): + """ + Show all databases from DLI + Returns: All databases + """ + client = self._get_dli_client() + try: + dbs = await run_sync_in_worker_thread(client.list_databases) + self.logger.info("List of all databases:") + for db in dbs: + self.logger.info(db) + except DliException as e: + self.logger.error(f"Failed to list all databases: {e}") + + @sync_compatible + async def create_obs_tbl(self, db_name, tbl_name): + """ + Delete a database from OBS + + Args: + db_name: The database you want to delete + tbl_name: The table you want to delete + + Returns: + + """ + cols = [ + Column('col_1', 'string'), + Column('col_2', 'string'), + Column('col_3', 'smallint'), + Column('col_4', 'int'), + Column('col_5', 'bigint'), + Column('col_6', 'double'), + Column('col_7', 'decimal(10,0)'), + Column('col_8', 'boolean'), + Column('col_9', 'date'), + Column('col_10', 'timestamp') + ] + tbl_schema = TableSchema(tbl_name, cols) + try: + table = await run_sync_in_worker_thread( + self._get_dli_client().create_obs_table, + db_name, + tbl_schema, + 'obs://bucket/obj', + 'csv' + ) + self.logger.info(f"Observatory table '{tbl_name}' created successfully.") + self.logger.info("%s", table) + return table + except DliException as e: + self.logger.error(f"Failed to create Observatory table '{tbl_name}': {e}") + + @sync_compatible + async def delete_tbls(self, db_name): + """ + Delete a database from DLI + + Args: + db_name: The database you want to delete + + Returns: + + """ + try: + dli_client = self._get_dli_client() + tbls = await run_sync_in_worker_thread(dli_client.list_tables, db_name) + for tbl in tbls: + await run_sync_in_worker_thread(dli_client.delete_table, db_name, tbl.name) + self.logger.info(f"Table '{tbl.name}' in database '{db_name}' deleted successfully.") + + except DliException as e: + self.logger.error(f"Failed to delete tables in database '{db_name}': {e}") + + @sync_compatible + async def list_all_tbls(self, db_name): + """ + Show all tables in database + + Args: + db_name: The database you want to show + + Returns: + + """ + try: + dli_client = self._get_dli_client() + tbls = await run_sync_in_worker_thread(dli_client.list_tables, db_name, with_detail=True) + + for tbl in tbls: + self.logger.info(tbl.name) + + except DliException as e: + self.logger.error(f"Failed to list tables in database '{db_name}': {e}") + + @sync_compatible + async def import_data(self, db_name, tbl_name, queue_name): + """ + Import data into the specified queue + + Args: + db_name: Target database name + tbl_name: Target table name + queue_name: Target queue name + + Returns: + + """ + options = { + "with_column_header": True, + "delimiter": ",", + "quote_char": "\"", + "escape_char": "\\", + "date_format": "yyyy/MM/dd", + "timestamp_format": "yyyy/MM/dd hh:mm:ss" + } + + try: + dli_client = self._get_dli_client() + job_id, status = await run_sync_in_worker_thread( + dli_client.import_table, + tbl_name, + db_name, + 'obs://bucket/obj/data.csv', + 'csv', + queue_name=queue_name, + options=options + ) + + self.logger.info(f"Import job for table '{tbl_name}' in database '{db_name}' started.") + self.logger.info(f"Job ID: {job_id}") + self.logger.info(f"Job Status: {status}") + + except DliException as e: + self.logger.error(f"Failed to import data for table '{tbl_name}': {e}") + + @sync_compatible + async def export_data(self, db_name, tbl_name, queue_name): + """ + Export data from specified queue + + Args: + db_name: The database you want to export + tbl_name: The table you want to export + queue_name: The queue want to export + + Returns: + + """ + try: + dli_client = self._get_dli_client() + job_id, status = await run_sync_in_worker_thread( + dli_client.export_table, + tbl_name, + db_name, + 'obs://bucket/obj', + queue_name=queue_name + ) + + self.logger.info(f"Export job for table '{tbl_name}' in database '{db_name}' started.") + self.logger.info(f"Job ID: {job_id}") + self.logger.info(f"Job Status: {status}") + + except DliException as e: + self.logger.error(f"Failed to export data for table '{tbl_name}': {e}") + + @sync_compatible + async def run_sql(self, db_name, queue_name): + """ + Run a sql from DLI + + Args: + db_name: The database you want to run + queue_name: The queue you want to run + Returns: + + """ + try: + dli_client = self._get_dli_client() + sql_job = await run_sync_in_worker_thread( + dli_client.execute_sql, + 'select * from tbl_dli_for_test', + db_name, + queue_name=queue_name + ) + result_set = await run_sync_in_worker_thread(sql_job.get_result, queue_name=queue_name) + except DliException as e: + self.logger.error(f"Failed to execute SQL query: {e}") + return + + if result_set.row_count == 0: + return + + for row in result_set: + self.logger.info(row) + + try: + status = await run_sync_in_worker_thread( + sql_job.export_result, + 'obs://bucket/obj', + queue_name=queue_name + ) + self.logger.info(f"Export SQL query result to OBS status: {status}") + + except DliException as e: + self.logger.error(f"Failed to export SQL query result to OBS: {e}") + + @sync_compatible + async def cancel_sql(self, job_id): + """ + Cancel a sql job from DLI + + Args: + job_id: The ID of sql job you want to cancel + + Returns: + + """ + try: + dli_client = self._get_dli_client() + await run_sync_in_worker_thread(dli_client.cancel_sql, job_id) + self.logger.info(f"SQL job with ID '{job_id}' canceled successfully.") + except DliException as e: + self.logger.error(f"Failed to cancel SQL job with ID '{job_id}': {e}") + + @sync_compatible + async def submit_spark_batch_job(self, batch_queue_name, batch_job_info): + """ + Submit spark batch job from DLI + + Args: + batch_queue_name: The batch queue name + batch_job_info: The batch job information + + Returns: + + """ + try: + dli_client = self._get_dli_client() + batch_job = await run_sync_in_worker_thread( + dli_client.submit_spark_batch_job, + batch_queue_name, + batch_job_info + ) + except DliException as e: + self.logger.error(f"Failed to submit Spark batch job: {e}") + return + + self.logger.info(f"Spark batch job submitted successfully. Job ID: {batch_job.job_id}") + + while True: + time.sleep(3) + job_status = await run_sync_in_worker_thread(batch_job.get_job_status) + self.logger.info(f'Job status: {job_status}') + if job_status == 'dead' or job_status == 'success': + break + + logs = await run_sync_in_worker_thread(batch_job.get_driver_log, 500) + for log_line in logs: + self.logger.info(log_line) + + @sync_compatible + async def del_spark_batch(self, batch_id): + """ + Cancel spark batch job from DLI + + Args: + batch_id: The batch job ID + + Returns: + + """ + try: + dli_client = self._get_dli_client() + resp = await run_sync_in_worker_thread(dli_client.del_spark_batch_job, batch_id) + self.logger.info(resp.msg) + except DliException as e: + self.logger.error(f"Failed to delete Spark batch job with ID '{batch_id}': {e}") + + def _get_dli_client(self) -> DliClient: + """ + The authenticated DLI client is returned. You can select a temporary token or AK/SK for authentication. + Returns: DliClient + """ + if not self.huawei_cloud_access_key_id or not self.huawei_cloud_secret_access_key: + raise Exception("please input both huawei_cloud_access_key_id and huawei_cloud_secret_access_key") + return DliClient( + region=self.region.get_secret_value(), + project_id=self.project_id.get_secret_value(), + auth_mode='AK/SK', + ak=self.huawei_cloud_access_key_id.get_secret_value(), + sk=self.huawei_cloud_secret_access_key.get_secret_value(), + endpoint=self.end_point, + obs_endpoint=self.obs_endpoint.get_secret_value(), + ) \ No newline at end of file diff --git a/dli_prefect/example/cancel_sql.py b/dli_prefect/example/cancel_sql.py new file mode 100644 index 0000000000000000000000000000000000000000..efa3f32e7d963f7959c5979ad57478bc3345ade5 --- /dev/null +++ b/dli_prefect/example/cancel_sql.py @@ -0,0 +1,22 @@ +import pytest +from dli_block_test import DliBlock +from prefect import flow + +huaweicloud_dli_block = DliBlock.load("dli-test-1") +client = huaweicloud_dli_block._get_dli_client() + + +@pytest.fixture +def job_id(): + return "*****" + + +@flow +def test_cancel_sql(job_id: str): + cancel_sql = huaweicloud_dli_block.cancel_sql(job_id) + assert cancel_sql is None + + +if __name__ == "__main__": + job_id = '*****' + test_cancel_sql(job_id) diff --git a/dli_prefect/example/create_db.py b/dli_prefect/example/create_db.py new file mode 100644 index 0000000000000000000000000000000000000000..56c17592edcfe0d58586b51952de4d1956b3f91e --- /dev/null +++ b/dli_prefect/example/create_db.py @@ -0,0 +1,22 @@ +from prefect import flow +from dli_block_test import DliBlock +import pytest + +huaweicloud_dli_block = DliBlock.load("dli-test-1") +client = huaweicloud_dli_block._get_dli_client() + + +@pytest.fixture +def db_name(): + return "new1" + + +@flow +def test_create_db(db_name: str): + created_db = huaweicloud_dli_block.create_db(db_name) + assert created_db is not None + + +if __name__ == "__main__": + db_name = 'new1' + test_create_db(db_name) diff --git a/dli_prefect/example/delete_database.py b/dli_prefect/example/delete_database.py new file mode 100644 index 0000000000000000000000000000000000000000..84215676e34ddaafc8b0cd5f299aeb2b06cb2243 --- /dev/null +++ b/dli_prefect/example/delete_database.py @@ -0,0 +1,22 @@ +from prefect import flow +from dli_block_test import DliBlock +import pytest + +huaweicloud_dli_block = DliBlock.load("dli-test-1") +client = huaweicloud_dli_block._get_dli_client() + + +@pytest.fixture +def db_name(): + return "new1" + + +@flow +def test_delete_database(db_name: str): + delete_database = huaweicloud_dli_block.delete_database(db_name) + assert delete_database is None + + +if __name__ == "__main__": + db_name = 'new1' + test_delete_database(db_name) diff --git a/dli_prefect/example/list_all_dbs.py b/dli_prefect/example/list_all_dbs.py new file mode 100644 index 0000000000000000000000000000000000000000..2580a3d218d09e181b9501ad78301a1e08c3629f --- /dev/null +++ b/dli_prefect/example/list_all_dbs.py @@ -0,0 +1,15 @@ +from prefect import flow +from dli_block_test import DliBlock + +huaweicloud_dli_block = DliBlock.load("dli-test-1") +client = huaweicloud_dli_block._get_dli_client() + + +@flow +def test_list_all_dbs(): + list_all_dbs = huaweicloud_dli_block.list_all_dbs() + assert list_all_dbs is None + + +if __name__ == "__main__": + test_list_all_dbs() diff --git a/dli_prefect/example/list_all_tbls.py b/dli_prefect/example/list_all_tbls.py new file mode 100644 index 0000000000000000000000000000000000000000..dedd2ef596184642a8f0cad26430ffea064af84a --- /dev/null +++ b/dli_prefect/example/list_all_tbls.py @@ -0,0 +1,22 @@ +from prefect import flow +from dli_block_test import DliBlock +import pytest + +huaweicloud_dli_block = DliBlock.load("dli-test-1") +client = huaweicloud_dli_block._get_dli_client() + + +@pytest.fixture +def db_name(): + return "db1" + + +@flow +def test_list_all_tbls(db_name: str): + list_all_tbls = huaweicloud_dli_block.list_all_tbls(db_name) + assert list_all_tbls is None + + +if __name__ == "__main__": + db_name = 'db1' + test_list_all_tbls(db_name)