4 Star 18 Fork 184

Fengguang/lkp-tests

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
submit 9.29 KB
一键复制 编辑 原始数据 按行查看 历史
Fengguang 提交于 10个月前 . job: simplify add_monitors
#! /usr/bin/env ruby
# frozen_string_literal: true
LKP_SRC = ENV['LKP_SRC'] || File.dirname(File.dirname(File.realpath($PROGRAM_NAME)))
require "#{LKP_SRC}/lib/job2sh"
require 'optparse'
require 'yaml'
opt_set_key_value = {}
opt_cmdline_defaults = {}
opt_output_dir = nil
opt_auto_define_files = false
opt_include_yamls = []
opt_monitor = false
opt_web_monitor = false
opt_monitor_query = {}
opt_my_queue = false
actions = ['output', 'stop']
result_roots = []
nr_run = 1
do_pack = true
manual_install_cmdline = false
any_job = false
if File.basename($PROGRAM_NAME) == 'run'
opt_run = true
opt_output_dir = RESULT_ROOT_DIR_PREFIX + '/lkp/tmp'
else
opt_run = false
require "#{LKP_SRC}/lib/local_pack"
require "#{LKP_SRC}/lib/upload_field_pack"
require "#{LKP_SRC}/lib/scheduler_client"
end
options = OptionParser.new do |opts|
opts.banner = 'Usage: submit [options] job1.yaml job2.yaml ...'
opts.separator ' submit test jobs to the scheduler'
opts.separator ''
opts.separator 'options:'
opts.on("-s 'KEY: VALUE'", "--set 'KEY: VALUE'", 'add key-value to the hash of the submitted jobs') do |key_value|
k, v = key_value.sub(' ', '').split(':', 2)
opt_set_key_value[k] = v
end
opts.on('-o DIR', '--output DIR', 'save job yaml to DIR/, jobs are not submitted to the scheduler') do |dir|
if File.file? dir
puts "Please input directory for job save yaml after '-o'"
exit 1
end
opt_output_dir = dir
Dir.mkdir(dir) unless File.directory? dir
end
opts.on('-a', '--auto-define-files', 'auto add define_files') do
opt_auto_define_files = true
end
opts.on('--no-pack', "don't do pack, just use the last one") do
do_pack = false
end
opts.on('-i include.yaml', '--include include.yaml', 'include other yamls') do |yaml|
opt_include_yamls << yaml
end
opts.on('-c', '--connect', 'automatic ssh connection to the testbox') do
actions << 'connect'
opt_include_yamls << 'ssh.yaml'
end
opts.on('-r', '--result', 'mirror job result to local directory') do
actions << 'mirror_result'
end
opts.on('-n job_number', '--number job_number', 'set the number of repeated submissions for each job, default 1') do |number|
nr_run = number.to_i
nr_run = 1 if nr_run < 1
end
opts.on('-m', '--monitor', 'capture and display job execution logs in real time') do
opt_monitor = true
k, v = ARGV[0].sub(' ', '').split(':', 2) if ARGV[0]
if (k && !k.empty?) && (v && !v.empty?)
opt_monitor_query[k] = v
ARGV.shift
end
end
opts.on('-w', '--web-monitor', 'get the query of monitor and disable origin monitor') do
opt_web_monitor = true
end
opts.on('--my-queue', "submit jobs to the user's own queue") do
opt_my_queue = true
end
opts.on('--cmdline', 'use manual install cmdline') do
manual_install_cmdline = true
end
opts.on('--any', "submit any one auto job from job") do
any_job = true
end
end
ARGV_CLONE = ARGV.clone
options.parse!(ARGV)
seen_job_yaml = false
ARGV.delete_if do |arg|
if arg.index '='
k, v = arg.split('=', 2)
if seen_job_yaml
opt_set_key_value[k] = v
else
opt_cmdline_defaults[k] = v
end
true
else
seen_job_yaml = true if arg =~ /\.yaml$/
false
end
end
if ARGV.size.zero?
puts(options)
exit
end
job_ids = []
job_hash_list = []
def prioritize_include_yaml(include_yamls, jobfile)
default_yamls, override_yamls = [], []
jobfile_index = ARGV_CLONE.index(jobfile)
include_yamls.each do |yaml|
i = ARGV_CLONE.index(yaml)
if !i or i < jobfile_index # !i is for auto included ssh.yaml
default_yamls << yaml
else
override_yamls << yaml
end
end
return default_yamls, override_yamls
end
def find_jobfiles(jobfile_list)
search_jobfile_list = []
jobfile_list.each do |jobfile|
search_jobfile_list << Job.find_jobfile(jobfile)
end
return search_jobfile_list
end
#for process submit
def begin_submit(job_json, scheduler_client, jobfile,
job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor)
upload_fields = []
messages = scheduler_client.submit_job(job_json)
JSON.parse(messages).each do |msg|
if msg['errcode'] != nil && msg['errcode'].include?('RETRY_UPLOAD')
opt_monitor = false
puts "submit #{jobfile} need upload files, retrying...."
err_msg_upload_hash = JSON.parse(msg['message'])
upload_fields += err_msg_upload_hash
next
end
if msg['message'].empty?
result_roots << msg['result_root']
job_ids << msg['job_id'].to_s
puts("submit #{jobfile}, got job id=#{msg['job_id']}\nresult_root #{msg['result_root']}")
else
opt_monitor = false
puts("submit #{jobfile} failed, got job id=#{msg['job_id']}, error: #{msg['message']}")
puts("#{msg['error_message']}") if msg.include? 'error_message'
if msg['message'].include?('Failed to verify the account.')
puts 'The submitted account information is as follows:'
puts "my_name: #{job['my_name']}"
puts "my_email: #{job['my_email']}"
puts "my_account: #{job['my_account']}"
puts "my_token: #{job['my_token']}"
puts "lab: #{job['lab']}"
end
end
end
if opt_monitor
job_hash_list[0].delete('define_files')
opt_monitor_query.merge!({'job_id' => job_ids})
if !opt_web_monitor
cmd = "#{LKP_SRC}/sbin/monitor -f '#{opt_monitor_query.to_json}' -j '#{job_hash_list[0].to_json}' -a #{actions.join(',')} -r #{result_roots.join(',')} -s 'job_state: extract_finished'"
exec cmd
else
puts("for jobs #{job_ids}, the monitor query=#{opt_monitor_query}")
end
end
return upload_fields
end
# prepare job shell function in ruby -- scheduler API runs crystal,
# and does not have latest lkp-tests code, so client side is more
# convenient to do some tasks
def prepare_sh_hash(job)
sh_run_job = job.sh_run_job
sh_extract_stats = job.sh_extract_stats
sh_define_files = job.sh_define_files
sh_on_fail = job.sh_on_state(state='on_fail')
sh_hash = {
'job2sh' => {
'run_job' => sh_run_job,
'extract_stats' => sh_extract_stats,
'define_files' => sh_define_files
}
}
if sh_on_fail
sh_hash['job2sh']['on_fail'] = sh_on_fail
end
sh_hash
end
def prepare_job_hash(job)
# merge job info
job_hash = job.to_hash
job_hash = job_hash.merge(prepare_sh_hash(job))
job_hash['pp']['parse'] = nil if job_hash['pp']['parse'] # XXX: remove for now, for ES compatibility
job_hash
end
# this time number is more meaningful than uuid, and less costly as int64 index in db
# [8] pry(main)> 1<<63
# => 9223372036854775808
# [9] pry(main)> Time.now.strftime("%y%m%d%H%M%S%6N")
# => "240429090711811190"
submit_id = Time.now.strftime("%y%m%d%H%M%S%6N")
puts "submit_id=#{submit_id}" unless opt_run
ARGV.each do |jobfile|
default_yamls, override_yamls = prioritize_include_yaml(opt_include_yamls, jobfile)
jobfile = Job.find_jobfile(jobfile)
jobs = Job2sh.new
jobs.cmdline_defaults = opt_cmdline_defaults
jobs.overrides = opt_set_key_value
jobs.default_yamls = find_jobfiles(default_yamls)
jobs.override_yamls = find_jobfiles(override_yamls)
jobs.load(jobfile, expand_template: true) || next
jobs[:expand_params] = true
jobs['testbox'] = opt_set_key_value['testbox'] if opt_set_key_value['testbox']
jobs['tbox_group'] = tbox_group(jobs['testbox']) if jobs.include?('testbox')
jobs['node_roles'] ||= 'server client' if jobs['cluster']
jobs['submit_id'] = submit_id
jobs['nr_run'] = nr_run
jobs.each_jobs do |job|
unless opt_run
raise 'Please configure SCHED_HOST' unless job['SCHED_HOST']
raise 'Please configure SCHED_PORT' unless job['SCHED_PORT']
job['queue'] = "#{jobs['tbox_group']}~#{ENV['USER']}" if opt_my_queue
end
job.add_pp
job.add_os_fields
job.add_install_depend_packages
job.add_define_files if opt_auto_define_files
job.add_timeout
job.use_manual_install_cmdline if manual_install_cmdline
# save job to yaml
if opt_output_dir
prefix = File.join(opt_output_dir, File.basename(jobfile, '.yaml'))
unit_jobfile = prefix + '-' + job.path_params[0..180] + '.yaml'
job.save unit_jobfile
puts "#{jobfile} => #{unit_jobfile}"
if opt_run
nr_run.times do
system "#{LKP_SRC}/bin/run-local", unit_jobfile
end
FileUtils.rm_f(unit_jobfile)
end
next
end
# submit job
scheduler_client = SchedulerClient.new(job['SCHED_HOST'], job['SCHED_PORT'])
job_hash = prepare_job_hash(job)
job_hash_list << job_hash
add_pkg_data(job_hash, do_pack)
job_json = job_hash.to_json
nr_run.times do
#submit job, if sced return upload_fields, pack file info in it and resubmit
upload_fields = begin_submit(job_json, scheduler_client,
jobfile, job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor)
unless upload_fields.empty?
job_hash["upload_fields"] = PackUploadFields.new(job).pack(upload_fields)
new_job_json = job_hash.to_json
#puts upload_fields.class
#puts job_hash["upload_fields"]
begin_submit(new_job_json, scheduler_client,
jobfile, job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor)
end
end
if(any_job)
break;
end
end
end
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wu_fengguang/lkp-tests.git
git@gitee.com:wu_fengguang/lkp-tests.git
wu_fengguang
lkp-tests
lkp-tests
master

搜索帮助