1 Star 1 Fork 1

新年/php进程操作

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
Process.php 26.73 KB
一键复制 编辑 原始数据 按行查看 历史
NewYear_yc 提交于 2019-02-28 17:49 . 统计子进程修改
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
<?php
/**
* @Date 2019-02-28
* @Desc PHP多进程管理,多任务多进程,类似nginx的master worker进程模型
* posix_kill($master_pid, 0) 测试进程是否存在 返回1存在
* - start : 启动进程
* - restart: 暴力关闭所有进程重启
* - reload : 平滑关闭所有进程重启
* - stop : 平滑停止所有进程,slaver进程循环中,会处理完正在处理的逻辑然后退出
* - quit : 暴力退出进程,直接退出slaver所有进程
* - status : 展示master、slaver进程的信息
*/
class Process
{
/**
* 根据配置建立多个独立的任务
*
* @var array
*/
public $config = [];
/**
* 保存主进程id(master pid)的文件
* @var string
*/
public $pid_file = "";
/**
* 标准输出、标准错误定向位置
* @var string
*/
public $std_file = "/dev/null";
/**
* 所有子进程id列表(slaver pid)
* @var array
*/
protected $_workers_pid = [];
/**
* 主进程的状态,STATUS_RUNNING 或者 STATUS_STOP
* @var integer
*/
protected $_status = 0;
/**
* 运行状态
*/
const STATUS_RUNNING = 1;
/**
* 结束状态
*/
const STATUS_STOP = 2;
/**
* 状态统计信息存储文件
*/
public $statistics_file = "";
/**
* 进程额外的信息
* @var array
*/
protected $_extend_info = [
"start_timestamp" => 0,
"end_timestamp" => 0,
];
/**
* 日志输出文件
* @var string
*/
public $log_file = "";
/*************************** slaver进程用到的变量 ***************************/
/**
* 进程是否准备停止
* @var boolean
*/
protected $_stoping = false;
/**
* 子进程循环次数
* @var integer
*/
protected $_run_times = 0;
/**
* 最大运行次数 ,子进程最大循环次数
* @var integer
*/
public $max_run_times = 100000;
/**
* 子进程最大运行秒数,默认一小时
*
* @var integer
*/
public $max_run_seconds = 3600;
/**
* slaver进程处理的时候暂停时间,防止写的时候漏掉 默认1/10秒
* @var integer
*/
public $sleep_micro_seconds = 10000;
/**
* 当前slaver进程的任务分组名称
* @var string
*/
protected $_task_name = "";
/**
* 主进程id
* @var int $master_pid
*/
protected $master_pid = null;
/**
* 当前子进程索引
* 第几个子进程
* @var int $slaver_index
*/
protected $slaver_index = 0;
/**
* 当前主进程中所产生的子进程数量
* @var array $slaver_count
*/
protected $slaver_count = array();
/**
* 构造函数
*/
public function __construct()
{
}
/**
* 初始化
*/
protected function init()
{
if (empty($this->statistics_file)) {
$this->log("请配置状态统计文件的位置", true);
exit(250);
}
if (!is_dir(dirname($this->statistics_file))) {
mkdir(dirname($this->statistics_file), 0755, true);
}
if (empty($this->pid_file)) {
$this->log("请配置pid_file,来存储master进程的pid", true);
exit(250);
}
if (!is_dir(dirname($this->pid_file))) {
mkdir(dirname($this->pid_file), 0755, true);
}
$this->_extend_info["start_timestamp"] = time();
}
/**
* 检查运行脚本的环境是否在cli下面,不在抛出异常
*
* - 脚本仅支持在CLI模式下运行
*/
protected function checkSapiEnv()
{
if (php_sapi_name() != "cli") {
$this->log("仅在CLI模式下运行", true);
exit(250);
}
}
/**
* 解析命令,根据命令做出不同的操作
* - start : 启动进程
* - restart: 暴力关闭所有进程重启
* - reload : 平滑关闭所有进程重启
* - stop : 平滑停止所有进程,slaver进程循环中,会处理完正在处理的逻辑然后退出
* - quit : 暴力退出进程,直接退出slaver所有进程
* - status : 展示master、slaver进程的信息
*/
protected function parseCommand()
{
global $argv;
$commands = [
"start",
"restart",
"stop",
"reload",
"quit",
"status"
];
$command = @trim($argv[1]);
if (!in_array($command, $commands)) {
$this->log("请输入正确的命令", true);
$this->log("使用方法:/path/to/php /path/to/script [start | restart | stop | reload | quit | status]", true);
exit(250);
}
switch ($command) {
case "stop":
{
$master_pid = $this->getMasterPid();
if ($master_pid <= 0) {
$this->log("获取主进程号失败,请确认主进程已启动,配置文件读取未知正确", true);
exit(250);
}
$this->sendSig($master_pid,SIGINT);
while (1) {
$master_is_alive = $this->sendSig($master_pid);
if ($master_is_alive) {
usleep(100000);
continue;
}
break;
}
$this->log("平滑停止成功", true);
exit(0);
}
break;
case "quit":
{
$master_pid = $this->getMasterPid();
if ($master_pid <= 0) {
$this->log("获取主进程号失败,请确认主进程已启动,配置文件读取未知正确", true);
exit(250);
}
$this->sendSig($master_pid,SIGTERM);
while (1) {
$master_is_alive = $this->sendSig($master_pid);
if ($master_is_alive) {
usleep(100000);
continue;
}
break;
}
$this->log("暴力停止成功", true);
exit(0);
}
break;
case "status":
{
$master_pid = $this->getMasterPid();
if ($master_pid <= 0) {
$this->log("获取主进程号失败,请确认主进程已启动,配置文件读取未知正确", true);
exit(250);
}
// 向主进程发送查看进程信息信号
$this->sendSig($master_pid, SIGUSR1);
// 默认的7行数据(主进程)加每个子进程的数据
$total_line = 6 + $this->getWorkerPidCount(false);
while (!is_file($this->statistics_file) || count(file($this->statistics_file)) < $total_line) {
sleep(1);
}
// 格式化处理数据
$lines = file($this->statistics_file);
$master_lines = array_slice($lines, 0, 5);
echo implode("", $master_lines);
$this->log("-------------------------------- slaver进程状态 --------------------------------", true);
$_workers_pid = trim($lines[5]);
$_workers_pid = json_decode($_workers_pid, true);
$slaver_lines = array_slice($lines, - $this->getWorkerPidCount(false));
$slaver_lines_map = [];
foreach ($slaver_lines as $slaver_line) {
$pid = intval($slaver_line);
$slaver_lines_map[$pid] = $slaver_line;
}
foreach ($_workers_pid as $task_name => $worker_pids) {
$this->log("任务{$task_name}: ", true);
foreach ($worker_pids as $worker_pid) {
echo $slaver_lines_map[$worker_pid];
}
$this->log("--------------------------------------------------------------------------------", true);
}
unlink($this->statistics_file);
exit(0);
}
break;
case "start":
{
$master_pid = $this->getMasterPid();
$master_is_alive = $master_pid && $this->sendSig($master_pid, 0);
if ($master_is_alive) {
$this->log("进程[{$master_pid}]已经在运行了", true);
exit(250);
}
}
break;
case "restart":
{
$master_pid = $this->getMasterPid();
if ($master_pid <= 0) {
$this->log("获取主进程号失败,请确认主进程已启动,配置文件读取未知正确", true);
exit(250);
}
$this->sendSig($master_pid, SIGTERM);
while (1) {
$master_is_alive = $this->sendSig($master_pid, 0);
if ($master_is_alive) {
usleep(100000);
continue;
}
break;
}
$this->log("暴力停止成功", true);
$this->log("重启中...", true);
}
break;
case "reload":
{
$master_pid = $this->getMasterPid();
if ($master_pid <= 0) {
$this->log("获取主进程号失败,请确认主进程已启动,配置文件读取未知正确", true);
exit(250);
}
$this->sendSig($master_pid, SIGINT);
while (1) {
$master_is_alive = $this->sendSig($master_pid, 0);
if ($master_is_alive) {
usleep(100000);
continue;
}
break;
}
$this->log("平滑停止成功", true);
$this->log("重启中...", true);
}
break;
default:
# code...
break;
}
}
/**
* 根据配置的slaver进程数量和已有slaver进程,fork新的slaver进程
*/
protected function forkWorkers()
{
foreach ($this->config as $task_name => $task_config) {
if(empty($task_config["handle"])){
$this->log($task_name." - 没有回调函数",true);
continue;
}
$count = empty($task_config["count"])?1:$task_config["count"];
$handle = $task_config["handle"];
$args = isset($task_config["args"])?$task_config["args"]:[];
if (!isset($this->_workers_pid[$task_name])) {
$this->_workers_pid[$task_name] = [];
}
$this->slaver_index = 0;
while (
count($this->_workers_pid[$task_name]) < $count &&
(!isset($this->slaver_count[$task_name]) || $this->slaver_count[$task_name] < $count )
) {
$this->forkOneWorker($task_name, $handle,$args);
}
}
}
/**
* fork一个新的进程
* @param string $task_name 任务名称
* @param array|\Closure $handle 执行函数
* @param array|null $args 参数
*/
protected function forkOneWorker($task_name, $handle,$args)
{
$this->slaver_index++;
$this->slaver_count[$task_name] = $this->slaver_index;
$pid = pcntl_fork();
if (-1 == $pid) {
$this->log("fork进程失败");
exit(250);
}
if (0 != $pid) {// 主进程获取到子进程的pid 保存至缓存中
$this->_workers_pid[$task_name][$pid] = $pid;
} elseif( 0 == $pid) {
$this->_task_name = $task_name;
$this->run($handle,$args);
}
}
/**
* 子进程逻辑,子进程运行开始
* @param $handle
* @param $args
*/
public function run($handle, $args)
{
$this->_extend_info["start_timestamp"] = time();
$this->installSlaverSignal(); // 当前运行的进程(子进程)中安装进程信号器
$this->_stoping = false;
$this->_run_times = 0;
while (1) {
pcntl_signal_dispatch();
// 信号停止
if (true == $this->_stoping) {
$this->_extend_info["end_timestamp"] = time();
break;
}
// 达到最大运行次数停止
if ($this->_run_times >= $this->max_run_times) {
$this->_extend_info["end_timestamp"] = time();
break;
}
// 达到最大运行时间推出
if (time() - $this->_extend_info["start_timestamp"] >= $this->max_run_seconds) {
$this->_extend_info["end_timestamp"] = time();
break;
}
call_user_func_array($handle,$args);
$this->_run_times++;
usleep($this->sleep_micro_seconds);
}
exit(250);
}
/**
* daemon化 新打开主进程
* 将当前启动入口进程脱离终端,建立新的会话、进程组
* 启动进程创建完任务进程后终止,任务进程的父进程会继承系统进程(1)
*/
protected function daemonize()
{
// 建立新的session,不受当前session控制
// fork是因为posix_setsid执行的进程不能是进程组组长(group leader)
$pid = pcntl_fork();
if (-1 == $pid) {
$this->log("fork进程失败", true);
exit(250);
} elseif ($pid != 0) {
exit(0);
}
// 创建新的会话,当前进程即是会话组长(sid=pid),也是进程组组长(gpid=pid)
if (-1 == posix_setsid()) {
$this->log("新建立session会话失败", true);
exit(250);
}
// 再次fork,使用非会话组长进程,因为打开一个终端的前提是该进程必须是会话组长
$pid = pcntl_fork();
if (-1 == $pid) {
$this->log("fork进程失败", true);
exit(0);
} else if($pid != 0) {
exit(0);
}
// 恢复默认的文件掩码
umask(0);
// 修改工作目录,让进程脱离原来的目录
chdir("/");
}
/**
* master(执行命令的程序)开始进程监控slaver进程
*/
public function monitorWorkers()
{
// master进程状态标记为运行中
$this->_status = static::STATUS_RUNNING;
while (1) {
$status = 0;
// 等待子进程退出信号,或者触发信号,触发信号的时候$pid == -1,进程退出pid == 退出进程ID
// 子进程结束必须使用$this->kill() 标示程序结束,不然回调函数会一直循环再次执行
$pid = pcntl_wait($status);
$this->log($this->getMasterTaskName($pid)." - 子进程退出:【{$pid}】");
// 分发执行信号动作
pcntl_signal_dispatch();
if ($pid > 0) {
$this->removeOneWorkerPid($pid);
if ($this->_status == static::STATUS_RUNNING) {
$this->forkWorkers();
}
} else {// 所有进程退出
if ($this->_status == static::STATUS_STOP && $this->getWorkerPidCount() == 0) {
unlink($this->pid_file);
exit(0);
}
$this->log("所有进程执行完毕,程序结束!");
}
}
}
/**
* 保存进程号文件到制定文件中
*/
protected function saveMasterPid()
{
$master_pid = posix_getpid();
$this->master_pid = $master_pid;
if (false === @file_put_contents($this->pid_file, $master_pid)) {
$this->log("主进程id(master pid)写入文件[".$this->pid_file."]失败.", true);
exit(250);
}
}
/**
* 获取写入文件中的master进程号
* @return int 进程号
*/
protected function getMasterPid()
{
$master_pid = intval(@file_get_contents($this->pid_file));
return $master_pid;
}
/**
* 给进程发送信号
* @param int $process_pid 进程id
* @param int $sig 信号 0:检测进程石村存在
* @return bool
*/
public function sendSig($process_pid,$sig = 0){
return posix_kill($process_pid, $sig);
}
/**
* 关掉当前进程
* 平滑停止所有进程,slaver进程循环中,会处理完正在处理的逻辑然后退出
*/
public function kill(){
$this->_stoping = true; // 标记当前进程关闭状态
posix_kill(posix_getpid(), SIGINT);
exit(0); // 两者效果相同
}
/**
* 程序终止
*/
public function killMaster(){
$pid = posix_getppid();
if ($pid > 1) {
$this->_stoping = true; // 标记当前进程关闭状态
posix_kill(posix_getppid(), SIGTERM);
exit(0); // 两者效果相同
}
}
/**
* 标准输出、标准错误出定向
* @see https://stackoverflow.com/questions/937627/how-to-redirect-stdout-to-a-file-in-php
* 这里关闭标准输出、标准错误之后,新建两个文件描述符将分别代替他们,取名为`$STDOUT`、`$STDERR`
* 无特别意义,其他名字也可以
*
* 关闭标准输入、输出、错误后,未关闭的跳过,重新打开的顺序:标准输入->标准输出->标准错误
*
* 如果设置了日志文件,标准输出,标准错误定位到文件中
*/
protected function resetStd()
{
global $STDOUT, $STDERR, $STDIN;
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
$STDIN = fopen($this->std_file, "r");
if ($this->log_file && file_exists($this->log_file)) {
$STDOUT = fopen($this->log_file, "a");
$STDERR = fopen($this->log_file, "a");
} else {
$STDOUT = fopen($this->std_file, "a");
$STDERR = fopen($this->std_file, "a");
}
}
/**
* 删除work pid列表中的一个pid
*
* @param $pid
*/
protected function removeOneWorkerPid($pid)
{
foreach ($this->_workers_pid as $task_name => $worker_pids) {
foreach ($worker_pids as $key => $worker_pid) {
if ($worker_pid == $pid) {
unset($this->_workers_pid[$task_name][$key]);
break;
}
}
}
}
/**
* 获取子进程pid的数量
* @param $real
* @return int
*/
protected function getWorkerPidCount($real = true)
{
$count = 0;
if ($real) {
foreach ($this->_workers_pid as $task_name => $worker_pids) {
$count += count($worker_pids);
}
} else {
foreach ($this->config as $task_name) {
$count += intval($task_name["count"]);
}
}
return $count;
}
/**
* 根据进程id获取主进程名称
* @param $pid
* @return int
*/
protected function getMasterTaskName($pid){
foreach ($this->_workers_pid as $task_name => $worker_pids) {
if(in_array($pid,$worker_pids)){
return $task_name;
}
}
return "启动进程";
}
/**
* 记录日志
* @param $info string 记录的信息
* @param bool $echo bool 是否直接显示不拼接额外信息
*/
protected function log($info, $echo = false)
{
if (!$echo) {
$pid = posix_getpid();
$ms = explode(" ", microtime());
$ms = $ms[0];
$ms = substr($ms, 2);
$task_name = $this->_task_name;
$log_info = sprintf("[%s.%d][task: %s][pid: %d] %s\n", date("Y-m-d H:i:s"), $ms, $task_name, $pid, $info);
file_put_contents($this->log_file, $log_info, FILE_APPEND);
} else {
$log_info = sprintf("%s\n", $info);
}
echo $log_info;
}
/**
* 主进程安装信号
* 每个主进程
*/
protected function installMasterSignal()
{
// 平滑退出
pcntl_signal(SIGINT, [__CLASS__, "handleMasterSignal"], false);
// 暴力退出
pcntl_signal(SIGTERM, [__CLASS__, "handleMasterSignal"], false);
// 查看进程状态
pcntl_signal(SIGUSR1, [__CLASS__, "handleMasterSignal"], false);
}
/**
* 主进程的信号处理
* @param int 信号触发回调的时候传递过来的信号标示值
*/
public function handleMasterSignal($signal)
{
switch ($signal) {
case SIGINT:
{
$this->_status = static::STATUS_STOP;
// 给所有slaver进程发送平滑退出信号
foreach ($this->_workers_pid as $task_name => $worker_pids) {
foreach ($worker_pids as $worker_pid) {
$this->sendSig($worker_pid, SIGINT);
}
}
}
break;
case SIGTERM:
{
$this->_status = static::STATUS_STOP;
// 给所有slaver进程发送退出信号
foreach ($this->_workers_pid as $task_name => $worker_pids) {
foreach ($worker_pids as $worker_pid) {
$this->sendSig($worker_pid, SIGKILL);
}
}
}
break;
case SIGUSR1:
{
// 获取master进程状态
$pid = posix_getpid();
$memory = round(memory_get_usage(true) / (1024 * 1024), 2) . "M";
$time = time();
$class_name = get_class($this);
$start_time = date("Y-m-d H:i:s", $this->_extend_info["start_timestamp"]);
$run_day = floor(($time - $this->_extend_info["start_timestamp"]) / (24 * 60 * 60));
$run_hour = floor((($time - $this->_extend_info["start_timestamp"]) % (24 * 60 * 60)) / (60 * 60));
$run_min = floor(((($time - $this->_extend_info["start_timestamp"]) % (24 * 60 * 60)) % (60 * 60)) / 60);
$status = "Daemon [{$class_name}] 信息: \n"
."-------------------------------- master进程状态 --------------------------------\n"
.str_pad("pid", 10)
.str_pad("占用内存", 19)
.str_pad("处理次数", 19)
.str_pad("开始时间", 29)
.str_pad("运行时间", 34)
."\n"
.str_pad($pid, 10)
.str_pad($memory, 15)
.str_pad("--", 15)
.str_pad($start_time, 25)
.str_pad("{$run_day}{$run_hour}{$run_min} 分", 30)
."\n"
. $this->getWorkerPidCount(false) . " slaver\n";
file_put_contents($this->statistics_file, $status);
// 写入slaver task进程关系
$json_workers_pid = json_encode($this->_workers_pid);
file_put_contents($this->statistics_file, $json_workers_pid."\n", FILE_APPEND);
foreach ($this->_workers_pid as $task_name => $worker_pids) {
foreach ($worker_pids as $worker_pid) { // 依次向子进程发送信号
posix_kill($worker_pid, SIGUSR1);
}
}
}
break;
default:
# code...
break;
}
}
/**
* 子进程安装信号
*/
protected function installSlaverSignal()
{
// slaver进程平滑退出信号
pcntl_signal(SIGINT, [__CLASS__, "handleSlaverSignal"], false);
// 查看进程状态
pcntl_signal(SIGUSR1, [__CLASS__, "handleSlaverSignal"], false);
}
/**
* 子进程信号处理
* @param int $signal 信号触发回调的时候传递过来的信号标示值
*/
public function handleSlaverSignal($signal)
{
switch ($signal) {
case SIGINT:
{
$this->_stoping = true;
}
break;
case SIGUSR1:
{
// 获取master进程状态
$pid = posix_getpid();
$memory = round(memory_get_usage(true) / (1024 * 1024), 2) . "M";
$run_times = $this->_run_times;
$time = time();
$start_time = date("Y-m-d H:i:s", $this->_extend_info["start_timestamp"]);
$run_day = floor(($time - $this->_extend_info["start_timestamp"]) / (24 * 60 * 60));
$run_hour = floor((($time - $this->_extend_info["start_timestamp"]) % (24 * 60 * 60)) / (60 * 60));
$run_min = floor(((($time - $this->_extend_info["start_timestamp"]) % (24 * 60 * 60)) % (60 * 60)) / 60);
$status = str_pad($pid, 10)
.str_pad($memory, 15)
.str_pad($run_times, 15)
.str_pad($start_time, 25)
.str_pad("{$run_day}{$run_hour}{$run_min} 分", 30)
."\n";
file_put_contents($this->statistics_file, $status, FILE_APPEND);
}
break;
default:
break;
}
}
/**
* 程序执行入口
*/
public function execute()
{
$this->checkSapiEnv();
$this->init();
$this->parseCommand();
$this->daemonize();
$this->saveMasterPid();
$this->resetStd();
$this->forkWorkers();
$this->installMasterSignal();
$this->monitorWorkers();
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
PHP
1
https://gitee.com/newyear_xn/php_pcntl.git
[email protected]:newyear_xn/php_pcntl.git
newyear_xn
php_pcntl
php进程操作
master

搜索帮助