1 Star 0 Fork 0

Yellowpea/my-tools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
setup_crontab.py 3.02 KB
一键复制 编辑 原始数据 按行查看 历史
Yellowpea 提交于 2024-10-21 23:53 . Finish setup_crontab script
"""
The script sets the cron configuration in the Python files
in the specified directory into the crontab for the current user.
"""
import os
import sys
import logging
import subprocess
from pathlib import Path
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s %(levelname)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger('setup_crontab')
def parse_crontab_info(py_file_path):
function_desc = None
cron_schedule = None
cron_command = None
has_entire_info = False
with open(py_file_path, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
line = line.strip() # strip spaces
if line == '@function':
function_desc = lines[i + 1].strip()
elif line == '@cron':
cron_schedule = lines[i + 1].strip()
elif line == '@command':
cron_command = lines[i + 1].strip()
if function_desc and cron_schedule and cron_command:
has_entire_info = True
break
if has_entire_info:
return function_desc, cron_schedule, cron_command
else:
return None
def add_to_crontab(function_desc, cron_schedule, cron_command):
try:
current_crontab = subprocess.check_output(['crontab', '-l'], text=True)
except subprocess.CalledProcessError:
current_crontab = ""
new_cron_job_no_desc = f'{cron_schedule} {cron_command}'
new_cron_job = f'# {function_desc}\n{cron_schedule} {cron_command}\n'
if new_cron_job_no_desc in current_crontab:
logger.info(f'crontab "{new_cron_job_no_desc}" already exists.')
return False
current_crontab += new_cron_job
process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE)
process.communicate(current_crontab.encode())
return True
def setup_crontab_from_directory(directory):
for root, _, files in os.walk(directory):
for file in files:
if not file.endswith('.py'):
continue
py_file_path = os.path.join(root, file)
crontab_info = parse_crontab_info(py_file_path)
if crontab_info is None:
logger.info(f"File does not have crontab info: '{py_file_path}'")
continue
function_desc, cron_schedule, cron_command = crontab_info
cron_command = f'cd {root} && ' + cron_command
if add_to_crontab(function_desc, cron_schedule, cron_command):
logger.info(f"Added crontab for '{py_file_path}'.")
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage:\npython3 setup_crontab.py <directory>")
sys.exit(1)
directory = sys.argv[1]
abs_directory = Path(directory).expanduser().resolve()
if not abs_directory.exists():
logger.info(f"Error: The directory '{abs_directory}' does not exist.")
sys.exit(1)
logger.debug(f"Your diretory: {abs_directory}")
setup_crontab_from_directory(str(abs_directory))
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/YellowPea/my-tools.git
[email protected]:YellowPea/my-tools.git
YellowPea
my-tools
my-tools
master

搜索帮助