$type, 'period' => $period, 'period_type' => $period_type, 'name' => $name, 'event' => $event, 'execute_time' => $execute_time, 'relate_id' => $relate_id, 'create_time' => time() ]; $res = model('cron')->add($data); return $this->success($res); } /** * 删除计划任务 * @param $condition * @return array */ public function deleteCron($condition) { $res = model('cron')->delete($condition); return $this->success($res); } /** * 执行任务 */ public function execute() { $system_config_model = new SystemConfig(); $config = $system_config_model->getSystemConfig()[ 'data' ] ?? []; $is_open_queue = $config[ 'is_open_queue' ] ?? 0; $query_execute_time = $is_open_queue == 1 ? time() + 60 : time(); $list = model('cron')->getList([ [ 'execute_time', '<=', $query_execute_time ] ]); if (!empty($list)) { foreach ($list as $k => $v) { $event_res = checkQueue($v, function($params) { //加入消息队列 $job_handler_classname = 'Cronexecute'; try { if ($params[ 'execute_time' ] <= time()) { Queue::push($job_handler_classname, $params); } else { Queue::later($params[ 'execute_time' ] - time(), $job_handler_classname, $params); } } catch (\Exception $e) { $res = $this->error($e->getMessage(), $e->getMessage()); } return $res ?? $this->success(); }, function($params) { try { $res = event($params[ 'event' ], [ 'relate_id' => $params[ 'relate_id' ] ]); } catch (\Exception $e) { $res = $this->error($e->getMessage(), $e->getMessage()); } $data_log = [ 'name' => $params[ 'name' ], 'event' => $params[ 'event' ], 'relate_id' => $params[ 'relate_id' ], 'message' => json_encode($res) ]; $this->addCronLog($data_log); return $res; }); //定义最新的执行时间或错误 $event_code = $event_res[ 'code' ] ?? 0; if ($event_code < 0) { Log::write($event_res); continue; } //循环任务 if ($v[ 'type' ] == 2) { $period = $v[ 'period' ] == 0 ? 1 : $v[ 'period' ]; switch ( $v[ 'period_type' ] ) { case 0://分 $execute_time = $v[ 'execute_time' ] + $period * 60; break; case 1://天 $execute_time = strtotime('+' . $period . 'day', $v[ 'execute_time' ]); break; case 2://周 $execute_time = strtotime('+' . $period . 'week', $v[ 'execute_time' ]); break; case 3://月 $execute_time = strtotime('+' . $period . 'month', $v[ 'execute_time' ]); break; } model('cron')->update([ 'execute_time' => $execute_time ], [ [ 'id', '=', $v[ 'id' ] ] ]); } else { model('cron')->delete([ [ 'id', '=', $v[ 'id' ] ] ]); } } } $this->setCron(); } /** * 添加自动任务日志 * @param $data * @return array */ public function addCronLog($data) { // 日常不需要添加,调试使用 // $data[ 'execute_time' ] = time(); // model('cron_log')->add($data); return $this->success(); } /** * 检测自动任务标识缓存是否已过期 */ public function checkCron() { $diff = $this->time_diff; $now_time = time(); $cron_cache = Cache::get('cron_cache'); if (empty($cron_cache)) { //todo 不存在缓存标识,并不视为任务停止 //创建缓存标识,当前时间填充 Cache::set('cron_cache', [ 'time' => $now_time, 'error' => '' ]); } else { $time = $cron_cache[ 'time' ]; $error = $cron_cache[ 'error' ] ?? ''; $attempts = $cron_cache[ 'attempts' ] ?? 0;//尝试次数 if (!empty($error) || ( $now_time - $time ) > $diff) { $message = '自动任务已停止'; if (!empty($error)) { $message .= ',停止原因:' . $error; } else { $system_config_model = new \app\model\system\SystemConfig(); $config = $system_config_model->getSystemConfig()[ 'data' ] ?? []; $is_open_queue = $config[ 'is_open_queue' ] ?? 0; if (!$is_open_queue) {//如果不是消息队列的话,可以尝试异步调用一下 if ($attempts < 1) { Cache::set('cron_cache', [ 'time' => $now_time, 'error' => '', 'attempts' => 1 ]); $url = url('cron/task/execute'); http($url, 1); return $this->success(); } } else { //消息队列无法启动,应该在前端引导跳转到官方的手册 } } //判断任务是 消息队列自动任务,还是默认睡眠sleep自动任务 return $this->error([], $message); } } return $this->success(); } /** * 设置自动任务 * @param $params */ public function setCron($params = []) { $cron_cache = Cache::get('cron_cache'); if (empty($cron_cache)) { $cron_cache = []; } $cron_cache[ 'time' ] = time(); $cron_cache[ 'attempts' ] = 0; Cache::set('cron_cache', $cron_cache); return $this->success(); } }