Ai
2 Star 0 Fork 0

陌路微尘/CustomDataProcess

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
demo.py 13.37 KB
一键复制 编辑 原始数据 按行查看 历史
陌路微尘 提交于 2022-09-04 18:58 +08:00 . 日志转换为可配置
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""
@version: 0.1
@author: 张宇洁
@contact: a_ff1984@163.com
@site: https://gitee.com/InsignificantDust
@software: PyCharm
@file: demo.py
@time: 2021/7/2 11:10
"""
import sys, os
import pandas as pd
from PyQt5.QtCore import QCoreApplication, Qt
from PyQt5.QtWidgets import (QWidget,
QPushButton,
QHBoxLayout,
QVBoxLayout,
QFileDialog,
QLineEdit,
QApplication,
QLabel,
QMessageBox,
QComboBox)
from PyQt5.QtGui import QFont, QIcon
from exceptions import FileTypeException, SheetIndexError
from tools import Logger
import numpy as np
import utils
from configuration import configuration
log_file = configuration.log.file
log_dir = os.path.dirname(log_file)
if not os.path.exists(log_file):
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
with open(log_file, 'w') as f:
f.write('')
log = Logger()(configuration.log.name,
log_file,
eval(configuration.log.maxBytes),
configuration.log.backupCount,
configuration.log.fmt,
configuration.log.level)
class Graph(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
pd.set_option("display.precision", 2)
def init_ui(self):
log.info("开始渲染界面")
self.input_in_file = QLineEdit()
self.input_in_file_label = QLabel("源文件")
self.input_in_file_btn = QPushButton("请选择分析文件")
self.input_in_file_btn.clicked.connect(self.in_file_dialog)
self.input_sheet_name = QLineEdit()
self.input_sheet_name_label = QLabel("请输入工作表名称")
self.input_out_file = QLineEdit()
self.input_out_file_label = QLabel("目标位置")
self.input_out_file_btn = QPushButton("请选择输出目录")
self.input_out_file_btn.clicked.connect(self.out_file_dialog)
self.input_combox = QComboBox()
self.input_combox_label = QLabel("包税类型")
self.input_combox.addItem("包税")
self.input_combox.addItem("不包税")
self.input_combox.addItem("关税调整")
self.input_combox.activated[str].connect(self.on_activated)
exec_button = QPushButton("执行")
exit_button = QPushButton("退出")
exec_button.clicked.connect(self.execute)
exit_button.clicked.connect(QCoreApplication.instance().quit)
label = QLabel(configuration.config.label)
label.setAlignment(Qt.AlignCenter)
font = QFont("Arial", 20, 75)
label.setFont(font)
in_file_hbox = QHBoxLayout()
in_file_hbox.addStretch(1)
in_file_hbox.addWidget(self.input_in_file_label)
in_file_hbox.addWidget(self.input_in_file)
in_file_hbox.addWidget(self.input_in_file_btn)
in_sheet_name_hbox = QHBoxLayout()
in_sheet_name_hbox.addWidget(self.input_sheet_name_label)
in_sheet_name_hbox.addWidget(self.input_sheet_name)
out_file_hbox = QHBoxLayout()
out_file_hbox.addStretch(1)
out_file_hbox.addWidget(self.input_out_file_label)
out_file_hbox.addWidget(self.input_out_file)
out_file_hbox.addWidget(self.input_out_file_btn)
in_combo_hbox = QHBoxLayout()
in_combo_hbox.addWidget(self.input_combox_label)
in_combo_hbox.addWidget(self.input_combox)
btn_hbox = QHBoxLayout()
btn_hbox.addStretch(1)
btn_hbox.addWidget(exit_button)
btn_hbox.addWidget(exec_button)
vbox = QVBoxLayout()
vbox.addStretch(1)
vbox.addWidget(label)
vbox.addStretch(1)
vbox.addLayout(in_file_hbox)
vbox.addLayout(in_sheet_name_hbox)
vbox.addLayout(in_combo_hbox)
vbox.addLayout(out_file_hbox)
vbox.addLayout(btn_hbox)
self.setLayout(vbox)
self.setGeometry(300, 300, 350, 120)
self.setWindowTitle(configuration.config.title)
self.setWindowIcon(QIcon("logo.ico"))
self.show()
def in_file_dialog(self):
fname = QFileDialog.getOpenFileName(self, "选择文件")
self.input_in_file.setText(fname[0])
log.info("您选择要处理的文件:%s" % fname[0])
def out_file_dialog(self):
f_dialog = QFileDialog(self, "选择输出位置")
f_dialog.setFileMode(QFileDialog.Directory)
fname = f_dialog.getOpenFileName()
self.save_folder = os.path.dirname(fname[0])
self.input_out_file.setText(self.save_folder)
log.info("您选择将处理结果文件存放在'%s'" % self.save_folder)
def show_message_box(self, level, text):
msg = QMessageBox()
msg.setIcon(level)
msg.setText(text)
msg.setWindowTitle("message")
msg.exec_()
def on_activated(self, text):
log.info("选择的包税类型为:%s" % (text))
if text == "包税":
self.process_type = "in_vax_process"
elif text == "不包税":
self.process_type = "not_in_vax_process"
elif text == "关税调整":
self.process_type = "pcs_process"
else:
raise TypeError("提供的数据处理类型'%s'不支持, 支持的数据处理类型为'包税', '不包税', '关税调整'" % (text))
def execute(self):
try:
log.info("开始处理数据")
self.read_file()
self.process_data()
self.save_file()
log.info("数据处理完成")
except FileTypeException as e:
log.exception(str(e))
self.show_message_box(QMessageBox.Critical, str(e))
except SheetIndexError as e:
log.exception(str(e))
self.show_message_box(QMessageBox.Critical, str(e))
except Exception as e:
log.exception(str(e))
self.show_message_box(QMessageBox.Critical, str(e))
def get_sheet_name(self):
self.sheet_name = self.input_sheet_name.text()
def is_valid_sheet_name(self):
sheet = getattr(self, "sheet_name", "")
return sheet == ""
def read_file(self):
file = self.input_in_file.text()
log.info("读取文件内容%s" % file)
self.get_sheet_name()
if self.is_valid_sheet_name():
raise SheetIndexError("工作表错误:工作表名称不能为空!")
if not self.is_xlsx_subfix(file):
raise FileTypeException("文件错误:期望的文件后缀是xls, xlsx, 实际的得到的文件是:%s" % file)
self._data = pd.read_excel(file, sheet_name=self.sheet_name, header=0)
def is_xlsx_subfix(self, file):
return file.endswith(".xls") or file.endswith(".xlsx")
def save_file(self):
folder = getattr(self, "save_folder", "")
if folder == "":
file = "dataProcessResult.xlsx"
else:
file = "{}/dataProcessResult.xlsx".format(folder)
log.info("正在保存结果到文件 '%s'" % file)
try:
self._final_data.to_excel(file)
self.show_message_box(QMessageBox.Information, "数据处理成功,已保存到文件:%s" % file)
except Exception as e:
log.exception(str(e))
self.show_message_box(QMessageBox.Critical, str(e))
def process_data(self):
if not hasattr(self, "process_type"):
self.process_type = "in_vax_process"
if self.process_type in ["in_vax_process", "not_in_vax_process"]:
self._data.loc[:, ["运单号", "英文品名", "申报单价", "材质"]] = self._data.loc[:, ["运单号", "英文品名", "申报单价", "材质"]].astype("string")
getattr(self, self.process_type)()
def in_vax_process(self):
log.info("正在处理包税数据,请耐心等待……")
tmp_data_frame = self.box_duplicate("英文品名")
self.aggrate(tmp_data_frame, "英文品名")
self.data_reduce()
del tmp_data_frame
def not_in_vax_process(self):
log.info("正在处理不包税数据,请耐心等待……")
temp_data_frame = self.box_duplicate("英文品名", "材质", "申报单价")
self.aggrate(temp_data_frame, "英文品名", "材质", "申报单价")
self.data_reduce()
del temp_data_frame
def pcs_process(self):
log.info("正在计算pcs, 请耐心等待……")
tmp_data_frame = self._data.copy()
tmp_data_frame.dropna(inplace=True)
tmp_data_frame.loc[:, ["PCs", "重量"]] = tmp_data_frame.loc[:, ["PCs", "重量"]].astype("int")
print(tmp_data_frame)
tmp_data_frame.set_index("S/N", inplace=True)
for index in tmp_data_frame.index:
tax = tmp_data_frame.loc[index, "关税"]
cn_name = tmp_data_frame.loc[index, "中文品名"]
if tax < configuration.config.tax_ignore_limit:
continue
pcs = tmp_data_frame.loc[index, "PCs"]
weight = tmp_data_frame.loc[index, "重量"]
price = tmp_data_frame.loc[index, "Unit Price"]
tax_rate = tmp_data_frame.loc[index, "税率"]
print(tax_rate)
log.info("品名:%s,关税:%s, pcs: %s, 重量:%s,开始计算" % (cn_name, tax, pcs, weight))
pcs_num = utils.anliquates(pcs, weight, price, tax_rate)
if pcs_num is None:
log.info("计算结果:无合适数据")
continue
cal_pcs = pcs // pcs_num
tmp_data_frame.loc[index, "中文品名"] = "%s %d pcs" % (tmp_data_frame.loc[index, "中文品名"], pcs_num)
tmp_data_frame.loc[index, "PCs"] = cal_pcs
log.info("计算结果: 品名:%s %s pcs,关税:%s, pcs: %s" % (cn_name, pcs_num, tax, cal_pcs))
self._final_data = tmp_data_frame
del tmp_data_frame
def mk_help_column(self, *help_columns):
"""
创建辅助数据列
:param help_columns: 同箱产品所包含的物品,相同物品的区分标准,如以英文品名为标准,以英文品名,材质,申报单价联合为标准
:return: 添加辅助数据列的数据
"""
tmp_data_frame = self._data.copy()
for index in tmp_data_frame.index:
help_name = ",".join(tmp_data_frame.loc[index, list(help_columns)])
tmp_data_frame.loc[index, "help_name"] = help_name
tmp_data_frame = tmp_data_frame.set_index("箱号")
for index in tmp_data_frame.index:
v_help = tmp_data_frame.loc[index, "help_name"]
if isinstance(v_help, str):
v_help_name = v_help
else:
v_help_name = ",".join(v_help.drop_duplicates())
tmp_data_frame.loc[index, "help_name"] = v_help_name
return tmp_data_frame
def box_duplicate(self, *help_columns):
"""
同箱去重
:param help_columns: see detail in mk_help_column
:return: pd.DataFrame 去重后的数据
"""
tmp_data_frame = self.mk_help_column(*help_columns)
tmp_data_frame = tmp_data_frame.sort_values(list(help_columns))
for index in tmp_data_frame.index:
if not isinstance(tmp_data_frame.loc[index, "重量"], (np.integer, np.floating)):
box_weight = tmp_data_frame.loc[index, "重量"].mask(
tmp_data_frame.loc[index, "重量"].duplicated(),
0)
tmp_data_frame.loc[index, "重量"] = box_weight
return tmp_data_frame
def aggrate(self, data, *help_name):
"""
数据聚合统计
:param data: pd.DataFrame 添加辅助列后的数据
:param help_name: see detail in mk_help_column
:return: pd.DataFrame
"""
columns = self._data.columns
categary_columns = list(help_name)
categary_columns.insert(0, "help_name")
skip_columns = list(help_name)
skip_columns = skip_columns + ["箱号", "重量", "数量", "运单号"]
data = data.reset_index()
groups = data.groupby(categary_columns)
box_num = groups["箱号"].nunique()
weight_num = groups["重量"].sum()
pqw_num = groups["数量"].sum()
self._final_data = pd.concat([box_num, pqw_num, weight_num], axis=1)
for column in columns:
if column in skip_columns:
continue
self._final_data[column] = groups[column].first()
for group in groups.groups:
vo = set(groups.get_group(group)["运单号"])
self._final_data.loc[group, "运单号"] = vo if len(vo) == 1 else "、".join(vo)
def data_reduce(self):
self._final_data = self._final_data.reset_index()
self._final_data.set_index(["运单号", "help_name"], inplace=True)
box_weight_num = self._final_data["重量"].mask(self._final_data["重量"] == 0, np.nan)
box_weight_num = box_weight_num.fillna(method="ffill")
self._final_data["重量"] = box_weight_num
self._final_data = self._final_data.reset_index()
self._final_data.set_index(["运单号", "help_name", "箱号", "重量", "英文品名", "数量"], inplace=True)
self._final_data.sort_index(axis=0, level=0, inplace=True)
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = Graph()
sys.exit(app.exec_())
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/InsignificantDust/custom-data-process.git
git@gitee.com:InsignificantDust/custom-data-process.git
InsignificantDust
custom-data-process
CustomDataProcess
master

搜索帮助