代码拉取完成,页面将自动刷新
同步操作将从 mldong/mldong-python 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import glob
import importlib
import json
import os
from flask import Blueprint, Flask, jsonify, request
from flask_migrate import Migrate
from config import getConfig
from controllers import R
from enums.load_enum import LoadEnum
from mauth import HasPerm
from mauth.login_user_holder import LoginUserHolder
from mauth.mtoken import TokenStrategyFactory, redis_client
from mexception import AssertTool, ExceptionConfig, GlobalErrorEnum
from mlogging import LoggingConfig
from mlogging.request_log import RequestLog
from models import db
from models.dict import Dict
from models.user import User
from packages.engine import FlowEngineConfig
from services import LowCodeServiceUtil
from datetime import datetime
from werkzeug.middleware.proxy_fix import ProxyFix
from services.dict_service import DictService
from services.user_service import UserService
from tools import ant_matcher, generate_id
from validators import BasePageForm
app = Flask(__name__)
# 应用 ProxyFix 中间件
# x_for=1 表示信任代理服务器的 X-Forwarded-For 头中的第一个 IP 地址
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1)
# 获取controllers目录下所有的.py文件路径
controllers_dir = "controllers"
controller_files = glob.glob(os.path.join(controllers_dir, "*.py"))
# 过滤掉__init__.py文件
controller_files = [f for f in controller_files if "__init__" not in f]
# 动态导入每个控制器模块并注册蓝图
for file_path in controller_files:
module_name = os.path.splitext(os.path.basename(file_path))[0] # 去掉'.py'后缀
blueprint_name = module_name.replace("_controller", "")
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 检查模块是否有对应的blueprint属性,如果有,则注册它
if hasattr(module, blueprint_name) and isinstance(
getattr(module, blueprint_name), Blueprint
):
app.register_blueprint(getattr(module, blueprint_name))
# 从配置对象中加载
app.config.from_object(getConfig())
# 初始化db
db.init_app(app)
# 初始化redis
redis_client.init_app(app)
app.before_request(LoginUserHolder.before_request)
# 自定义日期转换器,格式化日期
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
try:
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S") # 自定义日期时间格式
iterable = iter(obj)
except TypeError:
pass
else:
return list(iterable)
return json.JSONEncoder.default(self, obj)
app.json_encoder = CustomJSONEncoder
# 配置字典
LoadEnum(app)
# 配置日志
LoggingConfig(app)
# 配置请求日志
RequestLog(app)
# 配置异常处理
ExceptionConfig(app)
# 配置工作流引擎----start
def check_permission(access, mode=None):
if not TokenStrategyFactory.check_token(access, mode=mode):
AssertTool.raise_biz(GlobalErrorEnum.GL99990403)
dictService = DictService(model=Dict)
userService = UserService(model=User)
def dict_api(dictType):
return dictService.get_by_dict_type(dictType)
def user_api(userId):
if isinstance(userId, dict):
return userService.page(BasePageForm(data=userId))
return userService.get_wf_user_by_id(userId=userId)
FlowEngineConfig(
app,
db=db,
func_generate_id=generate_id,
func_get_current_user_id=LoginUserHolder.get_user_id,
func_check_permission=check_permission,
func_dict_api=dict_api,
func_user_api=user_api
)
# 配置工作流引擎----end
# 执行数据库迁移相关操作。flask db指令不能直接使用,需要获取Migrate实例,这里注册Migrate实例
migrate = Migrate(app, db)
# 配置代码生成器-仅开发环境配置
if app.config.get("ENV") == "dev":
from generator import CodeGenerator
CodeGenerator(app)
# 低代码-通用下拉接口
@app.route("/<moduleName>/<tableName>/select", methods=["POST"])
@HasPerm(name="通过下拉")
def lowcode_select(moduleName, tableName):
return R.data(LowCodeServiceUtil.select(moduleName=moduleName, tableName=tableName))
# 拦截器-demo环境拦截
if app.config.get("ENV") == "demo":
@app.before_request
def check_blacklist():
path = request.path
BLACK_LIST = app.config.get("BLACK_LIST")
if any(ant_matcher(path, blacklisted_path) for blacklisted_path in BLACK_LIST):
response = jsonify(
{
"code": 99990406,
"msg": "您没有资源访问权限,请联系管理员!",
"data": None,
}
)
return response
if __name__ == "__main__":
app.run(
host="0.0.0.0",
port=app.config.get("PORT", 5000),
debug=app.config.get("DEBUG", True),
)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。