diff --git a/kinit-api/utils/excel/excel_manage.py b/kinit-api/utils/excel/excel_manage.py index 749fe99..7f30e67 100644 --- a/kinit-api/utils/excel/excel_manage.py +++ b/kinit-api/utils/excel/excel_manage.py @@ -33,25 +33,37 @@ class ExcelManage: def open_workbook(self, file: str, read_only: bool = False, data_only: bool = False) -> None: """ 初始化 excel 文件 - :param file: 文件名称或者对象 :param read_only: 是否只读,优化读取速度 :param data_only: 是否加载文件对象 + :return: """ # 加载excel文件,获取表单 self.wb = load_workbook(file, read_only=read_only, data_only=data_only) - def open_sheet(self, sheet_name: str = None, **kwargs) -> None: + def open_sheet( + self, + sheet_name: str = None, + file: str = None, + read_only: bool = False, + data_only: bool = False + ) -> None: """ 初始化 excel 文件 - :param sheet_name: 表单名称,为空则默认第一个 + :param file: + :param read_only: + :param data_only: + :return: """ # 加载excel文件,获取表单 if not self.wb: - self.open_workbook(kwargs.get("file"), kwargs.get("read_only", False), kwargs.get("data_only", False)) + self.open_workbook(file, read_only, data_only) if sheet_name: - self.sheet = self.wb[sheet_name] + if sheet_name in self.get_sheets(): + self.sheet = self.wb[sheet_name] + else: + self.sheet = self.wb.create_sheet(sheet_name) else: self.sheet = self.wb.active @@ -65,8 +77,8 @@ class ExcelManage: def create_excel(self, sheet_name: str = None) -> None: """ 创建 excel 文件 - :param sheet_name: 表单名称,为空则默认第一个 + :return: """ # 加载excel文件,获取表单 self.wb = Workbook() @@ -77,7 +89,10 @@ class ExcelManage: def readlines(self, min_row: int = 1, min_col: int = 1, max_row: int = None, max_col: int = None) -> list: """ 读取指定表单所有数据 - + :param min_row: 最小行 + :param min_col: 最小列 + :param max_row: 最大行 + :param max_col: 最大列 :return: 二维数组 """ rows = self.sheet.iter_rows(min_row=min_row, min_col=min_col, max_row=max_row, max_col=max_col) @@ -93,7 +108,6 @@ class ExcelManage: def get_header(self, row: int = 1, col: int = None, asterisk: bool = False) -> list: """ 读取指定表单的表头(第一行数据) - :param row: 指定行 :param col: 最大列 :param asterisk: 是否去除 * 号 @@ -111,9 +125,9 @@ class ExcelManage: def write_list(self, rows: list, header: list = None) -> None: """ 写入 excel文件 - :param rows: 行数据集 :param header: 表头 + :return: """ if header: self.sheet.append(header) @@ -126,11 +140,13 @@ class ExcelManage: } for i in range(0, len(data)): if isinstance(data[i], datetime.datetime): - data[i] = data[i].strftime("%Y/%m/%d %H:%M:%S") + data[i] = data[i].strftime("%Y-%m-%d %H:%M:%S") format_columns["date_columns"].append(i + 1) + elif isinstance(data[i], bool): + data[i] = 1 if data[i] else 0 self.sheet.append(data) - self.__set_row_style(index + 2, len(data)) - self.__set_row_format(index + 2, format_columns) + self.__set_row_style(index + 2 if header else index + 1, len(data)) + self.__set_row_format(index + 2 if header else index + 1, format_columns) self.__auto_width() self.__set_row_border() @@ -138,6 +154,8 @@ class ExcelManage: """ 保存 excel 文件到本地 默认保存到临时目录中 + :param filename: 保存的文件名称 + :return: """ date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") file_dir = os.path.join(TEMP_DIR, date) @@ -163,12 +181,12 @@ class ExcelManage: ): """ 设置行样式 - :param row: 行 :param max_column: 最大列 :param alignment_style: 单元格内容的对齐设置 :param font_style: 单元格内容的字体样式设置 :param pattern_fill_style: 单元格的填充模式设置 + :return: """ for index in range(0, max_column): alignment = Alignment(**alignment_style.model_dump()) @@ -181,16 +199,17 @@ class ExcelManage: def __set_row_format(self, row: int, columns: dict): """ 格式化行数据类型 - :param row: 行 :param columns: 列数据 + :return: """ for index in columns.get("date_columns", []): - self.sheet.cell(row=row, column=index).number_format = "yyyy/mm/dd h:mm:ss" + self.sheet.cell(row=row, column=index).number_format = "yyyy-mm-dd h:mm:ss" def __set_row_border(self): """ 设置行边框 + :return: """ # 创建 Border 对象并设置边框样式 border = Border( @@ -207,6 +226,7 @@ class ExcelManage: def __auto_width(self): """ 设置自适应列宽 + :return: """ # 设置一个字典用于保存列宽数据 dims = {} @@ -226,6 +246,7 @@ class ExcelManage: def close(self): """ 关闭文件 + :return: """ self.wb.close() diff --git a/kinit-api/utils/excel/import_manage.py b/kinit-api/utils/excel/import_manage.py index 0bc6e10..60ae406 100644 --- a/kinit-api/utils/excel/import_manage.py +++ b/kinit-api/utils/excel/import_manage.py @@ -21,7 +21,7 @@ class FieldType(Enum): str = "str" -class ImportManage: +class ImportManage(ExcelManage): """ 数据导入管理 @@ -38,9 +38,9 @@ class ImportManage: file_type = ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] def __init__(self, file: UploadFile, headers: List[dict]): + super().__init__() self.__table_data = None self.__table_header = None - self.__filename = None self.errors = [] self.success = [] self.success_number = 0 @@ -53,27 +53,40 @@ class ImportManage: def check_file_type(cls, file: UploadFile) -> None: """ 验证文件类型 + :param file: 上传文件 + :return: """ if file.content_type not in cls.file_type: raise CustomException(msg="文件类型必须为xlsx类型", code=status.HTTP_ERROR) - async def get_table_data(self, header_row: int = 1, data_row: int = 2) -> None: + async def get_table_data( + self, + file_path: str = None, + sheet_name: str = None, + header_row: int = 1, + data_row: int = 2 + ) -> None: """ 获取表格数据与表头 - + :param file_path: + :param sheet_name: :param header_row: 表头在第几行 :param data_row: 数据开始行 + :return: """ - self.__filename = await FileManage.save_tmp_file(self.file) - es = ExcelManage() - es.open_sheet(file=self.__filename, read_only=True) - self.__table_header = es.get_header(header_row, len(self.headers), asterisk=True) - self.__table_data = es.readlines(min_row=data_row, max_col=len(self.headers)) - es.close() + if file_path: + __filename = file_path + else: + __filename = await FileManage.async_save_temp_file(self.file) + self.open_sheet(sheet_name=sheet_name, file=__filename, read_only=True) + self.__table_header = self.get_header(header_row, len(self.headers), asterisk=True) + self.__table_data = self.readlines(min_row=data_row, max_col=len(self.headers)) + self.close() def check_table_data(self) -> None: """ 检查表格数据 + :return: """ for row in self.__table_data: result = self.__check_row(row) @@ -93,6 +106,8 @@ class ImportManage: 1. 检查是否为必填项 2. 检查是否为选项列表 3. 检查是否符合规则 + :param row: 数据行 + :return: """ data = {} for index, cell in enumerate(row): @@ -128,18 +143,22 @@ class ImportManage: def generate_error_url(self) -> str: """ 成功错误数据的文件链接 + :return: """ if self.error_number <= 0: return "" - em = WriteXlsx(sheet_name="用户导入失败数据") + em = WriteXlsx() + em.create_excel(sheet_name="用户导入失败数据", save_static=True) em.generate_template(self.headers, max_row=self.error_number) em.write_list(self.errors) em.close() - return em.file_url + return em.get_file_url() def add_error_data(self, row: dict) -> None: """ 增加错误数据 + :param row: 错误的数据行 + :return: """ self.errors.append(row) self.error_number += 1 diff --git a/kinit-api/utils/excel/write_xlsx.py b/kinit-api/utils/excel/write_xlsx.py index 4ef7f35..7e7b05f 100644 --- a/kinit-api/utils/excel/write_xlsx.py +++ b/kinit-api/utils/excel/write_xlsx.py @@ -10,12 +10,13 @@ XlsxWriter:https://github.com/jmcnamara/XlsxWriter 博客教程:https://blog.csdn.net/lemonbit/article/details/113855768 """ -import datetime -import hashlib + import os.path -import random import xlsxwriter -from application.settings import TEMP_DIR, TEMP_URL +from typing import List +from application.settings import STATIC_ROOT, STATIC_URL +from utils.file.file_base import FileBase +from utils.tools import generate_string class WriteXlsx: @@ -23,35 +24,38 @@ class WriteXlsx: 写入xlsx文件 """ - def __init__(self, filename: str = None, sheet_name: str = "sheet1"): - date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") - file_dir = os.path.join(TEMP_DIR, date) - if not os.path.exists(file_dir): - os.mkdir(file_dir) - if not filename: - filename = hashlib.md5(str(random.random()).encode()).hexdigest() - self.file_url = f"{TEMP_URL}/{date}/{filename}.xlsx" - self.filename = os.path.join(TEMP_DIR, date, filename + ".xlsx") - self.sheet_name = sheet_name + def __init__(self): + self.file_path = None + self.sheet_name = None self.wb = None self.sheet = None - def create_excel(self) -> None: + def create_excel(self, file_path: str = None, sheet_name: str = "sheet1", save_static: bool = False) -> None: """ 创建 excel 文件 + :param file_path: 文件绝对路径或相对路径 + :param sheet_name: sheet 名称 + :param save_static: 保存方式 static 静态资源或者临时文件 + :return: """ + if not file_path or (file_path and not os.path.abspath(file_path)): + if save_static: + self.file_path = FileBase.generate_static_file_path("write_xlsx", file_path) + else: + self.file_path = FileBase.generate_temp_file_path(f"{generate_string(8)}.xlsx") + else: + self.file_path = file_path + self.sheet_name = sheet_name + self.wb = xlsxwriter.Workbook(self.file_path) + self.sheet = self.wb.add_worksheet(sheet_name) - self.wb = xlsxwriter.Workbook(self.filename) - self.sheet = self.wb.add_worksheet(self.sheet_name) - - def generate_template(self, headers: list[dict] = None, max_row: int = 101) -> None: + def generate_template(self, headers: List[dict] = None, max_row: int = 101) -> None: """ 生成模板 :param headers: 表头 :param max_row: 设置下拉列表至最大行 :return: 文件链接地址 """ - self.create_excel() max_row = max_row + 100 for index, field in enumerate(headers): font_format = { @@ -95,7 +99,6 @@ class WriteXlsx: # 设置列宽 self.sheet.set_column(0, len(rows[0]) - 1, 22) # 设置行高 - # self.sheet.set_row(row_number, 25) self.sheet.set_default_row(25) def close(self) -> None: @@ -103,3 +106,19 @@ class WriteXlsx: 关闭文件 """ self.wb.close() + + def get_file_url(self) -> str: + """ + 获取访问文件的 url + :return: + """ + if not self.file_path: + raise ValueError("还未创建文件,请先创建 excel 文件!") + assert isinstance(self.file_path, str) + if self.file_path.startswith(STATIC_ROOT): + return self.file_path.replace(STATIC_ROOT, STATIC_URL) + else: + print("write_xlsx 生成文件:", self.file_path) + raise ValueError("生成文件为临时文件,无法访问!") + + diff --git a/kinit-api/utils/file/file_base.py b/kinit-api/utils/file/file_base.py index 264dc84..7442b9f 100644 --- a/kinit-api/utils/file/file_base.py +++ b/kinit-api/utils/file/file_base.py @@ -8,10 +8,14 @@ import datetime import os -import uuid +from pathlib import Path + +from aiopathlib import AsyncPath from fastapi import UploadFile +from application.settings import TEMP_DIR from core.exception import CustomException from utils import status +from utils.tools import generate_string class FileBase: @@ -22,18 +26,66 @@ class FileBase: ALL_ACCEPT = [*IMAGE_ACCEPT, *VIDEO_ACCEPT, *AUDIO_ACCEPT] @classmethod - def generate_path(cls, path: str, filename): + def generate_static_file_path(cls, path: str, filename: str) -> str: """ 生成文件路径 + :param path: static 指定目录类别 + :param filename: 文件名称 + :return: """ + path = path.replace("\\", "/") if path[0] == "/": path = path[1:] if path[-1] == "/": path = path[:-1] today = str(int((datetime.datetime.now().replace(hour=0, minute=0, second=0)).timestamp())) - _filename = str(int(datetime.datetime.now().timestamp())) + str(uuid.uuid4())[:8] + _filename = str(int(datetime.datetime.now().timestamp())) + str(generate_string(8)) return f"{path}/{today}/{_filename}{os.path.splitext(filename)[-1]}" + @classmethod + def generate_temp_file_path(cls, filename: str) -> str: + """ + 生成临时文件路径 + :param filename: 文件名称 + :return: + """ + return f"{cls.generate_temp_dir_path()}/{filename}" + + @classmethod + def generate_temp_dir_path(cls) -> str: + """ + 生成临时目录路径 + :return: + """ + date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") + file_dir = Path(TEMP_DIR) / date + path = file_dir / (generate_string(4) + str(int(datetime.datetime.now().timestamp()))) + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return str(path).replace("\\", "/") + + @classmethod + async def async_generate_temp_file_path(cls, filename: str) -> str: + """ + 生成临时文件路径 + :param filename: 文件名称 + :return: + """ + return f"{await cls.async_generate_temp_dir_path()}/{filename}" + + @classmethod + async def async_generate_temp_dir_path(cls) -> str: + """ + 生成临时目录路径 + :return: + """ + date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") + file_dir = AsyncPath(TEMP_DIR) / date + path = file_dir / (generate_string(4) + str(int(datetime.datetime.now().timestamp()))) + if not await path.exists(): + await path.mkdir(parents=True, exist_ok=True) + return str(path).replace("\\", "/") + @classmethod async def validate_file(cls, file: UploadFile, max_size: int = None, mime_types: list = None) -> bool: """ @@ -52,18 +104,3 @@ class FileBase: if file.content_type not in mime_types: raise CustomException(f"上传文件格式错误,只支持 {'/'.join(mime_types)} 格式!", status.HTTP_ERROR) return True - - @classmethod - def get_file_type(cls, content_type: str) -> str | None: - """ - 获取文件类型 - - 0: 图片 - 1:视频 - """ - if content_type in cls.IMAGE_ACCEPT: - return "0" - elif content_type in cls.VIDEO_ACCEPT: - return "1" - else: - return None diff --git a/kinit-api/utils/file/file_manage.py b/kinit-api/utils/file/file_manage.py index 933d4b4..498452c 100644 --- a/kinit-api/utils/file/file_manage.py +++ b/kinit-api/utils/file/file_manage.py @@ -4,11 +4,13 @@ # @File : file_manage.py # @IDE : PyCharm # @desc : 保存图片到本地 + import asyncio -import datetime +import io import os import shutil -from application.settings import TEMP_DIR, STATIC_ROOT, BASE_DIR, STATIC_URL, STATIC_DIR +import zipfile +from application.settings import STATIC_ROOT, BASE_DIR, STATIC_URL from fastapi import UploadFile import sys from pathlib import Path @@ -24,21 +26,46 @@ class FileManage(FileBase): """ def __init__(self, file: UploadFile, path: str): - self.path = self.generate_path(path, file.filename) + self.path = self.generate_static_file_path(path, file.filename) self.file = file async def save_image_local(self, accept: list = None) -> dict: """ 保存图片文件到本地 + :param accept: + :return: """ if accept is None: accept = self.IMAGE_ACCEPT await self.validate_file(self.file, max_size=5, mime_types=accept) - return await self.save_local() + return await self.async_save_local() - async def save_local(self) -> dict: + async def save_audio_local(self, accept: list = None) -> dict: + """ + 保存音频文件到本地 + :param accept: + :return: + """ + if accept is None: + accept = self.AUDIO_ACCEPT + await self.validate_file(self.file, max_size=50, mime_types=accept) + return await self.async_save_local() + + async def save_video_local(self, accept: list = None) -> dict: + """ + 保存视频文件到本地 + :param accept: + :return: + """ + if accept is None: + accept = self.VIDEO_ACCEPT + await self.validate_file(self.file, max_size=100, mime_types=accept) + return await self.async_save_local() + + async def async_save_local(self) -> dict: """ 保存文件到本地 + :return: """ path = self.path if sys.platform == "win32": @@ -48,70 +75,72 @@ class FileManage(FileBase): await save_path.parent.mkdir(parents=True, exist_ok=True) await save_path.write_bytes(await self.file.read()) return { - "local_path": f"{STATIC_DIR}/{self.path}", + "local_path": f"{STATIC_ROOT}/{self.path}", "remote_path": f"{STATIC_URL}/{self.path}" } - @staticmethod - async def save_tmp_file(file: UploadFile) -> str: + @classmethod + async def async_save_temp_file(cls, file: UploadFile) -> str: """ 保存临时文件 + :param file: + :return: """ - date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") - file_dir = AsyncPath(TEMP_DIR) / date - if not await file_dir.exists(): - await file_dir.mkdir(parents=True, exist_ok=True) - filename = file_dir / (str(int(datetime.datetime.now().timestamp())) + file.filename) - await filename.write_bytes(await file.read()) - return str(filename) + temp_file_path = await cls.async_generate_temp_file_path(file.filename) + await AsyncPath(temp_file_path).write_bytes(await file.read()) + return temp_file_path + + @classmethod + async def unzip(cls, file: UploadFile, dir_path: str) -> str: + """ + 解压 zip 压缩包 + :param file: + :param dir_path: 解压路径 + :return: + """ + if file.content_type != "application/x-zip-compressed": + raise CustomException("上传文件类型错误,必须是 zip 压缩包格式!") + # 读取上传的文件内容 + contents = await file.read() + # 将文件内容转换为字节流 + zip_stream = io.BytesIO(contents) + # 使用zipfile库解压字节流 + with zipfile.ZipFile(zip_stream, "r") as zip_ref: + zip_ref.extractall(dir_path) + return dir_path @staticmethod - def copy(src: str, dst: str) -> None: - """ - 复制文件 - 根目录为项目根目录,传过来的文件路径均为相对路径 - - :param src: 原始文件 - :param dst: 目标路径。绝对路径 - """ - if src[0] == "/": - src = src.lstrip("/") - if sys.platform == "win32": - src = src.replace("/", "\\") - dst = dst.replace("/", "\\") - src = Path(BASE_DIR) / src - dst = Path(dst) - if not src.exists(): - raise CustomException("源文件不存在!") - if not dst.parent.exists(): - dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copyfile(src, dst) - - @staticmethod - async def async_copy(src: str, dst: str) -> None: + async def async_copy_file(src: str, dst: str) -> None: """ 异步复制文件 根目录为项目根目录,传过来的文件路径均为相对路径 - :param src: 原始文件 :param dst: 目标路径。绝对路径 """ if src[0] == "/": src = src.lstrip("/") - if sys.platform == "win32": - src = src.replace("/", "\\") - dst = dst.replace("/", "\\") src = AsyncPath(BASE_DIR) / src if not await src.exists(): - raise CustomException("源文件不存在!") + raise CustomException(f"{src} 源文件不存在!") dst = AsyncPath(dst) if not await dst.parent.exists(): await dst.parent.mkdir(parents=True, exist_ok=True) await aioshutil.copyfile(src, dst) + @staticmethod + async def async_copy_dir(src: str, dst: str, dirs_exist_ok: bool = True) -> None: + """ + 复制目录 + :param src: 源目录 + :param dst: 目标目录 + :param dirs_exist_ok: 是否覆盖 + """ + if not os.path.exists(dst): + raise CustomException("目标目录不存在!") + await aioshutil.copytree(src, dst, dirs_exist_ok=dirs_exist_ok) + if __name__ == '__main__': - _src = r"D:\programming\ktianc\project\kinit-pro\kinit-api\static\system\favicon.ico" - _dst = r"D:\programming\ktianc\project\kinit-pro\kinit-api\static\system\2022-12-07\16703958210ab33912.ico" - asyncio.run(FileManage.async_copy(_src, _dst)) - # FileManage.copy(_src, _dst) + _src = r"E:\ktianc\linfeng\project\leadership\temp\20231212\3nCx1702349951\template" + _dst = r"E:\ktianc\linfeng\project\leadership\static\template" + asyncio.run(FileManage.async_copy_dir(_src, _dst)) diff --git a/kinit-api/utils/tools.py b/kinit-api/utils/tools.py index df20501..dfc49c1 100644 --- a/kinit-api/utils/tools.py +++ b/kinit-api/utils/tools.py @@ -5,7 +5,9 @@ # @File : tools.py # @IDE : PyCharm # @desc : 工具类 -from asyncio.exceptions import TimeoutError + +from win32com.client import gencache +import comtypes.client import datetime import random import re @@ -103,6 +105,42 @@ async def import_modules_async(modules: list, desc: str, **kwargs): logger.error(f"ModuleNotFoundError:导入{desc}失败,未找到该模块下的方法:{module}") +def ppt_to_pdf_1(ppt_path: str, pdf_path: str): + """ + ppt 转 pdf,会弹出 office 软件 + :param ppt_path: + :param pdf_path: + :return: + """ + # 创建PDF + powerpoint = comtypes.client.CreateObject("Powerpoint.Application") + powerpoint.Visible = 1 + slide = powerpoint.Presentations.Open(ppt_path) + # 保存PDF + slide.SaveAs(pdf_path, 32) + slide.Close() + # 退出 office 软件 + powerpoint.Quit() + + +def ppt_to_pdf_2(ppt_path: str, pdf_path: str): + """ + 完美办法,PPT 转 PDF + :param ppt_path: + :param pdf_path: + :return: + """ + p = gencache.EnsureDispatch("PowerPoint.Application") + try: + ppt = p.Presentations.Open(ppt_path, False, False, False) + ppt.ExportAsFixedFormat(pdf_path, 2, PrintRange=None) + ppt.Close() + p.Quit() + except Exception as e: + print(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e) + logger.info(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e) + + if __name__ == '__main__': # print(generate_invitation_code()) # print(int(datetime.datetime.now().timestamp()))