2 Star 0 Fork 1

seraph2047/excel2json

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ExcelReader.py 16.43 KB
一键复制 编辑 原始数据 按行查看 历史
from datetime import datetime
import Logger
import xlrd
from xlrd import Book
from xlrd.sheet import Sheet
import Util
from ExcelData import ExcelData, TableData, FieldData, FieldType, EData, LoadConfig, IndexType, Platform, Union
from typing import List, Dict
def importFile(data: ExcelData):
log: Logger = Logger.Logger()
"""导入excel文件,基础分析CFG页"""
log.logNotice("开始导入Excel文件数据...")
lastImportTime = datetime.now()
result: bool = True
edata = EData()
workbook: Book = xlrd.open_workbook(data.filePath)
# 加载CFG配置页,获取基础配置
if not Util.isSheetExist(workbook, edata.setting.readerCfgSheetName):
log.logError("严重错误! [" + data.fileName + "]找不到配置页:" + edata.setting.readerCfgSheetName)
return False
cfgSheet: Sheet = workbook.sheet_by_name(edata.setting.readerCfgSheetName)
defineRow: int = -1
startRow: int = -1
for r in range(0, cfgSheet.nrows):
row = cfgSheet.row(r)
if (row[0].value == "导入表名"):
# 配置页“导入表名”右边单元格放着导入表名的表
log.logNotice("找到配置[导入表名]" + row[1].value)
defineRow = r
startRow = r + 1
if startRow <= -1:
# 如果找不到“导入表名”则CFG页格式错误
log.logError("严重错误! [" + data.fileName + "]找不到配置页找不到配置数据!")
return False
'''是否正常读入标记'''
tmpTables: Dict[str, TableData] = dict()
'''这里使用的临时表用SheetName做Key'''
# 找出配置页对应的列,即某些字段没了也能跑
headRow = cfgSheet.row(defineRow)
ordSheetName: int = -1 # 导入表名
ordIndexType: int = -1 # 索引方式
ordKey: int = -1 # 主键列
ordWeight: int = -1 # 加载权重
ordPlatform: int = -1 # 导出平台
ordUnion: int = -1 # 字段关联
ordGroup: int = -1 # 加载组别
ordLoadRelation: int = -1 # 关联加载
ordCustomField: int = -1 # 自定义索引名
for c in range(0, cfgSheet.ncols):
if (Util.hasSpecifyString(headRow[c].value, "导入表名")):
ordSheetName = c
elif (Util.hasSpecifyString(headRow[c].value, "索引方式")):
ordIndexType = c
elif (Util.hasSpecifyString(headRow[c].value, "主键列")):
ordKey = c
elif (Util.hasSpecifyString(headRow[c].value, "加载先后权重")):
ordWeight = c
elif (Util.hasSpecifyString(headRow[c].value, "导出平台")):
ordPlatform = c
elif (Util.hasSpecifyString(headRow[c].value, "字段关联")):
ordUnion = c
elif (Util.hasSpecifyString(headRow[c].value, "组别")):
ordGroup = c
elif (Util.hasSpecifyString(headRow[c].value, "关联加载")):
ordLoadRelation = c
elif (Util.hasSpecifyString(headRow[c].value, "自定义索引名")):
ordCustomField = c
for r in range(startRow, cfgSheet.nrows):
row = cfgSheet.row(r)
strSheetName: str = row[ordSheetName].value
sheetName = strSheetName.strip(" ").replace("\n", "")
if sheetName == "":
# 如果表名都没填写则不继续
continue
strIndexType: str = row[ordIndexType].value # 索引方式
strKey: str = row[ordKey].value # 主键列
key = strKey.strip(" ").replace("\n", "")
loadWeight: int = Util.parseInt(row[ordWeight].value, 0) # 加载权重
strPlatform: str = row[ordPlatform].value # 导出平台
strUnion: str = row[ordUnion].value # 字段关联
strGroup: str = row[ordGroup].value # 加载组别
strGroup = strGroup.strip(" ").replace("\n", "")
strLoadRelation: str = row[ordLoadRelation].value # 关联加载
strCustomField: str = ""
if (ordCustomField > -1):
strCustomField = row[ordCustomField].value # 自定义索引字段名
# 把信息读取加载配置内
loadConfig: LoadConfig = LoadConfig()
# 提取主键并写入
key = key.replace(",", ",").replace("\n", "")
arr = key.split(",")
loadConfig.keyFieldStrings = []
for i in range(0, len(arr)):
nKey: str = arr[i].strip(" ")
if nKey != "":
loadConfig.keyFieldStrings.append(nKey)
if len(loadConfig.keyFieldStrings) < 1:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,索引方式主键/非空键必须为1个以上:" + key)
result = False
continue
# 提取索引方式
loadConfig.type = IndexType.parseFromValue(strIndexType)
# 判断索引方式,并分析与记录主键
if loadConfig.type is IndexType.NULL:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,索引方式填写不正确:" + strIndexType)
result = False
continue
elif loadConfig.type is IndexType.MultKeyMap:
if len(loadConfig.keyFieldStrings) < 2:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,mkMap索引方式的主键应该为多个:" + key)
result = False
continue
# 加载权重
loadConfig.loadWeight = loadWeight
# 自定义字段名
loadConfig.customDataFullName = strCustomField.strip()
# 加载导出平台设定
loadConfig.exportPlatforms = dict()
strPlatform = strPlatform.replace("\n", "")
strPlatform = strPlatform.replace(" ", "")
strPlatform = strPlatform.replace(",", ",")
strPlatform = strPlatform.replace(u"/", ",")
arrPlatform = strPlatform.split(",")
for i in range(0, len(arrPlatform)):
platform: Platform = Platform.parseFromValue(arrPlatform[i])
if platform is Platform.NULL:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,导出平台填写不正确:" + strPlatform)
result = False
continue
loadConfig.exportPlatforms[platform.value] = platform
if len(loadConfig.exportPlatforms) < 1:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,导出平台不能为空:" + strPlatform)
result = False
continue
# 加载字段关联
arrUnion = strUnion.replace("\n", "").split("|")
if len(arrUnion) > 0:
loadConfig.unions = []
for i in range(0, len(arrUnion)):
strTmp = arrUnion[i].strip(" ")
if len(strTmp) > 0:
union: Union = Union.parseFromValue(arrUnion[i])
if union is None:
log.logError("错误配置!! [" + strSheetName + "]的配置总表中,字段关联配置格式错误:" + strUnion)
result = False
continue
loadConfig.unions.append(union)
# 加载组别
if strGroup == "":
# 如果不填组别就设置为default默认组
loadConfig.group = "default"
else:
loadConfig.group = strGroup
# 关联加载
strLoadRelation = strLoadRelation.strip(" ").replace("\n", "")
strLoadRelation = strLoadRelation.replace(",", ",")
strLoadRelation = strLoadRelation.replace(" ", "")
strLoadRelation = strLoadRelation.replace(u"/", ",")
arrLoadRelation = strLoadRelation.split(",")
loadConfig.loadRelations = []
for relation in arrLoadRelation:
if relation != "":
loadConfig.loadRelations.append(relation)
table: TableData
if sheetName in tmpTables.keys():
# 如果之前已有则不读取
table = tmpTables[sheetName]
else:
# 检查是否有对应的Sheet名
if not Util.isSheetExist(workbook, sheetName):
log.logError("错误配置!! 表名不存在:" + sheetName)
continue
# 还没有对应Table,需要创建
table = TableData()
table.sheetName = sheetName
table.lastExecuteTime = lastImportTime
table.loadConfigs = []
# 获得页签
sheet: Sheet = workbook.sheet_by_name(sheetName)
# logs.append(":::: 导入[" + sheetName + "]配置表..")
isRead: bool = readTableData(edata, table, sheet, log)
if isRead:
tmpTables[sheetName] = table
else:
log.logError("严重错误! 导入[" + sheetName + "]配置表,失败!")
result = False
# 把必须字段给标上不能缺,并且加入keyFields
loadConfig.keyFields = []
for i in range(0, len(loadConfig.keyFieldStrings)):
keyStr: str = loadConfig.keyFieldStrings[i]
isFound: bool = False
for field in table.fields:
if keyStr == field.fieldKey:
# 第一个键为主键
if i == 0:
if loadConfig.type != IndexType.List:
field.isMainKey = True
# 只要是填在'主键列/非空列'都视为不能为空
field.canNotEmpty = True
loadConfig.keyFields.append(field)
isFound = True
if isFound is False:
log.logError("严重错误! 导入[" + sheetName + "]配置表,CFG指定主键[" + keyStr + "]不存在!")
result = False
# 检查loadConfig有没有重复配置
for cfg in table.loadConfigs:
if cfg.type is loadConfig.type:
if loadConfig.type is IndexType.List:
log.logError(
"严重错误! [" + data.fileName + "]配置表中导入表[" + table.sheetName + "]的List索引方式超过1个!")
result = False
elif loadConfig.type is IndexType.MultKeyMap:
# 多键值表判断
if len(loadConfig.keyFields) != len(cfg.keyFields):
continue
samCount: int = 0
for f1 in loadConfig.keyFields:
for f2 in cfg.keyFields:
if f1.fieldKey == f2.fieldKey:
samCount += 1
if samCount >= len(cfg.keyFields):
log.logError(
"严重错误! [" + data.fileName + "]配置表中导入表[" + table.sheetName + "]的mkMap索引存在多个相同!")
result = False
else:
# 但键值表判断
if cfg.keyFields[0].fieldKey == loadConfig.keyFields[0].fieldKey:
log.logError(
"严重错误! [" + data.fileName + "]配置表中导入表[" + table.sheetName + "]中存在多个相同索引方式相同键值索引!")
result = False
# 把loadConfig添加到配置表中
table.loadConfigs.append(loadConfig)
table.syncExportPlatformsFromConfigs() #根据加载配置,算出本表应该对应导出的平台有什么
# 检测是否有重复数据表导入
for eFile in edata.excelDatas.selectedDatas.values():
for eTable in eFile.tables.values():
for tmpTable in tmpTables.values():
if eTable.jsonName == tmpTable.jsonName and eFile.fileName != data.fileName:
log.logError(
"严重错误! [" + data.fileName + "]配置表中,有数据表重复:" + tmpTable.sheetName + " ->" + tmpTable.jsonName)
result = False
# 最后把处理完的TableData以jsonname为键增加到表内
data.tables = dict()
for tmpTable in tmpTables.values():
data.tables[tmpTable.jsonName] = tmpTable
if len(data.tables) < 1:
log.logError("严重错误! [" + data.fileName + "]配置表,没有任何数据被导入!")
return False
data.lastExecuteTime = lastImportTime
return result
def readTableData(edata: EData, table: TableData, sheet: Sheet, log: Logger):
"""解析字段数据"""
# 提取json表名
tbStr: str = sheet.row(0)[0].value
if len(tbStr) < 1:
log.logError("错误配置!! 导出表Json名[" + tbStr + "]未找到,请在表格0行0列声明。格式: 表名+:+导出json名")
return False
tbStr = tbStr.replace(":", ":") # 对付策划填错全角逗号
arr = tbStr.split(":")
if len(arr) != 2:
log.logError("错误配置!! 导出表Json名[" + tbStr + "]未找到,请在表格0行0列声明。格式: 表名+:+导出json名")
return False
jsonName = arr[1].strip(" ")
if len(jsonName) < 1:
log.logError("错误配置!! 导出表Json名[" + tbStr + "]未找到,请在表格0行0列声明。格式: 表名+:+导出json名")
return False
table.jsonName = jsonName
# 获取控制字段在哪行
table.fieldDataRowIndex = -1
for r in range(0, sheet.nrows):
row = sheet.row(r)
if row[0].value == "名称":
table.fieldNameRowIndex = r
if r > table.fieldDataRowIndex:
table.fieldDataRowIndex = r
if row[0].value == "键名":
table.fieldKeyRowIndex = r
if r > table.fieldDataRowIndex:
table.fieldDataRowIndex = r
if row[0].value == "类型":
table.fieldTypeRowIndex = r
if r > table.fieldDataRowIndex:
table.fieldDataRowIndex = r
if row[0].value == "约束":
table.fieldRestrainRowIndex = r
if r > table.fieldDataRowIndex:
table.fieldDataRowIndex = r
# 获得数据行在哪里(最大行数的字段声明关键字+1则是数据开始行)
table.fieldDataRowIndex += 1
table.fields = []
for col in range(1, sheet.ncols): # 从第二列开始,因为第一列是行定义
field: FieldData = getFields(edata, table, sheet, col, log)
if field != None:
table.fields.append(field)
return True
def getFields(edata: EData, table: TableData, sheet: Sheet, col: int, log: Logger):
"""解析字段数据"""
# 获取字段定义属性
row = sheet.row(table.fieldNameRowIndex)
fName: str = row[col].value
fName = fName.strip(" ")
fName = fName.replace("\n", "") # 去掉字段命名中无谓的换行符
row = sheet.row(table.fieldKeyRowIndex)
fKey: str = row[col].value
fKey = fKey.strip(" ")
row = sheet.row(table.fieldTypeRowIndex)
fType: str = row[col].value
fType = fType.strip(" ")
# 约束行可选,可以没有此行
fRestrain: str = ""
if table.fieldRestrainRowIndex > 0:
row = sheet.row(table.fieldRestrainRowIndex)
fRestrain: str = row[col].value
fRestrain = fRestrain.strip(" ")
if fKey == "":
return None
if fName == "":
log.logWarning("提示: 表[" + table.jsonName + "]中, 键值[" + fKey + "]的中文名为空")
return None
if fType == "":
log.logError("错误配置!! 表[" + table.jsonName + "]中, 键值[" + fKey + "]的类型定义不能为空")
return None
field: FieldData = FieldData()
field.fieldName = fName
field.fieldKey = fKey
field.fieldType = fType
field.colunmOrd = col
# if fType == "int":
# field.fieldType = FieldType.Integer
# elif fType == "str":
# field.fieldType = FieldType.String
# elif fType == "long":
# field.fieldType = FieldType.Long
# elif fType == "float":
# field.fieldType = FieldType.Float
# elif fType == "double":
# field.fieldType = FieldType.Double
# elif (fType == "bool"):
# field.fieldType = FieldType.Boolean
# else:
# field.fieldType = FieldType.Unknow
# log.logError("错误配置!! 表[" + table.jsonName + "]中, 键值[" + fKey + "]的类型定义错误:" + fType)
# return None
field.fieldType = FieldType.getFieldType(fType)
if field.fieldType == FieldType.Unknow:
log.logError("错误配置!! 表[" + table.jsonName + "]中, 键值[" + fKey + "]的类型定义错误:" + fType)
return None
field.fieldRestrain = fRestrain
return field
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/seraph2047/excel2json.git
git@gitee.com:seraph2047/excel2json.git
seraph2047
excel2json
excel2json
master

搜索帮助