kinit/kinit-api/utils/file_manage.py
2022-11-16 15:41:24 +08:00

97 lines
3.0 KiB
Python

# -*- coding: utf-8 -*-
# @version : 1.0
# @Creaet Time : 2021/12/5 8:45
# @File : file_manage.py
# @IDE : PyCharm
# @desc : 保存图片到本地
import datetime
import os
import shutil
from application.settings import TEMP_DIR, STATIC_ROOT, BASE_DIR
from fastapi import UploadFile
import sys
from core.exception import CustomException
from utils import status
import uuid
class FileManage:
"""
上传文件管理
"""
image_accept = ["image/png", "image/jpeg", "image/gif", "image/x-icon"]
def __init__(self, file: UploadFile, path: str):
if path[0] == "/":
path = path[1:]
if path[-1] == "/":
path = path[:-1]
full_date = datetime.datetime.now().date()
filename = str(int(datetime.datetime.now().timestamp())) + str(uuid.uuid4())[:8]
self.path = f"{path}/{full_date}/{filename}{os.path.splitext(file.filename)[-1]}"
self.file = file
async def save_image_local(self, accept: list = None):
"""
保存图片文件到本地
"""
if accept is None:
accept = self.image_accept
if self.file.content_type not in accept:
raise CustomException(f"上传图片必须是 {'/'.join(accept)} 格式!", status.HTTP_ERROR)
return await self.save_local()
async def save_local(self):
"""
保存文件到本地
"""
path = self.path
if sys.platform == "win32":
path = self.path.replace("/", "\\")
filename = os.path.join(STATIC_ROOT, path)
if not os.path.exists(os.path.dirname(filename)):
os.mkdir(os.path.dirname(filename))
with open(filename, "wb") as f:
f.write(await self.file.read())
return f"/static/{self.path}"
@staticmethod
async def save_tmp_file(file: UploadFile):
"""
保存临时文件
"""
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = os.path.join(TEMP_DIR, date)
if not os.path.exists(file_dir):
os.mkdir(file_dir)
filename = os.path.join(file_dir, str(int(datetime.datetime.now().timestamp())) + file.filename)
with open(filename, "wb") as f:
f.write(await file.read())
return filename
@staticmethod
def copy(src: str, dst: str):
"""
复制文件
根目录为项目根目录,传过来的文件路径均为相对路径
@param src: 原始文件
@param dst: 目标路径
"""
if src[0] == "/":
src = src.lstrip("/")
if sys.platform == "win32":
src = src.replace("/", "\\")
dst = dst.replace("/", "\\")
src = os.path.join(BASE_DIR, src)
dst = os.path.join(BASE_DIR, dst)
if not os.path.exists(os.path.dirname(dst)):
os.mkdir(os.path.dirname(dst))
shutil.copyfile(src, dst)
if __name__ == '__main__':
print()