1 Star 0 Fork 0

dingqiao/Flask_CaseCreate

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
app.py 37.75 KB
一键复制 编辑 原始数据 按行查看 历史
dingqiao 提交于 2025-01-10 17:33 . 优化生成逻辑
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
# -*- coding: UTF-8 -*-
import importlib.metadata
import json
import os
import re
import time
import traceback
from datetime import datetime
import pandas as pd
from flask import Flask, request, render_template, send_file, jsonify, redirect, url_for
from FunctionalCases import functional_cases, functional_cases_api, functional_cases_img
from api.GPT4 import GPT4
from blue import dataAnalysis, pdfUtils
from blue import demo
from commons.Basex64 import BSx64
from commons.GET_IP import GETIP
from commons.confDemo import CONFIG
from commons.data_cleaning import DataCleaning
from commons.excel import ExcelTools
from commons.regular_expression import RegularExpression
from commons.timeDemo import TIME
from commons.typewriter import Login
from commons.ulid_uuid import MakeULID
from commons.zip import ZIP
from config import log
from demo.BatchCreate import BatchCreate
from demo.Data_analysis import DataAnalysis
from demo.db import get_all, uid_db
app = Flask(__name__)
# 获取 Flask 版本号
flask_version = importlib.metadata.version("flask")
# 根据 Flask 版本设置 ensure_ascii
if flask_version >= "2.3.0":
app.json.ensure_ascii = False
else:
app.config['JSON_AS_ASCII'] = False
"""FLask==2.2.5及以下版本解决"""
# app.config['JSON_AS_ASCII'] = False # 确保中文字符能够正确显示
"""FLask==2.3.0及以上版本解决"""
# app.json.ensure_ascii = False
"""
注册蓝图
"""
app.register_blueprint(demo.B)
app.register_blueprint(dataAnalysis.DataAnalysis)
# app.register_blueprint(pdfUtils.FileGenerator)
@app.errorhandler(Exception)
# 全局异常处理器-仅用来记录日志
def error_handler_all(e):
err_msg = {"status": 500, "msg": str(e) + '\n下面是详细的错误信息:' + traceback.format_exc(), "success": False}
log.error(json.dumps(err_msg, ensure_ascii=False))
"""
拦截器部分
"""
@app.before_request
def before_req():
"""
在请求之前执行的函数。
Returns:
如果请求路径不在排除列表中且未通过身份验证,则返回登录页面。
否则,无返回值。
"""
# 排除特定的路径
# if request.path in ["/", "/submit"]:
# if not request.remote_addr.startswith("10.30"):
# return render_template("error_template.html", error_message={"error": "您无权访问此页面!!!"})
# else:
# pass
# else:
# pass
pass
# 获取客户端的 IP
ip = request.remote_addr
# print(request.headers)
# 构建键值以获取键名
if request.path not in ["/chage_password", "/login", "/messages"] and not (request.path.startswith("/static")):
value_redis = Login.login_get_info(ip=ip)
if value_redis is None:
return render_template("login.html")
if value_redis.get("token") == "SUPCON_5T":
pass
else:
return render_template("login.html")
else:
# print(request.path)
pass
"""登录的所有逻辑"""
@app.route("/login", methods=["POST", 'GET'])
def login():
ip = request.remote_addr
if request.method == "GET":
return render_template("login.html")
else:
data = request.get_json(force=True, silent=True)
username = data.get("username").strip()
password = data.get("password")
# 从数据库获取密码
try:
password_db = get_all(sql="SELECT password FROM user WHERE username='%s';" % username)[0][0]
except IndexError as e:
return jsonify({"status": 401, "msg": "用户名或密码错误", "success": False})
if password == password_db:
# 往redis中插入鉴权信息
Login.login_set_info(ip=ip, username=username)
return jsonify({"status": 200, "msg": "登录成功", "success": True})
else:
return jsonify({"status": 401, "msg": "用户名或密码错误", "success": False})
# 修改密码
@app.route("/chage_password", methods=["POST", 'GET'])
def update_password():
ip = request.remote_addr
if request.method == "GET":
return render_template("change_pwd.html")
else:
data = request.get_json(force=True, silent=True)
username = data.get("username").strip()
old_password = data.get("old_password")
new_password = data.get("new_password")
# 从数据库获取旧密码
try:
password_db = get_all(sql="SELECT password FROM user WHERE username='%s';" % username)[0][0]
except IndexError as e:
return jsonify({"status": 401, "msg": "用户名或密码错误", "success": False})
if old_password == password_db:
# 更新数据库
uid_db(sql="UPDATE user SET password='%s' WHERE username='%s';" % (new_password, username))
# 删除redis中的鉴权信息
Login.login_del(ip=ip)
return jsonify({"status": 200, "msg": "修改成功", "success": True})
else:
return jsonify({"status": 401, "msg": "原密码错误", "success": False})
# 假设的 A 方法,它接收表单数据并返回处理后的结果
def A(input1, input2, input3, input4, input5, ip, tag_uild):
# 实现生成的方法逻辑
res = functional_cases(case_requirements=input1,
module=input2, responsible_person=input3,
excel_name=input4, sheet_name=input5, remote_ip=ip, tag_uild=tag_uild)
return "文件生成情况:" + res
def A_img(input1, input2, input3, input4, input5, ip, image_dicts_list):
# 实现生成的方法逻辑
res = functional_cases_img(case_requirements=input1,
module=input2, responsible_person=input3,
excel_name=input4, sheet_name=input5, remote_ip=ip, image_dicts_list=image_dicts_list)
return "文件生成情况:" + res
def A_api(api_name, uri, req_header, req_body, response_body, responsible_person, req_method, response_code, ip):
res = functional_cases_api(api_name=api_name, req_method=req_method, uri=uri, req_header=req_header,
response_code=response_code, response_body=response_body,
responsible_person=responsible_person, remote_ip=ip, req_body=req_body)
return "文件生成情况:" + res
"""功能测试用例部分"""
# 查询累计生成数量-功能
def case_num():
sql = "SELECT sum(numbers) FROM case_num;"
res = get_all(sql=sql)[0][0]
return res
# 查询累计生成数量-接口
def case_num_api():
sql = "SELECT sum(numbers) FROM case_num_api;"
res = get_all(sql=sql)[0][0]
return res
# def B(excel_name):
# file_path = r"D:\用例存储\{}.xlsx".format(excel_name)
# if os.path.isfile(file_path):
# return send_file(file_path, as_attachment=True, download_name="{}.xlsx".format(excel_name))
# else:
# print("文件不存在")
"""上传文件"""
UPLOAD_FOLDER = r'D:\用例存储\uploads' # 请替换为实际的上传文件夹路径
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
"""统计报告下载部分--注意区分"""
@app.route('/MS_zen_tao/download', methods=['POST'])
def download_action():
action = request.form.get('action') # 获取action的值来区分按钮执行逻辑
if action == 'MS本周总结':
filename = TIME.ms_week_now() # 示例文件名
file_path = os.path.join(r"D:\a", filename + ".xlsx") # 示例文件路径
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
elif action == 'MS上周总结':
filename = TIME.ms_week_now_before() # 示例文件名
file_path = os.path.join(r"D:\a", filename + ".xlsx") # 示例文件路径
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
elif action == '禅道本周总结':
filename = TIME.zen_tao_week()
file_path = os.path.join(r"D:\aa", filename + ".xlsx")
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
elif action == '禅道上周总结':
filename = TIME.zen_tao_week_before()
file_path = os.path.join(r"D:\aa", filename + ".xlsx")
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
elif action == 'MS上月总结':
filename = TIME.ms_month_before()
file_path = os.path.join(r"D:\a", filename + ".xlsx")
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
elif action == '禅道上月总结':
filename = TIME.zen_tao_month_before()
file_path = os.path.join(r"D:\aa", filename + ".xlsx")
if os.path.exists(file_path):
return send_file(file_path, download_name="{}.xlsx".format(filename), as_attachment=True)
else:
return "文件不存在"
# 工具箱界面
@app.route('/toolbox', methods=['GET', 'POST'])
def tools_test():
if request.method == 'POST':
action = request.form.get('action')
return send_file(r"D:\tools\{}.zip".format(action), as_attachment=True)
return render_template('test_tools.html')
# 初始化界面home
@app.route('/')
def home():
# 初始时没有结果,所以返回 None
return render_template('home.html', result=None, case_count=case_num(),
initial_value=Login.get_redis_respon(request.remote_addr))
@app.route('/clearPage', methods=['GET', 'POST'])
def clear_page():
# 重定向到/的url
return redirect(url_for('home'))
# 提交按钮后端
@app.route('/submit', methods=['GET', 'POST'])
def submit_form():
client_ip = request.remote_addr
if client_ip == "10.30.71.192":
return render_template('error_template.html', error_message=f"被禁止的IP:{client_ip}!!!")
if request.method == 'POST':
case_requirements = request.form.get('input1').replace('\r', '')
module = request.form.get('input2')
if not module.startswith("/"):
module = "/" + module
# if module == '1':
# return render_template('error_template.html', error_message="请填写正确的模块名称")
# 从redis获取责任人
responsible_person = Login.get_redis_respon(request.remote_addr)
if not responsible_person:
responsible_person = request.form.get('input3')
# # 校验这个字段是否是人名
# content_type = WordSegmentation.cut(text=responsible_person)
# bool_type = any("nr" in v for d in content_type for v in d.values())
# if not bool_type:
# return render_template('error_template.html',
# error_message=f"error: {responsible_person} 不是人名,请重新输入")
excel_name = request.form.get('input4')
sheet_name = request.form.get('input5')
# 数据清洗部分的实现逻辑
DataCleaning.clean_data(requirement_msg=case_requirements, ip=client_ip)
# 生成UILD用来实现数据清洗
tag_ulid = MakeULID.new()
# 将内容写入redis以用来实现重新编辑
Login.set_last_edit(client_ip, case_requirements, module, responsible_person, excel_name, sheet_name, tag_ulid)
# 获取文件名字然后进行后面的操作
files = request.files.getlist('file')
files = [file for file in files if file.filename != '']
# print(files)
# 判断是否上传了文件
if not files:
print("未上传文件,开始执行无图片逻辑~~~")
# 执行未上传文件时的操作
result = A(case_requirements, module, responsible_person, excel_name, sheet_name, ip=str(client_ip),
tag_uild=tag_ulid)
# 需求入库
time13 = TIME.timestamp13()
requirement_warehousing_sql = "INSERT INTO case_requirements (RequirementDescription,Module,Owner,casefilename,Sheetpagenaming,Imagefilename,create_time,update_time,tag_uild) VALUES ('{}','{}','{}','{}','{}','',{},{},'{}');".format(
case_requirements, module, responsible_person, excel_name, sheet_name, time13, time13, tag_ulid)
print(uid_db(sql=requirement_warehousing_sql), "\t需求插入")
# 提取文件名--D:\用例存储\登录.xlsx
try:
down_name = \
RegularExpression.regular_expression(text=result, patternexpression=r"D:\\用例存储\\(.*?).xlsx")[0]
except Exception as e:
print(e)
down_name = ''
return render_template('home.html', result=result.replace('\n', '<br>'), case_count=case_num(),
file_name=down_name, initial_value=Login.get_redis_respon(request.remote_addr))
else:
print("上传了图片文件,开始执行有图片逻辑~~~")
list_base_data = []
image_name_list = []
# 执行上传文件时的操作
# 处理多个/单个文件
for file in files:
if file:
# 保存图片文件并bs64转换
image_name_list.append(file.filename)
file_path = os.path.join(r'D:\用例存储\uploads', file.filename)
file.save(file_path)
# return 'File uploaded and renamed successfully', 200
resul = BSx64.bs64_conversion(source_dir=r'D:\用例存储\uploads', file_name=file.filename)
image_dicts = {
'image_url': {
# 'detail': 'auto',
'url': f'data:image/{resul[1]};base64,{resul[0]}',
},
'type': 'image_url',
}
list_base_data.append(image_dicts)
# 获取长度
image_count = len(list_base_data)
# 转换为JSON字符串
json_string = json.dumps(list_base_data, ensure_ascii=False)
# 解析json_string为Python对象(列表)
image_dicts_list = json.loads(json_string)
# sql插入需求信息
time13 = TIME.timestamp13()
# print(image_dicts_list)
# 0.编码 1.格式
"""
修改逻辑,仅从gpt4接口得到图片分析结果以辅助生成测试用例,得到图片描述走普通生成逻辑
case_requirements = f"这{image_count}个图片是产品的前端界面,请结合这{image_count}个UI原型图片生成功能测试用例!" + case_requirements
# 传了图片写入数据库-为重新生成准备
print(case_requirements)
result = A_img(case_requirements, module, responsible_person, excel_name, sheet_name, ip=str(client_ip),
image_dicts_list=image_dicts_list)
"""
# 得到图片分析结果
log.info("开始调用GPT4接口分析图片内容")
print("开始调用GPT4接口分析图片内容")
question_imgs = f"这{image_count}个图片是产品的前端UI,请详细分析一下前端页面的所有内容!!!"
image_analysis_result = GPT4.gpt4_flask_imgs(image_dicts_list=image_dicts_list, question=question_imgs)
for i in range(3):
if 'An error occurred' in image_analysis_result:
print(f"第{i + 1}次调用GPT4接口分析图片内容失败,响应:{image_analysis_result}")
log.error(f"第{i + 1}次调用GPT4接口分析图片内容失败,响应:{image_analysis_result}")
time.sleep(1)
image_analysis_result = GPT4.gpt4_flask_imgs(image_dicts_list=image_dicts_list,
question=question_imgs)
else:
log.info("图片内容分析完毕!!!")
print("图片内容分析完毕!!!")
break
if 'An error occurred' in image_analysis_result:
image_analysis_result = "无描述内容,忽略"
case_requirements = case_requirements + "上述是用例需求,下面是有关的前端页面的描述信息。" + image_analysis_result + "结合前端页面的描述信息与用例需求"
# 生成测试用例
result = A(case_requirements, module, responsible_person, excel_name, sheet_name, ip=str(client_ip),
tag_uild=tag_ulid)
# print(result)
# 需求入库
requirement_warehousing_sql = "INSERT INTO case_requirements (RequirementDescription,Module,Owner,casefilename,Sheetpagenaming,Imagefilename,create_time,update_time,tag_uild) VALUES ('{}','{}','{}','{}','{}','{}',{},{},'{}');".format(
case_requirements, module, responsible_person, excel_name, sheet_name,
str(image_name_list).replace("'", '"'), time13,
time13, tag_ulid)
# print(requirement_warehousing_sql)
print(uid_db(sql=requirement_warehousing_sql), "\t需求插入-IMG")
# 提取文件名--D:\用例存储\登录.xlsx
try:
down_name = \
RegularExpression.regular_expression(text=result, patternexpression=r"D:\\用例存储\\(.*?).xlsx")[0]
except Exception as e:
print(e)
down_name = ''
return render_template('home.html', result=result.replace('\n', '<br>'), case_count=case_num(),
file_name=down_name, initial_value=Login.get_redis_respon(request.remote_addr))
return render_template('home.html', result=None, case_count=case_num(),
initial_value=Login.get_redis_respon(request.remote_addr))
# 生成文件下载后端
@app.route('/download', methods=['POST'])
def download_file():
# 从POST请求中获取文件名
filename = request.form.get('fileNameInput')
action = request.form.get('action') # 获取action的值来区分按钮执行逻辑
if action == '下载':
# 确保文件存在,并防止目录遍历等安全问题
print("传入的下载文件名:", filename)
if filename == '':
return jsonify({"error": '下载文件名未填写,请填写后再点击下载'}), 404
file_path = r"D:\用例存储\{}.xlsx".format(filename)
if os.path.isfile(file_path):
return send_file(file_path, as_attachment=True, download_name="{}.xlsx".format(filename))
else:
return "File not found", 404
elif action == '预览':
# 以下是预览功能的实现逻辑
print("传入的预览文件名:", filename)
if filename == '':
return jsonify({"error": '下载文件名未填写,请填写后再点击预览'}), 404
file_path = r"D:\用例存储\{}.xlsx".format(filename) # 这个是要预览的文件的绝对路径
if not os.path.exists(file_path):
return f"{filename} File not found", 404
if filename != '':
# df = pd.read_excel(file_path)
# df = df.replace('\n', '', regex=True)
# # 使用fillna方法替换NaN值为''
# df = df.fillna('')
# return render_template('preview.html', tables=[df.to_html(classes='data', index=False)],
# titles=df.columns.values)
return redirect(url_for('preview_file', filename=filename))
return redirect(request.url)
def process_cell(cell):
if isinstance(cell, str):
# 去掉最开始的\n和结尾的\n
cell = cell.strip('\n')
# 将其他\n替换为<br>
cell = re.sub(r'\n', '<br>', cell)
return cell
# 预览部分后端
@app.route('/preview/<filename>')
def preview_file(filename):
file_path = r"D:\用例存储\{}.xlsx".format(filename)
df = pd.read_excel(file_path)
# 处理每个单元格
df = df.applymap(process_cell)
df = df.replace('\n', '<br>', regex=True)
df = df.replace('\t', ' ', regex=True)
# 使用fillna方法替换NaN值为''
df = df.fillna('')
return render_template('preview.html', tables=[df.to_html(classes='data', index=False, escape=False)],
titles=df.columns.values, filename=filename)
# 预览界面下载后端
@app.route('/download/<filename>')
def download_file_preview(filename):
file_path = r"D:\用例存储\{}.xlsx".format(filename)
print("传入的下载文件名:", filename)
return send_file(file_path, as_attachment=True, download_name="{}.xlsx".format(filename))
# 批量文件下载
@app.route('/downloadZip', methods=['POST'])
def download_file_zip():
"""
处理客户端的POST请求,用于下载特定客户端IP对应的测试用例的压缩包。
客户端IP将用于查找并打包相应的测试用例,然后提供给客户端下载。
:return: 压缩包文件或文件未找到的错误消息。
"""
# 获取请求的客户端IP地址
client_ip = request.remote_addr
# 根据客户端IP读取对应的配置文件,列出需要打包的文件列表
# 读取配置文件,将配置文件中的所有文件(文件名)复制压缩成一个压缩包,存入D:\用例存储\zip下,如果存在同名压缩包就替换成最新的
lis = CONFIG.read_csv(filename=r'D:\用例存储\conf\{}.csv'.format(client_ip))
# 使用列出的文件列表,将指定目录下的文件打包成ZIP压缩包
ZIP.zipDemo(source_dir='D:\用例存储', files_to_zip=lis, target_zip_path=r'D:\用例存储\zip\{}.zip'.format(client_ip))
# 构建打包后压缩包的文件路径
file_path = r"D:\用例存储\zip\{}.zip".format(client_ip)
# 检查压缩包文件是否存在
if os.path.isfile(file_path):
# 如果文件存在,返回压缩包文件供下载
return send_file(file_path, as_attachment=True, download_name="{}.zip".format(client_ip))
else:
# 如果文件不存在,返回404错误消息
return "File not found", 404
# 合并文件下载
@app.route('/downloadMerge', methods=['POST'])
def download_file_merge():
client_ip = request.remote_addr
# 读取配置文件,将配置文件中的所有文件(文件名)复制压缩成一个压缩包,存入D:\用例存储\zip下,如果存在同名压缩包就替换成最新的
lis = CONFIG.read_csv(filename=r'D:\用例存储\conf\{}.csv'.format(client_ip))
# 根据配置文件内容合并多个excel文件
file_path = ExcelTools.mergeexcel(file_list=lis,
output_file='D:\用例存储\合并文件夹\合并用例_{}.xlsx'.format(client_ip))
if os.path.isfile(file_path):
return send_file(file_path, as_attachment=True, download_name="合并用例_{}.xlsx".format(client_ip))
else:
return "File not found", 404
# 模板文件下载部分
@app.route('/downloadBatch', methods=['GET'])
def download_file_batch():
file_path = r"D:\用例存储\相关文档\{}".format('批量生成模板文件.xlsx')
if os.path.isfile(file_path):
return send_file(file_path, as_attachment=True, download_name="{}.xlsx".format("批量生成模板文件"))
else:
return "File not found", 404
# 说明书查看内容部分
@app.route('/explanation_document', methods=['GET'])
def explanation_document():
pass
return render_template('ExplanationDocument.html')
# 上传文件批量生成部分
@app.route('/upload', methods=['POST'])
def upload_file():
batchcreateflag = Login.get_batch_create_flag()
# 获取客户端的IP地址,用于唯一标识客户端上传的文件
# 获取请求的IP,根据这个新建一个CSV文件,存储生成的文件名字-唯一
client_ip = request.remote_addr
try:
"""
处理文件上传请求。
该函数接收客户端上传的文件,并将其保存到指定目录。同时,根据上传文件的内容,
批量生成相应的用例文件。
返回值:
- 如果请求中没有文件部分,返回一个包含错误信息的JSON对象,并设置HTTP状态码为400。
- 如果上传的文件没有文件名,返回一个包含错误信息的JSON对象,并设置HTTP状态码为400。
- 如果文件上传成功,返回一个包含成功信息和生成的用例文件数量的JSON对象。
"""
# 校验这个请求是否被已经被一个ip持有--获取redis键值对
if batchcreateflag is None:
# 设置键值对
Login.set_batch_create_flag(ip=str(client_ip))
# 指定存储配置文件的目录
# 指定目录新建一个CSV文件,自定义名字作为config文件读取
directory = r'D:\用例存储\conf'
# 根据客户端IP创建一个CSV文件,用于存储配置信息
CONFIG.mkdir_csv(directory=directory, filename=str(client_ip))
# 检查请求中是否包含文件部分
# 其他逻辑
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
# 获取上传的文件对象
file = request.files['file']
# 检查上传的文件是否没有文件名
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 如果文件对象存在,表示文件上传成功
if file:
# 获取上传文件的原始文件名
filename = file.filename
# 保存上传的文件到指定目录
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# 批量创建用例文件
# 读取上传的文件并按要求批量生成用例
file_path = r"D:\用例存储\uploads" + os.sep + filename
# 校验传入的表中表格命名列是否有重复名称情况
duplicate_name = ExcelTools.check_excel(excel_file_path=file_path, column_to_check="表格命名")
if "中存在重复的值" in duplicate_name:
print(duplicate_name)
return jsonify({'error': f'{duplicate_name},表格命名该列列名不允许重复。'}), 400
else:
print(filename, '上传成功')
# 调用批量创建用例的函数,传入文件路径和配置文件路径
numbers = BatchCreate.batch_create(file_path=file_path, sheet_name="模板",
csvfile=directory + os.sep + str(client_ip), ip=str(client_ip))
# 构造返回的JSON对象,包含成功信息和生成的用例文件数量
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return jsonify(
{'message': f'{now}@上传成功并成功生成了 {numbers} 个用例文件,请点击下载文件/批量下载进行下载'})
else:
return jsonify({'error': '当前存在其他用户批量生成用例,请稍后再试!!!'}), 429
except Exception as e:
print(e)
log.error(e)
finally:
batchcreateflag = Login.get_batch_create_flag() # 在并发情况下,这个变量可能引用为0被自动回收掉,所以需要重新获取,
# client_ip被函数方法持有,不会被重新回收,所以无需重新获取
if batchcreateflag is not None:
if batchcreateflag.get("RemoteIP") == str(client_ip): # 持有者才能主动删除
Login.delete_batch_create_flag()
log.info(f"{client_ip} 已完成功能用例批量生成,删除redis中的键值对")
"""
if 'file' not in request.files:
return 'No file part in the request', 400
file = request.files['file']
# 如果用户没有选择文件,浏览器也会提交一个空文件部分没有文件名
if file.filename == '':
return 'No selected file', 400
if file:
filename = secure_filename(file.filename) # 可以使用secure_filename来避免文件名冲突
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return 'File uploaded successfully', 200
"""
"""接口测试用例api部分"""
# 接口用例生成初始化界面
@app.route('/api')
def api_home():
# 初始时没有结果,所以返回 None
return render_template('api.html', result=None, case_count=case_num_api())
# 接口用例生成按钮
@app.route('/api/submit', methods=['GET', 'POST'])
def submit_form_api():
client_ip = request.remote_addr
if request.method == 'POST':
api_name = request.form.get('input6')
uri = request.form.get('input2')
req_header = request.form.get('input3')
req_body = request.form.get('input4')
response_body = request.form.get('input5')
responsible_person = Login.get_redis_respon(request.remote_addr)
if responsible_person is None:
responsible_person = request.form.get('input7')
# 可选框内容
req_method = request.form.get('request_method')
response_code = request.form.get('request_code')
# 调用 A_api 方法并获取结果
result = A_api(api_name=api_name, req_method=req_method, uri=uri, req_header=req_header,
response_code=response_code, response_body=response_body,
responsible_person=responsible_person, ip=client_ip, req_body=req_body)
# 提取文件名--D:\用例存储\登录.xlsx
try:
down_name = \
RegularExpression.regular_expression(text=result, patternexpression=r"D:\\用例存储\\(.*?).xlsx")[0]
except Exception as e:
print(e)
down_name = ''
return render_template('api.html', result=result.replace('\n', '<br>'), case_count=case_num_api(),
file_name=down_name)
return render_template('api.html', result=None, case_count=case_num_api())
# def submit_form_api():
# client_ip = request.remote_addr
# if request.method == 'POST':
# api_name = request.form.get('input6')
# uri = request.form.get('input2')
# req_header = request.form.get('input3')
# req_body = request.form.get('input4')
# response_body = request.form.get('input5')
# responsible_person = request.form.get('input7')
# # 可选框内容
# req_method = request.form.get('request_method')
# response_code = request.form.get('response_code')
# print(response_code)
# time.sleep(5)
# return render_template('api.html', result="22222", case_count=None,
# file_name="111")
# return render_template('api.html', result=None, case_count=None)
# 下载文件复用上面的即可
# 批量生成下载--模板要重新定义,上传生成要重新定义,合并和批量下载可复用
# 批量模板下载
# 接口批量模板生成用例
@app.route('/api/downloadBatch', methods=['GET'])
def download_file_batch_api():
file_path = r"D:\用例存储\相关文档\{}".format('批量生成模板文件_接口.xlsx')
if os.path.isfile(file_path):
return send_file(file_path, as_attachment=True, download_name="{}.xlsx".format("批量生成模板文件_接口"))
else:
return "File not found", 404
# 查看接口说明文档
@app.route('/explanation_document_api', methods=['GET'])
def explanation_document_api():
pass
return render_template('ExplanationDocument_api.html')
# 上传文件批量生成部分--接口
@app.route('/api/upload', methods=['POST'])
def upload_file_api():
batchcreateflag = Login.get_batch_create_flag()
# 获取客户端的IP地址,用于唯一标识客户端上传的文件
# 获取请求的IP,根据这个新建一个CSV文件,存储生成的文件名字-唯一
client_ip = request.remote_addr
try:
if batchcreateflag is None:
# 设置键值对
Login.set_batch_create_flag(ip=str(client_ip))
# 指定存储配置文件的目录
# 指定目录新建一个CSV文件,自定义名字作为config文件读取
directory = r'D:\用例存储\conf'
# 根据客户端IP创建一个CSV文件,用于存储配置信息
CONFIG.mkdir_csv(directory=directory, filename=str(client_ip))
# 检查请求中是否包含文件部分
# 其他逻辑
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
# 获取上传的文件对象
file = request.files['file']
# 检查上传的文件是否没有文件名
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 如果文件对象存在,表示文件上传成功
if file:
# 获取上传文件的原始文件名
filename = file.filename
# 保存上传的文件到指定目录
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# 批量创建用例文件
# 读取上传的文件并按要求批量生成用例
file_path = r"D:\用例存储\uploads" + os.sep + filename
print(filename, '上传成功')
# 调用批量创建用例的函数,传入文件路径和配置文件路径
numbers = BatchCreate.batch_create_api(file_path=file_path, sheet_name="模板",
csvfile=directory + os.sep + str(client_ip), ip=str(client_ip))
# 构造返回的JSON对象,包含成功信息和生成的用例文件数量
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return jsonify(
{'message': f'{now}@上传成功并成功生成了 {numbers} 个用例文件,请点击下载文件/批量下载进行下载'})
else:
return jsonify({'error': '当前存在其他用户批量生成用例,请稍后再试!!!'}), 429
except Exception as e:
print(e)
log.error(e)
finally:
batchcreateflag = Login.get_batch_create_flag()
if batchcreateflag is not None:
if batchcreateflag.get("RemoteIP") == str(client_ip):
Login.delete_batch_create_flag()
log.info(f"{client_ip} 已完成接口用例批量生成,删除redis中的键值对")
"""数据统计部分"""
@app.route('/data/num')
def index():
return render_template('Linechart.html')
@app.route('/data')
def data():
dates, test_cases, failed_cases = DataAnalysis.generate_data()
return jsonify({
'dates': dates,
'test_cases': test_cases,
'failed_cases': failed_cases
})
@app.route('/favicon.ico')
def index_2():
return jsonify({'error': 'Not Found'})
@app.route('/static/js/chart.umd.js.map')
def index_3():
return jsonify({'error': 'Not Found'})
# 柱状图--折线图示例-暂时没用到
@app.route('/data/num/app')
def index_1():
return render_template('Chart.html')
# 悬浮消息后端-轮询展示
@app.route('/messages')
def get_messages():
messages = [
"小技巧:可以在需求描述中增加至少生成x条用例来指定生成的数量。",
"重要更新1:优化用例提问逻辑为Prompt模板,让生成用例更准确。",
"重要更新2:修改表格内容为空的BUG,增加条数检测及自动重新生成。",
"更新 3: 点击右侧GIF图片后支持下载周报/月报。",
"更新 4: 点击重新编辑,可重新编辑上一次填入的需求。",
"bug修复 1: 修复上一步返回界面/重新编辑后出现GIF图片的BUG。"
]
return jsonify(messages)
if __name__ == '__main__':
"""
pyinstaller --onefile --hidden-import=itsdangerous --hidden-import=click --hidden-import=Werkzeug --add-data="templates/;templates/" --add-data="static/;static/" app.py
打包时请保持debug为False状态
"""
IP = GETIP.get_ip()
app.run(host=IP, port=8848, debug=True)
"""tornado启动"""
# # 将tornadoFlask应用转换为WSGI应用
# wsgi_app = WSGIContainer(app)
#
# # 创建一个Tornado HTTP服务器,并使用WSGI应用作为处理器
# http_server = HTTPServer(wsgi_app)
#
# # 在端口8888上监听HTTP请求
# http_server.listen(8848)
#
# # 启动IOLoop
# IOLoop.instance().start()
"""from waitress import serve 托管启动"""
# serve(app, host=IP, port=5000, threads=6, expose_tracebacks=True)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/dingqiao1638105266/flask_-case-create.git
[email protected]:dingqiao1638105266/flask_-case-create.git
dingqiao1638105266
flask_-case-create
Flask_CaseCreate
master

搜索帮助