Skip to content
Snippets Groups Projects
Unverified Commit b7ed080f authored by Marko Ivancic's avatar Marko Ivancic Committed by GitHub
Browse files

Enable running jobs using SSP cron module (#7)

parent 3b8073c3
No related branches found
No related tags found
1 merge request!1Relate histories
Showing
with 1215 additions and 3 deletions
...@@ -5,7 +5,9 @@ SimpleSAMLphp module providing user accounting functionality. ...@@ -5,7 +5,9 @@ SimpleSAMLphp module providing user accounting functionality.
## TODO ## TODO
- [x] Data stores - [x] Data stores
- [ ] Cron hooks - [ ] Cron hook
- [ ] explain specific cron tag for accounting (it should have its own tag which should be
added to cron config
- [ ] Profile page UI - [ ] Profile page UI
- [ ] Translation - [ ] Translation
......
...@@ -30,7 +30,8 @@ ...@@ -30,7 +30,8 @@
"ext-pdo_sqlite": "*", "ext-pdo_sqlite": "*",
"doctrine/dbal": "^3", "doctrine/dbal": "^3",
"psr/log": "^1|^2|^3", "psr/log": "^1|^2|^3",
"simplesamlphp/composer-module-installer": "^1" "simplesamlphp/composer-module-installer": "^1",
"cicnavi/simple-file-cache-php": "^2.0"
}, },
"require-dev": { "require-dev": {
"vimeo/psalm": "^4", "vimeo/psalm": "^4",
...@@ -39,6 +40,9 @@ ...@@ -39,6 +40,9 @@
"simplesamlphp/simplesamlphp": "^2@beta", "simplesamlphp/simplesamlphp": "^2@beta",
"simplesamlphp/simplesamlphp-test-framework": "^1" "simplesamlphp/simplesamlphp-test-framework": "^1"
}, },
"suggest": {
"ext-pcntl": "*"
},
"scripts": { "scripts": {
"pre-commit": [ "pre-commit": [
"vendor/bin/phpcs -p", "vendor/bin/phpcs -p",
......
...@@ -4,8 +4,61 @@ ...@@ -4,8 +4,61 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically" "This file is @generated automatically"
], ],
"content-hash": "dcca72582b835b41450dff9d223d5b82", "content-hash": "e1bdcb1340cc34e659c651695992a6b8",
"packages": [ "packages": [
{
"name": "cicnavi/simple-file-cache-php",
"version": "v2.0.0",
"source": {
"type": "git",
"url": "https://github.com/cicnavi/simple-file-cache-php.git",
"reference": "372b48b5ff364e514da80005b2367ecf1950dde4"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/cicnavi/simple-file-cache-php/zipball/372b48b5ff364e514da80005b2367ecf1950dde4",
"reference": "372b48b5ff364e514da80005b2367ecf1950dde4",
"shasum": ""
},
"require": {
"ext-gmp": "*",
"ext-json": "*",
"ext-openssl": "*",
"php": ">=7.4",
"psr/simple-cache": "^1.0"
},
"provide": {
"psr/simple-cache-implementation": "1.0"
},
"require-dev": {
"ext-xdebug": "*",
"phpunit/phpunit": "^9.4",
"squizlabs/php_codesniffer": "^3.5",
"vimeo/psalm": "^3.14"
},
"type": "library",
"autoload": {
"psr-4": {
"Cicnavi\\SimpleFileCache\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Marko Ivancic",
"email": "marko.ivancic@srce.hr"
}
],
"description": "PSR-16 simple cache provider based on files.",
"support": {
"issues": "https://github.com/cicnavi/simple-file-cache-php/issues",
"source": "https://github.com/cicnavi/simple-file-cache-php/tree/v2.0.0"
},
"time": "2021-08-18T11:28:21+00:00"
},
{ {
"name": "composer/ca-bundle", "name": "composer/ca-bundle",
"version": "1.3.3", "version": "1.3.3",
...@@ -1539,6 +1592,57 @@ ...@@ -1539,6 +1592,57 @@
}, },
"time": "2021-05-03T11:20:27+00:00" "time": "2021-05-03T11:20:27+00:00"
}, },
{
"name": "psr/simple-cache",
"version": "1.0.1",
"source": {
"type": "git",
"url": "https://github.com/php-fig/simple-cache.git",
"reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/php-fig/simple-cache/zipball/408d5eafb83c57f6365a3ca330ff23aa4a5fa39b",
"reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b",
"shasum": ""
},
"require": {
"php": ">=5.3.0"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.0.x-dev"
}
},
"autoload": {
"psr-4": {
"Psr\\SimpleCache\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "PHP-FIG",
"homepage": "http://www.php-fig.org/"
}
],
"description": "Common interfaces for simple caching",
"keywords": [
"cache",
"caching",
"psr",
"psr-16",
"simple-cache"
],
"support": {
"source": "https://github.com/php-fig/simple-cache/tree/master"
},
"time": "2017-10-23T01:57:42+00:00"
},
{ {
"name": "react/promise", "name": "react/promise",
"version": "v2.9.0", "version": "v2.9.0",
......
...@@ -40,6 +40,14 @@ $config = [ ...@@ -40,6 +40,14 @@ $config = [
*/ */
//ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS, //ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS,
/**
* Cron tags.
*
* Job runner tag designates the cron tag to use when running accounting jobs. Make sure to add this tag to
* the cron module configuration in case of the 'asynchronous' accounting processing type.
*/
ModuleConfiguration::OPTION_CRON_TAG_FOR_JOB_RUNNER => 'accounting_job_runner',
/** /**
* Jobs store class. In case of the 'asynchronous' accounting processing type, this determines which class * Jobs store class. In case of the 'asynchronous' accounting processing type, this determines which class
* will be used to store jobs. The class must implement Stores\Interfaces\JobsStoreInterface. * will be used to store jobs. The class must implement Stores\Interfaces\JobsStoreInterface.
...@@ -135,4 +143,32 @@ $config = [ ...@@ -135,4 +143,32 @@ $config = [
'table_prefix' => '', // (string): Prefix for each table. 'table_prefix' => '', // (string): Prefix for each table.
], ],
], ],
/**
* Job runner fine-grained configuration options.
*/
/**
* Maximum execution time for the job runner.
*
* You can use this option to limit job runner activity by combining when the job runner will run (using
* cron configuration) and how long the job runner will be active (execution time). This can be null,
* meaning it will run indefinitely, or can be set as a duration for DateInterval, examples being
* below. Note that when the job runner is run using Cron user interface in SimpleSAMLphp, the
* duration will be taken from the 'max_execution_time' ini setting, and will override this
* setting if ini setting is shorter.
* @see https://www.php.net/manual/en/dateinterval.construct.php
*/
ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => null,
//ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT9M', // 9 minutes
//ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT59M', // 59 minutes
//ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'P1D', // 1 day
/**
* Number of processed jobs after which the job runner should take a 1-second pause.
*
* This option was introduced so that the job runner can act in a more resource friendly fashion when facing
* backend store. If the value is null, there will be no pause.
*/
ModuleConfiguration::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED => 10,
]; ];
<?php
declare(strict_types=1);
use SimpleSAML\Module\accounting\Services\JobRunner;
use SimpleSAML\Module\accounting\ModuleConfiguration;
function accounting_hook_cron(array &$cronInfo): void
{
$moduleConfiguration = new ModuleConfiguration();
$currentCronTag = $cronInfo['tag'] ?? null;
$cronTagForJobRunner = $moduleConfiguration->getCronTagForJobRunner();
try {
if ($currentCronTag === $cronTagForJobRunner) {
$state = (new JobRunner($moduleConfiguration, \SimpleSAML\Configuration::getConfig()))->run();
foreach ($state->getStatusMessages() as $statusMessage) {
$cronInfo['summary'][] = $statusMessage;
}
$message = sprintf(
'Job processing finished with %s successful jobs, %s failed jobs; total: %s.',
$state->getSuccessfulJobsProcessed(),
$state->getFailedJobsProcessed(),
$state->getTotalJobsProcessed()
);
$cronInfo['summary'][] = $message;
}
} catch (Throwable $exception) {
$message = 'Job runner error: ' . $exception->getMessage();
$cronInfo['summary'][] = $message;
}
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ declare(strict_types=1); ...@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers; namespace SimpleSAML\Module\accounting\Helpers;
// TODO mivanci move to HelpersManager
class ArrayHelper class ArrayHelper
{ {
public static function recursivelySortByKey(array &$array): void public static function recursivelySortByKey(array &$array): void
......
...@@ -4,6 +4,8 @@ declare(strict_types=1); ...@@ -4,6 +4,8 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers; namespace SimpleSAML\Module\accounting\Helpers;
// TODO mivanci move to HelpersManager
class AttributesHelper class AttributesHelper
{ {
/** /**
......
<?php
declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers;
class DateTimeHelper
{
/**
* Convert date interval to seconds, interval being minimum 1 second.
* @param \DateInterval $dateInterval Minimum is 1 second.
* @return int
*/
public function convertDateIntervalToSeconds(\DateInterval $dateInterval): int
{
$reference = new \DateTimeImmutable();
$endTime = $reference->add($dateInterval);
$duration = $endTime->getTimestamp() - $reference->getTimestamp();
if ($duration < 1) {
$duration = 1;
}
return $duration;
}
}
<?php
namespace SimpleSAML\Module\accounting\Helpers;
class EnvironmentHelper
{
public function isCli(): bool
{
return http_response_code() === false;
}
}
...@@ -6,6 +6,7 @@ namespace SimpleSAML\Module\accounting\Helpers; ...@@ -6,6 +6,7 @@ namespace SimpleSAML\Module\accounting\Helpers;
use SimpleSAML\Module\accounting\Exceptions\InvalidValueException; use SimpleSAML\Module\accounting\Exceptions\InvalidValueException;
// TODO mivanci move to HelpersManager
class FilesystemHelper class FilesystemHelper
{ {
public static function getRealPath(string $path): string public static function getRealPath(string $path): string
......
...@@ -4,6 +4,7 @@ declare(strict_types=1); ...@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers; namespace SimpleSAML\Module\accounting\Helpers;
// TODO mivanci move to HelpersManager
class HashHelper class HashHelper
{ {
public static function getSha256(string $data): string public static function getSha256(string $data): string
......
...@@ -9,6 +9,7 @@ use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException; ...@@ -9,6 +9,7 @@ use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException;
use SimpleSAML\Module\accounting\Interfaces\BuildableUsingModuleConfigurationInterface; use SimpleSAML\Module\accounting\Interfaces\BuildableUsingModuleConfigurationInterface;
use SimpleSAML\Module\accounting\ModuleConfiguration; use SimpleSAML\Module\accounting\ModuleConfiguration;
// TODO mivanci move to HelpersManager
class InstanceBuilderUsingModuleConfigurationHelper class InstanceBuilderUsingModuleConfigurationHelper
{ {
/** /**
......
...@@ -4,6 +4,7 @@ declare(strict_types=1); ...@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers; namespace SimpleSAML\Module\accounting\Helpers;
// TODO mivanci move to HelpersManager
class NetworkHelper class NetworkHelper
{ {
public static function resolveClientIpAddress(string $clientIpAddress = null): ?string public static function resolveClientIpAddress(string $clientIpAddress = null): ?string
......
<?php
namespace SimpleSAML\Module\accounting\Helpers;
class RandomHelper
{
public function getRandomInt(int $minimum = PHP_INT_MIN, int $maximum = PHP_INT_MAX): int
{
try {
return random_int($minimum, $maximum);
} catch (\Throwable $exception) {
return mt_rand($minimum, $maximum);
}
}
}
...@@ -29,6 +29,10 @@ class ModuleConfiguration ...@@ -29,6 +29,10 @@ class ModuleConfiguration
public const OPTION_ADDITIONAL_TRACKERS = 'additional_trackers'; public const OPTION_ADDITIONAL_TRACKERS = 'additional_trackers';
public const OPTION_CONNECTIONS_AND_PARAMETERS = 'connections_and_parameters'; public const OPTION_CONNECTIONS_AND_PARAMETERS = 'connections_and_parameters';
public const OPTION_CLASS_TO_CONNECTION_MAP = 'class_to_connection_map'; public const OPTION_CLASS_TO_CONNECTION_MAP = 'class_to_connection_map';
public const OPTION_CRON_TAG_FOR_JOB_RUNNER = 'cron_tag_for_job_runner';
public const OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME = 'job_runner_maximum_execution_time';
public const OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED =
'job_runner_should_pause_after_number_of_jobs_processed';
/** /**
* Contains configuration from module configuration file. * Contains configuration from module configuration file.
...@@ -52,6 +56,11 @@ class ModuleConfiguration ...@@ -52,6 +56,11 @@ class ModuleConfiguration
return $this->getConfiguration()->getString(self::OPTION_ACCOUNTING_PROCESSING_TYPE); return $this->getConfiguration()->getString(self::OPTION_ACCOUNTING_PROCESSING_TYPE);
} }
public function getCronTagForJobRunner(): string
{
return $this->getConfiguration()->getString(self::OPTION_CRON_TAG_FOR_JOB_RUNNER);
}
/** /**
* Get underlying SimpleSAMLphp Configuration instance. * Get underlying SimpleSAMLphp Configuration instance.
* *
...@@ -67,6 +76,55 @@ class ModuleConfiguration ...@@ -67,6 +76,55 @@ class ModuleConfiguration
return $this->getConfiguration()->getString(self::OPTION_JOBS_STORE); return $this->getConfiguration()->getString(self::OPTION_JOBS_STORE);
} }
public function getJobRunnerMaximumExecutionTime(): ?\DateInterval
{
$value = $this->get(self::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME);
if (is_null($value)) {
return null;
}
if (! is_string($value)) {
$message = sprintf('Job runner maximum activity must be defined either as null, or DateInterval' .
'duration (string).');
throw new InvalidConfigurationException($message);
}
try {
return new \DateInterval($value);
} catch (Throwable $exception) {
$message = sprintf('Can not create DateInterval instance using value %s as parameter.', $value);
throw new InvalidConfigurationException($message);
}
}
public function getJobRunnerShouldPauseAfterNumberOfJobsProcessed(): ?int
{
$value = $this->get(self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED);
if (is_null($value)) {
return null;
}
if (! is_int($value)) {
$message = sprintf(
'Option \'%s\' must be defined either as null, or positive integer.',
self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED
);
throw new InvalidConfigurationException($message);
}
if ($value < 1) {
$message = sprintf(
'Option \'%s\' must positive integer.',
self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED
);
throw new InvalidConfigurationException($message);
}
return $value;
}
public function getDefaultDataTrackerAndProviderClass(): string public function getDefaultDataTrackerAndProviderClass(): string
{ {
return $this->getConfiguration()->getString(self::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER); return $this->getConfiguration()->getString(self::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER);
...@@ -212,6 +270,7 @@ class ModuleConfiguration ...@@ -212,6 +270,7 @@ class ModuleConfiguration
if ($this->getAccountingProcessingType() === AccountingProcessingType::VALUE_ASYNCHRONOUS) { if ($this->getAccountingProcessingType() === AccountingProcessingType::VALUE_ASYNCHRONOUS) {
try { try {
$this->validateJobsStoreClass(); $this->validateJobsStoreClass();
$this->validateCronTagForJobRunner();
} catch (Throwable $exception) { } catch (Throwable $exception) {
$errors[] = $exception->getMessage(); $errors[] = $exception->getMessage();
} }
...@@ -376,4 +435,9 @@ class ModuleConfiguration ...@@ -376,4 +435,9 @@ class ModuleConfiguration
throw new InvalidConfigurationException(implode(' ', $errors)); throw new InvalidConfigurationException(implode(' ', $errors));
} }
} }
protected function validateCronTagForJobRunner(): void
{
$this->getCronTagForJobRunner();
}
} }
<?php
declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Services;
use SimpleSAML\Module\accounting\Helpers\DateTimeHelper;
use SimpleSAML\Module\accounting\Helpers\EnvironmentHelper;
use SimpleSAML\Module\accounting\Helpers\RandomHelper;
class HelpersManager
{
protected static ?DateTimeHelper $dateTimeHelper;
protected static ?EnvironmentHelper $environmentHelper;
protected static ?RandomHelper $randomHelper;
public function getDateTimeHelper(): DateTimeHelper
{
return self::$dateTimeHelper ??= new DateTimeHelper();
}
public function getEnvironmentHelper(): EnvironmentHelper
{
return self::$environmentHelper ??= new EnvironmentHelper();
}
public function getRandomHelper(): RandomHelper
{
return self::$randomHelper ??= new RandomHelper();
}
}
<?php
declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Services;
use Cicnavi\SimpleFileCache\SimpleFileCache;
use Psr\Log\LoggerInterface;
use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\InvalidArgumentException;
use SimpleSAML\Configuration as SspConfiguration;
use SimpleSAML\Module\accounting\Entities\Authentication\Event\Job;
use SimpleSAML\Module\accounting\Exceptions\Exception;
use SimpleSAML\Module\accounting\Exceptions\StoreException;
use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException;
use SimpleSAML\Module\accounting\ModuleConfiguration;
use SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter;
use SimpleSAML\Module\accounting\Services\JobRunner\State;
use SimpleSAML\Module\accounting\Stores\Builders\JobsStoreBuilder;
use SimpleSAML\Module\accounting\Trackers\Builders\AuthenticationDataTrackerBuilder;
use SimpleSAML\Module\accounting\Trackers\Interfaces\AuthenticationDataTrackerInterface;
class JobRunner
{
protected ModuleConfiguration $moduleConfiguration;
protected SspConfiguration $sspConfiguration;
protected LoggerInterface $logger;
protected AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder;
protected JobsStoreBuilder $jobsStoreBuilder;
protected CacheInterface $cache;
protected State $state;
protected const CACHE_NAME = 'accounting-job-runner-cache';
protected const CACHE_KEY_STATE = 'state';
/**
* Interval after which the state will be considered stale.
*/
public const STATE_STALE_THRESHOLD_INTERVAL = 'PT5M';
/**
* @var int $jobRunnerId ID of the current job runner instance.
*/
protected int $jobRunnerId;
protected array $trackers;
protected \DateInterval $stateStaleThresholdInterval;
protected RateLimiter $rateLimiter;
protected HelpersManager $helpersManager;
protected ?\DateInterval $maximumExecutionTime;
protected ?int $shouldPauseAfterNumberOfJobsProcessed;
public function __construct(
ModuleConfiguration $moduleConfiguration,
SspConfiguration $sspConfiguration,
LoggerInterface $logger = null,
AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder = null,
JobsStoreBuilder $jobsStoreBuilder = null,
CacheInterface $cache = null,
State $state = null,
RateLimiter $rateLimiter = null,
HelpersManager $helpersManager = null
) {
$this->moduleConfiguration = $moduleConfiguration;
$this->sspConfiguration = $sspConfiguration;
$this->logger = $logger ?? new Logger();
$this->authenticationDataTrackerBuilder = $authenticationDataTrackerBuilder ??
new AuthenticationDataTrackerBuilder($this->moduleConfiguration, $this->logger);
$this->jobsStoreBuilder = $jobsStoreBuilder ?? new JobsStoreBuilder($this->moduleConfiguration, $this->logger);
$this->cache = $cache ?? $this->resolveCache();
$this->helpersManager = $helpersManager ?? new HelpersManager();
$this->jobRunnerId = $this->helpersManager->getRandomHelper()->getRandomInt();
$this->state = $state ?? new State($this->jobRunnerId);
$this->trackers = $this->resolveTrackers();
$this->stateStaleThresholdInterval = new \DateInterval(self::STATE_STALE_THRESHOLD_INTERVAL);
$this->rateLimiter = $rateLimiter ?? new RateLimiter();
$this->maximumExecutionTime = $this->resolveMaximumExecutionTime();
$this->shouldPauseAfterNumberOfJobsProcessed =
$this->moduleConfiguration->getJobRunnerShouldPauseAfterNumberOfJobsProcessed();
$this->registerInterruptHandler();
}
/**
* @throws Exception|StoreException
*/
public function run(): State
{
try {
$this->validatePreRunState();
} catch (\Throwable $exception) {
$message = sprintf(
'Pre-run state validation failed. Clearing cached state and continuing. Error was %s',
$exception->getMessage()
);
$this->logger->warning($message);
$this->state->addStatusMessage($message);
$this->clearCachedState();
}
try {
$this->validateRunConditions();
} catch (\Throwable $exception) {
$message = sprintf('Run conditions are not met, stopping. Reason was: %s', $exception->getMessage());
$this->logger->info($message);
$this->state->addStatusMessage($message);
return $this->state;
}
$this->logger->debug('Run conditions validated.');
$this->initializeCachedState();
$jobsStore = $this->jobsStoreBuilder->build($this->moduleConfiguration->getJobsStoreClass());
$jobsProcessedSincePause = 0;
// We have a clean state, we can start processing.
while ($this->shouldRun()) {
try {
/** @var ?Job $job */
$job = $jobsStore->dequeue(Job::class);
$this->updateCachedState($this->state);
declare(ticks=1) {
// No new jobs at the moment....
if ($job === null) {
$this->state->addStatusMessage('No (more) jobs to process.');
// If in CLI, do the backoff pause, so we can continue working later.
if ($this->isCli()) {
$message = sprintf(
'Doing a backoff pause for %s seconds.',
$this->rateLimiter->getCurrentBackoffPauseInSeconds()
);
$this->logger->debug($message);
$this->state->addStatusMessage($message);
$this->rateLimiter->doBackoffPause();
$jobsProcessedSincePause = 0;
continue;
} else {
// Since this is a web run, we will break immediately, so we can return HTTP response.
break;
}
}
// We have a job...
$this->rateLimiter->resetBackoffPause();
}
/** @var AuthenticationDataTrackerInterface $tracker */
foreach ($this->trackers as $tracker) {
/** @var Job $job */
$tracker->process($job->getPayload());
}
$this->state->incrementSuccessfulJobsProcessed();
/** @var Job $job */
$successMessage = sprintf(
'Successfully processed job with ID %s.',
$job->getId() ?? '(N/A)'
);
$this->logger->debug($successMessage);
$this->state->addStatusMessage($successMessage);
// If the job runner friendly pausing is enabled, and if the number of jobs processed since the last
// pause is greater than the configured value, do the pause.
if (
$this->shouldPauseAfterNumberOfJobsProcessed !== null &&
$jobsProcessedSincePause > $this->shouldPauseAfterNumberOfJobsProcessed
) {
$this->rateLimiter->doPause();
$jobsProcessedSincePause = 0;
} else {
$jobsProcessedSincePause++;
}
} catch (\Throwable $exception) {
$message = sprintf('Error while processing jobs. Error was: %', $exception->getMessage());
$context = [];
if (isset($job)) {
$context = ['job' => $job];
$jobsStore->markFailedJob($job);
}
$this->logger->error($message, $context);
$this->state->incrementFailedJobsProcessed();
$this->state->addStatusMessage($message);
}
}
$this->clearCachedState();
$this->state->setEndedAt(new \DateTimeImmutable());
return $this->state;
}
/**
*/
protected function shouldRun(): bool
{
// Enable this code to tick, which will enable it to catch CTRL-C signals and stop gracefully.
declare(ticks=1) {
if ($this->isMaximumExecutionTimeReached()) {
$message = 'Maximum job runner execution time reached.';
$this->logger->debug($message);
$this->state->addStatusMessage($message);
return false;
}
if ($this->state->getTotalJobsProcessed() > (PHP_INT_MAX - 1)) {
$message = 'Maximum number of processed jobs reached.';
$this->logger->debug($message);
$this->state->addStatusMessage($message);
return false;
}
try {
$this->validateSelfState();
} catch (\Throwable $exception) {
$message = sprintf(
'Job runner state is not valid. Message was: %s',
$exception->getMessage()
);
$this->logger->warning($message);
$this->state->addStatusMessage($message);
return false;
}
}
return true;
}
/**
* @throws Exception
*/
protected function initializeCachedState(): void
{
// Make sure that the state does not exist in the cache.
try {
if ($this->getCachedState() !== null) {
throw new UnexpectedValueException('Job runner state already initialized.');
}
} catch (\Throwable $exception) {
$message = sprintf('Error initializing job runner state. Error was: %s.', $exception->getMessage());
$this->logger->error($message);
throw new Exception($message, (int)$exception->getCode(), $exception);
}
$startedAt = new \DateTimeImmutable();
$this->state->setStartedAt($startedAt);
$this->updateCachedState($this->state, $startedAt);
}
/**
* @throws Exception
*/
protected function validatePreRunState(): void
{
$cachedState = $this->getCachedState();
// Empty state means that no other job runner is active.
if ($cachedState === null) {
return;
}
if ($cachedState->getJobRunnerId() === $this->jobRunnerId) {
$message = 'Job runner ID in cached state same as new ID.';
$this->logger->error($message);
throw new Exception($message);
}
if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
$message = 'Stale state encountered.';
$this->logger->warning($message);
throw new Exception($message);
}
}
/**
* @throws Exception
*/
protected function validateSelfState(): void
{
$cachedState = $this->getCachedState();
// Validate state before start.
if ($this->state->hasRunStarted() === false) {
if ($cachedState !== null) {
$message = 'Job run has not started, however cached state has already been initialized.';
throw new Exception($message);
}
}
// Validate state after start.
if ($this->state->hasRunStarted() === true) {
if ($cachedState === null) {
$message = 'Job run has started, however cached state has not been initialized.';
throw new Exception($message);
}
if ($cachedState->getJobRunnerId() !== $this->jobRunnerId) {
$message = 'Current job runner ID differs from the ID in the cached state.';
throw new Exception($message);
}
if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
$message = 'Job runner cached state is stale, which means possible job runner process shutdown' .
' without cached state clearing.';
throw new Exception($message);
}
if ($cachedState->getIsGracefulInterruptInitiated()) {
$message = 'Graceful job processing interrupt initiated.';
throw new Exception($message);
}
}
}
protected function isAnotherJobRunnerActive(): bool
{
try {
$cachedState = $this->getCachedState();
if ($cachedState === null) {
return false;
}
// There is cached state, which would indicate that a job runner is active. However, make sure that the
// state is not stale (which indicates that the runner was shutdown without state clearing). If stale,
// this means that the job runner is not active.
if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
$this->logger->warning('Stale cache encountered. Assuming no job runner is active.');
return false;
}
return $cachedState->getJobRunnerId() !== $this->jobRunnerId;
} catch (\Throwable $exception) {
$message = sprintf(
'Error checking if another job runner is active. To play safe, we will assume true. ' .
'Error was: %s',
$exception->getMessage()
);
$this->logger->error($message);
return true;
}
}
/**
* @throws Exception
*/
protected function resolveCache(): SimpleFileCache
{
try {
$this->logger->debug('Trying to initialize job runner cache using SSP datadir.');
$cache = new SimpleFileCache(
self::CACHE_NAME,
$this->sspConfiguration->getPathValue('datadir')
);
$this->logger->debug('Successfully initialized cache using SSP datadir.');
return $cache;
} catch (\Throwable $exception) {
$message = sprintf(
'Error initializing job runner cache using datadir. Error was: %s',
$exception->getMessage()
);
$this->logger->debug($message);
}
try {
$this->logger->debug('Trying to initialize job runner cache using SSP tempdir.');
$cache = new SimpleFileCache(
self::CACHE_NAME,
$this->sspConfiguration->getPathValue('tempdir')
);
$this->logger->debug('Successfully initialized job runner cache using SSP tempdir.');
return $cache;
} catch (\Throwable $exception) {
$message = sprintf(
'Error initializing job runner cache using tempdir. Error was: %s.',
$exception->getMessage()
);
$this->logger->debug($message);
}
try {
$this->logger->debug('Trying to initialize job runner cache using system tmp dir.');
$cache = new SimpleFileCache(self::CACHE_NAME);
$this->logger->debug('Successfully initialized cache using system tmp dir.');
return $cache;
} catch (\Throwable $exception) {
$message = sprintf(
'Error initializing job runner cache. Error was: %s.',
$exception->getMessage()
);
$this->logger->debug($message);
throw new Exception($message, (int)$exception->getCode(), $exception);
}
}
/**
* @throws Exception
*/
protected function clearCachedState(): void
{
/** @psalm-suppress InvalidCatch */
try {
$this->cache->delete(self::CACHE_KEY_STATE);
} catch (\Throwable | InvalidArgumentException $exception) {
$message = sprintf(
'Error clearing job runner cache. Error was: %s.',
$exception->getMessage()
);
$this->logger->error($message);
throw new Exception($message, (int)$exception->getCode(), $exception);
}
}
/**
* @throws Exception
*/
protected function getCachedState(): ?State
{
/** @psalm-suppress InvalidCatch */
try {
/** @var ?State $state */
$state = $this->cache->get(self::CACHE_KEY_STATE);
if ($state instanceof State) {
return $state;
} else {
return null;
}
} catch (\Throwable | InvalidArgumentException $exception) {
$message = sprintf('Error getting job runner state from cache. Error was: %s', $exception->getMessage());
throw new Exception($message, (int)$exception->getCode(), $exception);
}
}
/**
* @throws Exception
*/
protected function updateCachedState(State $state, \DateTimeImmutable $updatedAt = null): void
{
$updatedAt = $updatedAt ?? new \DateTimeImmutable();
$state->setUpdatedAt($updatedAt);
/** @psalm-suppress InvalidCatch */
try {
$this->cache->set(self::CACHE_KEY_STATE, $state);
} catch (\Throwable | InvalidArgumentException $exception) {
$message = sprintf('Error setting job runner state. Error was: %s.', $exception->getMessage());
$this->logger->error($message);
throw new Exception($message, (int)$exception->getCode(), $exception);
}
}
/**
* @throws Exception
*/
protected function validateRunConditions(): void
{
if (
$this->moduleConfiguration->getAccountingProcessingType() !==
ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS
) {
$message = 'Job runner called, however accounting mode is not ' .
ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS;
$this->logger->warning($message);
throw new Exception($message);
}
if ($this->isAnotherJobRunnerActive()) {
$message = 'Another job runner is active.';
$this->logger->debug($message);
throw new Exception($message);
}
}
/**
* @throws Exception
*/
protected function resolveTrackers(): array
{
$trackers = [];
$configuredTrackerClasses = array_merge(
[$this->moduleConfiguration->getDefaultDataTrackerAndProviderClass()],
$this->moduleConfiguration->getAdditionalTrackers()
);
/** @var string $trackerClass */
foreach ($configuredTrackerClasses as $trackerClass) {
$trackers[$trackerClass] = $this->authenticationDataTrackerBuilder->build($trackerClass);
}
return $trackers;
}
protected function isCli(): bool
{
return $this->helpersManager->getEnvironmentHelper()->isCli();
}
/**
* Register interrupt handler. This makes it possible to stop job processing gracefully by
* clearing the current state. It relies on pcntl extension, so to use this feature,
* that extension has to be enabled.
* @see https://www.php.net/manual/en/pcntl.installation.php
* @return void
*/
protected function registerInterruptHandler(): void
{
// pcntl won't be available in web server environment, so skip immediately.
if (! $this->isCli()) {
return;
}
// Extension pcntl doesn't come with PHP by default, so check if the proper function is available.
if (! function_exists('pcntl_signal')) {
$message = 'pcntl related functions not available, skipping registering interrupt handler.';
$this->logger->info($message);
$this->state->addStatusMessage($message);
return;
}
pcntl_signal(SIGINT, [$this, 'handleInterrupt']);
pcntl_signal(SIGTERM, [$this, 'handleInterrupt']);
}
/**
* @throws Exception
*/
protected function handleInterrupt(int $signal): void
{
$message = sprintf('Gracefully stopping job processing. Interrupt signal was %s.', $signal);
$this->state->addStatusMessage($message);
$this->logger->info($message);
$this->state->setIsGracefulInterruptInitiated(true);
$this->updateCachedState($this->state);
}
protected function resolveMaximumExecutionTime(): ?\DateInterval
{
$maximumExecutionTime = $this->moduleConfiguration->getJobRunnerMaximumExecutionTime();
// If we are in CLI environment, we can safely use module configuration setting.
if ($this->isCli()) {
return $maximumExecutionTime;
}
// We are in a "web" environment, so take max execution time ini setting into account.
$iniMaximumExecutionTimeSeconds = (int)floor((int)ini_get('max_execution_time') * 0.8);
$iniMaximumExecutionTime = new \DateInterval('PT' . $iniMaximumExecutionTimeSeconds . 'S');
// If the module setting is null (meaning infinite), use the ini setting.
if ($maximumExecutionTime === null) {
return $iniMaximumExecutionTime;
}
// Use the shorter interval from the two...
$maximumExecutionTimeSeconds = $this->helpersManager
->getDateTimeHelper()
->convertDateIntervalToSeconds($maximumExecutionTime);
if ($iniMaximumExecutionTimeSeconds < $maximumExecutionTimeSeconds) {
$this->logger->debug('Using maximum execution time from INI setting since it is shorter.');
return $iniMaximumExecutionTime;
}
return $maximumExecutionTime;
}
protected function isMaximumExecutionTimeReached(): bool
{
if ($this->maximumExecutionTime === null) {
// Execution time is infinite.
return false;
}
$startedAt = $this->state->getStartedAt();
if ($startedAt === null) {
// Processing has not even started yet.
return false;
}
$maxDateTime = $startedAt->add($this->maximumExecutionTime);
if ($maxDateTime > (new \DateTimeImmutable())) {
// Maximum has not been reached yet.
return false;
}
// Maximum has been reached.
return true;
}
}
<?php
namespace SimpleSAML\Module\accounting\Services\JobRunner;
use SimpleSAML\Module\accounting\Services\HelpersManager;
class RateLimiter
{
public const DEFAULT_MAX_PAUSE_DURATION = 'PT10M';
public const DEFAULT_MAX_BACKOFF_PAUSE_DURATION = 'PT1M';
protected HelpersManager $helpersManager;
protected int $maxPauseInSeconds;
protected int $maxBackoffPauseInSeconds;
protected int $currentBackoffPauseInSeconds = 1;
public function __construct(
\DateInterval $maxPauseInterval = null,
\DateInterval $maxBackoffInterval = null,
HelpersManager $helpersManager = null
) {
$this->helpersManager = $helpersManager ?? new HelpersManager();
$this->maxPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds(
$maxPauseInterval ?? new \DateInterval(self::DEFAULT_MAX_PAUSE_DURATION)
);
$this->maxBackoffPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds(
$maxBackoffInterval ?? new \DateInterval(self::DEFAULT_MAX_BACKOFF_PAUSE_DURATION)
);
}
public function doBackoffPause(): void
{
/** @psalm-suppress ArgumentTypeCoercion */
sleep($this->currentBackoffPauseInSeconds);
$newBackoffPauseInSeconds = $this->currentBackoffPauseInSeconds + $this->currentBackoffPauseInSeconds;
$this->currentBackoffPauseInSeconds = min($newBackoffPauseInSeconds, $this->maxBackoffPauseInSeconds);
}
public function doPause(int $seconds = 1): void
{
$seconds = $seconds > 0 ? $seconds : 1;
sleep($seconds);
}
public function resetBackoffPause(): void
{
$this->currentBackoffPauseInSeconds = 1;
}
/**
* @return int
*/
public function getMaxPauseInSeconds(): int
{
return $this->maxPauseInSeconds;
}
/**
* @return int
*/
public function getMaxBackoffPauseInSeconds(): int
{
return $this->maxBackoffPauseInSeconds;
}
/**
* @return int
*/
public function getCurrentBackoffPauseInSeconds(): int
{
return $this->currentBackoffPauseInSeconds;
}
}
<?php
declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Services\JobRunner;
class State
{
public const MAX_NUMBER_OF_MESSAGES_TO_KEEP = 100;
public const DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP = 10;
protected int $jobRunnerId;
protected ?\DateTimeImmutable $startedAt;
protected \DateTimeImmutable $updatedAt;
protected ?\DateTimeImmutable $endedAt = null;
protected int $successfulJobsProcessed = 0;
protected int $failedJobsProcessed = 0;
protected array $statusMessages = [];
protected int $numberOfStatusMessagesToKeep = 10;
protected bool $isGracefulInterruptInitiated = false;
public function __construct(
int $jobRunnerId,
\DateTimeImmutable $startedAt = null,
\DateTimeImmutable $updatedAt = null,
int $numberOfStatusMessagesToKeep = self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP
) {
$this->jobRunnerId = $jobRunnerId;
$this->startedAt = $startedAt;
$this->updatedAt = $updatedAt ?? new \DateTimeImmutable();
$this->numberOfStatusMessagesToKeep =
$numberOfStatusMessagesToKeep > 0 && $numberOfStatusMessagesToKeep <= self::MAX_NUMBER_OF_MESSAGES_TO_KEEP ?
$numberOfStatusMessagesToKeep :
self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP;
}
/**
* @return int
*/
public function getJobRunnerId(): int
{
return $this->jobRunnerId;
}
/**
* @return ?\DateTimeImmutable
*/
public function getStartedAt(): ?\DateTimeImmutable
{
return $this->startedAt;
}
/**
* Set startedAt if not already set.
* @param \DateTimeImmutable $startedAt
* @return bool True if set, false otherwise.
*/
public function setStartedAt(\DateTimeImmutable $startedAt): bool
{
if ($this->startedAt === null) {
$this->startedAt = $startedAt;
return true;
}
return false;
}
/**
* @return \DateTimeImmutable
*/
public function getUpdatedAt(): \DateTimeImmutable
{
return $this->updatedAt;
}
/**
* @param \DateTimeImmutable $updatedAt
*/
public function setUpdatedAt(\DateTimeImmutable $updatedAt): void
{
$this->updatedAt = $updatedAt;
}
/**
* Set endedAt if not already set.
* @param \DateTimeImmutable $endedAt
* @return bool True if set, false otherwise.
*/
public function setEndedAt(\DateTimeImmutable $endedAt): bool
{
if ($this->endedAt === null) {
$this->endedAt = $endedAt;
return true;
}
return false;
}
/**
* @return ?\DateTimeImmutable
*/
public function getEndedAt(): ?\DateTimeImmutable
{
return $this->endedAt;
}
public function hasRunStarted(): bool
{
return $this->startedAt !== null;
}
public function incrementSuccessfulJobsProcessed(): void
{
$this->successfulJobsProcessed++;
}
public function incrementFailedJobsProcessed(): void
{
$this->failedJobsProcessed++;
}
/**
* @return int
*/
public function getSuccessfulJobsProcessed(): int
{
return $this->successfulJobsProcessed;
}
/**
* @return int
*/
public function getFailedJobsProcessed(): int
{
return $this->failedJobsProcessed;
}
public function isStale(\DateInterval $threshold): bool
{
$minDateTime = (new \DateTimeImmutable())->sub($threshold);
if ($this->getUpdatedAt() < $minDateTime) {
return true;
}
return false;
}
public function getTotalJobsProcessed(): int
{
return $this->getSuccessfulJobsProcessed() + $this->getFailedJobsProcessed();
}
public function addStatusMessage(string $message): void
{
$this->statusMessages[] = $message;
if (count($this->statusMessages) > $this->numberOfStatusMessagesToKeep) {
array_shift($this->statusMessages);
}
}
public function getStatusMessages(): array
{
return $this->statusMessages;
}
public function getLastStatusMessage(): ?string
{
if (empty($this->statusMessages)) {
return null;
}
$message = (string)end($this->statusMessages);
reset($this->statusMessages);
return $message;
}
/**
* @return bool
*/
public function getIsGracefulInterruptInitiated(): bool
{
return $this->isGracefulInterruptInitiated;
}
/**
* @param bool $isGracefulInterruptInitiated
*/
public function setIsGracefulInterruptInitiated(bool $isGracefulInterruptInitiated): void
{
$this->isGracefulInterruptInitiated = $isGracefulInterruptInitiated;
}
}
...@@ -24,6 +24,12 @@ interface JobsStoreInterface extends StoreInterface ...@@ -24,6 +24,12 @@ interface JobsStoreInterface extends StoreInterface
*/ */
public function dequeue(string $type = null): ?JobInterface; public function dequeue(string $type = null): ?JobInterface;
/**
* @param JobInterface $job
* @return void
*/
public function markFailedJob(JobInterface $job): void;
public static function build( public static function build(
ModuleConfiguration $moduleConfiguration, ModuleConfiguration $moduleConfiguration,
LoggerInterface $logger, LoggerInterface $logger,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment