From 971812d0bb779e138fc0b05a3afa21a38afbc4bc Mon Sep 17 00:00:00 2001 From: jemappellehc Date: Wed, 26 Mar 2025 18:16:54 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=90=91=E9=87=8F=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E8=BF=81=E7=A7=BB=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zh/docs/DataVec/DataVec-Overview.md | 4 +- .../zh/docs/{SQLReference => DataVec}/PQ.md | 0 .../zh/docs/DataVec/milvus2datavec.md | 535 +++++++++++------ content/docs-lite/zh/menu/index.md | 12 +- content/zh/docs/DataVec/DataVec-Overview.md | 4 +- .../zh/docs/{SQLReference => DataVec}/PQ.md | 0 content/zh/docs/DataVec/milvus2datavec.md | 567 ++++++++++++------ content/zh/menu/index.md | 10 + 8 files changed, 766 insertions(+), 366 deletions(-) rename content/docs-lite/zh/docs/{SQLReference => DataVec}/PQ.md (100%) rename content/zh/docs/{SQLReference => DataVec}/PQ.md (100%) diff --git a/content/docs-lite/zh/docs/DataVec/DataVec-Overview.md b/content/docs-lite/zh/docs/DataVec/DataVec-Overview.md index 56037031c..a365f130a 100644 --- a/content/docs-lite/zh/docs/DataVec/DataVec-Overview.md +++ b/content/docs-lite/zh/docs/DataVec/DataVec-Overview.md @@ -27,9 +27,9 @@ DataVec架构与特性实现详情可参考[向量存储引擎](DataVec-architec ### 索引支持 - [IVFFLAT](../SQLReference/向量索引.md##IVFFlat) 倒排索引 -- [IVF-PQ](../SQLReference/PQ.md##IVF-PQ) 量化压缩倒排索引 +- [IVF-PQ](PQ.md##IVF-PQ) 量化压缩倒排索引 - [HNSW](../SQLReference/向量索引.md##HNSW) 图索引 -- [HNSW-PQ](../SQLReference/PQ.md##HNSW-PQ) 量化压缩图索引 +- [HNSW-PQ](PQ.md##HNSW-PQ) 量化压缩图索引 ## 生态对接 openGauss DataVec 提供Python、Java、Node.js、Go等多语言生态对接,让你能够通过API调用,快速使能向量数据库能力。同时, DataVec拥抱开源第三方组件,在RAG场景下做到快速兼容,多样选择。 diff --git a/content/docs-lite/zh/docs/SQLReference/PQ.md b/content/docs-lite/zh/docs/DataVec/PQ.md similarity index 100% rename from content/docs-lite/zh/docs/SQLReference/PQ.md rename to content/docs-lite/zh/docs/DataVec/PQ.md diff --git a/content/docs-lite/zh/docs/DataVec/milvus2datavec.md b/content/docs-lite/zh/docs/DataVec/milvus2datavec.md index 0ed381af4..abad35430 100644 --- a/content/docs-lite/zh/docs/DataVec/milvus2datavec.md +++ b/content/docs-lite/zh/docs/DataVec/milvus2datavec.md @@ -33,192 +33,381 @@ database = postgres [Table] milvus_collection_name = test opengauss_table_name = test + +[SparseVector] +# openGauss only support 1000 dimensions for sparsevec +default_dimension = 1000 + +[Output] +folder = output + +[Migration] +cleanup_temp_files = true ``` milvus2datavec.py迁移脚本如下: ```python import psycopg2 import csv -from pymilvus import connections, Collection -from concurrent.futures import ThreadPoolExecutor +from pymilvus import connections, Collection, utility import configparser import numpy as np -import re import os +import logging +from typing import List, Dict, Any, Optional, Union +from datetime import datetime + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.FileHandler('migration.log'), logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + + +class MilvusToOpenGaussMigrator: + def __init__(self, config_file: str = 'config.ini'): + self.config = self._load_config(config_file) + self.csv_file_path = self._get_csv_file_path() + self.fields = [] + self.MAX_WINDOW_SIZE = 16384 # Milvus default max query window + self.SPARSE_DIMENSION = self.config.getint('SparseVector', 'default_dimension', fallback=1000) + self.MAX_SPARSE_DIMENSION = 1000 + self.milvus_version = None + + def _load_config(self, config_file: str) -> configparser.ConfigParser: + """Load configuration file""" + config = configparser.ConfigParser() + try: + if not config.read(config_file): + raise FileNotFoundError(f"Config file {config_file} not found") + return config + except Exception as e: + logger.error(f"Failed to load config: {e}") + raise + + def _get_csv_file_path(self) -> str: + """Generate CSV file path with timestamp""" + output_folder = self.config.get('Output', 'folder', fallback='output') + os.makedirs(output_folder, exist_ok=True) + milvus_collection = self.config.get('Table', 'milvus_collection_name') + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return os.path.join(output_folder, f"{milvus_collection}_{timestamp}.csv") + + def _connect_milvus(self) -> Collection: + """Connect to Milvus""" + try: + connections.connect( + alias="default", + host=self.config.get('Milvus', 'host'), + port=self.config.get('Milvus', 'port') + ) + + self.milvus_version = utility.get_server_version() + logger.info(f"Connected to Milvus {self.milvus_version}") + + collection_name = self.config.get('Table', 'milvus_collection_name') + collection = Collection(collection_name) + collection.load() + self.fields = [field.name for field in collection.schema.fields] + logger.info(f"Loaded collection: {collection_name}") + return collection + except Exception as e: + logger.error(f"Milvus connection failed: {e}") + raise + + def _connect_opengauss(self) -> psycopg2.extensions.connection: + """Connect to openGauss""" + try: + conn = psycopg2.connect( + user=self.config.get('openGauss', 'user'), + password=self.config.get('openGauss', 'password'), + host=self.config.get('openGauss', 'host'), + port=self.config.get('openGauss', 'port'), + database=self.config.get('openGauss', 'database') + ) + logger.info("Connected to openGauss") + return conn + except Exception as e: + logger.error(f"openGauss connection failed: {e}") + raise + + def _process_sparse_vector(self, sparse_data: Union[dict, bytes, list], dimension: int) -> str: + """Convert to openGauss SPARSEVEC format: '{indice:value,...}/dim'""" + if sparse_data is None: + return "NULL" + + try: + # Convert to {index:value} dict + if dimension is None or dimension <=0: + dimension = self.MAX_SPARSE_DIMENSION + + sparse_dict = {} + + if isinstance(sparse_data, dict): + sparse_dict = { + int(k+1): float(v) + for k, v in sparse_data.items() + } + else: + raise ValueError(f"Unsupported format: {type(sparse_data)}") + + if not sparse_dict: + return "{}/" + str(dimension) + + try: + # Sort by index to ensure consistent output + + sorted_items = sorted(sparse_dict.items(), key=lambda x: x[0]) + entries = ",".join(f"{k}:{v}" for k, v in sorted_items) + return "{" + entries + "}/" + str(dimension) + except Exception as sort_error: + logger.warning(f"Sorting failed, using unsorted vector: {sort_error}") + entries = ",".join(f"{k}:{v}" for k, v in sparse_dict.items()) + return "{" + entries + "}/" + str(dimension) + + except Exception as e: + logger.error(f"Sparse vector conversion failed: {e}") + return "NULL" + + def _process_field_value(self, value: Any, field_type: str, dimension: Optional[int] = None) -> str: + """Convert field value to CSV string""" + if value is None: + return "NULL" + elif field_type == "SPARSE_FLOAT_VECTOR": + return self._process_sparse_vector(value, dimension or self.SPARSE_DIMENSION) + elif isinstance(value, (list, np.ndarray)): + return "[" + ",".join(str(x) for x in value) + "]" + elif isinstance(value, dict): + return json.dumps(value) + else: + return str(value) + + def _create_opengauss_table(self, conn: psycopg2.extensions.connection, collection: Collection) -> None: + """Create table with SPARSEVEC columns""" + table_name = self.config.get('Table', 'opengauss_table_name') + cursor = conn.cursor() + + try: + # Check if table exists + cursor.execute(f"SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename = '{table_name}');") + if cursor.fetchone()[0]: + logger.warning(f"Table {table_name} exists") + return + + # Build CREATE TABLE statement + columns = [] + for field in collection.schema.fields: + dim = field.dim if hasattr(field, 'dim') else None + pg_type = self._milvus_to_opengauss_type(field.dtype.name, dim) + columns.append(f"{field.name} {pg_type}") + + # Add primary key if exists + pk_fields = [f.name for f in collection.schema.fields if f.is_primary] + if pk_fields: + columns.append(f"PRIMARY KEY ({', '.join(pk_fields)})") + + create_sql = f"CREATE TABLE {table_name} ({', '.join(columns)});" + cursor.execute(create_sql) + conn.commit() + logger.info(f"Created table: {table_name}") + except Exception as e: + conn.rollback() + logger.error(f"Table creation failed: {e}") + raise + finally: + cursor.close() + + def _query_milvus_data(self, collection: Collection, limit: int, offset: int) -> List[Dict]: + """Query data with sparse vector support""" + try: + # Get field metadata + field_meta = {f.name: {"type": f.dtype.name, "dim": getattr(f, 'dim', None)} + for f in collection.schema.fields} + + # Adjust query window + if offset + limit > self.MAX_WINDOW_SIZE: + new_limit = self.MAX_WINDOW_SIZE - offset + if new_limit <= 0: + return [] + limit = new_limit + logger.warning(f"Adjusted limit to {limit}") + + # Query data + results = collection.query( + expr="", + output_fields=self.fields, + limit=limit, + offset=offset + ) + + # Process results + processed = [] + for row in results: + processed_row = {} + for field in self.fields: + meta = field_meta[field] + value = row.get(field) + processed_row[field] = self._process_field_value( + value, meta["type"], meta["dim"]) + processed.append(processed_row) + + return processed + except Exception as e: + logger.error(f"Query failed: {e}") + raise + + def _export_to_csv_chunked(self, collection: Collection) -> List[str]: + """Export data to CSV chunks""" + file_paths = [] + total_rows = 0 + offset = 0 + + try: + # Flush and get row count + collection.flush() + total_count = collection.num_entities + logger.info(f"Total rows to export: {total_count}") + + while total_rows < total_count: + # Create chunk file + chunk_id = len(file_paths) + 1 + chunk_file = f"{os.path.splitext(self.csv_file_path)[0]}_part{chunk_id}.csv" + file_paths.append(chunk_file) + + with open(chunk_file, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=self.fields) + writer.writeheader() + + chunk_rows = 0 + while chunk_rows < 100000: # 100K rows per chunk + batch = self._query_milvus_data( + collection, + limit=min(50000, 100000 - chunk_rows), + offset=offset + ) + if not batch: + break + + writer.writerows(batch) + chunk_rows += len(batch) + total_rows += len(batch) + offset += len(batch) + + if total_rows % 10000 == 0: + logger.info(f"Exported {total_rows}/{total_count} rows") + + logger.info(f"Created chunk {chunk_id}: {chunk_rows} rows") + + return file_paths + except Exception as e: + # Cleanup failed export + for f in file_paths: + try: + os.remove(f) + except: + pass + logger.error(f"Export failed: {e}") + raise + + def _import_to_opengauss(self, conn: psycopg2.extensions.connection, file_paths: List[str]) -> None: + """Import CSV data to openGauss""" + table_name = self.config.get('Table', 'opengauss_table_name') + cursor = conn.cursor() + + try: + # Prepare for bulk import + cursor.execute(f"TRUNCATE TABLE {table_name};") + conn.commit() + + total_rows = 0 + for i, csv_file in enumerate(file_paths, 1): + with open(csv_file, 'r') as f: + # Use COPY for bulk load + copy_sql = f""" + COPY {table_name} ({', '.join(self.fields)}) + FROM STDIN WITH (FORMAT CSV, HEADER, NULL 'NULL'); + """ + cursor.copy_expert(copy_sql, f) + conn.commit() + + rows_imported = cursor.rowcount + total_rows += rows_imported + logger.info(f"Imported {rows_imported} rows from {csv_file}") + + logger.info(f"Total imported: {total_rows} rows") + except Exception as e: + conn.rollback() + logger.error(f"Import failed: {e}") + raise + finally: + cursor.close() + + def run_migration(self) -> None: + """Execute full migration workflow""" + start_time = datetime.now() + logger.info("Starting migration") + + try: + # Step 1: Connect to Milvus + milvus_collection = self._connect_milvus() + + # Step 2: Export data to CSV + csv_files = self._export_to_csv_chunked(milvus_collection) + + # Step 3: Connect to openGauss and create table + opengauss_conn = self._connect_opengauss() + self._create_opengauss_table(opengauss_conn, milvus_collection) + + # Step 4: Import to openGauss + self._import_to_opengauss(opengauss_conn, csv_files) + + # Cleanup + if self.config.getboolean('Migration', 'cleanup_temp_files', fallback=True): + for f in csv_files: + try: + os.remove(f) + except Exception as e: + logger.warning(f"Failed to delete {f}: {e}") + + logger.info(f"Migration completed in {datetime.now() - start_time}") + except Exception as e: + logger.error(f"Migration failed: {e}") + raise + finally: + if 'opengauss_conn' in locals(): + opengauss_conn.close() + connections.disconnect("default") + + @staticmethod + def _milvus_to_opengauss_type(milvus_type: str, dim: Optional[int] = None) -> str: + """Map Milvus types to openGauss types""" + type_map = { + "Int64": "BIGINT", + "Int32": "INTEGER", + "Int16": "SMALLINT", + "Int8": "SMALLINT", + "Float": "REAL", + "Double": "DOUBLE PRECISION", + "Bool": "BOOLEAN", + "VarChar": "VARCHAR", + "String": "TEXT", + "Json": "JSONB", + "FLOAT_VECTOR": f"VECTOR({dim})" if dim else "VECTOR", + "BINARY_VECTOR": f"BIT({dim})" if dim else "BIT", + "SPARSE_FLOAT_VECTOR": "SPARSEVEC" + } + return type_map.get(milvus_type, "TEXT") -# Read database configuration from config.ini -config = configparser.ConfigParser() -config.read('config.ini') -# Connect to Milvus -def connect_milvus(): - milvus_host = config.get('Milvus', 'host') - milvus_port = config.get('Milvus', 'port') - try: - connections.connect(alias="default", host=milvus_host, port=milvus_port) - print("Connected to Milvus successfully!") - except Exception as e: - print(f"Failed to connect to Milvus: {e}") - -# Query data from Milvus collection in pages -def query_milvus_paginated(collection_name, limit=1000, offset=0): - collection = Collection(collection_name) - collection.load() - results = collection.query(expr="", output_fields=[field.name for field in collection.schema.fields], limit=limit, offset=offset) - return results - -# Connect to openGauss -def connect_opengauss(): - opengauss_user = config.get('openGauss', 'user') - opengauss_password = config.get('openGauss', 'password') - opengauss_host = config.get('openGauss', 'host') - opengauss_port = config.get('openGauss', 'port') - opengauss_database = config.get('openGauss', 'database') - try: - conn = psycopg2.connect( - user=opengauss_user, - password=opengauss_password, - host=opengauss_host, - port=opengauss_port, - database=opengauss_database - ) - print("Connected to openGauss successfully!") - return conn - except Exception as e: - print(f"Failed to connect to openGauss: {e}") - return None - -# Define the data type mapping from Milvus to openGauss -def milvus_to_opengauss_type(milvus_field_type): - type_mapping = { - "Int64": "BIGINT", - "Int8": "SMALLINT", - "Int16": "SMALLINT", - "Int32": "INTEGER", - "Float": "FLOAT", - "Double": "DOUBLE PRECISION", - "VarChar": "VARCHAR", - "String": "TEXT", - "Json": "TEXT", - "FLOAT_VECTOR": "VECTOR", - "BINARY_VECTOR": "BIT", - "SPARSE_FLOAT_VECTOR": "SPARSEVECTOR", - "Bool": "BOOLEAN", - - } - return type_mapping.get(milvus_field_type, "TEXT") - -# Create a table in openGauss -def create_table_opengauss(conn, table_name, fields, collection): - cursor = conn.cursor() - columns = [] - for field in fields: - # Manually find the field with the specified name - milvus_field = next((f for f in collection.schema.fields if f.name == field), None) - if milvus_field: - opengauss_type = milvus_to_opengauss_type(milvus_field.dtype.name) - columns.append(f"{field} {opengauss_type}") - else: - print(f"Field {field} not found in collection schema.") - columns_str = ", ".join(columns) - create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});" - try: - cursor.execute(create_table_query) - conn.commit() - print(f"Table {table_name} created successfully!") - except Exception as e: - print(f"Failed to create table: {e}") - finally: - cursor.close() - -# Save Milvus data to a CSV file in batches -def save_to_csv_batch(data, fields, csv_file_path): - with open(csv_file_path, 'a', newline='', encoding='utf-8') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - if csvfile.tell() == 0: - writer.writeheader() - for row in data: - new_row = {} - for field in fields: - value = row[field] - milvus_field = next((f for f in collection.schema.fields if f.name == field), None) - if milvus_field: - if isinstance(value, (list, np.ndarray)): - new_row[field] = "[" + ", ".join(str(x) for x in value) + "]" - else: - new_row[field] = str(value) - - writer.writerow(new_row) - print(f"Batch data saved to {csv_file_path} successfully!") - -# Export data from Milvus to a CSV file -def export_milvus_to_csv(milvus_collection_name, csv_file_path, limit=1000): - offset = 0 - collection = Collection(milvus_collection_name) - collection.load() - initial_data = query_milvus_paginated(milvus_collection_name, limit=1) - fields = [field.name for field in collection.schema.fields] - - while True: - milvus_data = query_milvus_paginated(milvus_collection_name, limit=limit, offset=offset) - if not milvus_data: - break - save_to_csv_batch(milvus_data, fields, csv_file_path) - offset += limit - -# Import data from a CSV file to openGauss -def import_csv_to_opengauss(conn, table_name, csv_file_path, fields): - cursor = conn.cursor() +if __name__ == "__main__": try: - # Truncate the target table - cursor.execute(f"TRUNCATE TABLE {table_name};") - with open(csv_file_path, 'r', encoding='utf-8') as csvfile: - columns = ', '.join(fields) - copy_query = f"COPY {table_name} ({columns}) FROM STDIN WITH (FORMAT CSV, HEADER);" - cursor.copy_expert(copy_query, csvfile) - conn.commit() - print("Data imported from CSV to openGauss successfully!") + migrator = MilvusToOpenGaussMigrator() + migrator.run_migration() except Exception as e: - print(f"Failed to import data from CSV to openGauss: {e}") - finally: - cursor.close() - -if __name__ == "__main__": - # Read table name from config file - milvus_collection_name = config.get('Table', 'milvus_collection_name') - opengauss_table_name = config.get('Table', 'opengauss_table_name') - - # Generate the CSV file path based on the table name - output_folder = 'output' - if not os.path.exists(output_folder): - os.makedirs(output_folder) - - #csv_file_path = f'{opengauss_table_name}.csv' - - # Generate the CSV file path based on the table name - csv_file_path = os.path.join(output_folder, f'{milvus_collection_name}.csv') - - # Connect to Milvus - connect_milvus() - collection = Collection(milvus_collection_name) - collection.load() - initial_data = query_milvus_paginated(milvus_collection_name, limit=1) - fields = [field.name for field in collection.schema.fields] - - # Connect to openGauss - opengauss_conn = connect_opengauss() - - if opengauss_conn: - # Create a table in openGauss - create_table_opengauss(opengauss_conn, opengauss_table_name, fields, collection) - - # Export data from Milvus to a CSV file - export_milvus_to_csv(milvus_collection_name, csv_file_path) - - # Import data from a CSV file to openGauss - import_csv_to_opengauss(opengauss_conn, opengauss_table_name, csv_file_path, fields) - - opengauss_conn.close() + logger.error(f"Migration failed: {e}") + exit(1) ``` diff --git a/content/docs-lite/zh/menu/index.md b/content/docs-lite/zh/menu/index.md index 38e099ea3..3454549bd 100644 --- a/content/docs-lite/zh/menu/index.md +++ b/content/docs-lite/zh/menu/index.md @@ -626,8 +626,18 @@ headless: true - [向量数据库]({{< relref "./docs/DataVec/DataVec-Overview.md" >}}) - [快速入门指南]({{< relref "./docs/DataVec/DataVec-quickstart.md" >}}) - [向量存储引擎]({{< relref "./docs/DataVec/DataVec-architecture.md" >}}) - - [工具编排使用]({{< relref "./docs/DataVec/DataVec-integrations.md" >}}) + - [PQ]({{< relref "./docs/DataVec/PQ.md" >}}) + - [工具编排使用]({{< relref "./docs/DataVec/DataVec-integrations.md" >}}) + - [使用openGauss部署Dify]({{< relref "./docs/DataVec/dify.md" >}}) + - [使用openGauss部署AnythingLLM]({{< relref "./docs/DataVec/anythingllm.md" >}}) + - [从Milvus迁移至openGauss DataVec]({{< relref "./docs/DataVec/milvus2datavec.md" >}}) + - [Python SDK对接向量数据库]({{< relref "./docs/DataVec/integrationPython.md" >}}) + - [Java SDK对接向量数据库]({{< relref "./docs/DataVec/integrationJava.md" >}}) + - [Go SDK对接向量数据库]({{< relref "./docs/DataVec/integrationGo.md" >}}) - [教程案例指导]({{< relref "./docs/DataVec/DataVec-tutorials.md" >}}) + - [打破AI黑盒,拥抱开源力量:基于openGauss+DeepSeek的本地知识库,打造你的专属AI助手!]({{< relref "./docs/DataVec/openGauss-RAG实践.md" >}}) + - [openGauss Datavec + Dify,快速搭建你的智能助手平台]({{< relref "./docs/DataVec/openGauss-Dify.md" >}}) + - [Spring Boot集成openGauss DataVec实现向量化检索]({{< relref "./docs/DataVec/openGauss-Springboot.md" >}} - [安全加固指南]({{< relref "./docs/SecHarden/secHarden.md" >}}) - [数据库加固概述]({{< relref "./docs/SecHarden/数据库加固概述.md" >}}) - [安全配置规范基线]({{< relref "./docs/SecHarden/安全配置规范基线.md" >}}) diff --git a/content/zh/docs/DataVec/DataVec-Overview.md b/content/zh/docs/DataVec/DataVec-Overview.md index e44f0778a..ed685312a 100644 --- a/content/zh/docs/DataVec/DataVec-Overview.md +++ b/content/zh/docs/DataVec/DataVec-Overview.md @@ -27,9 +27,9 @@ DataVec架构与特性实现详情可参考[向量存储引擎](DataVec-architec ### 索引支持 - [IVFFLAT](../SQLReference/向量索引.md##IVFFlat) 倒排索引 -- [IVF-PQ](../SQLReference/PQ.md##IVF-PQ) 量化压缩倒排索引 +- [IVF-PQ](PQ.md##IVF-PQ) 量化压缩倒排索引 - [HNSW](../SQLReference/向量索引.md##HNSW) 图索引 -- [HNSW-PQ](../SQLReference/PQ.md##HNSW-PQ) 量化压缩图索引 +- [HNSW-PQ](PQ.md##HNSW-PQ) 量化压缩图索引 ## 生态对接 openGauss DataVec 提供Python、Java、Node.js、Go等多语言生态对接,让你能够通过API调用,快速使能向量数据库能力。同时, DataVec拥抱开源第三方组件,在RAG场景下做到快速兼容,多样选择。 diff --git a/content/zh/docs/SQLReference/PQ.md b/content/zh/docs/DataVec/PQ.md similarity index 100% rename from content/zh/docs/SQLReference/PQ.md rename to content/zh/docs/DataVec/PQ.md diff --git a/content/zh/docs/DataVec/milvus2datavec.md b/content/zh/docs/DataVec/milvus2datavec.md index 424f2eef1..e53d9a14d 100644 --- a/content/zh/docs/DataVec/milvus2datavec.md +++ b/content/zh/docs/DataVec/milvus2datavec.md @@ -15,7 +15,8 @@ pip3 install numpy ``` ## 迁移操作 -### 1. 参考如下迁移脚本milvus2datavec.py及配置文件config.ini,根据本地部署的Milvus与openGauss进行配置修改: + +1. 参考如下迁移脚本milvus2datavec.py及配置文件config.ini,根据本地部署的Milvus与openGauss进行配置修改: 迁移配置文件config.ini如下: ``` @@ -32,192 +33,381 @@ database = postgres [Table] milvus_collection_name = test opengauss_table_name = test + +[SparseVector] +# openGauss only support 1000 dimensions for sparsevec +default_dimension = 1000 + +[Output] +folder = output + +[Migration] +cleanup_temp_files = true ``` milvus2datavec.py迁移脚本如下: ```python import psycopg2 import csv -from pymilvus import connections, Collection -from concurrent.futures import ThreadPoolExecutor +from pymilvus import connections, Collection, utility import configparser import numpy as np -import re import os +import logging +from typing import List, Dict, Any, Optional, Union +from datetime import datetime + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[logging.FileHandler('migration.log'), logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + + +class MilvusToOpenGaussMigrator: + def __init__(self, config_file: str = 'config.ini'): + self.config = self._load_config(config_file) + self.csv_file_path = self._get_csv_file_path() + self.fields = [] + self.MAX_WINDOW_SIZE = 16384 # Milvus default max query window + self.SPARSE_DIMENSION = self.config.getint('SparseVector', 'default_dimension', fallback=1000) + self.MAX_SPARSE_DIMENSION = 1000 + self.milvus_version = None + + def _load_config(self, config_file: str) -> configparser.ConfigParser: + """Load configuration file""" + config = configparser.ConfigParser() + try: + if not config.read(config_file): + raise FileNotFoundError(f"Config file {config_file} not found") + return config + except Exception as e: + logger.error(f"Failed to load config: {e}") + raise + + def _get_csv_file_path(self) -> str: + """Generate CSV file path with timestamp""" + output_folder = self.config.get('Output', 'folder', fallback='output') + os.makedirs(output_folder, exist_ok=True) + milvus_collection = self.config.get('Table', 'milvus_collection_name') + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return os.path.join(output_folder, f"{milvus_collection}_{timestamp}.csv") + + def _connect_milvus(self) -> Collection: + """Connect to Milvus""" + try: + connections.connect( + alias="default", + host=self.config.get('Milvus', 'host'), + port=self.config.get('Milvus', 'port') + ) + + self.milvus_version = utility.get_server_version() + logger.info(f"Connected to Milvus {self.milvus_version}") + + collection_name = self.config.get('Table', 'milvus_collection_name') + collection = Collection(collection_name) + collection.load() + self.fields = [field.name for field in collection.schema.fields] + logger.info(f"Loaded collection: {collection_name}") + return collection + except Exception as e: + logger.error(f"Milvus connection failed: {e}") + raise + + def _connect_opengauss(self) -> psycopg2.extensions.connection: + """Connect to openGauss""" + try: + conn = psycopg2.connect( + user=self.config.get('openGauss', 'user'), + password=self.config.get('openGauss', 'password'), + host=self.config.get('openGauss', 'host'), + port=self.config.get('openGauss', 'port'), + database=self.config.get('openGauss', 'database') + ) + logger.info("Connected to openGauss") + return conn + except Exception as e: + logger.error(f"openGauss connection failed: {e}") + raise + + def _process_sparse_vector(self, sparse_data: Union[dict, bytes, list], dimension: int) -> str: + """Convert to openGauss SPARSEVEC format: '{indice:value,...}/dim'""" + if sparse_data is None: + return "NULL" + + try: + # Convert to {index:value} dict + if dimension is None or dimension <=0: + dimension = self.MAX_SPARSE_DIMENSION + + sparse_dict = {} + + if isinstance(sparse_data, dict): + sparse_dict = { + int(k+1): float(v) + for k, v in sparse_data.items() + } + else: + raise ValueError(f"Unsupported format: {type(sparse_data)}") + + if not sparse_dict: + return "{}/" + str(dimension) + + try: + # Sort by index to ensure consistent output + + sorted_items = sorted(sparse_dict.items(), key=lambda x: x[0]) + entries = ",".join(f"{k}:{v}" for k, v in sorted_items) + return "{" + entries + "}/" + str(dimension) + except Exception as sort_error: + logger.warning(f"Sorting failed, using unsorted vector: {sort_error}") + entries = ",".join(f"{k}:{v}" for k, v in sparse_dict.items()) + return "{" + entries + "}/" + str(dimension) + + except Exception as e: + logger.error(f"Sparse vector conversion failed: {e}") + return "NULL" + + def _process_field_value(self, value: Any, field_type: str, dimension: Optional[int] = None) -> str: + """Convert field value to CSV string""" + if value is None: + return "NULL" + elif field_type == "SPARSE_FLOAT_VECTOR": + return self._process_sparse_vector(value, dimension or self.SPARSE_DIMENSION) + elif isinstance(value, (list, np.ndarray)): + return "[" + ",".join(str(x) for x in value) + "]" + elif isinstance(value, dict): + return json.dumps(value) + else: + return str(value) + + def _create_opengauss_table(self, conn: psycopg2.extensions.connection, collection: Collection) -> None: + """Create table with SPARSEVEC columns""" + table_name = self.config.get('Table', 'opengauss_table_name') + cursor = conn.cursor() + + try: + # Check if table exists + cursor.execute(f"SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename = '{table_name}');") + if cursor.fetchone()[0]: + logger.warning(f"Table {table_name} exists") + return + + # Build CREATE TABLE statement + columns = [] + for field in collection.schema.fields: + dim = field.dim if hasattr(field, 'dim') else None + pg_type = self._milvus_to_opengauss_type(field.dtype.name, dim) + columns.append(f"{field.name} {pg_type}") + + # Add primary key if exists + pk_fields = [f.name for f in collection.schema.fields if f.is_primary] + if pk_fields: + columns.append(f"PRIMARY KEY ({', '.join(pk_fields)})") + + create_sql = f"CREATE TABLE {table_name} ({', '.join(columns)});" + cursor.execute(create_sql) + conn.commit() + logger.info(f"Created table: {table_name}") + except Exception as e: + conn.rollback() + logger.error(f"Table creation failed: {e}") + raise + finally: + cursor.close() + + def _query_milvus_data(self, collection: Collection, limit: int, offset: int) -> List[Dict]: + """Query data with sparse vector support""" + try: + # Get field metadata + field_meta = {f.name: {"type": f.dtype.name, "dim": getattr(f, 'dim', None)} + for f in collection.schema.fields} + + # Adjust query window + if offset + limit > self.MAX_WINDOW_SIZE: + new_limit = self.MAX_WINDOW_SIZE - offset + if new_limit <= 0: + return [] + limit = new_limit + logger.warning(f"Adjusted limit to {limit}") + + # Query data + results = collection.query( + expr="", + output_fields=self.fields, + limit=limit, + offset=offset + ) + + # Process results + processed = [] + for row in results: + processed_row = {} + for field in self.fields: + meta = field_meta[field] + value = row.get(field) + processed_row[field] = self._process_field_value( + value, meta["type"], meta["dim"]) + processed.append(processed_row) + + return processed + except Exception as e: + logger.error(f"Query failed: {e}") + raise + + def _export_to_csv_chunked(self, collection: Collection) -> List[str]: + """Export data to CSV chunks""" + file_paths = [] + total_rows = 0 + offset = 0 + + try: + # Flush and get row count + collection.flush() + total_count = collection.num_entities + logger.info(f"Total rows to export: {total_count}") + + while total_rows < total_count: + # Create chunk file + chunk_id = len(file_paths) + 1 + chunk_file = f"{os.path.splitext(self.csv_file_path)[0]}_part{chunk_id}.csv" + file_paths.append(chunk_file) + + with open(chunk_file, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=self.fields) + writer.writeheader() + + chunk_rows = 0 + while chunk_rows < 100000: # 100K rows per chunk + batch = self._query_milvus_data( + collection, + limit=min(50000, 100000 - chunk_rows), + offset=offset + ) + if not batch: + break + + writer.writerows(batch) + chunk_rows += len(batch) + total_rows += len(batch) + offset += len(batch) + + if total_rows % 10000 == 0: + logger.info(f"Exported {total_rows}/{total_count} rows") + + logger.info(f"Created chunk {chunk_id}: {chunk_rows} rows") + + return file_paths + except Exception as e: + # Cleanup failed export + for f in file_paths: + try: + os.remove(f) + except: + pass + logger.error(f"Export failed: {e}") + raise + + def _import_to_opengauss(self, conn: psycopg2.extensions.connection, file_paths: List[str]) -> None: + """Import CSV data to openGauss""" + table_name = self.config.get('Table', 'opengauss_table_name') + cursor = conn.cursor() + + try: + # Prepare for bulk import + cursor.execute(f"TRUNCATE TABLE {table_name};") + conn.commit() + + total_rows = 0 + for i, csv_file in enumerate(file_paths, 1): + with open(csv_file, 'r') as f: + # Use COPY for bulk load + copy_sql = f""" + COPY {table_name} ({', '.join(self.fields)}) + FROM STDIN WITH (FORMAT CSV, HEADER, NULL 'NULL'); + """ + cursor.copy_expert(copy_sql, f) + conn.commit() + + rows_imported = cursor.rowcount + total_rows += rows_imported + logger.info(f"Imported {rows_imported} rows from {csv_file}") + + logger.info(f"Total imported: {total_rows} rows") + except Exception as e: + conn.rollback() + logger.error(f"Import failed: {e}") + raise + finally: + cursor.close() + + def run_migration(self) -> None: + """Execute full migration workflow""" + start_time = datetime.now() + logger.info("Starting migration") + + try: + # Step 1: Connect to Milvus + milvus_collection = self._connect_milvus() + + # Step 2: Export data to CSV + csv_files = self._export_to_csv_chunked(milvus_collection) + + # Step 3: Connect to openGauss and create table + opengauss_conn = self._connect_opengauss() + self._create_opengauss_table(opengauss_conn, milvus_collection) + + # Step 4: Import to openGauss + self._import_to_opengauss(opengauss_conn, csv_files) + + # Cleanup + if self.config.getboolean('Migration', 'cleanup_temp_files', fallback=True): + for f in csv_files: + try: + os.remove(f) + except Exception as e: + logger.warning(f"Failed to delete {f}: {e}") + + logger.info(f"Migration completed in {datetime.now() - start_time}") + except Exception as e: + logger.error(f"Migration failed: {e}") + raise + finally: + if 'opengauss_conn' in locals(): + opengauss_conn.close() + connections.disconnect("default") + + @staticmethod + def _milvus_to_opengauss_type(milvus_type: str, dim: Optional[int] = None) -> str: + """Map Milvus types to openGauss types""" + type_map = { + "Int64": "BIGINT", + "Int32": "INTEGER", + "Int16": "SMALLINT", + "Int8": "SMALLINT", + "Float": "REAL", + "Double": "DOUBLE PRECISION", + "Bool": "BOOLEAN", + "VarChar": "VARCHAR", + "String": "TEXT", + "Json": "JSONB", + "FLOAT_VECTOR": f"VECTOR({dim})" if dim else "VECTOR", + "BINARY_VECTOR": f"BIT({dim})" if dim else "BIT", + "SPARSE_FLOAT_VECTOR": "SPARSEVEC" + } + return type_map.get(milvus_type, "TEXT") -# Read database configuration from config.ini -config = configparser.ConfigParser() -config.read('config.ini') -# Connect to Milvus -def connect_milvus(): - milvus_host = config.get('Milvus', 'host') - milvus_port = config.get('Milvus', 'port') - try: - connections.connect(alias="default", host=milvus_host, port=milvus_port) - print("Connected to Milvus successfully!") - except Exception as e: - print(f"Failed to connect to Milvus: {e}") - -# Query data from Milvus collection in pages -def query_milvus_paginated(collection_name, limit=1000, offset=0): - collection = Collection(collection_name) - collection.load() - results = collection.query(expr="", output_fields=[field.name for field in collection.schema.fields], limit=limit, offset=offset) - return results - -# Connect to openGauss -def connect_opengauss(): - opengauss_user = config.get('openGauss', 'user') - opengauss_password = config.get('openGauss', 'password') - opengauss_host = config.get('openGauss', 'host') - opengauss_port = config.get('openGauss', 'port') - opengauss_database = config.get('openGauss', 'database') - try: - conn = psycopg2.connect( - user=opengauss_user, - password=opengauss_password, - host=opengauss_host, - port=opengauss_port, - database=opengauss_database - ) - print("Connected to openGauss successfully!") - return conn - except Exception as e: - print(f"Failed to connect to openGauss: {e}") - return None - -# Define the data type mapping from Milvus to openGauss -def milvus_to_opengauss_type(milvus_field_type): - type_mapping = { - "Int64": "BIGINT", - "Int8": "SMALLINT", - "Int16": "SMALLINT", - "Int32": "INTEGER", - "Float": "FLOAT", - "Double": "DOUBLE PRECISION", - "VarChar": "VARCHAR", - "String": "TEXT", - "Json": "TEXT", - "FLOAT_VECTOR": "VECTOR", - "BINARY_VECTOR": "BIT", - "SPARSE_FLOAT_VECTOR": "SPARSEVECTOR", - "Bool": "BOOLEAN", - - } - return type_mapping.get(milvus_field_type, "TEXT") - -# Create a table in openGauss -def create_table_opengauss(conn, table_name, fields, collection): - cursor = conn.cursor() - columns = [] - for field in fields: - # Manually find the field with the specified name - milvus_field = next((f for f in collection.schema.fields if f.name == field), None) - if milvus_field: - opengauss_type = milvus_to_opengauss_type(milvus_field.dtype.name) - columns.append(f"{field} {opengauss_type}") - else: - print(f"Field {field} not found in collection schema.") - columns_str = ", ".join(columns) - create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});" - try: - cursor.execute(create_table_query) - conn.commit() - print(f"Table {table_name} created successfully!") - except Exception as e: - print(f"Failed to create table: {e}") - finally: - cursor.close() - -# Save Milvus data to a CSV file in batches -def save_to_csv_batch(data, fields, csv_file_path): - with open(csv_file_path, 'a', newline='', encoding='utf-8') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - if csvfile.tell() == 0: - writer.writeheader() - for row in data: - new_row = {} - for field in fields: - value = row[field] - milvus_field = next((f for f in collection.schema.fields if f.name == field), None) - if milvus_field: - if isinstance(value, (list, np.ndarray)): - new_row[field] = "[" + ", ".join(str(x) for x in value) + "]" - else: - new_row[field] = str(value) - - writer.writerow(new_row) - print(f"Batch data saved to {csv_file_path} successfully!") - -# Export data from Milvus to a CSV file -def export_milvus_to_csv(milvus_collection_name, csv_file_path, limit=1000): - offset = 0 - collection = Collection(milvus_collection_name) - collection.load() - initial_data = query_milvus_paginated(milvus_collection_name, limit=1) - fields = [field.name for field in collection.schema.fields] - - while True: - milvus_data = query_milvus_paginated(milvus_collection_name, limit=limit, offset=offset) - if not milvus_data: - break - save_to_csv_batch(milvus_data, fields, csv_file_path) - offset += limit - -# Import data from a CSV file to openGauss -def import_csv_to_opengauss(conn, table_name, csv_file_path, fields): - cursor = conn.cursor() +if __name__ == "__main__": try: - # Truncate the target table - cursor.execute(f"TRUNCATE TABLE {table_name};") - with open(csv_file_path, 'r', encoding='utf-8') as csvfile: - columns = ', '.join(fields) - copy_query = f"COPY {table_name} ({columns}) FROM STDIN WITH (FORMAT CSV, HEADER);" - cursor.copy_expert(copy_query, csvfile) - conn.commit() - print("Data imported from CSV to openGauss successfully!") + migrator = MilvusToOpenGaussMigrator() + migrator.run_migration() except Exception as e: - print(f"Failed to import data from CSV to openGauss: {e}") - finally: - cursor.close() - -if __name__ == "__main__": - # Read table name from config file - milvus_collection_name = config.get('Table', 'milvus_collection_name') - opengauss_table_name = config.get('Table', 'opengauss_table_name') - - # Generate the CSV file path based on the table name - output_folder = 'output' - if not os.path.exists(output_folder): - os.makedirs(output_folder) - - #csv_file_path = f'{opengauss_table_name}.csv' - - # Generate the CSV file path based on the table name - csv_file_path = os.path.join(output_folder, f'{milvus_collection_name}.csv') - - # Connect to Milvus - connect_milvus() - collection = Collection(milvus_collection_name) - collection.load() - initial_data = query_milvus_paginated(milvus_collection_name, limit=1) - fields = [field.name for field in collection.schema.fields] - - # Connect to openGauss - opengauss_conn = connect_opengauss() - - if opengauss_conn: - # Create a table in openGauss - create_table_opengauss(opengauss_conn, opengauss_table_name, fields, collection) - - # Export data from Milvus to a CSV file - export_milvus_to_csv(milvus_collection_name, csv_file_path) - - # Import data from a CSV file to openGauss - import_csv_to_opengauss(opengauss_conn, opengauss_table_name, csv_file_path, fields) - - opengauss_conn.close() + logger.error(f"Migration failed: {e}") + exit(1) ``` @@ -233,18 +423,19 @@ python3 milvus2datavec.py ``` 4. 登录openGauss,查看数据是否完成迁移: -4.1 进入容器: -``` -$ docker exec -it bash -``` -4.2 登录`omm`超级用户: -``` -$ su omm -$ gsql -d postgres -p 5432 -``` - -4.3 查看迁移表数据量: -``` -$ select count(*) from test; -``` \ No newline at end of file + 4.1 进入容器: + ``` + $ docker exec -it bash + ``` + + 4.2 登录`omm`超级用户: + ``` + $ su omm + $ gsql -d postgres -p 5432 + ``` + + 4.3 查看迁移表数据量: + ``` + $ select count(*) from test; + ``` \ No newline at end of file diff --git a/content/zh/menu/index.md b/content/zh/menu/index.md index fa6a451e0..9281c3b79 100644 --- a/content/zh/menu/index.md +++ b/content/zh/menu/index.md @@ -910,8 +910,18 @@ headless: true - [向量数据库]({{< relref "./docs/DataVec/DataVec-Overview.md" >}}) - [快速入门指南]({{< relref "./docs/DataVec/DataVec-quickstart.md" >}}) - [向量存储引擎]({{< relref "./docs/DataVec/DataVec-architecture.md" >}}) + - [PQ]({{< relref "./docs/DataVec/PQ.md" >}}) - [工具编排使用]({{< relref "./docs/DataVec/DataVec-integrations.md" >}}) + - [使用openGauss部署Dify]({{< relref "./docs/DataVec/dify.md" >}}) + - [使用openGauss部署AnythingLLM]({{< relref "./docs/DataVec/anythingllm.md" >}}) + - [从Milvus迁移至openGauss DataVec]({{< relref "./docs/DataVec/milvus2datavec.md" >}}) + - [Python SDK对接向量数据库]({{< relref "./docs/DataVec/integrationPython.md" >}}) + - [Java SDK对接向量数据库]({{< relref "./docs/DataVec/integrationJava.md" >}}) + - [Go SDK对接向量数据库]({{< relref "./docs/DataVec/integrationGo.md" >}}) - [教程案例指导]({{< relref "./docs/DataVec/DataVec-tutorials.md" >}}) + - [打破AI黑盒,拥抱开源力量:基于openGauss+DeepSeek的本地知识库,打造你的专属AI助手!]({{< relref "./docs/DataVec/openGauss-RAG实践.md" >}}) + - [openGauss Datavec + Dify,快速搭建你的智能助手平台]({{< relref "./docs/DataVec/openGauss-Dify.md" >}}) + - [Spring Boot集成openGauss DataVec实现向量化检索]({{< relref "./docs/DataVec/openGauss-Springboot.md" >}} - [AI特性指南]({{< relref "./docs/AIFeatureGuide/AI特性.md" >}}) - [AI4DB: 数据库自治运维]({{< relref "./docs/AIFeatureGuide/AI4DB-数据库自治运维.md" >}}) - [DBMind模式说明]({{< relref "./docs/AIFeatureGuide/DBMind模式说明.md" >}}) -- Gitee