1 Star 0 Fork 0

正版水民/tunasync-scripts

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
nix-channels.py 14.85 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python3
import hashlib
import json
import logging
import lzma
import minio
import os
import pytz
import re
import requests
import subprocess
import sys
from pyquery import PyQuery as pq
from datetime import datetime, timedelta
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from minio.credentials import Credentials, Static
from urllib3.util.retry import Retry
### Config
if len(sys.argv) > 1 and sys.argv[1] == '--ustc':
# Mode for https://github.com/ustclug/ustcmirror-images
UPSTREAM_URL = os.getenv("NIX_MIRROR_UPSTREAM", 'https://nixos.org/channels')
MIRROR_BASE_URL = os.getenv("NIX_MIRROR_BASE_URL", 'https://mirrors.ustc.edu.cn/nix-channels')
WORKING_DIR = os.getenv("TO", 'working-channels')
else:
UPSTREAM_URL = os.getenv("TUNASYNC_UPSTREAM_URL", 'https://nixos.org/channels')
MIRROR_BASE_URL = os.getenv("MIRROR_BASE_URL", 'https://mirrors.tuna.tsinghua.edu.cn/nix-channels')
WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR", 'working-channels')
PATH_BATCH = int(os.getenv('NIX_MIRROR_PATH_BATCH', 8192))
THREADS = int(os.getenv('NIX_MIRROR_THREADS', 10))
DELETE_OLD = os.getenv('NIX_MIRROR_DELETE_OLD', '1') == '1'
RETAIN_DAYS = float(os.getenv('NIX_MIRROR_RETAIN_DAYS', 30))
STORE_DIR = 'store'
RELEASES_DIR = 'releases'
# Channels that have not updated since migration to Netlify [1] are assumed to
# be too old and defunct.
#
# [1]: https://discourse.nixos.org/t/announcement-moving-nixos-org-to-netlify/6212
CLONE_SINCE = datetime(2020, 3, 6, tzinfo=pytz.utc)
TIMEOUT = 60
working_dir = Path(WORKING_DIR)
# `nix copy` uses a cache database
# TODO Should we expose this directory?
os.environ['XDG_CACHE_HOME'] = str((working_dir / '.cache').resolve())
nix_store_dest = f'file://{(working_dir / STORE_DIR).resolve()}'
binary_cache_url = f'{MIRROR_BASE_URL}/{STORE_DIR}'
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
retry_adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', retry_adapter)
session.mount('https://', retry_adapter)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)-8s %(message)s'
)
# Set this to True if some sub-process failed
# Don't forget 'global failure'
failure = False
def http_get(*args, **kwargs):
return session.get(*args, timeout=TIMEOUT, **kwargs)
# Adapted from anaconda.py
def file_sha256(dest):
m = hashlib.sha256()
with dest.open('rb') as f:
while True:
buf = f.read(1*1024*1024)
if not buf:
break
m.update(buf)
return m.hexdigest()
def atomic_write_file(dest, contents):
tmp_dest = dest.parent / f'.{dest.name}.tmp'
with tmp_dest.open('w') as f:
f.write(contents)
tmp_dest.rename(dest)
class WrongSize(RuntimeError):
def __init__(self, expected, actual):
super().__init__(f'Wrong file size: expected {expected}, actual {actual}')
self.actual = actual
self.expected = expected
def download(url, dest):
dest.parent.mkdir(parents=True, exist_ok=True)
download_dest = dest.parent / f'.{dest.name}.tmp'
retry = retries
while True:
with http_get(url, stream=True) as res:
res.raise_for_status()
try:
with download_dest.open('wb') as f:
for chunk in res.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
actual_size = download_dest.stat().st_size
if 'Content-Length' in res.headers:
expected_size = int(res.headers['Content-Length'])
if actual_size != expected_size:
raise WrongSize(expected=expected_size, actual=actual_size)
break
except (requests.exceptions.ConnectionError, WrongSize) as e:
logging.warning(e)
next_retry = retry.increment(
method='GET',
url=url,
error=e
)
if next_retry is None:
global failure
failure = True
raise e
else:
retry = next_retry
logging.warning(f'Retrying download: {retry}')
download_dest.rename(dest)
credentials = Credentials(provider=Static())
client = minio.Minio('s3.amazonaws.com', credentials=credentials)
def get_channels():
return [
(x.object_name, x.last_modified)
for x in client.list_objects_v2('nix-channels')
if re.fullmatch(r'(nixos|nixpkgs)-.+[^/]', x.object_name)
]
def clone_channels():
logging.info(f'- Fetching channels')
channels_to_update = []
working_dir.mkdir(parents=True, exist_ok=True)
for channel, chan_updated in get_channels():
chan_path = working_dir / channel
# Old channels, little value in cloning and format changes
if chan_updated < CLONE_SINCE:
continue
chan_obj = client.get_object('nix-channels', channel)
chan_location = chan_obj.headers['x-amz-website-redirect-location']
chan_release = chan_location.split('/')[-1]
release_target = f'{RELEASES_DIR}/{channel}@{chan_release}'
release_path = working_dir / release_target
if chan_path.is_symlink() \
and os.readlink(str(chan_path)) == release_target:
continue
chan_path_update = working_dir / f'.{channel}.update'
if chan_path_update.is_symlink() \
and os.readlink(str(chan_path_update)) == release_target:
channels_to_update.append(channel)
logging.info(f' - {channel} ready to update to {chan_release}')
continue
logging.info(f' - {channel} -> {chan_release}')
release_res = http_get(chan_location)
if release_res.status_code == 404:
logging.warning(f' - Not found')
continue
release_res.raise_for_status()
node = pq(release_res.text)
tagline = node('p').text()
tagline_res = re.match(r'^Released on (.+) from', tagline)
if tagline_res is None:
logging.warning(f' - Invalid tagline: {tagline}')
continue
released_time = tagline_res[1]
release_path.mkdir(parents=True, exist_ok=True)
with (release_path / '.released-time').open('w') as f:
f.write(released_time)
logging.info(f' - Downloading files')
has_hash_fail = False
for row in node('tr'):
td = pq(row)('td')
if len(td) != 3:
continue
file_name, _file_size, file_hash = (pq(x).text() for x in td)
if file_name.endswith('.ova') or file_name.endswith('.iso'):
# Skip images
pass
elif (release_path / file_name).exists() \
and file_sha256(release_path / file_name) == file_hash:
logging.info(f' - {file_name} (existing)')
else:
if file_name == 'binary-cache-url':
logging.info(f' - binary-cache-url (redirected)')
dest = '.original-binary-cache-url'
else:
logging.info(f' - {file_name}')
dest = file_name
download(f'{chan_location}/{file_name}', release_path / dest)
if file_sha256(release_path / dest) != file_hash:
global failure
failure = True
has_hash_fail = True
logging.error(f' Wrong hash!')
logging.error(f' - expected {file_hash}')
logging.error(f' - got {hash}')
logging.info(' - Writing binary-cache-url')
(release_path / 'binary-cache-url').write_text(binary_cache_url)
if has_hash_fail:
logging.warning(' - Found bad files. Not updating symlink.')
else:
channels_to_update.append(channel)
if chan_path_update.exists():
chan_path_update.unlink()
chan_path_update.symlink_to(release_target)
logging.info(f' - Symlink updated')
return channels_to_update
def hash_part(path):
return path.split('/')[-1].split('-', 1)[0]
def update_channels(channels):
logging.info(f'- Updating binary cache')
has_cache_info = False
for channel in channels:
logging.info(f' - {channel}')
chan_path = working_dir / channel
chan_path_update = working_dir / f'.{channel}.update'
upstream_binary_cache = (chan_path_update / '.original-binary-cache-url').read_text()
upstream_binary_cache = upstream_binary_cache.rstrip('/')
# All the channels should have https://cache.nixos.org as binary cache
# URL. We download nix-cache-info here (once per sync) to avoid
# hard-coding it, and in case it changes.
if not has_cache_info:
info_file = 'nix-cache-info'
logging.info(f' - Downloading {info_file}')
download(
f'{upstream_binary_cache}/{info_file}',
working_dir / STORE_DIR / info_file
)
has_cache_info = True
with lzma.open(str(chan_path_update / 'store-paths.xz')) as f:
paths = [ path.rstrip() for path in f ]
logging.info(f' - {len(paths)} paths listed')
todo = []
seen_paths = set()
channel_failure = False
# Workaround to temporarily fix https://github.com/tuna/issues/issues/1855
paths = [
path
for path in paths
if b'texlive-2022-env-man' not in path
and b'texlive-2022-env-info' not in path
]
# Batch paths to avoid E2BIG
for i in range(0, len(paths), PATH_BATCH):
batch = paths[i : i + PATH_BATCH]
process = subprocess.run(
[
'nix', 'path-info',
'--store', upstream_binary_cache,
'--recursive', '--json'
] + batch,
stdout=subprocess.PIPE
)
if process.returncode != 0:
channel_failure = True
logging.info(f' - Error status: {process.returncode}')
break
else:
infos = json.loads(process.stdout)
for info in infos:
ha = hash_part(info['path'])
one_todo = [
name
for name in [info['url'], f'{ha}.narinfo']
if name not in seen_paths
]
seen_paths.update(one_todo)
if one_todo:
todo.append(one_todo)
else:
logging.info(f' - {len(todo)} paths to download')
digits = len(str(len(todo)))
def try_mirror(index, paths):
index += 1
prefix = f'[{str(index).rjust(digits)}/{len(todo)}]'
try:
for path in paths:
url = f'{upstream_binary_cache}/{path}'
dest = working_dir / STORE_DIR / path
if dest.exists(): continue
download(url, dest)
logging.info(f' - {prefix} {path}')
return True
except (requests.exceptions.ConnectionError, WrongSize):
return False
with ThreadPoolExecutor(max_workers=THREADS) as executor:
results = executor.map(
lambda job: try_mirror(*job),
enumerate(todo)
)
if not all(results):
channel_failure = True
if channel_failure:
logging.info(f' - Finished with errors, not updating symlink')
else:
chan_path_update.rename(chan_path)
logging.info(f' - Finished with success, symlink updated')
def parse_narinfo(narinfo):
res = {}
for line in narinfo.splitlines():
key, value = line.split(': ', 1)
res[key] = value
return res
def garbage_collect():
logging.info(f'- Collecting garbage')
time_threshold = datetime.now() - timedelta(days=RETAIN_DAYS)
last_updated = {}
latest = {}
alive = set()
for release in (working_dir / RELEASES_DIR).iterdir():
# This release never finished downloading
if not (release / 'binary-cache-url').exists(): continue
channel = release.name.split('@')[0]
date_str = (release / '.released-time').read_text()
released_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
if released_date >= time_threshold:
alive.add(release)
if channel not in last_updated \
or last_updated[channel] < released_date:
last_updated[channel] = released_date
latest[channel] = release
alive.update(latest.values())
logging.info(f' - {len(alive)} releases alive')
closure = set()
for release in alive:
with lzma.open(str(release / 'store-paths.xz')) as f:
paths = [ path.rstrip() for path in f ]
# Workaround to temporarily fix https://github.com/tuna/issues/issues/1855
paths = [
path
for path in paths
if b'texlive-2022-env-man' not in path
and b'texlive-2022-env-info' not in path
]
for i in range(0, len(paths), PATH_BATCH):
batch = paths[i : i + PATH_BATCH]
process = subprocess.run(
[
'nix', 'path-info',
'--store', nix_store_dest,
'--recursive'
] + batch,
stdout=subprocess.PIPE
)
for path in process.stdout.decode().splitlines():
closure.add(hash_part(path))
logging.info(f' - {len(closure)} paths in closure')
deleted = 0
for path in (working_dir / STORE_DIR).iterdir():
if not path.name.endswith('.narinfo'):
continue
hash = path.name.split('.narinfo', 1)[0]
if hash in closure:
continue
deleted += 1
if DELETE_OLD:
narinfo = parse_narinfo(path.read_text())
try:
path.unlink()
except:
pass
try:
(working_dir / STORE_DIR / narinfo['URL']).unlink()
except:
pass
if DELETE_OLD:
logging.info(f' - {deleted} paths deleted')
else:
logging.info(f' - {deleted} paths now unreachable')
if __name__ == '__main__':
channels = clone_channels()
update_channels(channels)
garbage_collect()
if failure:
sys.exit(1)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wyk0930/tunasync-scripts.git
[email protected]:wyk0930/tunasync-scripts.git
wyk0930
tunasync-scripts
tunasync-scripts
master

搜索帮助