代码拉取完成,页面将自动刷新
#!/usr/bin/env python3
"""
A analyzer help to check the package similarity among given distro,
by comparing the Name and Version of package in rpm format.
Copyright (C) 2022 OpenAnolis.org
"""
import os
import sys
import argparse
import subprocess
# globals
tmpdir = "/tmp/pkg-analyzer"
mountpoint = tmpdir + "/iso"
logfile = tmpdir + "/repoquery.log"
repoes = []
def do_prepare():
# create workspace
_ = not os.path.exists(tmpdir) and os.mkdir(tmpdir)
_ = not os.path.exists(mountpoint) and os.mkdir(mountpoint)
if os.path.exists(logfile):
os.remove(logfile)
def run_cmd(cmd):
# helper to run shell command
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = p.communicate()
return p.returncode, output.decode("utf-8", errors="ignore"), error.decode("utf-8", errors="ignore")
def mount_iso(iso):
# mount the iso image
if iso:
cmd = "sudo mount -o loop {} {}".format(iso, mountpoint)
print(cmd)
ret, output, error = run_cmd(cmd)
if ret:
print("Mount %s failed: %s" % (iso, error))
def umount_iso():
# umount the iso image
cmd = "sudo umount {}".format(mountpoint)
print(cmd)
ret, output, error = run_cmd(cmd)
if ret:
print("Unmount %s failed: %s" % (mountpoint, error))
def search_repo(path, repoes):
# recursive searching all the repodata path
repodata = os.path.join(path, "repodata")
if os.path.isdir(repodata):
repoes.append(path)
else:
files = os.listdir(path)
for file in files:
repo = os.path.join(path, file)
repodata = os.path.join(path, file, "repodata")
if os.path.isdir(repo):
if os.path.isdir(repodata):
repoes.append(repo)
else:
search_repo(repo, repoes)
def query_nvr_from_repo(repoes):
# get the NVR list by repoquery
print("Clean up repo cache...")
cmd = "dnf clean all"
ret, output, error = run_cmd(cmd)
if ret:
print("run '%s' failed: %s" % (cmd, error))
id = 0
cmd ="repoquery --quiet --qf '%{name} %{version}'"
for repo in repoes:
id = id + 1
cmd = cmd + " --repoid=repo{} --repofrompath=repo{},{}".format(id, id, repo)
print("Query the package list from repository...")
#print(cmd)
ret, output, error = run_cmd(cmd)
if ret:
print("run '%s' failed: %s" % (cmd, error))
else:
with open(logfile, "a") as f:
f.write(cmd)
f.write("\n===>8===\n")
f.write(output)
f.close()
pkg = {}
for line in output.splitlines():
(n, v) = line.split()
pkg[n] = v
return pkg
def read_nvrs(file):
# read NVR list from file
pkg = {}
with open(file, "r") as f:
for line in f:
(n, v, r) = line.split()
pkg[n] = (v, r)
return pkg
def check_similarity(pkgs):
# check the similarity
print("Checking the package similarity...")
sn = [0] * (len(pkgs) - 1)
snv = [0] * (len(pkgs) - 1)
for an, av in pkgs[0].items():
i = 0
n_found = False
v_found = False
for pkg in pkgs[1:]:
bv = pkg.get(an)
if bv:
if not n_found:
n_found = True
sn[i] = sn[i] + 1
if bv == av:
if not v_found:
v_found = True
snv[i] = snv[i] + 1
break
i = i + 1
return (sn, snv)
def output(pkgs, sn, snv):
# output the similarity report
total = []
for pkg in pkgs:
total.append(len(pkg))
print("===============================================================")
print(" PACKAGES : TOTAL SAME_NAME SAME_NAME_&_VERSION")
print("----------------------:----------------------------------------")
print("%s : %5d -- --" % ("distro A".rjust(21), total[0]))
if len(sn) <= 1:
print("%s : %5d %11d %21d" % ("distro B".rjust(21), total[1], sn[0], snv[0]))
print("[SIMILARITY: A vs B ] : -- %10.2f%% %20.2f%%" % (sn[0]/total[0]*100, snv[0]/total[0]*100))
else:
print("%s : %5d %11d %21d" % ("distro B0".rjust(21), total[1], sn[0], snv[0]))
print("[SIMILARITY: A vs B0] : -- %10.2f%% %20.2f%%" % (sn[0]/total[0]*100, snv[0]/total[0]*100))
print("----------------------:----------------------------------------")
for i in range(1, len(sn)):
print("%s : %5d %11d %21d" % ("distro B{}".format(i).rjust(21), total[i+1], sn[i], snv[i]))
print("[SIMILARITY: A vs B*] : -- %10.2f%% %20.2f%%" % (sum(sn)/total[0]*100, sum(snv)/total[0]*100))
print("===============================================================")
def main(args):
print("Begin to test the similarity between:")
do_prepare()
pkgs = []
# distro A
print("[-] distro A: %s" % args.distroA)
if args.distroA.endswith((".iso", ".ISO")):
mount_iso(args.distroA)
repoes = []
search_repo(mountpoint, repoes)
repoes = list("file://" + i for i in repoes)
pkgs.append(query_nvr_from_repo(repoes))
umount_iso()
elif args.distroA.startswith(("http:", "https:")):
repoes = [args.distroA]
pkgs.append(query_nvr_from_repo(repoes))
else:
print("Invalid format, only support ISO or URL.")
return
# distro B(s)
i = 0
for distro in args.distroB:
if len(args.distroB) > 1:
print("[-] distro B%d: %s" % (i, distro))
else:
print("[-] distro B: %s" % distro)
if distro.endswith((".iso", ".ISO")):
mount_iso(distro)
repoes = []
search_repo(mountpoint, repoes)
repoes = list("file://" + i for i in repoes)
pkgs.append(query_nvr_from_repo(repoes))
umount_iso()
elif distro.startswith(("http:", "https:")):
repoes = [distro]
pkgs.append(query_nvr_from_repo(repoes))
else:
print("Invalid format, only support ISO or URL.")
continue
i = i + 1
if len(pkgs) < 2:
print("Something goes wrong, please check.")
return
(sn, snv) = check_similarity(pkgs)
output(pkgs, sn, snv)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Package Similarity Analyzer')
parser.add_argument('-a', '--distroA', help='distro A source repo, ISO or URL', required=True)
parser.add_argument('-b', '--distroB', nargs='+', help='distro B source repo, ISO or URL', required=True)
args = parser.parse_args()
main(args)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。