1 Star 2 Fork 0

王梁/task-management-system

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
database.py 8.22 KB
一键复制 编辑 原始数据 按行查看 历史
wangliang 提交于 2024-07-31 10:03 +08:00 . 初始化
import sqlite3
from sqlite3 import Error
from datetime import datetime
from taskEnum import TaskType, TaskStatus
def create_connection(db_file):
"""Create a database connection to the SQLite database specified by db_file."""
conn = None
try:
conn = sqlite3.connect(db_file)
return conn
except Error as e:
print(e)
return conn
def create_table(conn, create_table_sql):
"""Create a table from the create_table_sql statement."""
try:
c = conn.cursor()
c.execute(create_table_sql)
except Error as e:
print(e)
def insert_work_record(conn, work_record):
"""
Create a new work record into the work_records table.
"""
sql = ''' INSERT INTO work_records(title, description, start_time, end_time, task_type, task_status)
VALUES(?,?,?,?,?,?) '''
cur = conn.cursor()
cur.execute(sql, work_record)
result = cur.lastrowid
cur.close()
return result
def update_work_record(conn, updated_record):
"""
更新指定ID的工作记录
:param conn: 数据库连接
:param id: 工作记录的ID
:param title: 新的标题
:param description: 新的描述
:param start_time: 新的开始时间
:param end_time: 新的结束时间
:return: None
"""
sql = ''' UPDATE work_records
SET title = ?, description = ?, start_time = ?, end_time = ?, task_type = ?, task_status = ?
WHERE id = ?'''
cur = conn.cursor()
cur.execute(sql, updated_record)
conn.commit()
cur.close()
def delete_work_record(conn, id):
"""
Create a new work record into the work_records table.
"""
sql = ''' delete from work_records where id = ?
'''
cur = conn.cursor()
cur.execute(sql, (id,))
result = cur.lastrowid
cur.close()
return result
def select_one_work_record(conn, id):
sql = ''' select * from work_records where id = ?
'''
cur = conn.cursor()
cur.execute(sql, (id,))
result = cur.fetchone()
cur.close()
return result
def select_all_work_records(conn):
sql = """
SELECT * FROM work_records
order by task_status, start_time desc
"""
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
cur.close()
return result
def select_all_work_records(conn, condition):
title, start_time, task_type_name, task_status_name, tag_name = condition
print(start_time)
print(type(start_time))
# 准备参数列表
params = []
# 构建 SQL 查询
sql = """
SELECT * FROM work_records wr
WHERE 1=1 -- 始终为真,用于动态添加条件
"""
# 添加 title 的条件
if title:
sql += " AND wr.title LIKE ?"
params.append(f'%{title}%')
# 添加 start_time 的条件
if start_time:
sql += " AND wr.start_time >= ?"
params.append(start_time)
# 获取任务类型的编号
task_type_id = next((task_type.id for task_type in TaskType if task_type.desc == task_type_name), None)
if task_type_id is not None:
sql += " AND wr.task_type = ?"
params.append(task_type_id)
# 获取任务状态的编号
task_status_id = next((task_status.id for task_status in TaskStatus if task_status.desc == task_status_name), None)
if task_status_id is not None:
sql += " AND wr.task_status = ?"
params.append(task_status_id)
# 添加 tag_name 的子查询
if tag_name:
sql += """
AND EXISTS (
SELECT 1
FROM record_tags rt
JOIN tag t ON rt.tag_id = t.id
WHERE t.name = ?
AND rt.record_id = wr.id
)
"""
params.append(tag_name)
sql += " ORDER BY task_status, start_time DESC"
print(sql)
cur = conn.cursor()
cur.execute(sql, params)
result = cur.fetchall()
return result
# 更新标签
def update_tag(conn, record_id, tag_name_join):
#查询任务的所有标签
select_record_tag_sql = """
select tag_id from record_tags where record_id = ?
"""
cur = conn.cursor()
cur.execute(select_record_tag_sql, (record_id, ))
record_tag_id_list = cur.fetchall()
tag_name_list = tag_name_join.split(",")
insert_tag(conn, tag_name_list)
placeholders = ', '.join('?' for _ in tag_name_list) # 生成多个占位符
select_tag_sql = f"""
select id from tag where name in ({placeholders})
"""
cur.execute(select_tag_sql, tag_name_list)
tag_id_list = cur.fetchall()
delete_tag_id_set = set(record_tag_id_list) - set(tag_id_list)
if delete_tag_id_set:
delete_placeholders = ', '.join('?' for _ in delete_tag_id_set) # 生成多个占位符
delete_record_tag_sql = f"""
delete from record_tags where record_id = ? and tag_id in ({delete_placeholders})
"""
cur.execute(delete_record_tag_sql, (record_id,) + tuple(delete_tag_id_set))
add_tag_id_set = set(tag_id_list) - set(record_tag_id_list)
if add_tag_id_set:
for add_tag_id in add_tag_id_set:
tag_id = add_tag_id[0]
add_record_tag_sql = """
INSERT INTO record_tags(record_id, tag_id)
VALUES(?,?)
"""
cur.execute(add_record_tag_sql, (record_id, tag_id))
cur.close()
# 默认添加新的标签
def insert_tag(conn, tag_name_list):
if tag_name_list:
for tag_name in tag_name_list:
tag_name = tag_name.strip()
tag_name_placeholders = ", ".join("?" for _ in tag_name_list)
select_sql = f"""
select name from tag where name in ({tag_name_placeholders})
"""
cur = conn.cursor()
cur.execute(select_sql, tag_name_list)
tag_name_set_have = set()
for tag_name_tuple in cur.fetchall():
tag_name_set_have.add(tag_name_tuple[0])
new_tag_name_set = set(tag_name_list) - tag_name_set_have
current_time = datetime.now()
# 格式化时间
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
if new_tag_name_set:
for new_tag_name in new_tag_name_set:
add_sql = """
insert into tag(name, create_time)
values(?, ?)
"""
cur.execute(add_sql, (new_tag_name.strip(), formatted_time))
cur.close()
def add_tag(conn, record_id, tag_name_join):
tag_name_list = tag_name_join.split(",")
for tag_name in tag_name_list:
add_one_tag(conn, record_id, tag_name)
def add_one_tag(conn, record_id, tag_name):
select_tag_sql = "select id from tag where name = ?"
cur = conn.cursor()
cur.execute(select_tag_sql, (tag_name.strip(), ))
tag_id_tuple = cur.fetchone()
tag_id = tag_id_tuple[0]
if tag_id is None:
# 获取当前时间
current_time = datetime.now()
# 格式化时间
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
sql = ''' INSERT INTO tag(name, create_time)
VALUES(?,?) '''
cur.execute(sql, (tag_name.strip(), formatted_time))
new_tag_id = cur.lastrowid
add_record_tag_sql = """
INSERT INTO record_tags(record_id, tag_id)
VALUES(?,?)
"""
cur.execute(add_record_tag_sql, (record_id, new_tag_id))
else:
add_record_tag_sql = """
INSERT INTO record_tags(record_id, tag_id)
VALUES(?,?)
"""
cur.execute(add_record_tag_sql, (record_id, tag_id))
cur.close()
def get_record_all_tags(conn, record_ids):
record_id_placeholders = ",".join("?" for _ in record_ids)
sql = f"""
select rt.record_id, group_concat(t.name, ',') as tags from record_tags rt left join tag t on rt.tag_id = t.id
where rt.record_id in ({record_id_placeholders})
group by rt.record_id
"""
cur = conn.cursor()
cur.execute(sql, record_ids)
result = cur.fetchall()
cur.close()
return result
def get_all_tags(conn):
cur = conn.cursor()
cur.execute("select id, name from tag")
return cur.fetchall()
def delete_tag(conn, tag_id):
sql = """
delete from tag where id = ?
"""
cur = conn.cursor()
cur.execute(sql, (tag_id,))
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/lwang05858/task-management-system.git
git@gitee.com:lwang05858/task-management-system.git
lwang05858
task-management-system
task-management-system
master

搜索帮助