From b7ed080fd0bf744b13d663ea86c5ac21b28ba113 Mon Sep 17 00:00:00 2001 From: Marko Ivancic <cicnavi@gmail.com> Date: Fri, 14 Oct 2022 12:20:00 +0200 Subject: [PATCH] Enable running jobs using SSP cron module (#7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Marko Ivančić <marko.ivancic@srce.hr> --- README.md | 4 +- composer.json | 6 +- composer.lock | 106 +- config-templates/module_accounting.php | 36 + hooks/hook_cron.php | 34 + src/Helpers/ArrayHelper.php | 1 + src/Helpers/AttributesHelper.php | 2 + src/Helpers/DateTimeHelper.php | 27 + src/Helpers/EnvironmentHelper.php | 11 + src/Helpers/FilesystemHelper.php | 1 + src/Helpers/HashHelper.php | 1 + ...eBuilderUsingModuleConfigurationHelper.php | 1 + src/Helpers/NetworkHelper.php | 1 + src/Helpers/RandomHelper.php | 15 + src/ModuleConfiguration.php | 64 + src/Services/HelpersManager.php | 31 + src/Services/JobRunner.php | 599 +++++++++ src/Services/JobRunner/RateLimiter.php | 76 ++ src/Services/JobRunner/State.php | 196 +++ src/Stores/Interfaces/JobsStoreInterface.php | 6 + src/Stores/Jobs/DoctrineDbal/Store.php | 15 +- tests/config-templates/module_accounting.php | 2 + .../Services/JobRunner/RateLimiterTest.php | 67 + tests/src/Services/JobRunner/StateTest.php | 108 ++ tests/src/Services/JobRunnerTest.php | 1085 +++++++++++++++++ 25 files changed, 2491 insertions(+), 4 deletions(-) create mode 100644 hooks/hook_cron.php create mode 100644 src/Helpers/DateTimeHelper.php create mode 100644 src/Helpers/EnvironmentHelper.php create mode 100644 src/Helpers/RandomHelper.php create mode 100644 src/Services/HelpersManager.php create mode 100644 src/Services/JobRunner.php create mode 100644 src/Services/JobRunner/RateLimiter.php create mode 100644 src/Services/JobRunner/State.php create mode 100644 tests/src/Services/JobRunner/RateLimiterTest.php create mode 100644 tests/src/Services/JobRunner/StateTest.php create mode 100644 tests/src/Services/JobRunnerTest.php diff --git a/README.md b/README.md index 02d784a..3f04cb5 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,9 @@ SimpleSAMLphp module providing user accounting functionality. ## TODO - [x] Data stores -- [ ] Cron hooks +- [ ] Cron hook + - [ ] explain specific cron tag for accounting (it should have its own tag which should be +added to cron config - [ ] Profile page UI - [ ] Translation diff --git a/composer.json b/composer.json index 2cf885f..81c480e 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,8 @@ "ext-pdo_sqlite": "*", "doctrine/dbal": "^3", "psr/log": "^1|^2|^3", - "simplesamlphp/composer-module-installer": "^1" + "simplesamlphp/composer-module-installer": "^1", + "cicnavi/simple-file-cache-php": "^2.0" }, "require-dev": { "vimeo/psalm": "^4", @@ -39,6 +40,9 @@ "simplesamlphp/simplesamlphp": "^2@beta", "simplesamlphp/simplesamlphp-test-framework": "^1" }, + "suggest": { + "ext-pcntl": "*" + }, "scripts": { "pre-commit": [ "vendor/bin/phpcs -p", diff --git a/composer.lock b/composer.lock index ec09766..40bbe7f 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,61 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "dcca72582b835b41450dff9d223d5b82", + "content-hash": "e1bdcb1340cc34e659c651695992a6b8", "packages": [ + { + "name": "cicnavi/simple-file-cache-php", + "version": "v2.0.0", + "source": { + "type": "git", + "url": "https://github.com/cicnavi/simple-file-cache-php.git", + "reference": "372b48b5ff364e514da80005b2367ecf1950dde4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/cicnavi/simple-file-cache-php/zipball/372b48b5ff364e514da80005b2367ecf1950dde4", + "reference": "372b48b5ff364e514da80005b2367ecf1950dde4", + "shasum": "" + }, + "require": { + "ext-gmp": "*", + "ext-json": "*", + "ext-openssl": "*", + "php": ">=7.4", + "psr/simple-cache": "^1.0" + }, + "provide": { + "psr/simple-cache-implementation": "1.0" + }, + "require-dev": { + "ext-xdebug": "*", + "phpunit/phpunit": "^9.4", + "squizlabs/php_codesniffer": "^3.5", + "vimeo/psalm": "^3.14" + }, + "type": "library", + "autoload": { + "psr-4": { + "Cicnavi\\SimpleFileCache\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Marko Ivancic", + "email": "marko.ivancic@srce.hr" + } + ], + "description": "PSR-16 simple cache provider based on files.", + "support": { + "issues": "https://github.com/cicnavi/simple-file-cache-php/issues", + "source": "https://github.com/cicnavi/simple-file-cache-php/tree/v2.0.0" + }, + "time": "2021-08-18T11:28:21+00:00" + }, { "name": "composer/ca-bundle", "version": "1.3.3", @@ -1539,6 +1592,57 @@ }, "time": "2021-05-03T11:20:27+00:00" }, + { + "name": "psr/simple-cache", + "version": "1.0.1", + "source": { + "type": "git", + "url": "https://github.com/php-fig/simple-cache.git", + "reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/simple-cache/zipball/408d5eafb83c57f6365a3ca330ff23aa4a5fa39b", + "reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b", + "shasum": "" + }, + "require": { + "php": ">=5.3.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\SimpleCache\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interfaces for simple caching", + "keywords": [ + "cache", + "caching", + "psr", + "psr-16", + "simple-cache" + ], + "support": { + "source": "https://github.com/php-fig/simple-cache/tree/master" + }, + "time": "2017-10-23T01:57:42+00:00" + }, { "name": "react/promise", "version": "v2.9.0", diff --git a/config-templates/module_accounting.php b/config-templates/module_accounting.php index 4a3538a..17f8a52 100644 --- a/config-templates/module_accounting.php +++ b/config-templates/module_accounting.php @@ -40,6 +40,14 @@ $config = [ */ //ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS, + /** + * Cron tags. + * + * Job runner tag designates the cron tag to use when running accounting jobs. Make sure to add this tag to + * the cron module configuration in case of the 'asynchronous' accounting processing type. + */ + ModuleConfiguration::OPTION_CRON_TAG_FOR_JOB_RUNNER => 'accounting_job_runner', + /** * Jobs store class. In case of the 'asynchronous' accounting processing type, this determines which class * will be used to store jobs. The class must implement Stores\Interfaces\JobsStoreInterface. @@ -135,4 +143,32 @@ $config = [ 'table_prefix' => '', // (string): Prefix for each table. ], ], + + /** + * Job runner fine-grained configuration options. + */ + + /** + * Maximum execution time for the job runner. + * + * You can use this option to limit job runner activity by combining when the job runner will run (using + * cron configuration) and how long the job runner will be active (execution time). This can be null, + * meaning it will run indefinitely, or can be set as a duration for DateInterval, examples being + * below. Note that when the job runner is run using Cron user interface in SimpleSAMLphp, the + * duration will be taken from the 'max_execution_time' ini setting, and will override this + * setting if ini setting is shorter. + * @see https://www.php.net/manual/en/dateinterval.construct.php + */ + ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => null, + //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT9M', // 9 minutes + //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT59M', // 59 minutes + //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'P1D', // 1 day + + /** + * Number of processed jobs after which the job runner should take a 1-second pause. + * + * This option was introduced so that the job runner can act in a more resource friendly fashion when facing + * backend store. If the value is null, there will be no pause. + */ + ModuleConfiguration::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED => 10, ]; diff --git a/hooks/hook_cron.php b/hooks/hook_cron.php new file mode 100644 index 0000000..fbc5716 --- /dev/null +++ b/hooks/hook_cron.php @@ -0,0 +1,34 @@ +<?php + +declare(strict_types=1); + +use SimpleSAML\Module\accounting\Services\JobRunner; +use SimpleSAML\Module\accounting\ModuleConfiguration; + +function accounting_hook_cron(array &$cronInfo): void +{ + $moduleConfiguration = new ModuleConfiguration(); + + $currentCronTag = $cronInfo['tag'] ?? null; + + $cronTagForJobRunner = $moduleConfiguration->getCronTagForJobRunner(); + + try { + if ($currentCronTag === $cronTagForJobRunner) { + $state = (new JobRunner($moduleConfiguration, \SimpleSAML\Configuration::getConfig()))->run(); + foreach ($state->getStatusMessages() as $statusMessage) { + $cronInfo['summary'][] = $statusMessage; + } + $message = sprintf( + 'Job processing finished with %s successful jobs, %s failed jobs; total: %s.', + $state->getSuccessfulJobsProcessed(), + $state->getFailedJobsProcessed(), + $state->getTotalJobsProcessed() + ); + $cronInfo['summary'][] = $message; + } + } catch (Throwable $exception) { + $message = 'Job runner error: ' . $exception->getMessage(); + $cronInfo['summary'][] = $message; + } +} \ No newline at end of file diff --git a/src/Helpers/ArrayHelper.php b/src/Helpers/ArrayHelper.php index a87f81d..0b57b66 100644 --- a/src/Helpers/ArrayHelper.php +++ b/src/Helpers/ArrayHelper.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace SimpleSAML\Module\accounting\Helpers; +// TODO mivanci move to HelpersManager class ArrayHelper { public static function recursivelySortByKey(array &$array): void diff --git a/src/Helpers/AttributesHelper.php b/src/Helpers/AttributesHelper.php index 8fc6806..e3aedf3 100644 --- a/src/Helpers/AttributesHelper.php +++ b/src/Helpers/AttributesHelper.php @@ -4,6 +4,8 @@ declare(strict_types=1); namespace SimpleSAML\Module\accounting\Helpers; +// TODO mivanci move to HelpersManager + class AttributesHelper { /** diff --git a/src/Helpers/DateTimeHelper.php b/src/Helpers/DateTimeHelper.php new file mode 100644 index 0000000..0fc5f25 --- /dev/null +++ b/src/Helpers/DateTimeHelper.php @@ -0,0 +1,27 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Module\accounting\Helpers; + +class DateTimeHelper +{ + /** + * Convert date interval to seconds, interval being minimum 1 second. + * @param \DateInterval $dateInterval Minimum is 1 second. + * @return int + */ + public function convertDateIntervalToSeconds(\DateInterval $dateInterval): int + { + $reference = new \DateTimeImmutable(); + $endTime = $reference->add($dateInterval); + + $duration = $endTime->getTimestamp() - $reference->getTimestamp(); + + if ($duration < 1) { + $duration = 1; + } + + return $duration; + } +} diff --git a/src/Helpers/EnvironmentHelper.php b/src/Helpers/EnvironmentHelper.php new file mode 100644 index 0000000..b67f33c --- /dev/null +++ b/src/Helpers/EnvironmentHelper.php @@ -0,0 +1,11 @@ +<?php + +namespace SimpleSAML\Module\accounting\Helpers; + +class EnvironmentHelper +{ + public function isCli(): bool + { + return http_response_code() === false; + } +} diff --git a/src/Helpers/FilesystemHelper.php b/src/Helpers/FilesystemHelper.php index 074b461..f82b41c 100644 --- a/src/Helpers/FilesystemHelper.php +++ b/src/Helpers/FilesystemHelper.php @@ -6,6 +6,7 @@ namespace SimpleSAML\Module\accounting\Helpers; use SimpleSAML\Module\accounting\Exceptions\InvalidValueException; +// TODO mivanci move to HelpersManager class FilesystemHelper { public static function getRealPath(string $path): string diff --git a/src/Helpers/HashHelper.php b/src/Helpers/HashHelper.php index 568b826..7421125 100644 --- a/src/Helpers/HashHelper.php +++ b/src/Helpers/HashHelper.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace SimpleSAML\Module\accounting\Helpers; +// TODO mivanci move to HelpersManager class HashHelper { public static function getSha256(string $data): string diff --git a/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php b/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php index fdecc9a..ffdba87 100644 --- a/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php +++ b/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php @@ -9,6 +9,7 @@ use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException; use SimpleSAML\Module\accounting\Interfaces\BuildableUsingModuleConfigurationInterface; use SimpleSAML\Module\accounting\ModuleConfiguration; +// TODO mivanci move to HelpersManager class InstanceBuilderUsingModuleConfigurationHelper { /** diff --git a/src/Helpers/NetworkHelper.php b/src/Helpers/NetworkHelper.php index 4d033e7..7b24549 100644 --- a/src/Helpers/NetworkHelper.php +++ b/src/Helpers/NetworkHelper.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace SimpleSAML\Module\accounting\Helpers; +// TODO mivanci move to HelpersManager class NetworkHelper { public static function resolveClientIpAddress(string $clientIpAddress = null): ?string diff --git a/src/Helpers/RandomHelper.php b/src/Helpers/RandomHelper.php new file mode 100644 index 0000000..3025683 --- /dev/null +++ b/src/Helpers/RandomHelper.php @@ -0,0 +1,15 @@ +<?php + +namespace SimpleSAML\Module\accounting\Helpers; + +class RandomHelper +{ + public function getRandomInt(int $minimum = PHP_INT_MIN, int $maximum = PHP_INT_MAX): int + { + try { + return random_int($minimum, $maximum); + } catch (\Throwable $exception) { + return mt_rand($minimum, $maximum); + } + } +} diff --git a/src/ModuleConfiguration.php b/src/ModuleConfiguration.php index 6f4cce1..71b33fd 100644 --- a/src/ModuleConfiguration.php +++ b/src/ModuleConfiguration.php @@ -29,6 +29,10 @@ class ModuleConfiguration public const OPTION_ADDITIONAL_TRACKERS = 'additional_trackers'; public const OPTION_CONNECTIONS_AND_PARAMETERS = 'connections_and_parameters'; public const OPTION_CLASS_TO_CONNECTION_MAP = 'class_to_connection_map'; + public const OPTION_CRON_TAG_FOR_JOB_RUNNER = 'cron_tag_for_job_runner'; + public const OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME = 'job_runner_maximum_execution_time'; + public const OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED = + 'job_runner_should_pause_after_number_of_jobs_processed'; /** * Contains configuration from module configuration file. @@ -52,6 +56,11 @@ class ModuleConfiguration return $this->getConfiguration()->getString(self::OPTION_ACCOUNTING_PROCESSING_TYPE); } + public function getCronTagForJobRunner(): string + { + return $this->getConfiguration()->getString(self::OPTION_CRON_TAG_FOR_JOB_RUNNER); + } + /** * Get underlying SimpleSAMLphp Configuration instance. * @@ -67,6 +76,55 @@ class ModuleConfiguration return $this->getConfiguration()->getString(self::OPTION_JOBS_STORE); } + public function getJobRunnerMaximumExecutionTime(): ?\DateInterval + { + $value = $this->get(self::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME); + + if (is_null($value)) { + return null; + } + + if (! is_string($value)) { + $message = sprintf('Job runner maximum activity must be defined either as null, or DateInterval' . + 'duration (string).'); + throw new InvalidConfigurationException($message); + } + + try { + return new \DateInterval($value); + } catch (Throwable $exception) { + $message = sprintf('Can not create DateInterval instance using value %s as parameter.', $value); + throw new InvalidConfigurationException($message); + } + } + + public function getJobRunnerShouldPauseAfterNumberOfJobsProcessed(): ?int + { + $value = $this->get(self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED); + + if (is_null($value)) { + return null; + } + + if (! is_int($value)) { + $message = sprintf( + 'Option \'%s\' must be defined either as null, or positive integer.', + self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED + ); + throw new InvalidConfigurationException($message); + } + + if ($value < 1) { + $message = sprintf( + 'Option \'%s\' must positive integer.', + self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED + ); + throw new InvalidConfigurationException($message); + } + + return $value; + } + public function getDefaultDataTrackerAndProviderClass(): string { return $this->getConfiguration()->getString(self::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER); @@ -212,6 +270,7 @@ class ModuleConfiguration if ($this->getAccountingProcessingType() === AccountingProcessingType::VALUE_ASYNCHRONOUS) { try { $this->validateJobsStoreClass(); + $this->validateCronTagForJobRunner(); } catch (Throwable $exception) { $errors[] = $exception->getMessage(); } @@ -376,4 +435,9 @@ class ModuleConfiguration throw new InvalidConfigurationException(implode(' ', $errors)); } } + + protected function validateCronTagForJobRunner(): void + { + $this->getCronTagForJobRunner(); + } } diff --git a/src/Services/HelpersManager.php b/src/Services/HelpersManager.php new file mode 100644 index 0000000..f20ddc1 --- /dev/null +++ b/src/Services/HelpersManager.php @@ -0,0 +1,31 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Module\accounting\Services; + +use SimpleSAML\Module\accounting\Helpers\DateTimeHelper; +use SimpleSAML\Module\accounting\Helpers\EnvironmentHelper; +use SimpleSAML\Module\accounting\Helpers\RandomHelper; + +class HelpersManager +{ + protected static ?DateTimeHelper $dateTimeHelper; + protected static ?EnvironmentHelper $environmentHelper; + protected static ?RandomHelper $randomHelper; + + public function getDateTimeHelper(): DateTimeHelper + { + return self::$dateTimeHelper ??= new DateTimeHelper(); + } + + public function getEnvironmentHelper(): EnvironmentHelper + { + return self::$environmentHelper ??= new EnvironmentHelper(); + } + + public function getRandomHelper(): RandomHelper + { + return self::$randomHelper ??= new RandomHelper(); + } +} diff --git a/src/Services/JobRunner.php b/src/Services/JobRunner.php new file mode 100644 index 0000000..9735bfa --- /dev/null +++ b/src/Services/JobRunner.php @@ -0,0 +1,599 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Module\accounting\Services; + +use Cicnavi\SimpleFileCache\SimpleFileCache; +use Psr\Log\LoggerInterface; +use Psr\SimpleCache\CacheInterface; +use Psr\SimpleCache\InvalidArgumentException; +use SimpleSAML\Configuration as SspConfiguration; +use SimpleSAML\Module\accounting\Entities\Authentication\Event\Job; +use SimpleSAML\Module\accounting\Exceptions\Exception; +use SimpleSAML\Module\accounting\Exceptions\StoreException; +use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException; +use SimpleSAML\Module\accounting\ModuleConfiguration; +use SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter; +use SimpleSAML\Module\accounting\Services\JobRunner\State; +use SimpleSAML\Module\accounting\Stores\Builders\JobsStoreBuilder; +use SimpleSAML\Module\accounting\Trackers\Builders\AuthenticationDataTrackerBuilder; +use SimpleSAML\Module\accounting\Trackers\Interfaces\AuthenticationDataTrackerInterface; + +class JobRunner +{ + protected ModuleConfiguration $moduleConfiguration; + protected SspConfiguration $sspConfiguration; + protected LoggerInterface $logger; + protected AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder; + protected JobsStoreBuilder $jobsStoreBuilder; + protected CacheInterface $cache; + protected State $state; + + protected const CACHE_NAME = 'accounting-job-runner-cache'; + protected const CACHE_KEY_STATE = 'state'; + + /** + * Interval after which the state will be considered stale. + */ + public const STATE_STALE_THRESHOLD_INTERVAL = 'PT5M'; + + /** + * @var int $jobRunnerId ID of the current job runner instance. + */ + protected int $jobRunnerId; + protected array $trackers; + protected \DateInterval $stateStaleThresholdInterval; + protected RateLimiter $rateLimiter; + protected HelpersManager $helpersManager; + protected ?\DateInterval $maximumExecutionTime; + protected ?int $shouldPauseAfterNumberOfJobsProcessed; + + public function __construct( + ModuleConfiguration $moduleConfiguration, + SspConfiguration $sspConfiguration, + LoggerInterface $logger = null, + AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder = null, + JobsStoreBuilder $jobsStoreBuilder = null, + CacheInterface $cache = null, + State $state = null, + RateLimiter $rateLimiter = null, + HelpersManager $helpersManager = null + ) { + $this->moduleConfiguration = $moduleConfiguration; + $this->sspConfiguration = $sspConfiguration; + $this->logger = $logger ?? new Logger(); + $this->authenticationDataTrackerBuilder = $authenticationDataTrackerBuilder ?? + new AuthenticationDataTrackerBuilder($this->moduleConfiguration, $this->logger); + $this->jobsStoreBuilder = $jobsStoreBuilder ?? new JobsStoreBuilder($this->moduleConfiguration, $this->logger); + + $this->cache = $cache ?? $this->resolveCache(); + + $this->helpersManager = $helpersManager ?? new HelpersManager(); + + $this->jobRunnerId = $this->helpersManager->getRandomHelper()->getRandomInt(); + + $this->state = $state ?? new State($this->jobRunnerId); + + $this->trackers = $this->resolveTrackers(); + $this->stateStaleThresholdInterval = new \DateInterval(self::STATE_STALE_THRESHOLD_INTERVAL); + $this->rateLimiter = $rateLimiter ?? new RateLimiter(); + + $this->maximumExecutionTime = $this->resolveMaximumExecutionTime(); + $this->shouldPauseAfterNumberOfJobsProcessed = + $this->moduleConfiguration->getJobRunnerShouldPauseAfterNumberOfJobsProcessed(); + + $this->registerInterruptHandler(); + } + + /** + * @throws Exception|StoreException + */ + public function run(): State + { + try { + $this->validatePreRunState(); + } catch (\Throwable $exception) { + $message = sprintf( + 'Pre-run state validation failed. Clearing cached state and continuing. Error was %s', + $exception->getMessage() + ); + $this->logger->warning($message); + $this->state->addStatusMessage($message); + $this->clearCachedState(); + } + + try { + $this->validateRunConditions(); + } catch (\Throwable $exception) { + $message = sprintf('Run conditions are not met, stopping. Reason was: %s', $exception->getMessage()); + $this->logger->info($message); + $this->state->addStatusMessage($message); + return $this->state; + } + + $this->logger->debug('Run conditions validated.'); + + $this->initializeCachedState(); + + $jobsStore = $this->jobsStoreBuilder->build($this->moduleConfiguration->getJobsStoreClass()); + + $jobsProcessedSincePause = 0; + + // We have a clean state, we can start processing. + while ($this->shouldRun()) { + try { + /** @var ?Job $job */ + $job = $jobsStore->dequeue(Job::class); + + $this->updateCachedState($this->state); + + declare(ticks=1) { + // No new jobs at the moment.... + if ($job === null) { + $this->state->addStatusMessage('No (more) jobs to process.'); + // If in CLI, do the backoff pause, so we can continue working later. + if ($this->isCli()) { + $message = sprintf( + 'Doing a backoff pause for %s seconds.', + $this->rateLimiter->getCurrentBackoffPauseInSeconds() + ); + $this->logger->debug($message); + $this->state->addStatusMessage($message); + $this->rateLimiter->doBackoffPause(); + $jobsProcessedSincePause = 0; + continue; + } else { + // Since this is a web run, we will break immediately, so we can return HTTP response. + break; + } + } + + // We have a job... + $this->rateLimiter->resetBackoffPause(); + } + + /** @var AuthenticationDataTrackerInterface $tracker */ + foreach ($this->trackers as $tracker) { + /** @var Job $job */ + $tracker->process($job->getPayload()); + } + + $this->state->incrementSuccessfulJobsProcessed(); + + /** @var Job $job */ + $successMessage = sprintf( + 'Successfully processed job with ID %s.', + $job->getId() ?? '(N/A)' + ); + $this->logger->debug($successMessage); + $this->state->addStatusMessage($successMessage); + + // If the job runner friendly pausing is enabled, and if the number of jobs processed since the last + // pause is greater than the configured value, do the pause. + if ( + $this->shouldPauseAfterNumberOfJobsProcessed !== null && + $jobsProcessedSincePause > $this->shouldPauseAfterNumberOfJobsProcessed + ) { + $this->rateLimiter->doPause(); + $jobsProcessedSincePause = 0; + } else { + $jobsProcessedSincePause++; + } + } catch (\Throwable $exception) { + $message = sprintf('Error while processing jobs. Error was: %', $exception->getMessage()); + $context = []; + if (isset($job)) { + $context = ['job' => $job]; + $jobsStore->markFailedJob($job); + } + $this->logger->error($message, $context); + $this->state->incrementFailedJobsProcessed(); + $this->state->addStatusMessage($message); + } + } + + $this->clearCachedState(); + + $this->state->setEndedAt(new \DateTimeImmutable()); + return $this->state; + } + + /** + */ + protected function shouldRun(): bool + { + // Enable this code to tick, which will enable it to catch CTRL-C signals and stop gracefully. + declare(ticks=1) { + if ($this->isMaximumExecutionTimeReached()) { + $message = 'Maximum job runner execution time reached.'; + $this->logger->debug($message); + $this->state->addStatusMessage($message); + return false; + } + + if ($this->state->getTotalJobsProcessed() > (PHP_INT_MAX - 1)) { + $message = 'Maximum number of processed jobs reached.'; + $this->logger->debug($message); + $this->state->addStatusMessage($message); + return false; + } + + try { + $this->validateSelfState(); + } catch (\Throwable $exception) { + $message = sprintf( + 'Job runner state is not valid. Message was: %s', + $exception->getMessage() + ); + $this->logger->warning($message); + $this->state->addStatusMessage($message); + return false; + } + } + + return true; + } + + /** + * @throws Exception + */ + protected function initializeCachedState(): void + { + // Make sure that the state does not exist in the cache. + try { + if ($this->getCachedState() !== null) { + throw new UnexpectedValueException('Job runner state already initialized.'); + } + } catch (\Throwable $exception) { + $message = sprintf('Error initializing job runner state. Error was: %s.', $exception->getMessage()); + $this->logger->error($message); + throw new Exception($message, (int)$exception->getCode(), $exception); + } + + $startedAt = new \DateTimeImmutable(); + $this->state->setStartedAt($startedAt); + $this->updateCachedState($this->state, $startedAt); + } + + /** + * @throws Exception + */ + protected function validatePreRunState(): void + { + $cachedState = $this->getCachedState(); + + // Empty state means that no other job runner is active. + if ($cachedState === null) { + return; + } + + if ($cachedState->getJobRunnerId() === $this->jobRunnerId) { + $message = 'Job runner ID in cached state same as new ID.'; + $this->logger->error($message); + throw new Exception($message); + } + + if ($cachedState->isStale($this->stateStaleThresholdInterval)) { + $message = 'Stale state encountered.'; + $this->logger->warning($message); + throw new Exception($message); + } + } + + /** + * @throws Exception + */ + protected function validateSelfState(): void + { + $cachedState = $this->getCachedState(); + + // Validate state before start. + if ($this->state->hasRunStarted() === false) { + if ($cachedState !== null) { + $message = 'Job run has not started, however cached state has already been initialized.'; + throw new Exception($message); + } + } + + // Validate state after start. + if ($this->state->hasRunStarted() === true) { + if ($cachedState === null) { + $message = 'Job run has started, however cached state has not been initialized.'; + throw new Exception($message); + } + + if ($cachedState->getJobRunnerId() !== $this->jobRunnerId) { + $message = 'Current job runner ID differs from the ID in the cached state.'; + throw new Exception($message); + } + + if ($cachedState->isStale($this->stateStaleThresholdInterval)) { + $message = 'Job runner cached state is stale, which means possible job runner process shutdown' . + ' without cached state clearing.'; + throw new Exception($message); + } + + if ($cachedState->getIsGracefulInterruptInitiated()) { + $message = 'Graceful job processing interrupt initiated.'; + throw new Exception($message); + } + } + } + + protected function isAnotherJobRunnerActive(): bool + { + try { + $cachedState = $this->getCachedState(); + + if ($cachedState === null) { + return false; + } + + // There is cached state, which would indicate that a job runner is active. However, make sure that the + // state is not stale (which indicates that the runner was shutdown without state clearing). If stale, + // this means that the job runner is not active. + if ($cachedState->isStale($this->stateStaleThresholdInterval)) { + $this->logger->warning('Stale cache encountered. Assuming no job runner is active.'); + return false; + } + + return $cachedState->getJobRunnerId() !== $this->jobRunnerId; + } catch (\Throwable $exception) { + $message = sprintf( + 'Error checking if another job runner is active. To play safe, we will assume true. ' . + 'Error was: %s', + $exception->getMessage() + ); + $this->logger->error($message); + return true; + } + } + + /** + * @throws Exception + */ + protected function resolveCache(): SimpleFileCache + { + try { + $this->logger->debug('Trying to initialize job runner cache using SSP datadir.'); + $cache = new SimpleFileCache( + self::CACHE_NAME, + $this->sspConfiguration->getPathValue('datadir') + ); + $this->logger->debug('Successfully initialized cache using SSP datadir.'); + return $cache; + } catch (\Throwable $exception) { + $message = sprintf( + 'Error initializing job runner cache using datadir. Error was: %s', + $exception->getMessage() + ); + $this->logger->debug($message); + } + + try { + $this->logger->debug('Trying to initialize job runner cache using SSP tempdir.'); + $cache = new SimpleFileCache( + self::CACHE_NAME, + $this->sspConfiguration->getPathValue('tempdir') + ); + $this->logger->debug('Successfully initialized job runner cache using SSP tempdir.'); + return $cache; + } catch (\Throwable $exception) { + $message = sprintf( + 'Error initializing job runner cache using tempdir. Error was: %s.', + $exception->getMessage() + ); + $this->logger->debug($message); + } + + try { + $this->logger->debug('Trying to initialize job runner cache using system tmp dir.'); + $cache = new SimpleFileCache(self::CACHE_NAME); + $this->logger->debug('Successfully initialized cache using system tmp dir.'); + return $cache; + } catch (\Throwable $exception) { + $message = sprintf( + 'Error initializing job runner cache. Error was: %s.', + $exception->getMessage() + ); + $this->logger->debug($message); + throw new Exception($message, (int)$exception->getCode(), $exception); + } + } + + /** + * @throws Exception + */ + protected function clearCachedState(): void + { + /** @psalm-suppress InvalidCatch */ + try { + $this->cache->delete(self::CACHE_KEY_STATE); + } catch (\Throwable | InvalidArgumentException $exception) { + $message = sprintf( + 'Error clearing job runner cache. Error was: %s.', + $exception->getMessage() + ); + $this->logger->error($message); + throw new Exception($message, (int)$exception->getCode(), $exception); + } + } + + /** + * @throws Exception + */ + protected function getCachedState(): ?State + { + /** @psalm-suppress InvalidCatch */ + try { + /** @var ?State $state */ + $state = $this->cache->get(self::CACHE_KEY_STATE); + if ($state instanceof State) { + return $state; + } else { + return null; + } + } catch (\Throwable | InvalidArgumentException $exception) { + $message = sprintf('Error getting job runner state from cache. Error was: %s', $exception->getMessage()); + throw new Exception($message, (int)$exception->getCode(), $exception); + } + } + + /** + * @throws Exception + */ + protected function updateCachedState(State $state, \DateTimeImmutable $updatedAt = null): void + { + $updatedAt = $updatedAt ?? new \DateTimeImmutable(); + $state->setUpdatedAt($updatedAt); + + /** @psalm-suppress InvalidCatch */ + try { + $this->cache->set(self::CACHE_KEY_STATE, $state); + } catch (\Throwable | InvalidArgumentException $exception) { + $message = sprintf('Error setting job runner state. Error was: %s.', $exception->getMessage()); + $this->logger->error($message); + throw new Exception($message, (int)$exception->getCode(), $exception); + } + } + + /** + * @throws Exception + */ + protected function validateRunConditions(): void + { + if ( + $this->moduleConfiguration->getAccountingProcessingType() !== + ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS + ) { + $message = 'Job runner called, however accounting mode is not ' . + ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS; + $this->logger->warning($message); + throw new Exception($message); + } + + if ($this->isAnotherJobRunnerActive()) { + $message = 'Another job runner is active.'; + $this->logger->debug($message); + throw new Exception($message); + } + } + + /** + * @throws Exception + */ + protected function resolveTrackers(): array + { + $trackers = []; + + $configuredTrackerClasses = array_merge( + [$this->moduleConfiguration->getDefaultDataTrackerAndProviderClass()], + $this->moduleConfiguration->getAdditionalTrackers() + ); + + /** @var string $trackerClass */ + foreach ($configuredTrackerClasses as $trackerClass) { + $trackers[$trackerClass] = $this->authenticationDataTrackerBuilder->build($trackerClass); + } + + return $trackers; + } + + protected function isCli(): bool + { + return $this->helpersManager->getEnvironmentHelper()->isCli(); + } + + /** + * Register interrupt handler. This makes it possible to stop job processing gracefully by + * clearing the current state. It relies on pcntl extension, so to use this feature, + * that extension has to be enabled. + * @see https://www.php.net/manual/en/pcntl.installation.php + * @return void + */ + protected function registerInterruptHandler(): void + { + // pcntl won't be available in web server environment, so skip immediately. + if (! $this->isCli()) { + return; + } + + // Extension pcntl doesn't come with PHP by default, so check if the proper function is available. + if (! function_exists('pcntl_signal')) { + $message = 'pcntl related functions not available, skipping registering interrupt handler.'; + $this->logger->info($message); + $this->state->addStatusMessage($message); + return; + } + + pcntl_signal(SIGINT, [$this, 'handleInterrupt']); + pcntl_signal(SIGTERM, [$this, 'handleInterrupt']); + } + + /** + * @throws Exception + */ + protected function handleInterrupt(int $signal): void + { + $message = sprintf('Gracefully stopping job processing. Interrupt signal was %s.', $signal); + $this->state->addStatusMessage($message); + $this->logger->info($message); + $this->state->setIsGracefulInterruptInitiated(true); + $this->updateCachedState($this->state); + } + + protected function resolveMaximumExecutionTime(): ?\DateInterval + { + $maximumExecutionTime = $this->moduleConfiguration->getJobRunnerMaximumExecutionTime(); + + // If we are in CLI environment, we can safely use module configuration setting. + if ($this->isCli()) { + return $maximumExecutionTime; + } + + // We are in a "web" environment, so take max execution time ini setting into account. + $iniMaximumExecutionTimeSeconds = (int)floor((int)ini_get('max_execution_time') * 0.8); + $iniMaximumExecutionTime = new \DateInterval('PT' . $iniMaximumExecutionTimeSeconds . 'S'); + + // If the module setting is null (meaning infinite), use the ini setting. + if ($maximumExecutionTime === null) { + return $iniMaximumExecutionTime; + } + + // Use the shorter interval from the two... + $maximumExecutionTimeSeconds = $this->helpersManager + ->getDateTimeHelper() + ->convertDateIntervalToSeconds($maximumExecutionTime); + + if ($iniMaximumExecutionTimeSeconds < $maximumExecutionTimeSeconds) { + $this->logger->debug('Using maximum execution time from INI setting since it is shorter.'); + return $iniMaximumExecutionTime; + } + + return $maximumExecutionTime; + } + + protected function isMaximumExecutionTimeReached(): bool + { + if ($this->maximumExecutionTime === null) { + // Execution time is infinite. + return false; + } + + $startedAt = $this->state->getStartedAt(); + if ($startedAt === null) { + // Processing has not even started yet. + return false; + } + + $maxDateTime = $startedAt->add($this->maximumExecutionTime); + if ($maxDateTime > (new \DateTimeImmutable())) { + // Maximum has not been reached yet. + return false; + } + + // Maximum has been reached. + return true; + } +} diff --git a/src/Services/JobRunner/RateLimiter.php b/src/Services/JobRunner/RateLimiter.php new file mode 100644 index 0000000..bc8dc55 --- /dev/null +++ b/src/Services/JobRunner/RateLimiter.php @@ -0,0 +1,76 @@ +<?php + +namespace SimpleSAML\Module\accounting\Services\JobRunner; + +use SimpleSAML\Module\accounting\Services\HelpersManager; + +class RateLimiter +{ + public const DEFAULT_MAX_PAUSE_DURATION = 'PT10M'; + public const DEFAULT_MAX_BACKOFF_PAUSE_DURATION = 'PT1M'; + + protected HelpersManager $helpersManager; + + protected int $maxPauseInSeconds; + protected int $maxBackoffPauseInSeconds; + protected int $currentBackoffPauseInSeconds = 1; + + public function __construct( + \DateInterval $maxPauseInterval = null, + \DateInterval $maxBackoffInterval = null, + HelpersManager $helpersManager = null + ) { + $this->helpersManager = $helpersManager ?? new HelpersManager(); + + $this->maxPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds( + $maxPauseInterval ?? new \DateInterval(self::DEFAULT_MAX_PAUSE_DURATION) + ); + $this->maxBackoffPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds( + $maxBackoffInterval ?? new \DateInterval(self::DEFAULT_MAX_BACKOFF_PAUSE_DURATION) + ); + } + + public function doBackoffPause(): void + { + /** @psalm-suppress ArgumentTypeCoercion */ + sleep($this->currentBackoffPauseInSeconds); + + $newBackoffPauseInSeconds = $this->currentBackoffPauseInSeconds + $this->currentBackoffPauseInSeconds; + $this->currentBackoffPauseInSeconds = min($newBackoffPauseInSeconds, $this->maxBackoffPauseInSeconds); + } + + public function doPause(int $seconds = 1): void + { + $seconds = $seconds > 0 ? $seconds : 1; + sleep($seconds); + } + + public function resetBackoffPause(): void + { + $this->currentBackoffPauseInSeconds = 1; + } + + /** + * @return int + */ + public function getMaxPauseInSeconds(): int + { + return $this->maxPauseInSeconds; + } + + /** + * @return int + */ + public function getMaxBackoffPauseInSeconds(): int + { + return $this->maxBackoffPauseInSeconds; + } + + /** + * @return int + */ + public function getCurrentBackoffPauseInSeconds(): int + { + return $this->currentBackoffPauseInSeconds; + } +} diff --git a/src/Services/JobRunner/State.php b/src/Services/JobRunner/State.php new file mode 100644 index 0000000..74eb2e4 --- /dev/null +++ b/src/Services/JobRunner/State.php @@ -0,0 +1,196 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Module\accounting\Services\JobRunner; + +class State +{ + public const MAX_NUMBER_OF_MESSAGES_TO_KEEP = 100; + public const DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP = 10; + + protected int $jobRunnerId; + protected ?\DateTimeImmutable $startedAt; + protected \DateTimeImmutable $updatedAt; + protected ?\DateTimeImmutable $endedAt = null; + protected int $successfulJobsProcessed = 0; + protected int $failedJobsProcessed = 0; + protected array $statusMessages = []; + protected int $numberOfStatusMessagesToKeep = 10; + protected bool $isGracefulInterruptInitiated = false; + + public function __construct( + int $jobRunnerId, + \DateTimeImmutable $startedAt = null, + \DateTimeImmutable $updatedAt = null, + int $numberOfStatusMessagesToKeep = self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP + ) { + $this->jobRunnerId = $jobRunnerId; + $this->startedAt = $startedAt; + $this->updatedAt = $updatedAt ?? new \DateTimeImmutable(); + + $this->numberOfStatusMessagesToKeep = + $numberOfStatusMessagesToKeep > 0 && $numberOfStatusMessagesToKeep <= self::MAX_NUMBER_OF_MESSAGES_TO_KEEP ? + $numberOfStatusMessagesToKeep : + self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP; + } + + /** + * @return int + */ + public function getJobRunnerId(): int + { + return $this->jobRunnerId; + } + + /** + * @return ?\DateTimeImmutable + */ + public function getStartedAt(): ?\DateTimeImmutable + { + return $this->startedAt; + } + + /** + * Set startedAt if not already set. + * @param \DateTimeImmutable $startedAt + * @return bool True if set, false otherwise. + */ + public function setStartedAt(\DateTimeImmutable $startedAt): bool + { + if ($this->startedAt === null) { + $this->startedAt = $startedAt; + return true; + } + + return false; + } + + /** + * @return \DateTimeImmutable + */ + public function getUpdatedAt(): \DateTimeImmutable + { + return $this->updatedAt; + } + + /** + * @param \DateTimeImmutable $updatedAt + */ + public function setUpdatedAt(\DateTimeImmutable $updatedAt): void + { + $this->updatedAt = $updatedAt; + } + + /** + * Set endedAt if not already set. + * @param \DateTimeImmutable $endedAt + * @return bool True if set, false otherwise. + */ + public function setEndedAt(\DateTimeImmutable $endedAt): bool + { + if ($this->endedAt === null) { + $this->endedAt = $endedAt; + return true; + } + + return false; + } + + /** + * @return ?\DateTimeImmutable + */ + public function getEndedAt(): ?\DateTimeImmutable + { + return $this->endedAt; + } + + public function hasRunStarted(): bool + { + return $this->startedAt !== null; + } + + public function incrementSuccessfulJobsProcessed(): void + { + $this->successfulJobsProcessed++; + } + + public function incrementFailedJobsProcessed(): void + { + $this->failedJobsProcessed++; + } + + /** + * @return int + */ + public function getSuccessfulJobsProcessed(): int + { + return $this->successfulJobsProcessed; + } + + /** + * @return int + */ + public function getFailedJobsProcessed(): int + { + return $this->failedJobsProcessed; + } + + public function isStale(\DateInterval $threshold): bool + { + $minDateTime = (new \DateTimeImmutable())->sub($threshold); + + if ($this->getUpdatedAt() < $minDateTime) { + return true; + } + + return false; + } + + public function getTotalJobsProcessed(): int + { + return $this->getSuccessfulJobsProcessed() + $this->getFailedJobsProcessed(); + } + + public function addStatusMessage(string $message): void + { + $this->statusMessages[] = $message; + + if (count($this->statusMessages) > $this->numberOfStatusMessagesToKeep) { + array_shift($this->statusMessages); + } + } + + public function getStatusMessages(): array + { + return $this->statusMessages; + } + + public function getLastStatusMessage(): ?string + { + if (empty($this->statusMessages)) { + return null; + } + + $message = (string)end($this->statusMessages); + reset($this->statusMessages); + + return $message; + } + + /** + * @return bool + */ + public function getIsGracefulInterruptInitiated(): bool + { + return $this->isGracefulInterruptInitiated; + } + + /** + * @param bool $isGracefulInterruptInitiated + */ + public function setIsGracefulInterruptInitiated(bool $isGracefulInterruptInitiated): void + { + $this->isGracefulInterruptInitiated = $isGracefulInterruptInitiated; + } +} diff --git a/src/Stores/Interfaces/JobsStoreInterface.php b/src/Stores/Interfaces/JobsStoreInterface.php index 1ff238d..90d05c4 100644 --- a/src/Stores/Interfaces/JobsStoreInterface.php +++ b/src/Stores/Interfaces/JobsStoreInterface.php @@ -24,6 +24,12 @@ interface JobsStoreInterface extends StoreInterface */ public function dequeue(string $type = null): ?JobInterface; + /** + * @param JobInterface $job + * @return void + */ + public function markFailedJob(JobInterface $job): void; + public static function build( ModuleConfiguration $moduleConfiguration, LoggerInterface $logger, diff --git a/src/Stores/Jobs/DoctrineDbal/Store.php b/src/Stores/Jobs/DoctrineDbal/Store.php index 5e81988..7bdbeeb 100644 --- a/src/Stores/Jobs/DoctrineDbal/Store.php +++ b/src/Stores/Jobs/DoctrineDbal/Store.php @@ -21,6 +21,7 @@ class Store extends AbstractStore implements JobsStoreInterface protected string $prefixedTableNameJobs; protected string $prefixedTableNameFailedJobs; protected Repository $jobsRepository; + protected Repository $failedJobsRepository; /** * @throws StoreException @@ -30,7 +31,8 @@ class Store extends AbstractStore implements JobsStoreInterface LoggerInterface $logger, Factory $connectionFactory, string $connectionKey = null, - Repository $jobsRepository = null + Repository $jobsRepository = null, + Repository $failedJobsRepository = null ) { parent::__construct($moduleConfiguration, $logger, $connectionFactory, $connectionKey); @@ -40,6 +42,9 @@ class Store extends AbstractStore implements JobsStoreInterface $this->jobsRepository = $jobsRepository ?? new Repository($this->connection, $this->prefixedTableNameJobs, $this->logger); + + $this->failedJobsRepository = $failedJobsRepository ?? + new Repository($this->connection, $this->prefixedTableNameFailedJobs, $this->logger); } /** @@ -136,4 +141,12 @@ class Store extends AbstractStore implements JobsStoreInterface $connectionKey ); } + + /** + * @throws StoreException + */ + public function markFailedJob(JobInterface $job): void + { + $this->failedJobsRepository->insert($job); + } } diff --git a/tests/config-templates/module_accounting.php b/tests/config-templates/module_accounting.php index 4eec5cf..60da309 100644 --- a/tests/config-templates/module_accounting.php +++ b/tests/config-templates/module_accounting.php @@ -16,6 +16,8 @@ $config = [ ModuleConfiguration::OPTION_ACCOUNTING_PROCESSING_TYPE => ModuleConfiguration\AccountingProcessingType::VALUE_SYNCHRONOUS, + ModuleConfiguration::OPTION_CRON_TAG_FOR_JOB_RUNNER => 'accounting_job_runner', + ModuleConfiguration::OPTION_JOBS_STORE => Stores\Jobs\DoctrineDbal\Store::class, ModuleConfiguration::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER => diff --git a/tests/src/Services/JobRunner/RateLimiterTest.php b/tests/src/Services/JobRunner/RateLimiterTest.php new file mode 100644 index 0000000..19a98c6 --- /dev/null +++ b/tests/src/Services/JobRunner/RateLimiterTest.php @@ -0,0 +1,67 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Test\Module\accounting\Services\JobRunner; + +use PHPUnit\Framework\TestCase; +use SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter; + +/** + * @covers \SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter + * @uses \SimpleSAML\Module\accounting\Helpers\DateTimeHelper + * @uses \SimpleSAML\Module\accounting\Services\HelpersManager + */ +class RateLimiterTest extends TestCase +{ + protected function setUp(): void + { + } + + public function testCanCreateInstance(): void + { + $this->assertInstanceOf(RateLimiter::class, new RateLimiter()); + } + + public function testCanDoPause(): void + { + $rateLimiter = new RateLimiter(); + $startTimeInSeconds = (new \DateTimeImmutable())->getTimestamp(); + $rateLimiter->doPause(); + $endTimeInSeconds = (new \DateTimeImmutable())->getTimestamp(); + + $this->assertTrue(($endTimeInSeconds - $startTimeInSeconds) >= 1); + } + + public function testCanSetMaxPause(): void + { + $rateLimiter = new RateLimiter(new \DateInterval('PT1S')); + $this->assertSame(1, $rateLimiter->getMaxPauseInSeconds()); + $splitSecondInterval = \DateInterval::createFromDateString('10000 microsecond'); // 10 milliseconds + $rateLimiter = new RateLimiter($splitSecondInterval); + $this->assertSame(1, $rateLimiter->getMaxPauseInSeconds()); + $rateLimiter->doPause(); + } + + public function testCanDoBackoffPause(): void + { + $rateLimiter = new RateLimiter(); + $startTimeInSeconds = (new \DateTimeImmutable())->getTimestamp(); + $rateLimiter->doBackoffPause(); + $endTimeInSeconds = (new \DateTimeImmutable())->getTimestamp(); + $this->assertTrue(($endTimeInSeconds - $startTimeInSeconds) >= 1); + $this->assertTrue($rateLimiter->getCurrentBackoffPauseInSeconds() > 1); + $rateLimiter->resetBackoffPause(); + $this->assertTrue($rateLimiter->getCurrentBackoffPauseInSeconds() === 1); + } + + public function testCanSetMaxBackoffPause(): void + { + $rateLimiter = new RateLimiter(null, new \DateInterval('PT1S')); + $this->assertSame(1, $rateLimiter->getMaxBackoffPauseInSeconds()); + $splitSecondInterval = \DateInterval::createFromDateString('10000 microsecond'); // 10 milliseconds + $rateLimiter = new \SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter(null, $splitSecondInterval); + $this->assertSame(1, $rateLimiter->getMaxBackoffPauseInSeconds()); + $rateLimiter->doBackoffPause(); + } +} diff --git a/tests/src/Services/JobRunner/StateTest.php b/tests/src/Services/JobRunner/StateTest.php new file mode 100644 index 0000000..061b40b --- /dev/null +++ b/tests/src/Services/JobRunner/StateTest.php @@ -0,0 +1,108 @@ +<?php + +namespace SimpleSAML\Test\Module\accounting\Services\JobRunner; + +use SimpleSAML\Module\accounting\Services\JobRunner\State; +use PHPUnit\Framework\TestCase; + +/** + * @covers \SimpleSAML\Module\accounting\Services\JobRunner\State + */ +class StateTest extends TestCase +{ + protected int $jobRunnerId; + + protected function setUp(): void + { + $this->jobRunnerId = 1; + } + + public function testCanCreateInstance(): void + { + $startedAt = $updatedAt = new \DateTimeImmutable(); + + $state = new State($this->jobRunnerId); + $this->assertInstanceOf(State::class, $state); + $this->assertSame($this->jobRunnerId, $state->getJobRunnerId()); + + $state = new State($this->jobRunnerId, $startedAt, null); + $this->assertInstanceOf(State::class, $state); + + $state = new State($this->jobRunnerId, $startedAt, $updatedAt); + $this->assertInstanceOf(State::class, $state); + + $state = new State($this->jobRunnerId, $startedAt, $updatedAt, 1000); + $this->assertInstanceOf(State::class, $state); + } + + public function testCanWorkWithTimestamps(): void + { + $startedAt = $updatedAt = $endedAt = new \DateTimeImmutable(); + + $state = new State($this->jobRunnerId); + $this->assertNull($state->getStartedAt()); + $this->assertInstanceOf(\DateTimeImmutable::class, $state->getUpdatedAt()); + $this->assertNull($state->getEndedAt()); + + $this->assertTrue($state->setStartedAt($startedAt)); + $this->assertTrue($state->hasRunStarted()); + $state->setUpdatedAt($updatedAt); + $this->assertTrue($state->setEndedAt($endedAt)); + + $this->assertSame($startedAt, $state->getStartedAt()); + $this->assertSame($updatedAt, $state->getUpdatedAt()); + $this->assertSame($endedAt, $state->getEndedAt()); + + $this->assertFalse($state->setStartedAt($startedAt)); + $this->assertFalse($state->setEndedAt($endedAt)); + } + + public function testCanCountProcessedJobs(): void + { + $state = new State($this->jobRunnerId); + + $this->assertSame(0, $state->getTotalJobsProcessed()); + $state->incrementSuccessfulJobsProcessed(); + + $this->assertSame(1, $state->getTotalJobsProcessed()); + $this->assertSame(1, $state->getSuccessfulJobsProcessed()); + $this->assertSame(0, $state->getFailedJobsProcessed()); + + $state->incrementFailedJobsProcessed(); + + $this->assertSame(2, $state->getTotalJobsProcessed()); + $this->assertSame(1, $state->getSuccessfulJobsProcessed()); + $this->assertSame(1, $state->getFailedJobsProcessed()); + } + + public function testCanCheckIfStateIsStale(): void + { + $state = new State($this->jobRunnerId); + $freshnessDuration = new \DateInterval('PT5M'); + + $this->assertFalse($state->isStale($freshnessDuration)); + + $dateTimeInHistory = new \DateTimeImmutable('-9 minutes'); + $state->setUpdatedAt($dateTimeInHistory); + + $this->assertTrue($state->isStale($freshnessDuration)); + } + + public function testCanWorkWithStatusMessages(): void + { + $state = new State($this->jobRunnerId, null, null, 2); + $this->assertEmpty($state->getStatusMessages()); + $this->assertNull($state->getLastStatusMessage()); + + $state->addStatusMessage('test'); + $this->assertSame(1, count($state->getStatusMessages())); + $this->assertSame('test', $state->getLastStatusMessage()); + $state->addStatusMessage('test2'); + $this->assertSame('test2', $state->getLastStatusMessage()); + $this->assertSame(2, count($state->getStatusMessages())); + $state->addStatusMessage('test3'); + $this->assertSame('test3', $state->getLastStatusMessage()); + $this->assertSame(2, count($state->getStatusMessages())); + $this->assertSame('test3', $state->getLastStatusMessage()); + } +} diff --git a/tests/src/Services/JobRunnerTest.php b/tests/src/Services/JobRunnerTest.php new file mode 100644 index 0000000..8e1fbc9 --- /dev/null +++ b/tests/src/Services/JobRunnerTest.php @@ -0,0 +1,1085 @@ +<?php + +declare(strict_types=1); + +namespace SimpleSAML\Test\Module\accounting\Services; + +use PHPUnit\Framework\MockObject\MockObject; +use PHPUnit\Framework\MockObject\Stub; +use Psr\Log\LoggerInterface; +use Psr\SimpleCache\CacheInterface; +use SimpleSAML\Configuration; +use SimpleSAML\Module\accounting\Entities\Authentication\Event; +use SimpleSAML\Module\accounting\Entities\Bases\AbstractPayload; +use SimpleSAML\Module\accounting\Entities\Interfaces\JobInterface; +use SimpleSAML\Module\accounting\Exceptions\Exception; +use SimpleSAML\Module\accounting\Helpers\DateTimeHelper; +use SimpleSAML\Module\accounting\Helpers\EnvironmentHelper; +use SimpleSAML\Module\accounting\Helpers\RandomHelper; +use SimpleSAML\Module\accounting\ModuleConfiguration; +use SimpleSAML\Module\accounting\Services\HelpersManager; +use SimpleSAML\Module\accounting\Services\JobRunner; +use PHPUnit\Framework\TestCase; +use SimpleSAML\Module\accounting\Stores\Builders\JobsStoreBuilder; +use SimpleSAML\Module\accounting\Stores\Interfaces\JobsStoreInterface; +use SimpleSAML\Module\accounting\Trackers\Builders\AuthenticationDataTrackerBuilder; +use SimpleSAML\Module\accounting\Trackers\Interfaces\AuthenticationDataTrackerInterface; + +/** + * @covers \SimpleSAML\Module\accounting\Services\JobRunner + * + * @psalm-suppress all + */ +class JobRunnerTest extends TestCase +{ + /** + * @var Stub|ModuleConfiguration + */ + protected $moduleConfigurationStub; + /** + * @var Stub|Configuration + */ + protected $sspConfigurationStub; + /** + * @var MockObject|LoggerInterface + */ + protected $loggerMock; + /** + * @var MockObject|CacheInterface + */ + protected $cacheMock; + /** + * @var Stub|JobRunner\State + */ + protected $stateStub; + /** + * @var Stub|JobRunner\RateLimiter + */ + protected $rateLimiterMock; + /** + * @var Stub|AuthenticationDataTrackerBuilder + */ + protected $authenticationDataTrackerBuilderStub; + /** + * @var MockObject|AuthenticationDataTrackerInterface + */ + protected $authenticationDataTrackerMock; + /** + * @var Stub|JobsStoreBuilder + */ + protected $jobsStoreBuilderStub; + /** + * @var Stub|RandomHelper + */ + protected $randomHelperStub; + /** + * @var Stub|EnvironmentHelper + */ + protected $environmentHelperStub; + /** + * @var Stub|DateTimeHelper + */ + protected $dateTimeHelperStub; + /** + * @var Stub|HelpersManager + */ + protected $helpersManagerStub; + /** + * @var Stub|JobsStoreInterface + */ + protected $jobsStoreMock; + /** + * @var Stub|JobInterface + */ + protected $jobStub; + /** + * @var Stub|AbstractPayload + */ + protected $payloadStub; + + protected function setUp(): void + { + $this->moduleConfigurationStub = $this->createStub(ModuleConfiguration::class); + $this->sspConfigurationStub = $this->createStub(Configuration::class); + $this->loggerMock = $this->createMock(LoggerInterface::class); + $this->authenticationDataTrackerBuilderStub = $this->createStub(AuthenticationDataTrackerBuilder::class); + $this->authenticationDataTrackerMock = $this->createMock(AuthenticationDataTrackerInterface::class); + $this->jobsStoreBuilderStub = $this->createStub(JobsStoreBuilder::class); + $this->cacheMock = $this->createMock(CacheInterface::class); + $this->stateStub = $this->createStub(JobRunner\State::class); + $this->rateLimiterMock = $this->createMock(JobRunner\RateLimiter::class); + $this->randomHelperStub = $this->createStub(RandomHelper::class); + $this->environmentHelperStub = $this->createStub(EnvironmentHelper::class); + $this->dateTimeHelperStub = $this->createStub(DateTimeHelper::class); + $this->helpersManagerStub = $this->createStub(HelpersManager::class); + $this->jobsStoreMock = $this->createMock(JobsStoreInterface::class); + $this->jobStub = $this->createStub(JobInterface::class); + $this->payloadStub = $this->createStub(Event::class); + } + + public function testCanCreateInstance(): void + { + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->assertInstanceOf( + JobRunner::class, + new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ) + ); + } + + public function testPreRunValidationFailsForSameJobRunnerId(): void + { + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->cacheMock->method('get')->willReturn($this->stateStub); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('error') + ->with('Job runner ID in cached state same as new ID.'); + $this->loggerMock->expects($this->atLeast(2))->method('warning') + ->withConsecutive( + [$this->stringContains('Pre-run state validation failed. Clearing cached state and continuing.')], + [$this->stringContains('Job runner called, however accounting mode is not')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testPreRunValidationFailsForStaleState(): void + { + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + $this->stateStub->method('isStale')->willReturn(true); + $this->cacheMock->method('get')->willReturn($this->stateStub); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(3))->method('warning') + ->withConsecutive( + [$this->stringContains('Stale state encountered.')], + [$this->stringContains('Pre-run state validation failed. Clearing cached state and continuing.')], + [$this->stringContains('Job runner called, however accounting mode is not')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testPreRunValidationPassesWhenStateIsNull(): void + { + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->cacheMock->method('get')->willReturn(null); + + $this->cacheMock->expects($this->never())->method('delete'); + + $this->loggerMock->expects($this->atLeast(1))->method('warning') + ->withConsecutive( + [$this->stringContains('Job runner called, however accounting mode is not')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateRunConditionsFailsIfAnotherJobRunnerIsActive(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + $this->stateStub->method('isStale')->willReturn(false); + $this->cacheMock->method('get')->willReturn($this->stateStub); + + $this->cacheMock->expects($this->never())->method('delete'); + + $this->loggerMock->expects($this->once())->method('debug') + ->with($this->stringContains('Another job runner is active.')); + $this->loggerMock->expects($this->once())->method('info') + ->with($this->stringContains('Run conditions are not met, stopping.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testAssumeTrueOnJobRunnerActivityIfThrown(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + $this->stateStub->method('isStale')->willReturn(false); + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + $this->stateStub, + $this->throwException(new Exception('test')) + ); + + $this->cacheMock->expects($this->never())->method('delete'); + + $this->loggerMock->expects($this->once())->method('error') + ->with($this->stringContains('Error checking if another job runner is active.')); + $this->loggerMock->expects($this->once())->method('info') + ->with($this->stringContains('Run conditions are not met, stopping.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanLogCacheClearingError(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + $this->stateStub->method('isStale')->willReturn(false); + $this->cacheMock->method('get')->willThrowException(new Exception('test')); + $this->cacheMock->method('delete')->willThrowException(new Exception('test')); + + $this->expectException(Exception::class); + + $this->loggerMock->expects($this->once())->method('error') + ->with($this->stringContains('Error clearing job runner cache.')); + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateRunConditionsSuccessIfStaleStateEncountered(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + $this->stateStub->method('isStale')->willReturn(true); + $this->cacheMock->method('get') + ->willReturnOnConsecutiveCalls( + null, + $this->stateStub + ); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Assuming no job runner is active.')); + $this->loggerMock->expects($this->once())->method('debug') + ->with($this->stringContains('Run conditions validated.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testShouldRunCheckFailsIfMaximumExecutionTimeIsReached(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->moduleConfigurationStub->method('getJobRunnerMaximumExecutionTime') + ->willReturn(new \DateInterval('PT1S')); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->cacheMock->method('get')->willReturn(null); + + $this->stateStub->method('getStartedAt')->willReturn(new \DateTimeImmutable('-2 seconds')); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(2))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Maximum job runner execution time reached.')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanUseIniSettingForMaximumExecutionTime(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->moduleConfigurationStub->method('getJobRunnerMaximumExecutionTime') + ->willReturn(new \DateInterval('PT20S')); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + $this->dateTimeHelperStub->method('convertDateIntervalToSeconds')->willReturn(20); + $this->helpersManagerStub->method('getDateTimeHelper')->willReturn($this->dateTimeHelperStub); + + ini_set('max_execution_time', '10'); + + $this->cacheMock->method('get')->willReturn(null); + + $this->stateStub->method('getStartedAt')->willReturn(new \DateTimeImmutable('-30 seconds')); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(3))->method('debug') + ->withConsecutive( + [$this->stringContains('Using maximum execution time from INI setting since it is shorter.')], + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Maximum job runner execution time reached.')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testShouldRunCheckFailsIfMaximumNumberOfProcessedJobsIsReached(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->cacheMock->method('get')->willReturn(null); + + $this->stateStub->method('getTotalJobsProcessed')->willReturn(PHP_INT_MAX); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(2))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Maximum number of processed jobs reached.')] + ); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateSelfStateFailsIfRunHasNotStartedButCachedStateExists(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(false); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('cached state has already been initialized.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateSelfStateFailsIfRunHasStartedButCachedStateDoesNotExist(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + null, + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('cached state has not been initialized.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateSelfStateFailsIfRunHasStartedButDifferentJobRunnerIdEncountered(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(321); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Current job runner ID differs from the ID in the cached state.')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateSelfStateFailsIfRunHasStartedButStaleCachedStateEncountered(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Job runner cached state is stale')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testValidateSelfStateFailsIfRunHasStartedButGracefulInterruptIsInitiated(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated')->willReturn(true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanDoBackoffPauseIfNoJobsInCli(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(true); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(2))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Doing a backoff pause')] + ); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanBreakImmediatelyIfNoJobsInWeb(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('debug') + ->with($this->stringContains('Run conditions validated')); + + $this->loggerMock->expects($this->never())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanProcessJob(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass') + ->willReturn('mock'); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->rateLimiterMock->expects($this->once())->method('resetBackoffPause'); + + $this->authenticationDataTrackerMock->expects($this->once()) + ->method('process'); + $this->authenticationDataTrackerBuilderStub->method('build') + ->willReturn($this->authenticationDataTrackerMock); + + $this->jobStub->method('getPayload')->willReturn($this->payloadStub); + $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub); + $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(2))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Successfully processed job with ID')] + ); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanLogCacheUpdateError(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass') + ->willReturn('mock'); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('set')->willThrowException(new Exception('test')); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->jobStub->method('getPayload')->willReturn($this->payloadStub); + $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub); + $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock); + + $this->loggerMock->expects($this->atLeast(1))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + ); + + $this->loggerMock->expects($this->once())->method('error') + ->with($this->stringContains('Error setting job runner state')); + + $this->expectException(Exception::class); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanPauseProcessingBasedOnConfiguration(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass') + ->willReturn('mock'); + $this->moduleConfigurationStub->method('getJobRunnerShouldPauseAfterNumberOfJobsProcessed') + ->willReturn(0); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->rateLimiterMock->expects($this->exactly(2))->method('resetBackoffPause'); + $this->rateLimiterMock->expects($this->once())->method('doPause'); + + $this->authenticationDataTrackerMock->expects($this->exactly(2)) + ->method('process'); + $this->authenticationDataTrackerBuilderStub->method('build') + ->willReturn($this->authenticationDataTrackerMock); + + $this->jobStub->method('getPayload')->willReturn($this->payloadStub); + $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub); + $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->atLeast(3))->method('debug') + ->withConsecutive( + [$this->stringContains('Run conditions validated.')], + [$this->stringContains('Successfully processed job with ID')], + [$this->stringContains('Successfully processed job with ID')] + ); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testCanMarkFailedJobOnError(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass') + ->willReturn('mock'); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + null, + $this->stateStub, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->rateLimiterMock->expects($this->once())->method('resetBackoffPause'); + + $this->authenticationDataTrackerMock->expects($this->once()) + ->method('process') + ->willThrowException(new Exception('test')); + $this->authenticationDataTrackerBuilderStub->method('build') + ->willReturn($this->authenticationDataTrackerMock); + + $this->jobStub->method('getPayload')->willReturn($this->payloadStub); + $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub); + $this->jobsStoreMock->expects($this->once())->method('markFailedJob')->with($this->jobStub); + $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock); + + $this->cacheMock->expects($this->once())->method('delete'); + + $this->loggerMock->expects($this->once())->method('error') + ->with($this->stringContains('Error while processing jobs.')); + + $this->loggerMock->expects($this->once())->method('warning') + ->with($this->stringContains('Graceful job processing interrupt initiated')); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } + + public function testThrowsOnAlreadyInitializedState(): void + { + $this->moduleConfigurationStub->method('getAccountingProcessingType') + ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS); + $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass') + ->willReturn('mock'); + + $this->randomHelperStub->method('getRandomInt')->willReturn(123); + $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub); + $this->environmentHelperStub->method('isCli')->willReturn(false); + $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub); + + $this->stateStub->method('getJobRunnerId')->willReturn(123); + $this->stateStub->method('isStale')->willReturn(false); + $this->stateStub->method('getIsGracefulInterruptInitiated') + ->willReturnOnConsecutiveCalls(false, true); + + $this->cacheMock->method('get')->willReturnOnConsecutiveCalls( + null, + null, + $this->stateStub + ); + + $this->stateStub->method('hasRunStarted')->willReturn(true); + + $this->loggerMock->expects($this->once())->method('error') + ->with($this->stringContains('Job runner state already initialized')); + $this->expectException(Exception::class); + + $jobRunner = new JobRunner( + $this->moduleConfigurationStub, + $this->sspConfigurationStub, + $this->loggerMock, + $this->authenticationDataTrackerBuilderStub, + $this->jobsStoreBuilderStub, + $this->cacheMock, + $this->stateStub, + $this->rateLimiterMock, + $this->helpersManagerStub + ); + + $jobRunner->run(); + } +} -- GitLab