优化工具类

This commit is contained in:
ktianc 2023-12-16 21:16:01 +08:00
parent 713f4bdcda
commit d9ffa98d13
6 changed files with 277 additions and 114 deletions

View File

@ -33,25 +33,37 @@ class ExcelManage:
def open_workbook(self, file: str, read_only: bool = False, data_only: bool = False) -> None: def open_workbook(self, file: str, read_only: bool = False, data_only: bool = False) -> None:
""" """
初始化 excel 文件 初始化 excel 文件
:param file: 文件名称或者对象 :param file: 文件名称或者对象
:param read_only: 是否只读优化读取速度 :param read_only: 是否只读优化读取速度
:param data_only: 是否加载文件对象 :param data_only: 是否加载文件对象
:return:
""" """
# 加载excel文件获取表单 # 加载excel文件获取表单
self.wb = load_workbook(file, read_only=read_only, data_only=data_only) self.wb = load_workbook(file, read_only=read_only, data_only=data_only)
def open_sheet(self, sheet_name: str = None, **kwargs) -> None: def open_sheet(
self,
sheet_name: str = None,
file: str = None,
read_only: bool = False,
data_only: bool = False
) -> None:
""" """
初始化 excel 文件 初始化 excel 文件
:param sheet_name: 表单名称为空则默认第一个 :param sheet_name: 表单名称为空则默认第一个
:param file:
:param read_only:
:param data_only:
:return:
""" """
# 加载excel文件获取表单 # 加载excel文件获取表单
if not self.wb: if not self.wb:
self.open_workbook(kwargs.get("file"), kwargs.get("read_only", False), kwargs.get("data_only", False)) self.open_workbook(file, read_only, data_only)
if sheet_name: if sheet_name:
self.sheet = self.wb[sheet_name] if sheet_name in self.get_sheets():
self.sheet = self.wb[sheet_name]
else:
self.sheet = self.wb.create_sheet(sheet_name)
else: else:
self.sheet = self.wb.active self.sheet = self.wb.active
@ -65,8 +77,8 @@ class ExcelManage:
def create_excel(self, sheet_name: str = None) -> None: def create_excel(self, sheet_name: str = None) -> None:
""" """
创建 excel 文件 创建 excel 文件
:param sheet_name: 表单名称为空则默认第一个 :param sheet_name: 表单名称为空则默认第一个
:return:
""" """
# 加载excel文件获取表单 # 加载excel文件获取表单
self.wb = Workbook() self.wb = Workbook()
@ -77,7 +89,10 @@ class ExcelManage:
def readlines(self, min_row: int = 1, min_col: int = 1, max_row: int = None, max_col: int = None) -> list: def readlines(self, min_row: int = 1, min_col: int = 1, max_row: int = None, max_col: int = None) -> list:
""" """
读取指定表单所有数据 读取指定表单所有数据
:param min_row: 最小行
:param min_col: 最小列
:param max_row: 最大行
:param max_col: 最大列
:return: 二维数组 :return: 二维数组
""" """
rows = self.sheet.iter_rows(min_row=min_row, min_col=min_col, max_row=max_row, max_col=max_col) rows = self.sheet.iter_rows(min_row=min_row, min_col=min_col, max_row=max_row, max_col=max_col)
@ -93,7 +108,6 @@ class ExcelManage:
def get_header(self, row: int = 1, col: int = None, asterisk: bool = False) -> list: def get_header(self, row: int = 1, col: int = None, asterisk: bool = False) -> list:
""" """
读取指定表单的表头第一行数据 读取指定表单的表头第一行数据
:param row: 指定行 :param row: 指定行
:param col: 最大列 :param col: 最大列
:param asterisk: 是否去除 * :param asterisk: 是否去除 *
@ -111,9 +125,9 @@ class ExcelManage:
def write_list(self, rows: list, header: list = None) -> None: def write_list(self, rows: list, header: list = None) -> None:
""" """
写入 excel文件 写入 excel文件
:param rows: 行数据集 :param rows: 行数据集
:param header: 表头 :param header: 表头
:return:
""" """
if header: if header:
self.sheet.append(header) self.sheet.append(header)
@ -126,11 +140,13 @@ class ExcelManage:
} }
for i in range(0, len(data)): for i in range(0, len(data)):
if isinstance(data[i], datetime.datetime): if isinstance(data[i], datetime.datetime):
data[i] = data[i].strftime("%Y/%m/%d %H:%M:%S") data[i] = data[i].strftime("%Y-%m-%d %H:%M:%S")
format_columns["date_columns"].append(i + 1) format_columns["date_columns"].append(i + 1)
elif isinstance(data[i], bool):
data[i] = 1 if data[i] else 0
self.sheet.append(data) self.sheet.append(data)
self.__set_row_style(index + 2, len(data)) self.__set_row_style(index + 2 if header else index + 1, len(data))
self.__set_row_format(index + 2, format_columns) self.__set_row_format(index + 2 if header else index + 1, format_columns)
self.__auto_width() self.__auto_width()
self.__set_row_border() self.__set_row_border()
@ -138,6 +154,8 @@ class ExcelManage:
""" """
保存 excel 文件到本地 保存 excel 文件到本地
默认保存到临时目录中 默认保存到临时目录中
:param filename: 保存的文件名称
:return:
""" """
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = os.path.join(TEMP_DIR, date) file_dir = os.path.join(TEMP_DIR, date)
@ -163,12 +181,12 @@ class ExcelManage:
): ):
""" """
设置行样式 设置行样式
:param row: :param row:
:param max_column: 最大列 :param max_column: 最大列
:param alignment_style: 单元格内容的对齐设置 :param alignment_style: 单元格内容的对齐设置
:param font_style: 单元格内容的字体样式设置 :param font_style: 单元格内容的字体样式设置
:param pattern_fill_style: 单元格的填充模式设置 :param pattern_fill_style: 单元格的填充模式设置
:return:
""" """
for index in range(0, max_column): for index in range(0, max_column):
alignment = Alignment(**alignment_style.model_dump()) alignment = Alignment(**alignment_style.model_dump())
@ -181,16 +199,17 @@ class ExcelManage:
def __set_row_format(self, row: int, columns: dict): def __set_row_format(self, row: int, columns: dict):
""" """
格式化行数据类型 格式化行数据类型
:param row: :param row:
:param columns: 列数据 :param columns: 列数据
:return:
""" """
for index in columns.get("date_columns", []): for index in columns.get("date_columns", []):
self.sheet.cell(row=row, column=index).number_format = "yyyy/mm/dd h:mm:ss" self.sheet.cell(row=row, column=index).number_format = "yyyy-mm-dd h:mm:ss"
def __set_row_border(self): def __set_row_border(self):
""" """
设置行边框 设置行边框
:return:
""" """
# 创建 Border 对象并设置边框样式 # 创建 Border 对象并设置边框样式
border = Border( border = Border(
@ -207,6 +226,7 @@ class ExcelManage:
def __auto_width(self): def __auto_width(self):
""" """
设置自适应列宽 设置自适应列宽
:return:
""" """
# 设置一个字典用于保存列宽数据 # 设置一个字典用于保存列宽数据
dims = {} dims = {}
@ -226,6 +246,7 @@ class ExcelManage:
def close(self): def close(self):
""" """
关闭文件 关闭文件
:return:
""" """
self.wb.close() self.wb.close()

View File

@ -21,7 +21,7 @@ class FieldType(Enum):
str = "str" str = "str"
class ImportManage: class ImportManage(ExcelManage):
""" """
数据导入管理 数据导入管理
@ -38,9 +38,9 @@ class ImportManage:
file_type = ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] file_type = ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]
def __init__(self, file: UploadFile, headers: List[dict]): def __init__(self, file: UploadFile, headers: List[dict]):
super().__init__()
self.__table_data = None self.__table_data = None
self.__table_header = None self.__table_header = None
self.__filename = None
self.errors = [] self.errors = []
self.success = [] self.success = []
self.success_number = 0 self.success_number = 0
@ -53,27 +53,40 @@ class ImportManage:
def check_file_type(cls, file: UploadFile) -> None: def check_file_type(cls, file: UploadFile) -> None:
""" """
验证文件类型 验证文件类型
:param file: 上传文件
:return:
""" """
if file.content_type not in cls.file_type: if file.content_type not in cls.file_type:
raise CustomException(msg="文件类型必须为xlsx类型", code=status.HTTP_ERROR) raise CustomException(msg="文件类型必须为xlsx类型", code=status.HTTP_ERROR)
async def get_table_data(self, header_row: int = 1, data_row: int = 2) -> None: async def get_table_data(
self,
file_path: str = None,
sheet_name: str = None,
header_row: int = 1,
data_row: int = 2
) -> None:
""" """
获取表格数据与表头 获取表格数据与表头
:param file_path:
:param sheet_name:
:param header_row: 表头在第几行 :param header_row: 表头在第几行
:param data_row: 数据开始行 :param data_row: 数据开始行
:return:
""" """
self.__filename = await FileManage.save_tmp_file(self.file) if file_path:
es = ExcelManage() __filename = file_path
es.open_sheet(file=self.__filename, read_only=True) else:
self.__table_header = es.get_header(header_row, len(self.headers), asterisk=True) __filename = await FileManage.async_save_temp_file(self.file)
self.__table_data = es.readlines(min_row=data_row, max_col=len(self.headers)) self.open_sheet(sheet_name=sheet_name, file=__filename, read_only=True)
es.close() self.__table_header = self.get_header(header_row, len(self.headers), asterisk=True)
self.__table_data = self.readlines(min_row=data_row, max_col=len(self.headers))
self.close()
def check_table_data(self) -> None: def check_table_data(self) -> None:
""" """
检查表格数据 检查表格数据
:return:
""" """
for row in self.__table_data: for row in self.__table_data:
result = self.__check_row(row) result = self.__check_row(row)
@ -93,6 +106,8 @@ class ImportManage:
1. 检查是否为必填项 1. 检查是否为必填项
2. 检查是否为选项列表 2. 检查是否为选项列表
3. 检查是否符合规则 3. 检查是否符合规则
:param row: 数据行
:return:
""" """
data = {} data = {}
for index, cell in enumerate(row): for index, cell in enumerate(row):
@ -128,18 +143,22 @@ class ImportManage:
def generate_error_url(self) -> str: def generate_error_url(self) -> str:
""" """
成功错误数据的文件链接 成功错误数据的文件链接
:return:
""" """
if self.error_number <= 0: if self.error_number <= 0:
return "" return ""
em = WriteXlsx(sheet_name="用户导入失败数据") em = WriteXlsx()
em.create_excel(sheet_name="用户导入失败数据", save_static=True)
em.generate_template(self.headers, max_row=self.error_number) em.generate_template(self.headers, max_row=self.error_number)
em.write_list(self.errors) em.write_list(self.errors)
em.close() em.close()
return em.file_url return em.get_file_url()
def add_error_data(self, row: dict) -> None: def add_error_data(self, row: dict) -> None:
""" """
增加错误数据 增加错误数据
:param row: 错误的数据行
:return:
""" """
self.errors.append(row) self.errors.append(row)
self.error_number += 1 self.error_number += 1

