代码拉取完成,页面将自动刷新
同步操作将从 Daz/auto-Api 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
from collections import Iterable
from common.constant import QuestionType
from common.compute_unit import *
import json
from common.documents import Question, APIModel
success_condition = {
# ('retcode',): lambda x: int(x) >= 0
}
success_c = verify_success(success_condition)
solvers = {}
with solvers_register(QuestionType.IS_LOGIN, solvers) as s:
s.cal_flow = Source(url='url').my_filter(success_c).update(remove_headers).update(resend).summary(
my_all(my_not(success_c)))
s.res_builder = lambda md, res: json.dumps(res)
with solvers_register(QuestionType.ABOUT_SQL, solvers) as s:
s.cal_flow = Source(url='url').my_filter(success_c).update(resend).listen(remember_sql)
s.res_builder = lambda md, res: json.dumps(md.aboutSql)
with solvers_register(QuestionType.PARAM_IS_NECESSARY, solvers) as s:
s.condition = lambda model: model.only_selected()
s.cal_flow = Source(url='url').my_filter(success_c).expand(remove_one_required).my_filter(
no_checked_param).update(resend).my_filter(success_c).expand(update_required)
s.res_builder = lambda md, res: json.dumps(res)
with solvers_register(QuestionType.PARAM_HAS_GROUP, solvers) as s:
s.condition = lambda model: model.only_selected()
s.cal_flow = Source(url='url').my_filter(success_c).model_update(remove_all_group).update(
remove_not_required).update(resend).summary(my_all(my_not(success_c))).IF(
success=Source(url='url').expand(one_not_required)).update(resend).summary(summary_group(success_c))
s.res_builder = lambda md, res: json.dumps(list(res))
with solvers_register(QuestionType.PARAM_TYPE, solvers) as s:
s.condition = lambda model: model.only_selected()
s.cal_flow = Source(url='url').my_filter(success_c).gate(get_all_routes_and_values).update(summarize_param_type)
s.res_builder = lambda md, res: json.dumps(list(res))
sol_order = [QuestionType.PARAM_TYPE,QuestionType.ABOUT_SQL, QuestionType.IS_LOGIN, QuestionType.PARAM_IS_NECESSARY,
QuestionType.PARAM_HAS_GROUP]
def solve_one_problem(question):
model = APIModel.objects(url=question.about).first()
if not model or model.is_ignored():
return
if question.que_type in solvers:
solvs = solvers[question.que_type]
for solv in solvs:
if not solv.condition(model):
continue
res = solv.cal_flow(model)
if isinstance(res, Iterable):
res = list(res)
question.result = solv.res_builder(model, res)
question.solved = True
question.save()
break
def solve_problem(url=None):
"""
自动解决问题
:return:
"""
from common.documents import QuestionState
for order in sol_order:
que_state = QuestionState.objects(que_type=order).first()
if not (que_state and que_state.auto_solve_available):
continue
if url is None:
ite = Question.objects(que_type=order, solved=False)
else:
ite = Question.objects(que_type=order, solved=False, about=url)
for question in ite:
solve_one_problem(question)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。