代码拉取完成,页面将自动刷新
同步操作将从 Daz/auto-Api 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
"""
web 控制台
"""
import json
from flask import Flask, render_template, request
from flask import current_app, url_for, session, redirect
from flask_oauthlib.client import OAuth
from build_model import build
from common.config import OAUTH2_key, OAUTH2_secret, OAUTH2_URL, SECRET_KEY, WEB_CONSOLE_ADDRESS, WEB_CONSOLE_PORT
from common.decorate import need_login
from common.documents import APIModel, IgnoreApi, Question
from common.tools import get_model_struct
from solvers import solve_problem
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
oauth = OAuth(current_app)
gitlab = oauth.remote_app('gitlab',
consumer_key=OAUTH2_key,
consumer_secret=OAUTH2_secret,
base_url=OAUTH2_URL,
request_token_url=None,
request_token_params={'scope': 'read_user'},
access_token_url='/oauth/token',
authorize_url='/oauth/authorize', )
@app.route('/')
@need_login
def index():
urls = get_model_struct()
from common.documents import QuestionState
from common.documents import AskProxySetting
from common.documents import Ask
state = AskProxySetting.get_state()
return render_template('blank.html', urls=urls, que_state=QuestionState.objects(), state=state,
askCount=Ask.objects().count())
@app.route("/clean_model")
@need_login
def clean_model():
APIModel.drop_collection()
from common.documents import RawValue
RawValue.drop_collection()
return redirect(url_for("index"))
@app.route("/build_model")
@need_login
def build_model():
from common.state import build_state
# 开始构建模型
with build_state():
build()
return "true"
@app.route('/auto_solved_one', methods=['POST'])
@need_login
def auto_solved_one():
url = request.form['model_url']
solve_problem(url)
return "true"
@app.route("/auto_solve")
@need_login
def auto_solve():
solve_problem()
return "true"
@app.route("/update_by_answers")
@need_login
def update_doc():
for ans in Question.objects(solved=True):
ans.update_model()
return redirect(url_for("index"))
@app.route('/doc', methods=['POST'])
@need_login
def doc():
url = request.form['model_url']
model = APIModel.objects(url=url).first()
return render_template('DocModel.html', model=model)
@app.route('/set_ignore', methods=['POST'])
@need_login
def set_ignore():
url = request.form['model_url']
model = APIModel.objects(url=url).first()
if model:
ignore = IgnoreApi.objects(url=url).first()
if ignore:
ignore.delete()
else:
IgnoreApi(url=url).save()
return "True"
@app.route('/doc/<select>')
@need_login
def get_model_args(select):
url = request.args['model_url']
model = APIModel.objects(url=url).first()
if select == 'args':
return json.dumps(model.args.get_json(), ensure_ascii=False)
else:
return json.dumps(model.result.get_json(), ensure_ascii=False)
@app.route('/switch_proxy_state/<goal_state>')
@need_login
def switch_proxy_state(goal_state):
from common.documents import AskProxySetting
state = AskProxySetting.get_state()
state[goal_state] = not state[goal_state]
state.save()
return redirect(url_for('index'))
@app.route('/switch_que_state/<que_type>')
@need_login
def switch_que_state(que_type):
from common.documents import QuestionState
que_state = QuestionState.objects(que_type=que_type).first()
if que_state.auto_solve_available:
que_state.auto_solve_available = False
elif que_state.can_auto_solve():
que_state.auto_solve_available = True
que_state.save()
return redirect(url_for("index"))
@app.route("/remove_ans", methods=['POST'])
@need_login
def remove_ans():
que = request.form['que_key']
question = Question.objects(key=que).first()
if question.solved:
question.solved = False
question.result = ""
question.save()
return question.about
@app.route('/set_most_api_count', methods=['POST'])
@need_login
def set_most_api_count():
from common.documents import Ask
for ask in Ask.objects():
key = ask.make_key()
ask.delete()
ask.hash_code = key
ask.is_Available = False
ask.save()
if request.form['count']:
count = int(request.form['count'])
for model in APIModel.objects():
for ask in Ask.objects(url=model.url).order_by('-date')[:count]:
ask.is_Available = True
ask.save()
for ask in Ask.objects(is_Available=False):
ask.delete()
return "True"
@app.route("/save_answer", methods=['POST'])
@need_login
def save_answer():
result = request.form['result']
que_key = request.form['que_key']
que = Question.objects(key=que_key).first()
que.result = result
que.solved = True
que.save()
return que.about
@app.route('/solve_one', methods=['POST'])
@need_login
def solve_one():
que_key = request.form['que_key']
que = Question.objects(key=que_key).first()
if que:
from solvers import solve_one_problem
solve_one_problem(que)
return que.about
@app.route("/search_results", methods=['POST'])
@need_login
def search_results():
from itertools import chain
text = request.form['q']
return render_template("search_results.html", models=set(
chain(APIModel.objects(url__icontains=text), APIModel.objects(comment__icontains=text))), query_key=text)
@app.route('/login')
def login():
return gitlab.authorize(callback=url_for('authorized', _external=True))
@app.route('/authorized')
def authorized():
resp = gitlab.authorized_response()
if resp is None:
return "授权失败"
session['gitlab_token'] = resp.get('access_token'), ''
return redirect(url_for('index'))
@app.route('/logout')
def logout():
if 'gitlab_token' in session:
session.pop('gitlab_token')
return "已退出,请同时退出gitlab登陆或取消授权<br>点击链接前往<a href='{}/profile/applications'>gitlab</a>".format(OAUTH2_URL)
@gitlab.tokengetter
def get_gitlab_token(token=None):
return session.get('gitlab_token')
if __name__ == '__main__':
app.run(debug=True, port=WEB_CONSOLE_PORT, host=WEB_CONSOLE_ADDRESS)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。