admin/vendor/topthink/think-queue/src/queue/Worker.php

420 lines
10 KiB
PHP

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Carbon\Carbon;
use Exception;
use RuntimeException;
use think\Cache;
use think\Event;
use think\exception\Handle;
use think\Queue;
use think\queue\event\JobExceptionOccurred;
use think\queue\event\JobFailed;
use think\queue\event\JobProcessed;
use think\queue\event\JobProcessing;
use think\queue\event\WorkerStopping;
use think\queue\exception\MaxAttemptsExceededException;
use Throwable;
class Worker
{
/** @var Event */
protected $event;
/** @var Handle */
protected $handle;
/** @var Queue */
protected $queue;
/** @var Cache */
protected $cache;
/**
* Indicates if the worker should exit.
*
* @var bool
*/
public $shouldQuit = false;
/**
* Indicates if the worker is paused.
*
* @var bool
*/
public $paused = false;
public function __construct(Queue $queue, Event $event, Handle $handle, Cache $cache = null)
{
$this->queue = $queue;
$this->event = $event;
$this->handle = $handle;
$this->cache = $cache;
}
/**
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @param int $memory
* @param int $timeout
*/
public function daemon($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0, $memory = 128, $timeout = 60)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
$job = $this->getNextJob(
$this->queue->connection($connection),
$queue
);
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $timeout);
}
if ($job) {
$this->runJob($job, $connection, $maxTries, $delay);
} else {
$this->sleep($sleep);
}
$this->stopIfNecessary($job, $lastRestart, $memory);
}
}
protected function stopIfNecessary($job, $lastRestart, $memory)
{
if ($this->shouldQuit || $this->queueShouldRestart($lastRestart)) {
$this->stop();
} elseif ($this->memoryExceeded($memory)) {
$this->stop(12);
}
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}
/**
* 获取队列重启时间
* @return mixed
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('think:queue:restart');
}
}
/**
* Register the worker timeout handler.
*
* @param Job|null $job
* @param int $timeout
* @return void
*/
protected function registerTimeoutHandler($job, $timeout)
{
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm(
max($this->timeoutForJob($job, $timeout), 0)
);
}
/**
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
*/
public function stop($status = 0)
{
$this->event->trigger(new WorkerStopping($status));
exit($status);
}
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
$this->event->trigger(new WorkerStopping($status));
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* Get the appropriate timeout for the given job.
*
* @param Job|null $job
* @param int $timeout
* @return int
*/
protected function timeoutForJob($job, $timeout)
{
return $job && !is_null($job->timeout()) ? $job->timeout() : $timeout;
}
/**
* Determine if "async" signals are supported.
*
* @return bool
*/
protected function supportsAsyncSignals()
{
return extension_loaded('pcntl');
}
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
/**
* 执行下个任务
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return void
* @throws Exception
*/
public function runNextJob($connection, $queue, $delay = 0, $sleep = 3, $maxTries = 0)
{
$job = $this->getNextJob($this->queue->connection($connection), $queue);
if ($job) {
$this->runJob($job, $connection, $maxTries, $delay);
} else {
$this->sleep($sleep);
}
}
/**
* 执行任务
* @param Job $job
* @param string $connection
* @param int $maxTries
* @param int $delay
* @return void
*/
protected function runJob($job, $connection, $maxTries, $delay)
{
try {
$this->process($connection, $job, $maxTries, $delay);
} catch (Exception | Throwable $e) {
$this->handle->report($e);
}
}
/**
* 获取下个任务
* @param Connector $connector
* @param string $queue
* @return Job
*/
protected function getNextJob($connector, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
if (!is_null($job = $connector->pop($queue))) {
return $job;
}
}
} catch (Exception | Throwable $e) {
$this->handle->report($e);
$this->sleep(1);
}
}
/**
* Process a given job from the queue.
* @param string $connection
* @param Job $job
* @param int $maxTries
* @param int $delay
* @return void
* @throws Exception
*/
public function process($connection, $job, $maxTries = 0, $delay = 0)
{
try {
$this->event->trigger(new JobProcessing($connection, $job));
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connection,
$job,
(int) $maxTries
);
$job->fire();
$this->event->trigger(new JobProcessed($connection, $job));
} catch (Exception | Throwable $e) {
try {
if (!$job->hasFailed()) {
$this->markJobAsFailedIfWillExceedMaxAttempts($connection, $job, (int) $maxTries, $e);
}
$this->event->trigger(new JobExceptionOccurred($connection, $job, $e));
} finally {
if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) {
$job->release($delay);
}
}
throw $e;
}
}
/**
* @param string $connection
* @param Job $job
* @param int $maxTries
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connection, $job, $maxTries)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeoutAt();
if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
return;
}
if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) {
return;
}
$this->failJob($connection, $job, $e = new MaxAttemptsExceededException(
$job->getName() . ' has been attempted too many times or run too long. The job may have previously timed out.'
));
throw $e;
}
/**
* @param string $connection
* @param Job $job
* @param int $maxTries
* @param Exception $e
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connection, $job, $maxTries, $e)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
$this->failJob($connection, $job, $e);
}
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connection, $job, $e);
}
}
/**
* @param string $connection
* @param Job $job
* @param Exception $e
*/
protected function failJob($connection, $job, $e)
{
$job->markAsFailed();
if ($job->isDeleted()) {
return;
}
try {
$job->delete();
$job->failed($e);
} finally {
$this->event->trigger(new JobFailed(
$connection,
$job,
$e ?: new RuntimeException('ManuallyFailed')
));
}
}
/**
* Sleep the script for a given number of seconds.
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
if ($seconds < 1) {
usleep($seconds * 1000000);
} else {
sleep($seconds);
}
}
}