#! /usr/bin/env ruby # frozen_string_literal: true LKP_SRC = ENV['LKP_SRC'] || File.dirname(File.dirname(File.realpath($PROGRAM_NAME))) require "#{LKP_SRC}/lib/job2sh" require 'optparse' require 'yaml' opt_set_key_value = {} opt_cmdline_defaults = {} opt_output_dir = nil opt_auto_define_files = false opt_include_yamls = [] opt_monitor = false opt_web_monitor = false opt_monitor_query = {} opt_my_queue = false actions = ['output', 'stop'] result_roots = [] nr_run = 1 do_pack = true manual_install_cmdline = false any_job = false if File.basename($PROGRAM_NAME) == 'run' opt_run = true opt_output_dir = RESULT_ROOT_DIR_PREFIX + '/lkp/tmp' else opt_run = false require "#{LKP_SRC}/lib/local_pack" require "#{LKP_SRC}/lib/upload_field_pack" require "#{LKP_SRC}/lib/scheduler_client" end options = OptionParser.new do |opts| opts.banner = 'Usage: submit [options] job1.yaml job2.yaml ...' opts.separator ' submit test jobs to the scheduler' opts.separator '' opts.separator 'options:' opts.on("-s 'KEY: VALUE'", "--set 'KEY: VALUE'", 'add key-value to the hash of the submitted jobs') do |key_value| k, v = key_value.sub(' ', '').split(':', 2) opt_set_key_value[k] = v end opts.on('-o DIR', '--output DIR', 'save job yaml to DIR/, jobs are not submitted to the scheduler') do |dir| if File.file? dir puts "Please input directory for job save yaml after '-o'" exit 1 end opt_output_dir = dir Dir.mkdir(dir) unless File.directory? dir end opts.on('-a', '--auto-define-files', 'auto add define_files') do opt_auto_define_files = true end opts.on('--no-pack', "don't do pack, just use the last one") do do_pack = false end opts.on('-i include.yaml', '--include include.yaml', 'include other yamls') do |yaml| opt_include_yamls << yaml end opts.on('-c', '--connect', 'automatic ssh connection to the testbox') do actions << 'connect' opt_include_yamls << 'ssh.yaml' end opts.on('-r', '--result', 'mirror job result to local directory') do actions << 'mirror_result' end opts.on('-n job_number', '--number job_number', 'set the number of repeated submissions for each job, default 1') do |number| nr_run = number.to_i nr_run = 1 if nr_run < 1 end opts.on('-m', '--monitor', 'capture and display job execution logs in real time') do opt_monitor = true k, v = ARGV[0].sub(' ', '').split(':', 2) if ARGV[0] if (k && !k.empty?) && (v && !v.empty?) opt_monitor_query[k] = v ARGV.shift end end opts.on('-w', '--web-monitor', 'get the query of monitor and disable origin monitor') do opt_web_monitor = true end opts.on('--my-queue', "submit jobs to the user's own queue") do opt_my_queue = true end opts.on('--cmdline', 'use manual install cmdline') do manual_install_cmdline = true end opts.on('--any', "submit any one auto job from job") do any_job = true end end ARGV_CLONE = ARGV.clone options.parse!(ARGV) seen_job_yaml = false ARGV.delete_if do |arg| if arg.index '=' k, v = arg.split('=', 2) if seen_job_yaml opt_set_key_value[k] = v else opt_cmdline_defaults[k] = v end true else seen_job_yaml = true if arg =~ /\.yaml$/ false end end if ARGV.size.zero? puts(options) exit end job_ids = [] job_hash_list = [] def prioritize_include_yaml(include_yamls, jobfile) default_yamls, override_yamls = [], [] jobfile_index = ARGV_CLONE.index(jobfile) include_yamls.each do |yaml| i = ARGV_CLONE.index(yaml) if !i or i < jobfile_index # !i is for auto included ssh.yaml default_yamls << yaml else override_yamls << yaml end end return default_yamls, override_yamls end def find_jobfiles(jobfile_list) search_jobfile_list = [] jobfile_list.each do |jobfile| search_jobfile_list << Job.find_jobfile(jobfile) end return search_jobfile_list end #for process submit def begin_submit(job_json, scheduler_client, jobfile, job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor) upload_fields = [] messages = scheduler_client.submit_job(job_json) JSON.parse(messages).each do |msg| if msg['errcode'] != nil && msg['errcode'].include?('RETRY_UPLOAD') opt_monitor = false puts "submit #{jobfile} need upload files, retrying...." err_msg_upload_hash = JSON.parse(msg['message']) upload_fields += err_msg_upload_hash next end if msg['message'].empty? result_roots << msg['result_root'] job_ids << msg['job_id'].to_s puts("submit #{jobfile}, got job id=#{msg['job_id']}\nresult_root #{msg['result_root']}") else opt_monitor = false puts("submit #{jobfile} failed, got job id=#{msg['job_id']}, error: #{msg['message']}") puts("#{msg['error_message']}") if msg.include? 'error_message' if msg['message'].include?('Failed to verify the account.') puts 'The submitted account information is as follows:' puts "my_name: #{job['my_name']}" puts "my_email: #{job['my_email']}" puts "my_account: #{job['my_account']}" puts "my_token: #{job['my_token']}" puts "lab: #{job['lab']}" end end end if opt_monitor job_hash_list[0].delete('define_files') opt_monitor_query.merge!({'job_id' => job_ids}) if !opt_web_monitor cmd = "#{LKP_SRC}/sbin/monitor -f '#{opt_monitor_query.to_json}' -j '#{job_hash_list[0].to_json}' -a #{actions.join(',')} -r #{result_roots.join(',')} -s 'job_state: extract_finished'" exec cmd else puts("for jobs #{job_ids}, the monitor query=#{opt_monitor_query}") end end return upload_fields end # prepare job shell function in ruby -- scheduler API runs crystal, # and does not have latest lkp-tests code, so client side is more # convenient to do some tasks def prepare_sh_hash(job) sh_run_job = job.sh_run_job sh_extract_stats = job.sh_extract_stats sh_define_files = job.sh_define_files sh_on_fail = job.sh_on_state(state='on_fail') sh_hash = { 'job2sh' => { 'run_job' => sh_run_job, 'extract_stats' => sh_extract_stats, 'define_files' => sh_define_files } } if sh_on_fail sh_hash['job2sh']['on_fail'] = sh_on_fail end sh_hash end def prepare_job_hash(job) # merge job info job_hash = job.to_hash job_hash = job_hash.merge(prepare_sh_hash(job)) job_hash['pp']['parse'] = nil if job_hash['pp']['parse'] # XXX: remove for now, for ES compatibility job_hash end # this time number is more meaningful than uuid, and less costly as int64 index in db # [8] pry(main)> 1<<63 # => 9223372036854775808 # [9] pry(main)> Time.now.strftime("%y%m%d%H%M%S%6N") # => "240429090711811190" submit_id = Time.now.strftime("%y%m%d%H%M%S%6N") puts "submit_id=#{submit_id}" unless opt_run ARGV.each do |jobfile| default_yamls, override_yamls = prioritize_include_yaml(opt_include_yamls, jobfile) jobfile = Job.find_jobfile(jobfile) jobs = Job2sh.new jobs.cmdline_defaults = opt_cmdline_defaults jobs.overrides = opt_set_key_value jobs.default_yamls = find_jobfiles(default_yamls) jobs.override_yamls = find_jobfiles(override_yamls) jobs.load(jobfile, expand_template: true) || next jobs[:expand_params] = true jobs['testbox'] = opt_set_key_value['testbox'] if opt_set_key_value['testbox'] jobs['tbox_group'] = tbox_group(jobs['testbox']) if jobs.include?('testbox') jobs['node_roles'] ||= 'server client' if jobs['cluster'] jobs['submit_id'] = submit_id jobs['nr_run'] = nr_run jobs.each_jobs do |job| unless opt_run raise 'Please configure SCHED_HOST' unless job['SCHED_HOST'] raise 'Please configure SCHED_PORT' unless job['SCHED_PORT'] job['queue'] = "#{jobs['tbox_group']}~#{ENV['USER']}" if opt_my_queue end job.add_pp job.add_os_fields job.add_install_depend_packages job.add_define_files if opt_auto_define_files job.add_timeout job.use_manual_install_cmdline if manual_install_cmdline # save job to yaml if opt_output_dir prefix = File.join(opt_output_dir, File.basename(jobfile, '.yaml')) unit_jobfile = prefix + '-' + job.path_params[0..180] + '.yaml' job.save unit_jobfile puts "#{jobfile} => #{unit_jobfile}" if opt_run nr_run.times do system "#{LKP_SRC}/bin/run-local", unit_jobfile end FileUtils.rm_f(unit_jobfile) end next end # submit job scheduler_client = SchedulerClient.new(job['SCHED_HOST'], job['SCHED_PORT']) job_hash = prepare_job_hash(job) job_hash_list << job_hash add_pkg_data(job_hash, do_pack) job_json = job_hash.to_json nr_run.times do #submit job, if sced return upload_fields, pack file info in it and resubmit upload_fields = begin_submit(job_json, scheduler_client, jobfile, job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor) unless upload_fields.empty? job_hash["upload_fields"] = PackUploadFields.new(job).pack(upload_fields) new_job_json = job_hash.to_json #puts upload_fields.class #puts job_hash["upload_fields"] begin_submit(new_job_json, scheduler_client, jobfile, job, opt_monitor_query, job_hash_list, actions, job_ids, result_roots, opt_monitor, opt_web_monitor) end end if(any_job) break; end end end