View File

@ -10,12 +10,13 @@
XlsxWriterhttps://github.com/jmcnamara/XlsxWriter XlsxWriterhttps://github.com/jmcnamara/XlsxWriter
博客教程https://blog.csdn.net/lemonbit/article/details/113855768 博客教程https://blog.csdn.net/lemonbit/article/details/113855768
""" """
import datetime
import hashlib
import os.path import os.path
import random
import xlsxwriter import xlsxwriter
from application.settings import TEMP_DIR, TEMP_URL from typing import List
from application.settings import STATIC_ROOT, STATIC_URL
from utils.file.file_base import FileBase
from utils.tools import generate_string
class WriteXlsx: class WriteXlsx:
@ -23,35 +24,38 @@ class WriteXlsx:
写入xlsx文件 写入xlsx文件
""" """
def __init__(self, filename: str = None, sheet_name: str = "sheet1"): def __init__(self):
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") self.file_path = None
file_dir = os.path.join(TEMP_DIR, date) self.sheet_name = None
if not os.path.exists(file_dir):
os.mkdir(file_dir)
if not filename:
filename = hashlib.md5(str(random.random()).encode()).hexdigest()
self.file_url = f"{TEMP_URL}/{date}/{filename}.xlsx"
self.filename = os.path.join(TEMP_DIR, date, filename + ".xlsx")
self.sheet_name = sheet_name
self.wb = None self.wb = None
self.sheet = None self.sheet = None
def create_excel(self) -> None: def create_excel(self, file_path: str = None, sheet_name: str = "sheet1", save_static: bool = False) -> None:
""" """
创建 excel 文件 创建 excel 文件
:param file_path: 文件绝对路径或相对路径
:param sheet_name: sheet 名称
:param save_static: 保存方式 static 静态资源或者临时文件
:return:
""" """
if not file_path or (file_path and not os.path.abspath(file_path)):
if save_static:
self.file_path = FileBase.generate_static_file_path("write_xlsx", file_path)
else:
self.file_path = FileBase.generate_temp_file_path(f"{generate_string(8)}.xlsx")
else:
self.file_path = file_path
self.sheet_name = sheet_name
self.wb = xlsxwriter.Workbook(self.file_path)
self.sheet = self.wb.add_worksheet(sheet_name)
self.wb = xlsxwriter.Workbook(self.filename) def generate_template(self, headers: List[dict] = None, max_row: int = 101) -> None:
self.sheet = self.wb.add_worksheet(self.sheet_name)
def generate_template(self, headers: list[dict] = None, max_row: int = 101) -> None:
""" """
生成模板 生成模板
:param headers: 表头 :param headers: 表头
:param max_row: 设置下拉列表至最大行 :param max_row: 设置下拉列表至最大行
:return: 文件链接地址 :return: 文件链接地址
""" """
self.create_excel()
max_row = max_row + 100 max_row = max_row + 100
for index, field in enumerate(headers): for index, field in enumerate(headers):
font_format = { font_format = {
@ -95,7 +99,6 @@ class WriteXlsx:
# 设置列宽 # 设置列宽
self.sheet.set_column(0, len(rows[0]) - 1, 22) self.sheet.set_column(0, len(rows[0]) - 1, 22)
# 设置行高 # 设置行高
# self.sheet.set_row(row_number, 25)
self.sheet.set_default_row(25) self.sheet.set_default_row(25)
def close(self) -> None: def close(self) -> None:
@ -103,3 +106,19 @@ class WriteXlsx:
关闭文件 关闭文件
""" """
self.wb.close() self.wb.close()
def get_file_url(self) -> str:
"""
获取访问文件的 url
:return:
"""
if not self.file_path:
raise ValueError("还未创建文件,请先创建 excel 文件!")
assert isinstance(self.file_path, str)
if self.file_path.startswith(STATIC_ROOT):
return self.file_path.replace(STATIC_ROOT, STATIC_URL)
else:
print("write_xlsx 生成文件:", self.file_path)
raise ValueError("生成文件为临时文件,无法访问!")

View File

@ -8,10 +8,14 @@
import datetime import datetime
import os import os
import uuid from pathlib import Path
from aiopathlib import AsyncPath
from fastapi import UploadFile from fastapi import UploadFile
from application.settings import TEMP_DIR
from core.exception import CustomException from core.exception import CustomException
from utils import status from utils import status
from utils.tools import generate_string
class FileBase: class FileBase:
@ -22,18 +26,66 @@ class FileBase:
ALL_ACCEPT = [*IMAGE_ACCEPT, *VIDEO_ACCEPT, *AUDIO_ACCEPT] ALL_ACCEPT = [*IMAGE_ACCEPT, *VIDEO_ACCEPT, *AUDIO_ACCEPT]
@classmethod @classmethod
def generate_path(cls, path: str, filename): def generate_static_file_path(cls, path: str, filename: str) -> str:
""" """
生成文件路径 生成文件路径
:param path: static 指定目录类别
:param filename: 文件名称
:return:
""" """
path = path.replace("\\", "/")
if path[0] == "/": if path[0] == "/":
path = path[1:] path = path[1:]
if path[-1] == "/": if path[-1] == "/":
path = path[:-1] path = path[:-1]
today = str(int((datetime.datetime.now().replace(hour=0, minute=0, second=0)).timestamp())) today = str(int((datetime.datetime.now().replace(hour=0, minute=0, second=0)).timestamp()))
_filename = str(int(datetime.datetime.now().timestamp())) + str(uuid.uuid4())[:8] _filename = str(int(datetime.datetime.now().timestamp())) + str(generate_string(8))
return f"{path}/{today}/{_filename}{os.path.splitext(filename)[-1]}" return f"{path}/{today}/{_filename}{os.path.splitext(filename)[-1]}"
@classmethod
def generate_temp_file_path(cls, filename: str) -> str:
"""
生成临时文件路径
:param filename: 文件名称
:return:
"""
return f"{cls.generate_temp_dir_path()}/{filename}"
@classmethod
def generate_temp_dir_path(cls) -> str:
"""
生成临时目录路径
:return:
"""
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = Path(TEMP_DIR) / date
path = file_dir / (generate_string(4) + str(int(datetime.datetime.now().timestamp())))
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
return str(path).replace("\\", "/")
@classmethod
async def async_generate_temp_file_path(cls, filename: str) -> str:
"""
生成临时文件路径
:param filename: 文件名称
:return:
"""
return f"{await cls.async_generate_temp_dir_path()}/{filename}"
@classmethod
async def async_generate_temp_dir_path(cls) -> str:
"""
生成临时目录路径
:return:
"""
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = AsyncPath(TEMP_DIR) / date
path = file_dir / (generate_string(4) + str(int(datetime.datetime.now().timestamp())))
if not await path.exists():
await path.mkdir(parents=True, exist_ok=True)
return str(path).replace("\\", "/")
@classmethod @classmethod
async def validate_file(cls, file: UploadFile, max_size: int = None, mime_types: list = None) -> bool: async def validate_file(cls, file: UploadFile, max_size: int = None, mime_types: list = None) -> bool:
""" """
@ -52,18 +104,3 @@ class FileBase:
if file.content_type not in mime_types: if file.content_type not in mime_types:
raise CustomException(f"上传文件格式错误,只支持 {'/'.join(mime_types)} 格式!", status.HTTP_ERROR) raise CustomException(f"上传文件格式错误,只支持 {'/'.join(mime_types)} 格式!", status.HTTP_ERROR)
return True return True
@classmethod
def get_file_type(cls, content_type: str) -> str | None:
"""
获取文件类型
0: 图片
1视频
"""
if content_type in cls.IMAGE_ACCEPT:
return "0"
elif content_type in cls.VIDEO_ACCEPT:
return "1"
else:
return None

View File

@ -4,11 +4,13 @@
# @File : file_manage.py # @File : file_manage.py
# @IDE : PyCharm # @IDE : PyCharm
# @desc : 保存图片到本地 # @desc : 保存图片到本地
import asyncio import asyncio
import datetime import io
import os import os
import shutil import shutil
from application.settings import TEMP_DIR, STATIC_ROOT, BASE_DIR, STATIC_URL, STATIC_DIR import zipfile
from application.settings import STATIC_ROOT, BASE_DIR, STATIC_URL
from fastapi import UploadFile from fastapi import UploadFile
import sys import sys
from pathlib import Path from pathlib import Path
@ -24,21 +26,46 @@ class FileManage(FileBase):
""" """
def __init__(self, file: UploadFile, path: str): def __init__(self, file: UploadFile, path: str):
self.path = self.generate_path(path, file.filename) self.path = self.generate_static_file_path(path, file.filename)
self.file = file self.file = file
async def save_image_local(self, accept: list = None) -> dict: async def save_image_local(self, accept: list = None) -> dict:
""" """
保存图片文件到本地 保存图片文件到本地
:param accept:
:return:
""" """
if accept is None: if accept is None:
accept = self.IMAGE_ACCEPT accept = self.IMAGE_ACCEPT
await self.validate_file(self.file, max_size=5, mime_types=accept) await self.validate_file(self.file, max_size=5, mime_types=accept)
return await self.save_local() return await self.async_save_local()
async def save_local(self) -> dict: async def save_audio_local(self, accept: list = None) -> dict:
"""
保存音频文件到本地
:param accept:
:return:
"""
if accept is None:
accept = self.AUDIO_ACCEPT
await self.validate_file(self.file, max_size=50, mime_types=accept)
return await self.async_save_local()
async def save_video_local(self, accept: list = None) -> dict:
"""
保存视频文件到本地
:param accept:
:return:
"""
if accept is None:
accept = self.VIDEO_ACCEPT
await self.validate_file(self.file, max_size=100, mime_types=accept)
return await self.async_save_local()
async def async_save_local(self) -> dict:
""" """
保存文件到本地 保存文件到本地
:return:
""" """
path = self.path path = self.path
if sys.platform == "win32": if sys.platform == "win32":
@ -48,70 +75,72 @@ class FileManage(FileBase):
await save_path.parent.mkdir(parents=True, exist_ok=True) await save_path.parent.mkdir(parents=True, exist_ok=True)
await save_path.write_bytes(await self.file.read()) await save_path.write_bytes(await self.file.read())
return { return {
"local_path": f"{STATIC_DIR}/{self.path}", "local_path": f"{STATIC_ROOT}/{self.path}",
"remote_path": f"{STATIC_URL}/{self.path}" "remote_path": f"{STATIC_URL}/{self.path}"
} }
@staticmethod @classmethod
async def save_tmp_file(file: UploadFile) -> str: async def async_save_temp_file(cls, file: UploadFile) -> str:
""" """
保存临时文件 保存临时文件
:param file:
:return:
""" """
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d") temp_file_path = await cls.async_generate_temp_file_path(file.filename)
file_dir = AsyncPath(TEMP_DIR) / date await AsyncPath(temp_file_path).write_bytes(await file.read())
if not await file_dir.exists(): return temp_file_path
await file_dir.mkdir(parents=True, exist_ok=True)
filename = file_dir / (str(int(datetime.datetime.now().timestamp())) + file.filename) @classmethod
await filename.write_bytes(await file.read()) async def unzip(cls, file: UploadFile, dir_path: str) -> str:
return str(filename) """
解压 zip 压缩包
:param file:
:param dir_path: 解压路径
:return:
"""
if file.content_type != "application/x-zip-compressed":
raise CustomException("上传文件类型错误,必须是 zip 压缩包格式!")
# 读取上传的文件内容
contents = await file.read()
# 将文件内容转换为字节流
zip_stream = io.BytesIO(contents)
# 使用zipfile库解压字节流
with zipfile.ZipFile(zip_stream, "r") as zip_ref:
zip_ref.extractall(dir_path)
return dir_path
@staticmethod @staticmethod
def copy(src: str, dst: str) -> None: async def async_copy_file(src: str, dst: str) -> None:
"""
复制文件
根目录为项目根目录传过来的文件路径均为相对路径
:param src: 原始文件
:param dst: 目标路径绝对路径
"""
if src[0] == "/":
src = src.lstrip("/")
if sys.platform == "win32":
src = src.replace("/", "\\")
dst = dst.replace("/", "\\")
src = Path(BASE_DIR) / src
dst = Path(dst)
if not src.exists():
raise CustomException("源文件不存在!")
if not dst.parent.exists():
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(src, dst)
@staticmethod
async def async_copy(src: str, dst: str) -> None:
""" """
异步复制文件 异步复制文件
根目录为项目根目录传过来的文件路径均为相对路径 根目录为项目根目录传过来的文件路径均为相对路径
:param src: 原始文件 :param src: 原始文件
:param dst: 目标路径绝对路径 :param dst: 目标路径绝对路径
""" """
if src[0] == "/": if src[0] == "/":
src = src.lstrip("/") src = src.lstrip("/")
if sys.platform == "win32":
src = src.replace("/", "\\")
dst = dst.replace("/", "\\")
src = AsyncPath(BASE_DIR) / src src = AsyncPath(BASE_DIR) / src
if not await src.exists(): if not await src.exists():
raise CustomException("源文件不存在!") raise CustomException(f"{src} 源文件不存在!")
dst = AsyncPath(dst) dst = AsyncPath(dst)
if not await dst.parent.exists(): if not await dst.parent.exists():
await dst.parent.mkdir(parents=True, exist_ok=True) await dst.parent.mkdir(parents=True, exist_ok=True)
await aioshutil.copyfile(src, dst) await aioshutil.copyfile(src, dst)
@staticmethod
async def async_copy_dir(src: str, dst: str, dirs_exist_ok: bool = True) -> None:
"""
复制目录
:param src: 源目录
:param dst: 目标目录
:param dirs_exist_ok: 是否覆盖
"""
if not os.path.exists(dst):
raise CustomException("目标目录不存在!")
await aioshutil.copytree(src, dst, dirs_exist_ok=dirs_exist_ok)
if __name__ == '__main__': if __name__ == '__main__':
_src = r"D:\programming\ktianc\project\kinit-pro\kinit-api\static\system\favicon.ico" _src = r"E:\ktianc\linfeng\project\leadership\temp\20231212\3nCx1702349951\template"
_dst = r"D:\programming\ktianc\project\kinit-pro\kinit-api\static\system\2022-12-07\16703958210ab33912.ico" _dst = r"E:\ktianc\linfeng\project\leadership\static\template"
asyncio.run(FileManage.async_copy(_src, _dst)) asyncio.run(FileManage.async_copy_dir(_src, _dst))
# FileManage.copy(_src, _dst)

View File

@ -5,7 +5,9 @@
# @File : tools.py # @File : tools.py
# @IDE : PyCharm # @IDE : PyCharm
# @desc : 工具类 # @desc : 工具类
from asyncio.exceptions import TimeoutError
from win32com.client import gencache
import comtypes.client
import datetime import datetime
import random import random
import re import re
@ -103,6 +105,42 @@ async def import_modules_async(modules: list, desc: str, **kwargs):
logger.error(f"ModuleNotFoundError导入{desc}失败,未找到该模块下的方法:{module}") logger.error(f"ModuleNotFoundError导入{desc}失败,未找到该模块下的方法:{module}")
def ppt_to_pdf_1(ppt_path: str, pdf_path: str):
"""
ppt pdf会弹出 office 软件
:param ppt_path:
:param pdf_path:
:return:
"""
# 创建PDF
powerpoint = comtypes.client.CreateObject("Powerpoint.Application")
powerpoint.Visible = 1
slide = powerpoint.Presentations.Open(ppt_path)
# 保存PDF
slide.SaveAs(pdf_path, 32)
slide.Close()
# 退出 office 软件
powerpoint.Quit()
def ppt_to_pdf_2(ppt_path: str, pdf_path: str):
"""
完美办法PPT PDF
:param ppt_path:
:param pdf_path:
:return:
"""
p = gencache.EnsureDispatch("PowerPoint.Application")
try:
ppt = p.Presentations.Open(ppt_path, False, False, False)
ppt.ExportAsFixedFormat(pdf_path, 2, PrintRange=None)
ppt.Close()
p.Quit()
except Exception as e:
print(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e)
logger.info(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e)
if __name__ == '__main__': if __name__ == '__main__':
# print(generate_invitation_code()) # print(generate_invitation_code())
# print(int(datetime.datetime.now().timestamp())) # print(int(datetime.datetime.now().timestamp()))