diff --git a/setup.cfg b/setup.cfg index 639f349d98249751e85b74d313c7cb048c02b409..619cb6e90180da5d3ad39a1d11419d57e0b6a7b8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = quingo -version = 0.1.4 +version = 0.1.5 author = Xiang Fu author_email = gtaifu@gmail.com use_2to3 = False diff --git a/src/quingo/core/manager.py b/src/quingo/core/manager.py index b4ec7406845c131975de3381c8acfb6f96ba787c..36198921c6ac6cad5d0c9c5bd94faea9a53dc998 100755 --- a/src/quingo/core/manager.py +++ b/src/quingo/core/manager.py @@ -9,13 +9,20 @@ import subprocess import logging from pathlib import Path from .compiler_config import get_mlir_path, get_xtext_path -from quingo.core.utils import quingo_err, quingo_msg, quingo_warning, get_logger, quingo_info -logger = get_logger((__name__).split('.')[-1]) +from quingo.core.utils import ( + quingo_err, + quingo_msg, + quingo_warning, + get_logger, + quingo_info, +) + +logger = get_logger((__name__).split(".")[-1]) def remove_comment(qu_src): - search_annotation_string = r'\/\/.*\n' - return re.sub(search_annotation_string, '', qu_src) + search_annotation_string = r"\/\/.*\n" + return re.sub(search_annotation_string, "", qu_src) def get_ret_type(qg_filename: str, qg_func_name: str): @@ -27,24 +34,25 @@ def get_ret_type(qg_filename: str, qg_func_name: str): quantum function called by the host program. qg_func_name (str) : the name of the Quingo function. """ - with open(qg_filename, 'r', encoding='utf-8') as qu_src_file: + with open(qg_filename, "r", encoding="utf-8") as qu_src_file: qu_src = qu_src_file.read() qu_src = remove_comment(qu_src) - search_string = r'\boperation\b\s+' + \ - qg_func_name + r'\s*\((.*)\)\s*:(.*)\s*\{' + search_string = r"\boperation\b\s+" + qg_func_name + r"\s*\((.*)\)\s*:(.*)\s*\{" op_def_components = re.search(search_string, qu_src) if op_def_components is None: - raise ValueError("Cannot find the operation ({}) in the given Quingo source " - "file ({}).".format(qg_func_name, qg_filename)) + raise ValueError( + "Cannot find the operation ({}) in the given Quingo source " + "file ({}).".format(qg_func_name, qg_filename) + ) - ret_type = re.sub(r'\d', '', op_def_components.groups()[1].strip()) + ret_type = re.sub(r"\d", "", op_def_components.groups()[1].strip()) return ret_type -class Runtime_system_manager(): +class Runtime_system_manager: def __init__(self, verbose=False, log_level=logging.WARNING, **kwargs): """The quingo runtime system manager. @@ -66,7 +74,7 @@ class Runtime_system_manager(): self.verbose = verbose self.log_level = log_level - self.config_execution('one_shot', 1) + self.config_execution("one_shot", 1) self.compiler_name = None # to be set self.backend = None # to be connected @@ -90,11 +98,10 @@ class Runtime_system_manager(): self.max_unroll = 100 if self.verbose: - logger.debug("Python version: {}".format( - platform.python_version())) + logger.debug("Python version: {}".format(platform.python_version())) def set_call_kernel_success(self, success): - assert(isinstance(success, bool)) + assert isinstance(success, bool) if not success: self.qasm_file_path = None self.success_on_last_execution = success @@ -113,7 +120,7 @@ class Runtime_system_manager(): Args: v (bool): True or False. """ - assert(isinstance(v, bool)) + assert isinstance(v, bool) self.verbose = v backend = self.get_backend() if backend is not None: @@ -134,9 +141,8 @@ class Runtime_system_manager(): - xtext - mlir """ - if (compiler_name not in self.supported_compilers): - raise ValueError( - "Found unsupported compiler: {}".format(compiler_name)) + if compiler_name not in self.supported_compilers: + raise ValueError("Found unsupported compiler: {}".format(compiler_name)) self.compiler_name = compiler_name @@ -145,16 +151,18 @@ class Runtime_system_manager(): def get_backend_or_connect_default(self): if self.backend is None: - quingo_info("No backend has been connected. " - "Trying to connect the default PyQCAS backend...") - if not self.connect_backend('pyqcas_quantumsim'): + quingo_info( + "No backend has been connected. " + "Trying to connect the default PyQCAS backend..." + ) + if not self.connect_backend("pyqcas_quantumsim"): raise SystemError("Cannot connect to the default backend.") return self.backend def get_backend_name(self): - '''return the name of the backend that is being used. + """return the name of the backend that is being used. An empty string will be returned if no backend has been set. - ''' + """ backend = self.get_backend() if backend is None: return "" @@ -175,24 +183,31 @@ class Runtime_system_manager(): backend_hub = Backend_hub() if not backend_hub.support(backend_name): - logger.error("The chosen backend ({}) is currently not " - "supported by qgrtsys.".format(backend_name)) + logger.error( + "The chosen backend ({}) is currently not " + "supported by qgrtsys.".format(backend_name) + ) - raise ValueError('Undefined backend ({})'.format(backend_name)) + raise ValueError("Undefined backend ({})".format(backend_name)) try: self.backend = backend_hub.get_instance(backend_name) except Exception as e: quingo_err( - "Cannot connect backend '{}' with the following error:".format(backend_name)) + "Cannot connect backend '{}' with the following error:".format( + backend_name + ) + ) quingo_err("{}".format(e)) - quingo_info("To fix this problem, you could explicitly connect another " - "backend use the the following method: \n" - " `quingo_interface.connect_backend()`\n" - " or, install the corresponding simulation backend using:\n" - " `pip install pyqcas`\n" - " or\n" - " `pip install pyqcisim`\n") + quingo_info( + "To fix this problem, you could explicitly connect another " + "backend use the the following method: \n" + " `quingo_interface.connect_backend()`\n" + " or, install the corresponding simulation backend using:\n" + " `pip install pyqcas`\n" + " or\n" + " `pip install pyqcisim`\n" + ) return False if self.backend is None: @@ -216,36 +231,39 @@ class Runtime_system_manager(): return success def config_path(self, qg_filename: str, qg_func_name: str): - '''Configure the following paths of the following files or directories: - - The project root direcotry (`prj_root_dir`). - - The build directory (`build_dir`), which is used to buffer generarted files. - Create this directory if it does not exist. - - the main file (`main_file_fn`) generated by the runtime system, which contains - the `main` operation. - - the qasm file (`qasm_file_path`) generated by the compiler. - ''' + """Configure the following paths of the following files or directories: + - The project root direcotry (`prj_root_dir`). + - The build directory (`build_dir`), which is used to buffer generarted files. + Create this directory if it does not exist. + - the main file (`main_file_fn`) generated by the runtime system, which contains + the `main` operation. + - the qasm file (`qasm_file_path`) generated by the compiler. + """ self.resolved_qg_filename = Path(qg_filename).resolve() # ensure there is a build directory in the same directory as the source file. self.prj_root_dir = Path(self.resolved_qg_filename).parent self.build_dir = self.prj_root_dir / gc.build_dirname - if self.build_dir.exists(): # clear the existing build directory to remove old files. + if ( + self.build_dir.exists() + ): # clear the existing build directory to remove old files. shutil.rmtree(str(self.build_dir)) - self.build_dir.mkdir() # create a new emtpy build dir. + self.build_dir.mkdir() # create a new emtpy build dir. # the basename of qg_filename without extension self.qg_stem = self.resolved_qg_filename.stem - self.main_file_fn = (self.build_dir / ('main_' + self.qg_stem)).with_suffix( - gc.quingo_suffix) + self.main_file_fn = (self.build_dir / ("main_" + self.qg_stem)).with_suffix( + gc.quingo_suffix + ) qasm_fn_no_ext = self.build_dir / qg_func_name backend = self.get_backend_or_connect_default() qisa_used = backend.get_qisa() - if qisa_used == 'eqasm': + if qisa_used == "eqasm": self.qasm_file_path = qasm_fn_no_ext.with_suffix(gc.eqasm_suffix) - elif qisa_used == 'qcis': + elif qisa_used == "qcis": self.qasm_file_path = qasm_fn_no_ext.with_suffix(gc.qcis_suffix) else: quingo_err("Found unsupported QISA to use: {}".format(qisa_used)) @@ -254,8 +272,8 @@ class Runtime_system_manager(): def get_last_qasm(self): if self.qasm_file_path is None or not self.qasm_file_path.is_file(): - return '' - with self.qasm_file_path.open('r') as f: + return "" + with self.qasm_file_path.open("r") as f: return f.read() def main_process(self, qg_filename: str, qg_func_name: str, *args): @@ -276,7 +294,7 @@ class Runtime_system_manager(): return False if self.verbose: - quingo_msg("Start compilation ... ", end='') + quingo_msg("Start compilation ... ", end="") # generate the quingo file which contains the main function. self.gen_main_func_file(qg_filename, qg_func_name, *args) @@ -289,20 +307,24 @@ class Runtime_system_manager(): logger.debug("The compiler exited successfully.") if not self.qasm_file_path.is_file(): - quingo_err("Error: expected qasm file ({}) has not been generated. Aborts.".format( - self.qasm_file_path)) + quingo_err( + "Error: expected qasm file ({}) has not been generated. Aborts.".format( + self.qasm_file_path + ) + ) return False # compilation finished successfully - logger.debug("The qasm file has been generated at: {}".format( - self.qasm_file_path)) + logger.debug( + "The qasm file has been generated at: {}".format(self.qasm_file_path) + ) if not self.execute(): # execute the eQASM file quingo_err("Execution failed. Abort.") return False if self.verbose: - quingo_msg('Execution finished.') + quingo_msg("Execution finished.") # read back the results self.result = self.get_backend().read_result() @@ -317,22 +339,28 @@ class Runtime_system_manager(): backend = self.get_backend_or_connect_default() if backend.available is False: raise EnvironmentError( - "The backend {} is not available.".format(backend.name())) + "The backend {} is not available.".format(backend.name()) + ) - if self.mode == 'state_vector' and not backend.is_simulator(): + if self.mode == "state_vector" and not backend.is_simulator(): raise ValueError( - "Cannot retrieve state vector from a non-simulator backend.") + "Cannot retrieve state vector from a non-simulator backend." + ) if self.verbose: quingo_msg( - "Uploading the program to the backend {}...".format(backend.name())) + "Uploading the program to the backend {}...".format(backend.name()) + ) if not backend.upload_program(self.qasm_file_path): quingo_err( - "Failed to upload the program to the backend {}.".format(backend.name())) - quingo_info(" Suggestion: are you uploading QCIS program to an eQASM backend " - "or eQASM program to a QCIS backend?\n" - " If so, please specify the compiler and backend accordingly.") + "Failed to upload the program to the backend {}.".format(backend.name()) + ) + quingo_info( + " Suggestion: are you uploading QCIS program to an eQASM backend " + "or eQASM program to a QCIS backend?\n" + " If so, please specify the compiler and backend accordingly." + ) return False if self.verbose: @@ -350,34 +378,38 @@ class Runtime_system_manager(): for r, d, f in os.walk(prj_dir): for file in f: file_path = Path(r) / file - if file_path.suffix in ['.qu', '.qfg']: + if file_path.suffix in [".qu", ".qfg"]: valid_file_list.append(file_path) - logger.debug("imported files:\n\t" + - "\n\t".join(['"{}"'.format(str(f)) for f in valid_file_list])) + logger.debug( + "imported files:\n\t" + + "\n\t".join(['"{}"'.format(str(f)) for f in valid_file_list]) + ) return valid_file_list def get_compiler_cmd(self, compiler_name): - assert(compiler_name in self.supported_compilers) + assert compiler_name in self.supported_compilers - if compiler_name == 'mlir': + if compiler_name == "mlir": quingoc_path = get_mlir_path() if quingoc_path is None: quingo_err( - "Cannot find the mlir-based quingoc compiler in the system path.") + "Cannot find the mlir-based quingoc compiler in the system path." + ) quingo_info( "To resolve this problem, you can install quingoc with two ways:\n" - "1. run the following command \"python -m quingo.install_quingoc\"\n" + '1. run the following command "python -m quingo.install_quingoc"\n' "2. Dowload quingoc from https://gitee.com/quingo/quingoc-release/releases and save " "it at a directory in the system path \n" "or configure its path by calling this method inside python:\n" - " `quingo.quingo_interface.set_mlir_compiler_path()`") + " `quingo.quingo_interface.set_mlir_compiler_path()`" + ) return None else: return '"{}"'.format(quingoc_path) - if compiler_name == 'xtext': + if compiler_name == "xtext": xtext_path = get_xtext_path() if xtext_path is None: quingo_err("Cannot find the Xtext-based Quingo compiler.") @@ -385,29 +417,32 @@ class Runtime_system_manager(): "To resolve this issue, please download the quingo.jar from " "https://github.com/Quingo/compiler_xtext/releases " "and configure its path by calling this method inside python:\n" - " `quingo.quingo_interface.set_xtext_compiler_path()`") + " `quingo.quingo_interface.set_xtext_compiler_path()`" + ) return None return 'java -jar "{}"'.format(xtext_path) def compile(self): """Compiles the quingo files and generate corresponding quantum assembly code. - Both the compiler and backend are selected based on the configuration. + Both the compiler and backend are selected based on the configuration. """ if self.compiler_name is None: - quingo_info("No Quingo compiler has been set. " - "Trying to use the default xtext-based Quingo compiler...") + quingo_info( + "No Quingo compiler has been set. " + "Trying to use the default xtext-based Quingo compiler..." + ) self.set_compiler("xtext") compiler_name = self.get_compiler() - compile_cmd_head = self.get_compiler_cmd(compiler_name) - if compile_cmd_head is None: # Failure + cmd_invoke_compiler = self.get_compiler_cmd(compiler_name) + if cmd_invoke_compiler is None: # Failure return False - if compiler_name == 'mlir': - logger.debug(self.compose_mlir_cmd(compile_cmd_head, print=True)) - compile_cmd = self.compose_mlir_cmd(compile_cmd_head, False) + if compiler_name == "mlir": + logger.debug(self.compose_mlir_cmd(cmd_invoke_compiler, print=True)) + compile_cmd = self.compose_mlir_cmd(cmd_invoke_compiler, print=False) - elif compiler_name == 'xtext': + elif compiler_name == "xtext": # the Quingo files written by the programmer # qgrtsys recursively scans the root directory of the project to get all quingo files. # however, qgrtsys will only use the files which are imported by `qg_filename` @@ -439,27 +474,31 @@ class Runtime_system_manager(): compile_files.extend(user_files) compile_files.extend(default_files) - logger.debug(self.compose_xtext_cmd( - compile_cmd_head, compile_files, print=True)) + logger.debug( + self.compose_xtext_cmd(cmd_invoke_compiler, compile_files, print=True) + ) compile_cmd = self.compose_xtext_cmd( - compile_cmd_head, compile_files, False) + cmd_invoke_compiler, compile_files, False + ) else: raise ValueError("Found undefined compiler to use.") - ret_value = subprocess.run(compile_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, shell=True) - if ret_value.stdout != '': + ret_value = subprocess.run( + compile_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + shell=True, + ) + if ret_value.stdout != "": logger.info(ret_value.stdout.strip()) - if ret_value.stderr != '': - msg = "Error message from the compiler:\n\t{}".format( - ret_value.stderr) + if ret_value.stderr != "": + msg = "Error message from the compiler:\n\t{}".format(ret_value.stderr) quingo_err(msg) logger.error(msg) - if (ret_value.returncode != 0): # failure + if ret_value.returncode != 0: # failure return False else: # success return True @@ -472,28 +511,59 @@ class Runtime_system_manager(): else: head = "" separator = "" - str_files = " ".join(['"{}"'.format(str(f)) - for f in compile_files]) - - compile_cmd = head + quingo_compiler + ' ' + str_files + \ - separator + ' -o "{}"'.format(str(self.qasm_file_path)) + \ - separator + " -s " + str(self.shared_addr) + \ - separator + " -t " + str(self.static_addr) + \ - separator + " -d " + str(self.dynamic_addr) + \ - separator + " -u " + str(self.max_unroll) + str_files = " ".join(['"{}"'.format(str(f)) for f in compile_files]) + + compile_cmd = ( + head + + quingo_compiler + + " " + + str_files + + separator + + ' -o "{}"'.format(str(self.qasm_file_path)) + + separator + + " -s " + + str(self.shared_addr) + + separator + + " -t " + + str(self.static_addr) + + separator + + " -d " + + str(self.dynamic_addr) + + separator + + " -u " + + str(self.max_unroll) + ) return compile_cmd - def compose_mlir_cmd(self, quingo_compiler, print: bool = False): + def compose_mlir_cmd(self, quingoc_path, print: bool = False): if print: - head = "\n" + header = "\n" separator = "\n\t " else: - head = "" + header = "" separator = "" - compile_cmd = head + quingo_compiler + ' ' + str(self.main_file_fn) + \ - separator + ' -o "{}"'.format(self.qasm_file_path) + # compile_cmd = ( + # head + # + quingoc_path + # + " " + # + str(self.main_file_fn) + # + separator + # + ' -o "{}"'.format(self.qasm_file_path) + # ) + + # The project root directory is added to the compiler module search path. + compile_cmd = ( + '{header}{qgc_path} {main_fn}{sep} -I {root_dir} -o "{qasm_fn}"'.format( + header=header, + qgc_path=quingoc_path, + main_fn=str(self.main_file_fn), + sep=separator, + root_dir=self.prj_root_dir, + qasm_fn=self.qasm_file_path, + ) + ) return compile_cmd @@ -509,12 +579,11 @@ class Runtime_system_manager(): qg_func_name (str) : the name of the Quingo function. args (list): a variable length of parameters passed to the quantum function """ - qg_file_content = self.resolved_qg_filename.open('r').read() + qg_file_content = self.resolved_qg_filename.open("r").read() main_func_str = self.main_func(qg_filename, qg_func_name, *args) try: - self.main_file_fn.write_text( - qg_file_content + '\n' + main_func_str) + self.main_file_fn.write_text(qg_file_content + "\n" + main_func_str) except: raise IOError("Cannot write the file: ", self.main_file_fn) @@ -529,8 +598,9 @@ class Runtime_system_manager(): var_name_list = [] arg_str_list = [] - logger.debug("calling function '{}' with parameters: {}".format( - qg_func_name, args)) + logger.debug( + "calling function '{}' with parameters: {}".format(qg_func_name, args) + ) if len(args) != 0: for (i, arg) in enumerate(args): @@ -545,28 +615,32 @@ class Runtime_system_manager(): if len(arg_strs) > 0: arg_strs = "\n " + arg_strs - if self.ret_type == 'unit': + if self.ret_type == "unit": str_ret = "" else: str_ret = "return " func_call = " {ret}{func_name}({parameters});".format( - ret=str_ret, func_name=qg_func_name, parameters=str_params) + ret=str_ret, func_name=qg_func_name, parameters=str_params + ) func_str = "\noperation main() : {} {{{}\n{}\n}}".format( - self.ret_type, arg_strs, func_call) + self.ret_type, arg_strs, func_call + ) return func_str def config_execution(self, mode: str, num_shots: int = 1): - '''Configure the execution mode to 'one_shot' or 'state_vector'. + """Configure the execution mode to 'one_shot' or 'state_vector'. When the execution mode is 'one_shot', the number of times to run the uploaded quantum circuit can be configured using the parameter `num_shots` at the same time. - ''' + """ - if mode not in ['one_shot', 'state_vector']: - raise ValueError("Found unrecognized execution mode: '{}'.".format( - mode) + "Allowed values are: 'one_shot' or 'state_vector'.") + if mode not in ["one_shot", "state_vector"]: + raise ValueError( + "Found unrecognized execution mode: '{}'.".format(mode) + + "Allowed values are: 'one_shot' or 'state_vector'." + ) self.mode = mode self.num_shots = num_shots @@ -582,21 +656,23 @@ class Runtime_system_manager(): def read_result(self, start_addr): if self.success_on_last_execution is False: - quingo_warning('Last execution fails and no result is read back.') + quingo_warning("Last execution fails and no result is read back.") return None qisa_used = self.get_backend().get_qisa() - if qisa_used == 'eqasm': + if qisa_used == "eqasm": data_trans = dt.Data_transfer() data_trans.set_data_block(self.result) pydata = data_trans.bin_to_pydata(self.ret_type, start_addr) - logger.debug( - "The data converted from the binary is: \n{}\n".format(pydata)) + logger.debug("The data converted from the binary is: \n{}\n".format(pydata)) return pydata - elif qisa_used == 'qcis': + elif qisa_used == "qcis": return self.result else: raise ValueError( - "Reading result from a program with unsupported QISA: {}".format(qisa_used)) + "Reading result from a program with unsupported QISA: {}".format( + qisa_used + ) + ) diff --git a/src/tests/pydata2bin.py b/src/tests/pydata2bin.py index ace7d17368f9672782bb532eca3043a1794092bc..a1d7d32c23cfc54e32e5959dfe7e64b4cc4068a2 100755 --- a/src/tests/pydata2bin.py +++ b/src/tests/pydata2bin.py @@ -1,32 +1,29 @@ -import qgrtsys.global_config as gc -import qgrtsys.core.data_transfer as dt +import quingo.global_config as gc +import quingo.core.data_transfer as dt import struct def int2bytes(value): - '''Convert a Python interger to Quingo binary data. - ''' - assert(isinstance(value, int)) + """Convert a Python interger to Quingo binary data.""" + assert isinstance(value, int) return value.to_bytes(gc.QU_INT_SIZE, byteorder=gc.endian, signed=True) def bool2bytes(value): - '''Convert a Python boolean value to Quingo binary data. - ''' - assert(isinstance(value, bool)) + """Convert a Python boolean value to Quingo binary data.""" + assert isinstance(value, bool) return value.to_bytes(gc.QU_BOOL_SIZE, byteorder=gc.endian, signed=True) def double2bytes(value): - '''Convert a Python boolean value to Quingo binary data. - ''' - assert(isinstance(value, float)) - return struct.pack(' [length, ele0, ele1, ele2, ...] # the pointer is written at the stack part, and the body at heap part. @@ -128,9 +125,9 @@ class Pydata_2_qg_bin(): return end_pos def write_tuple(self, tuple_val: tuple, pos: int): - '''Convert the python tuple `tuple_val` into quingo binary and write this binary into buf + """Convert the python tuple `tuple_val` into quingo binary and write this binary into buf with the starting address being `pos`. - ''' + """ for ele in tuple_val: pos = self.convert(ele, pos) @@ -143,26 +140,26 @@ class Pydata_2_qg_bin(): return len(list_val) * self.stack_part_size(list_val[0]) def stack_part_size(self, pydata: str): - '''The binary converted from the python data contains two parts: the stack part and the + """The binary converted from the python data contains two parts: the stack part and the heap part. This function calculates the size of stack part of the quingo binary converted from the python value `pydata`. - ''' + """ data_type = dt.check_if_param_type(pydata) res = 0 - if data_type == 'int': + if data_type == "int": res = gc.QU_INT_SIZE - elif data_type == 'bool': + elif data_type == "bool": res = gc.QU_BOOL_SIZE - elif data_type == 'list': + elif data_type == "list": res = gc.QU_INT_SIZE - elif data_type == 'double': + elif data_type == "double": res = gc.QU_DOUBLE_SIZE - elif data_type == 'tuple': + elif data_type == "tuple": for ele in pydata: res += self.stack_part_size(ele) @@ -172,7 +169,7 @@ class Pydata_2_qg_bin(): return res def convert(self, pydata, pos: int = 0): - '''Convert the pydata into quingo binary, and write this binary into buf. + """Convert the pydata into quingo binary, and write this binary into buf. Args: - pydata: the python data to convert @@ -180,32 +177,33 @@ class Pydata_2_qg_bin(): Return: - int: the address of buf to allocate the next binary data - ''' + """ - assert(isinstance(pos, int)) + assert isinstance(pos, int) data_type = dt.check_if_param_type(pydata) - self.heap_head = max(self.heap_head, pos + - self.stack_part_size(pydata)) + self.heap_head = max(self.heap_head, pos + self.stack_part_size(pydata)) - if data_type == 'int': + if data_type == "int": pos = self.write_int(pydata, pos) - elif data_type == 'bool': + elif data_type == "bool": pos = self.write_bool(pydata, pos) - elif data_type == 'double': + elif data_type == "double": pos = self.write_double(pydata, pos) - elif data_type == 'list': + elif data_type == "list": pos = self.write_list(pydata, pos) - elif data_type == 'tuple': + elif data_type == "tuple": pos = self.write_tuple(pydata, pos) else: - raise ValueError("Found undefined types ({}).\n\tSupported types include " - "int, bool, double, list, and tuple.".format(pydata)) + raise ValueError( + "Found undefined types ({}).\n\tSupported types include " + "int, bool, double, list, and tuple.".format(pydata) + ) return pos diff --git a/src/tests/test_dt_long_bool_arr.py b/src/tests/test_dt_long_bool_arr.py index 2917fcf3cc3aed6f50b7d4638587d0f53c65ad71..456c25d8413e41e4eea348cf8ae3088dc0033d1e 100755 --- a/src/tests/test_dt_long_bool_arr.py +++ b/src/tests/test_dt_long_bool_arr.py @@ -1,25 +1,25 @@ -import qgrtsys.core.data_transfer as dt +import quingo.core.data_transfer as dt import pathlib def test_read_long_bool_arr(): cur_dir = pathlib.Path(__file__).parent.absolute() - data_dir = cur_dir / 'data' - bool_arr_fn = data_dir / 'res_gen_ran_seq.bin' + data_dir = cur_dir / "data" + bool_arr_fn = data_dir / "res_gen_ran_seq.bin" data_trans = dt.Data_transfer() - with open(bool_arr_fn, 'rb') as bin_file: + with open(bool_arr_fn, "rb") as bin_file: result = bin_file.read() data_trans.set_data_block(result) - ret_type = 'bool[]' + ret_type = "bool[]" res = data_trans.bin_to_pydata(ret_type, start_addr=0) - assert(len(res) == 10000) + assert len(res) == 10000 -if __name__ == '__main__': +if __name__ == "__main__": test_read_long_bool_arr() diff --git a/src/tests/test_dt_roundtrip.py b/src/tests/test_dt_roundtrip.py index 8419aee60df4e4d8efca42eeff38dc4ee004056a..7467642a20918eee12009249180464c5970490c3 100755 --- a/src/tests/test_dt_roundtrip.py +++ b/src/tests/test_dt_roundtrip.py @@ -1,6 +1,6 @@ -from qgrtsys.tests.pydata2bin import Pydata_2_qg_bin -import qgrtsys.core.data_transfer as dt -from qgrtsys.core.utils import * +from pydata2bin import Pydata_2_qg_bin +import quingo.core.data_transfer as dt +from quingo.core.utils import * import pytest import random @@ -26,21 +26,23 @@ def check_equal(val1, val2): if type(val1) is not type(val2): return False - if (isinstance(val1, float) or isinstance(val2, float)): + if isinstance(val1, float) or isinstance(val2, float): return val1 == pytest.approx(val2, rel=1e-7) - elif(isinstance(val1, int) or isinstance(val1, bool)): + elif isinstance(val1, int) or isinstance(val1, bool): return val1 == val2 - elif(isinstance(val1, list) or isinstance(val1, tuple)): - if (len(val1) != len(val2)): + elif isinstance(val1, list) or isinstance(val1, tuple): + if len(val1) != len(val2): return False return all(map(lambda x, y: check_equal(x, y), val1, val2)) else: - raise ValueError("Found unsupported value type ({}). \n\tCurrently only int, bool, float, " - "list, and tuple are supported.".format(type(val1))) + raise ValueError( + "Found unsupported value type ({}). \n\tCurrently only int, bool, float, " + "list, and tuple are supported.".format(type(val1)) + ) def roundtrip(type_str, value): @@ -62,14 +64,20 @@ def roundtrip(type_str, value): logger.debug("The decoded data is:{}\n".format(res_py)) - assert(check_equal(res_py, value)) + assert check_equal(res_py, value) def test_roundtrip(): n = 20 while n > 0: - type_str = gen_type_str(weight_list=4, weight_tuple=4, - weight_bool=2, weight_int_double=10, layer=0, max_num=5) + type_str = gen_type_str( + weight_list=4, + weight_tuple=4, + weight_bool=2, + weight_int_double=10, + layer=0, + max_num=5, + ) if type_str == None: continue @@ -79,7 +87,9 @@ def test_roundtrip(): n -= 1 -def gen_type_str(weight_list, weight_tuple, weight_bool, weight_int_double, layer, max_num): +def gen_type_str( + weight_list, weight_tuple, weight_bool, weight_int_double, layer, max_num +): """This function generates a random string of type. Args: @@ -92,7 +102,7 @@ def gen_type_str(weight_list, weight_tuple, weight_bool, weight_int_double, laye max_num (int) : the maximum number of the values in a tuple. """ - if(layer >= 5): + if layer >= 5: return tmp_bool = weight_bool @@ -105,52 +115,68 @@ def gen_type_str(weight_list, weight_tuple, weight_bool, weight_int_double, laye weight_total = weight_list + weight_tuple + weight_bool + weight_int_double # Select the type according to the weight - if(layer == 0): + if layer == 0: weight_bool = 1 weight_int_double = 1 - if(index < weight_list / weight_total): - chosed_type = 'list' - elif(index < (weight_list + weight_tuple) / weight_total): - chosed_type = 'tuple' - elif(index < (weight_list + weight_tuple + weight_bool) / weight_total): - chosed_type = 'bool' - elif(index < (weight_list + weight_tuple + weight_bool + 0.25*weight_int_double) / weight_total): - chosed_type = 'double' + if index < weight_list / weight_total: + chosed_type = "list" + elif index < (weight_list + weight_tuple) / weight_total: + chosed_type = "tuple" + elif index < (weight_list + weight_tuple + weight_bool) / weight_total: + chosed_type = "bool" + elif ( + index + < (weight_list + weight_tuple + weight_bool + 0.25 * weight_int_double) + / weight_total + ): + chosed_type = "double" else: - chosed_type = 'int' + chosed_type = "int" weight_bool = tmp_bool weight_int_double = tmp_int_double # If the tuple is chosen, the function is called to add several elements to the tuple. - if(chosed_type == 'tuple'): + if chosed_type == "tuple": res = "(" num = random.randint(2, max_num) cnt = 0 for i in range(num): - tmp = gen_type_str(weight_list, weight_tuple-1, - weight_bool, weight_int_double, layer+1, max_num) - if(tmp): + tmp = gen_type_str( + weight_list, + weight_tuple - 1, + weight_bool, + weight_int_double, + layer + 1, + max_num, + ) + if tmp: cnt += 1 - res += tmp + ',' - if(cnt < 2): + res += tmp + "," + if cnt < 2: return - return res[:-1]+')' + return res[:-1] + ")" # If the list is chosen, the function is called to genetate a type for the list. - elif(chosed_type == 'list'): - tmp = gen_type_str(weight_list-1, weight_tuple, - weight_bool, weight_int_double, layer+1, max_num) - if(tmp): - return tmp + '[]' + elif chosed_type == "list": + tmp = gen_type_str( + weight_list - 1, + weight_tuple, + weight_bool, + weight_int_double, + layer + 1, + max_num, + ) + if tmp: + return tmp + "[]" return - elif(chosed_type == 'bool'): - return 'bool' - elif(chosed_type == 'int'): - return 'int' - elif(chosed_type == 'double'): - return 'double' + elif chosed_type == "bool": + return "bool" + elif chosed_type == "int": + return "int" + elif chosed_type == "double": + return "double" else: print("Error: No such a type!") exit(0) @@ -165,39 +191,39 @@ def gen_ran_value(type_str, max_num=5): """ res = [] - type_str = type_str.replace(' ', '') - if(type_str[-2:] == '[]'): - num = random.randint(1, max_num+1) + type_str = type_str.replace(" ", "") + if type_str[-2:] == "[]": + num = random.randint(1, max_num + 1) for i in range(num): res.append(gen_ran_value(type_str[:-2], max_num)) return res - elif(type_str[0] == '(' and type_str[-1] == ')'): + elif type_str[0] == "(" and type_str[-1] == ")": sub = list(type_str[1:-1]) left = 0 for i in range(len(sub)): - if(sub[i] == '('): + if sub[i] == "(": left += 1 - elif(sub[i] == ')'): + elif sub[i] == ")": left -= 1 - elif(sub[i] == ','): - if(left == 0): - sub[i] = '$' - sub = (''.join(sub)).split('$') + elif sub[i] == ",": + if left == 0: + sub[i] = "$" + sub = ("".join(sub)).split("$") for i in sub: res.append(gen_ran_value(i, max_num)) return tuple(res) - elif(type_str == 'int'): + elif type_str == "int": return random.randint(-10000, 10000) - elif(type_str == 'double'): + elif type_str == "double": return random.uniform(-10000, 10000) - elif(type_str == 'bool'): - if(random.uniform(0, 1) < 0.5): + elif type_str == "bool": + if random.uniform(0, 1) < 0.5: return True return False else: @@ -205,5 +231,5 @@ def gen_ran_value(type_str, max_num=5): exit(0) -if __name__ == '__main__': +if __name__ == "__main__": test_roundtrip()