diff --git a/README.md b/README.md
index 02d784ad7a734ad17dc8638a8859a243eb425971..3f04cb539784310ee94c597b88747879bf258b5c 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,9 @@ SimpleSAMLphp module providing user accounting functionality.
## TODO
- [x] Data stores
-- [ ] Cron hooks
+- [ ] Cron hook
+ - [ ] explain specific cron tag for accounting (it should have its own tag which should be
+added to cron config
- [ ] Profile page UI
- [ ] Translation
diff --git a/composer.json b/composer.json
index 2cf885fc27c7c9a890989f6db58350cf9fafeae6..81c480ef97cfc9d3aacf4a939cef436c1e8a444c 100644
--- a/composer.json
+++ b/composer.json
@@ -30,7 +30,8 @@
"ext-pdo_sqlite": "*",
"doctrine/dbal": "^3",
"psr/log": "^1|^2|^3",
- "simplesamlphp/composer-module-installer": "^1"
+ "simplesamlphp/composer-module-installer": "^1",
+ "cicnavi/simple-file-cache-php": "^2.0"
},
"require-dev": {
"vimeo/psalm": "^4",
@@ -39,6 +40,9 @@
"simplesamlphp/simplesamlphp": "^2@beta",
"simplesamlphp/simplesamlphp-test-framework": "^1"
},
+ "suggest": {
+ "ext-pcntl": "*"
+ },
"scripts": {
"pre-commit": [
"vendor/bin/phpcs -p",
diff --git a/composer.lock b/composer.lock
index ec09766340b3a8aab59046a8e4088e1e52fc0480..40bbe7f6024d4aedca1e0dfbcd47e86d3386d135 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,8 +4,61 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
- "content-hash": "dcca72582b835b41450dff9d223d5b82",
+ "content-hash": "e1bdcb1340cc34e659c651695992a6b8",
"packages": [
+ {
+ "name": "cicnavi/simple-file-cache-php",
+ "version": "v2.0.0",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/cicnavi/simple-file-cache-php.git",
+ "reference": "372b48b5ff364e514da80005b2367ecf1950dde4"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/cicnavi/simple-file-cache-php/zipball/372b48b5ff364e514da80005b2367ecf1950dde4",
+ "reference": "372b48b5ff364e514da80005b2367ecf1950dde4",
+ "shasum": ""
+ },
+ "require": {
+ "ext-gmp": "*",
+ "ext-json": "*",
+ "ext-openssl": "*",
+ "php": ">=7.4",
+ "psr/simple-cache": "^1.0"
+ },
+ "provide": {
+ "psr/simple-cache-implementation": "1.0"
+ },
+ "require-dev": {
+ "ext-xdebug": "*",
+ "phpunit/phpunit": "^9.4",
+ "squizlabs/php_codesniffer": "^3.5",
+ "vimeo/psalm": "^3.14"
+ },
+ "type": "library",
+ "autoload": {
+ "psr-4": {
+ "Cicnavi\\SimpleFileCache\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "Marko Ivancic",
+ "email": "marko.ivancic@srce.hr"
+ }
+ ],
+ "description": "PSR-16 simple cache provider based on files.",
+ "support": {
+ "issues": "https://github.com/cicnavi/simple-file-cache-php/issues",
+ "source": "https://github.com/cicnavi/simple-file-cache-php/tree/v2.0.0"
+ },
+ "time": "2021-08-18T11:28:21+00:00"
+ },
{
"name": "composer/ca-bundle",
"version": "1.3.3",
@@ -1539,6 +1592,57 @@
},
"time": "2021-05-03T11:20:27+00:00"
},
+ {
+ "name": "psr/simple-cache",
+ "version": "1.0.1",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/php-fig/simple-cache.git",
+ "reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/php-fig/simple-cache/zipball/408d5eafb83c57f6365a3ca330ff23aa4a5fa39b",
+ "reference": "408d5eafb83c57f6365a3ca330ff23aa4a5fa39b",
+ "shasum": ""
+ },
+ "require": {
+ "php": ">=5.3.0"
+ },
+ "type": "library",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "1.0.x-dev"
+ }
+ },
+ "autoload": {
+ "psr-4": {
+ "Psr\\SimpleCache\\": "src/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "PHP-FIG",
+ "homepage": "http://www.php-fig.org/"
+ }
+ ],
+ "description": "Common interfaces for simple caching",
+ "keywords": [
+ "cache",
+ "caching",
+ "psr",
+ "psr-16",
+ "simple-cache"
+ ],
+ "support": {
+ "source": "https://github.com/php-fig/simple-cache/tree/master"
+ },
+ "time": "2017-10-23T01:57:42+00:00"
+ },
{
"name": "react/promise",
"version": "v2.9.0",
diff --git a/config-templates/module_accounting.php b/config-templates/module_accounting.php
index 4a3538ae0f7aea1d06bf52bea6b2405553e2c0c4..17f8a52be6fbe71436486f64220f632e670da21b 100644
--- a/config-templates/module_accounting.php
+++ b/config-templates/module_accounting.php
@@ -40,6 +40,14 @@ $config = [
*/
//ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS,
+ /**
+ * Cron tags.
+ *
+ * Job runner tag designates the cron tag to use when running accounting jobs. Make sure to add this tag to
+ * the cron module configuration in case of the 'asynchronous' accounting processing type.
+ */
+ ModuleConfiguration::OPTION_CRON_TAG_FOR_JOB_RUNNER => 'accounting_job_runner',
+
/**
* Jobs store class. In case of the 'asynchronous' accounting processing type, this determines which class
* will be used to store jobs. The class must implement Stores\Interfaces\JobsStoreInterface.
@@ -135,4 +143,32 @@ $config = [
'table_prefix' => '', // (string): Prefix for each table.
],
],
+
+ /**
+ * Job runner fine-grained configuration options.
+ */
+
+ /**
+ * Maximum execution time for the job runner.
+ *
+ * You can use this option to limit job runner activity by combining when the job runner will run (using
+ * cron configuration) and how long the job runner will be active (execution time). This can be null,
+ * meaning it will run indefinitely, or can be set as a duration for DateInterval, examples being
+ * below. Note that when the job runner is run using Cron user interface in SimpleSAMLphp, the
+ * duration will be taken from the 'max_execution_time' ini setting, and will override this
+ * setting if ini setting is shorter.
+ * @see https://www.php.net/manual/en/dateinterval.construct.php
+ */
+ ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => null,
+ //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT9M', // 9 minutes
+ //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'PT59M', // 59 minutes
+ //ModuleConfiguration::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME => 'P1D', // 1 day
+
+ /**
+ * Number of processed jobs after which the job runner should take a 1-second pause.
+ *
+ * This option was introduced so that the job runner can act in a more resource friendly fashion when facing
+ * backend store. If the value is null, there will be no pause.
+ */
+ ModuleConfiguration::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED => 10,
];
diff --git a/hooks/hook_cron.php b/hooks/hook_cron.php
new file mode 100644
index 0000000000000000000000000000000000000000..fbc57163510c4c1ba486893958e20c4bcf76efe0
--- /dev/null
+++ b/hooks/hook_cron.php
@@ -0,0 +1,34 @@
+<?php
+
+declare(strict_types=1);
+
+use SimpleSAML\Module\accounting\Services\JobRunner;
+use SimpleSAML\Module\accounting\ModuleConfiguration;
+
+function accounting_hook_cron(array &$cronInfo): void
+{
+ $moduleConfiguration = new ModuleConfiguration();
+
+ $currentCronTag = $cronInfo['tag'] ?? null;
+
+ $cronTagForJobRunner = $moduleConfiguration->getCronTagForJobRunner();
+
+ try {
+ if ($currentCronTag === $cronTagForJobRunner) {
+ $state = (new JobRunner($moduleConfiguration, \SimpleSAML\Configuration::getConfig()))->run();
+ foreach ($state->getStatusMessages() as $statusMessage) {
+ $cronInfo['summary'][] = $statusMessage;
+ }
+ $message = sprintf(
+ 'Job processing finished with %s successful jobs, %s failed jobs; total: %s.',
+ $state->getSuccessfulJobsProcessed(),
+ $state->getFailedJobsProcessed(),
+ $state->getTotalJobsProcessed()
+ );
+ $cronInfo['summary'][] = $message;
+ }
+ } catch (Throwable $exception) {
+ $message = 'Job runner error: ' . $exception->getMessage();
+ $cronInfo['summary'][] = $message;
+ }
+}
\ No newline at end of file
diff --git a/src/Helpers/ArrayHelper.php b/src/Helpers/ArrayHelper.php
index a87f81df2bac5e6653608e3cfac7e42c77064e41..0b57b663bcd3158113578f9a0741b599d70caf16 100644
--- a/src/Helpers/ArrayHelper.php
+++ b/src/Helpers/ArrayHelper.php
@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers;
+// TODO mivanci move to HelpersManager
class ArrayHelper
{
public static function recursivelySortByKey(array &$array): void
diff --git a/src/Helpers/AttributesHelper.php b/src/Helpers/AttributesHelper.php
index 8fc6806d02e6ddc926c12f2ec8a9594543009289..e3aedf3ca9f7445e77459ca6cbebc95ac5ac3f8d 100644
--- a/src/Helpers/AttributesHelper.php
+++ b/src/Helpers/AttributesHelper.php
@@ -4,6 +4,8 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers;
+// TODO mivanci move to HelpersManager
+
class AttributesHelper
{
/**
diff --git a/src/Helpers/DateTimeHelper.php b/src/Helpers/DateTimeHelper.php
new file mode 100644
index 0000000000000000000000000000000000000000..0fc5f25242105b35e58ee10257340b8dd7dcfdf2
--- /dev/null
+++ b/src/Helpers/DateTimeHelper.php
@@ -0,0 +1,27 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Module\accounting\Helpers;
+
+class DateTimeHelper
+{
+ /**
+ * Convert date interval to seconds, interval being minimum 1 second.
+ * @param \DateInterval $dateInterval Minimum is 1 second.
+ * @return int
+ */
+ public function convertDateIntervalToSeconds(\DateInterval $dateInterval): int
+ {
+ $reference = new \DateTimeImmutable();
+ $endTime = $reference->add($dateInterval);
+
+ $duration = $endTime->getTimestamp() - $reference->getTimestamp();
+
+ if ($duration < 1) {
+ $duration = 1;
+ }
+
+ return $duration;
+ }
+}
diff --git a/src/Helpers/EnvironmentHelper.php b/src/Helpers/EnvironmentHelper.php
new file mode 100644
index 0000000000000000000000000000000000000000..b67f33c1b7e50ef6dbe8589a3a70d7c1b2d368ce
--- /dev/null
+++ b/src/Helpers/EnvironmentHelper.php
@@ -0,0 +1,11 @@
+<?php
+
+namespace SimpleSAML\Module\accounting\Helpers;
+
+class EnvironmentHelper
+{
+ public function isCli(): bool
+ {
+ return http_response_code() === false;
+ }
+}
diff --git a/src/Helpers/FilesystemHelper.php b/src/Helpers/FilesystemHelper.php
index 074b461b287011cdfb8308351458811ad66e4b2f..f82b41c595d3cc2980cee6ceb5303c0947fcc1fb 100644
--- a/src/Helpers/FilesystemHelper.php
+++ b/src/Helpers/FilesystemHelper.php
@@ -6,6 +6,7 @@ namespace SimpleSAML\Module\accounting\Helpers;
use SimpleSAML\Module\accounting\Exceptions\InvalidValueException;
+// TODO mivanci move to HelpersManager
class FilesystemHelper
{
public static function getRealPath(string $path): string
diff --git a/src/Helpers/HashHelper.php b/src/Helpers/HashHelper.php
index 568b8268425511e180e9863ab82e13eb34e89a80..7421125e4bd5aaab937074a0d30d9f9f9c1dcd0e 100644
--- a/src/Helpers/HashHelper.php
+++ b/src/Helpers/HashHelper.php
@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers;
+// TODO mivanci move to HelpersManager
class HashHelper
{
public static function getSha256(string $data): string
diff --git a/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php b/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php
index fdecc9a5718995d12a72ad811e38eeda3101e091..ffdba87b2026680708a3aca7fa8ddc2917f62c5c 100644
--- a/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php
+++ b/src/Helpers/InstanceBuilderUsingModuleConfigurationHelper.php
@@ -9,6 +9,7 @@ use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException;
use SimpleSAML\Module\accounting\Interfaces\BuildableUsingModuleConfigurationInterface;
use SimpleSAML\Module\accounting\ModuleConfiguration;
+// TODO mivanci move to HelpersManager
class InstanceBuilderUsingModuleConfigurationHelper
{
/**
diff --git a/src/Helpers/NetworkHelper.php b/src/Helpers/NetworkHelper.php
index 4d033e7fa97e8281e84fc20c9a77e689643dde08..7b245491326d1ad570d66041217a5dccb621855c 100644
--- a/src/Helpers/NetworkHelper.php
+++ b/src/Helpers/NetworkHelper.php
@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace SimpleSAML\Module\accounting\Helpers;
+// TODO mivanci move to HelpersManager
class NetworkHelper
{
public static function resolveClientIpAddress(string $clientIpAddress = null): ?string
diff --git a/src/Helpers/RandomHelper.php b/src/Helpers/RandomHelper.php
new file mode 100644
index 0000000000000000000000000000000000000000..30256837765db4ebe4806e951d31ae66974af799
--- /dev/null
+++ b/src/Helpers/RandomHelper.php
@@ -0,0 +1,15 @@
+<?php
+
+namespace SimpleSAML\Module\accounting\Helpers;
+
+class RandomHelper
+{
+ public function getRandomInt(int $minimum = PHP_INT_MIN, int $maximum = PHP_INT_MAX): int
+ {
+ try {
+ return random_int($minimum, $maximum);
+ } catch (\Throwable $exception) {
+ return mt_rand($minimum, $maximum);
+ }
+ }
+}
diff --git a/src/ModuleConfiguration.php b/src/ModuleConfiguration.php
index 6f4cce1671cd10baae8d89beb12bea8e5a1a1a89..71b33fd5600851385f5102b6be977a6163e85449 100644
--- a/src/ModuleConfiguration.php
+++ b/src/ModuleConfiguration.php
@@ -29,6 +29,10 @@ class ModuleConfiguration
public const OPTION_ADDITIONAL_TRACKERS = 'additional_trackers';
public const OPTION_CONNECTIONS_AND_PARAMETERS = 'connections_and_parameters';
public const OPTION_CLASS_TO_CONNECTION_MAP = 'class_to_connection_map';
+ public const OPTION_CRON_TAG_FOR_JOB_RUNNER = 'cron_tag_for_job_runner';
+ public const OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME = 'job_runner_maximum_execution_time';
+ public const OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED =
+ 'job_runner_should_pause_after_number_of_jobs_processed';
/**
* Contains configuration from module configuration file.
@@ -52,6 +56,11 @@ class ModuleConfiguration
return $this->getConfiguration()->getString(self::OPTION_ACCOUNTING_PROCESSING_TYPE);
}
+ public function getCronTagForJobRunner(): string
+ {
+ return $this->getConfiguration()->getString(self::OPTION_CRON_TAG_FOR_JOB_RUNNER);
+ }
+
/**
* Get underlying SimpleSAMLphp Configuration instance.
*
@@ -67,6 +76,55 @@ class ModuleConfiguration
return $this->getConfiguration()->getString(self::OPTION_JOBS_STORE);
}
+ public function getJobRunnerMaximumExecutionTime(): ?\DateInterval
+ {
+ $value = $this->get(self::OPTION_JOB_RUNNER_MAXIMUM_EXECUTION_TIME);
+
+ if (is_null($value)) {
+ return null;
+ }
+
+ if (! is_string($value)) {
+ $message = sprintf('Job runner maximum activity must be defined either as null, or DateInterval' .
+ 'duration (string).');
+ throw new InvalidConfigurationException($message);
+ }
+
+ try {
+ return new \DateInterval($value);
+ } catch (Throwable $exception) {
+ $message = sprintf('Can not create DateInterval instance using value %s as parameter.', $value);
+ throw new InvalidConfigurationException($message);
+ }
+ }
+
+ public function getJobRunnerShouldPauseAfterNumberOfJobsProcessed(): ?int
+ {
+ $value = $this->get(self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED);
+
+ if (is_null($value)) {
+ return null;
+ }
+
+ if (! is_int($value)) {
+ $message = sprintf(
+ 'Option \'%s\' must be defined either as null, or positive integer.',
+ self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED
+ );
+ throw new InvalidConfigurationException($message);
+ }
+
+ if ($value < 1) {
+ $message = sprintf(
+ 'Option \'%s\' must positive integer.',
+ self::OPTION_JOB_RUNNER_SHOULD_PAUSE_AFTER_NUMBER_OF_JOBS_PROCESSED
+ );
+ throw new InvalidConfigurationException($message);
+ }
+
+ return $value;
+ }
+
public function getDefaultDataTrackerAndProviderClass(): string
{
return $this->getConfiguration()->getString(self::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER);
@@ -212,6 +270,7 @@ class ModuleConfiguration
if ($this->getAccountingProcessingType() === AccountingProcessingType::VALUE_ASYNCHRONOUS) {
try {
$this->validateJobsStoreClass();
+ $this->validateCronTagForJobRunner();
} catch (Throwable $exception) {
$errors[] = $exception->getMessage();
}
@@ -376,4 +435,9 @@ class ModuleConfiguration
throw new InvalidConfigurationException(implode(' ', $errors));
}
}
+
+ protected function validateCronTagForJobRunner(): void
+ {
+ $this->getCronTagForJobRunner();
+ }
}
diff --git a/src/Services/HelpersManager.php b/src/Services/HelpersManager.php
new file mode 100644
index 0000000000000000000000000000000000000000..f20ddc10612e584f09e496cd51995019b2395c03
--- /dev/null
+++ b/src/Services/HelpersManager.php
@@ -0,0 +1,31 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Module\accounting\Services;
+
+use SimpleSAML\Module\accounting\Helpers\DateTimeHelper;
+use SimpleSAML\Module\accounting\Helpers\EnvironmentHelper;
+use SimpleSAML\Module\accounting\Helpers\RandomHelper;
+
+class HelpersManager
+{
+ protected static ?DateTimeHelper $dateTimeHelper;
+ protected static ?EnvironmentHelper $environmentHelper;
+ protected static ?RandomHelper $randomHelper;
+
+ public function getDateTimeHelper(): DateTimeHelper
+ {
+ return self::$dateTimeHelper ??= new DateTimeHelper();
+ }
+
+ public function getEnvironmentHelper(): EnvironmentHelper
+ {
+ return self::$environmentHelper ??= new EnvironmentHelper();
+ }
+
+ public function getRandomHelper(): RandomHelper
+ {
+ return self::$randomHelper ??= new RandomHelper();
+ }
+}
diff --git a/src/Services/JobRunner.php b/src/Services/JobRunner.php
new file mode 100644
index 0000000000000000000000000000000000000000..9735bfa3ce3c65c124b09265bdec2c60d4f24d3a
--- /dev/null
+++ b/src/Services/JobRunner.php
@@ -0,0 +1,599 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Module\accounting\Services;
+
+use Cicnavi\SimpleFileCache\SimpleFileCache;
+use Psr\Log\LoggerInterface;
+use Psr\SimpleCache\CacheInterface;
+use Psr\SimpleCache\InvalidArgumentException;
+use SimpleSAML\Configuration as SspConfiguration;
+use SimpleSAML\Module\accounting\Entities\Authentication\Event\Job;
+use SimpleSAML\Module\accounting\Exceptions\Exception;
+use SimpleSAML\Module\accounting\Exceptions\StoreException;
+use SimpleSAML\Module\accounting\Exceptions\UnexpectedValueException;
+use SimpleSAML\Module\accounting\ModuleConfiguration;
+use SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter;
+use SimpleSAML\Module\accounting\Services\JobRunner\State;
+use SimpleSAML\Module\accounting\Stores\Builders\JobsStoreBuilder;
+use SimpleSAML\Module\accounting\Trackers\Builders\AuthenticationDataTrackerBuilder;
+use SimpleSAML\Module\accounting\Trackers\Interfaces\AuthenticationDataTrackerInterface;
+
+class JobRunner
+{
+ protected ModuleConfiguration $moduleConfiguration;
+ protected SspConfiguration $sspConfiguration;
+ protected LoggerInterface $logger;
+ protected AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder;
+ protected JobsStoreBuilder $jobsStoreBuilder;
+ protected CacheInterface $cache;
+ protected State $state;
+
+ protected const CACHE_NAME = 'accounting-job-runner-cache';
+ protected const CACHE_KEY_STATE = 'state';
+
+ /**
+ * Interval after which the state will be considered stale.
+ */
+ public const STATE_STALE_THRESHOLD_INTERVAL = 'PT5M';
+
+ /**
+ * @var int $jobRunnerId ID of the current job runner instance.
+ */
+ protected int $jobRunnerId;
+ protected array $trackers;
+ protected \DateInterval $stateStaleThresholdInterval;
+ protected RateLimiter $rateLimiter;
+ protected HelpersManager $helpersManager;
+ protected ?\DateInterval $maximumExecutionTime;
+ protected ?int $shouldPauseAfterNumberOfJobsProcessed;
+
+ public function __construct(
+ ModuleConfiguration $moduleConfiguration,
+ SspConfiguration $sspConfiguration,
+ LoggerInterface $logger = null,
+ AuthenticationDataTrackerBuilder $authenticationDataTrackerBuilder = null,
+ JobsStoreBuilder $jobsStoreBuilder = null,
+ CacheInterface $cache = null,
+ State $state = null,
+ RateLimiter $rateLimiter = null,
+ HelpersManager $helpersManager = null
+ ) {
+ $this->moduleConfiguration = $moduleConfiguration;
+ $this->sspConfiguration = $sspConfiguration;
+ $this->logger = $logger ?? new Logger();
+ $this->authenticationDataTrackerBuilder = $authenticationDataTrackerBuilder ??
+ new AuthenticationDataTrackerBuilder($this->moduleConfiguration, $this->logger);
+ $this->jobsStoreBuilder = $jobsStoreBuilder ?? new JobsStoreBuilder($this->moduleConfiguration, $this->logger);
+
+ $this->cache = $cache ?? $this->resolveCache();
+
+ $this->helpersManager = $helpersManager ?? new HelpersManager();
+
+ $this->jobRunnerId = $this->helpersManager->getRandomHelper()->getRandomInt();
+
+ $this->state = $state ?? new State($this->jobRunnerId);
+
+ $this->trackers = $this->resolveTrackers();
+ $this->stateStaleThresholdInterval = new \DateInterval(self::STATE_STALE_THRESHOLD_INTERVAL);
+ $this->rateLimiter = $rateLimiter ?? new RateLimiter();
+
+ $this->maximumExecutionTime = $this->resolveMaximumExecutionTime();
+ $this->shouldPauseAfterNumberOfJobsProcessed =
+ $this->moduleConfiguration->getJobRunnerShouldPauseAfterNumberOfJobsProcessed();
+
+ $this->registerInterruptHandler();
+ }
+
+ /**
+ * @throws Exception|StoreException
+ */
+ public function run(): State
+ {
+ try {
+ $this->validatePreRunState();
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Pre-run state validation failed. Clearing cached state and continuing. Error was %s',
+ $exception->getMessage()
+ );
+ $this->logger->warning($message);
+ $this->state->addStatusMessage($message);
+ $this->clearCachedState();
+ }
+
+ try {
+ $this->validateRunConditions();
+ } catch (\Throwable $exception) {
+ $message = sprintf('Run conditions are not met, stopping. Reason was: %s', $exception->getMessage());
+ $this->logger->info($message);
+ $this->state->addStatusMessage($message);
+ return $this->state;
+ }
+
+ $this->logger->debug('Run conditions validated.');
+
+ $this->initializeCachedState();
+
+ $jobsStore = $this->jobsStoreBuilder->build($this->moduleConfiguration->getJobsStoreClass());
+
+ $jobsProcessedSincePause = 0;
+
+ // We have a clean state, we can start processing.
+ while ($this->shouldRun()) {
+ try {
+ /** @var ?Job $job */
+ $job = $jobsStore->dequeue(Job::class);
+
+ $this->updateCachedState($this->state);
+
+ declare(ticks=1) {
+ // No new jobs at the moment....
+ if ($job === null) {
+ $this->state->addStatusMessage('No (more) jobs to process.');
+ // If in CLI, do the backoff pause, so we can continue working later.
+ if ($this->isCli()) {
+ $message = sprintf(
+ 'Doing a backoff pause for %s seconds.',
+ $this->rateLimiter->getCurrentBackoffPauseInSeconds()
+ );
+ $this->logger->debug($message);
+ $this->state->addStatusMessage($message);
+ $this->rateLimiter->doBackoffPause();
+ $jobsProcessedSincePause = 0;
+ continue;
+ } else {
+ // Since this is a web run, we will break immediately, so we can return HTTP response.
+ break;
+ }
+ }
+
+ // We have a job...
+ $this->rateLimiter->resetBackoffPause();
+ }
+
+ /** @var AuthenticationDataTrackerInterface $tracker */
+ foreach ($this->trackers as $tracker) {
+ /** @var Job $job */
+ $tracker->process($job->getPayload());
+ }
+
+ $this->state->incrementSuccessfulJobsProcessed();
+
+ /** @var Job $job */
+ $successMessage = sprintf(
+ 'Successfully processed job with ID %s.',
+ $job->getId() ?? '(N/A)'
+ );
+ $this->logger->debug($successMessage);
+ $this->state->addStatusMessage($successMessage);
+
+ // If the job runner friendly pausing is enabled, and if the number of jobs processed since the last
+ // pause is greater than the configured value, do the pause.
+ if (
+ $this->shouldPauseAfterNumberOfJobsProcessed !== null &&
+ $jobsProcessedSincePause > $this->shouldPauseAfterNumberOfJobsProcessed
+ ) {
+ $this->rateLimiter->doPause();
+ $jobsProcessedSincePause = 0;
+ } else {
+ $jobsProcessedSincePause++;
+ }
+ } catch (\Throwable $exception) {
+ $message = sprintf('Error while processing jobs. Error was: %', $exception->getMessage());
+ $context = [];
+ if (isset($job)) {
+ $context = ['job' => $job];
+ $jobsStore->markFailedJob($job);
+ }
+ $this->logger->error($message, $context);
+ $this->state->incrementFailedJobsProcessed();
+ $this->state->addStatusMessage($message);
+ }
+ }
+
+ $this->clearCachedState();
+
+ $this->state->setEndedAt(new \DateTimeImmutable());
+ return $this->state;
+ }
+
+ /**
+ */
+ protected function shouldRun(): bool
+ {
+ // Enable this code to tick, which will enable it to catch CTRL-C signals and stop gracefully.
+ declare(ticks=1) {
+ if ($this->isMaximumExecutionTimeReached()) {
+ $message = 'Maximum job runner execution time reached.';
+ $this->logger->debug($message);
+ $this->state->addStatusMessage($message);
+ return false;
+ }
+
+ if ($this->state->getTotalJobsProcessed() > (PHP_INT_MAX - 1)) {
+ $message = 'Maximum number of processed jobs reached.';
+ $this->logger->debug($message);
+ $this->state->addStatusMessage($message);
+ return false;
+ }
+
+ try {
+ $this->validateSelfState();
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Job runner state is not valid. Message was: %s',
+ $exception->getMessage()
+ );
+ $this->logger->warning($message);
+ $this->state->addStatusMessage($message);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function initializeCachedState(): void
+ {
+ // Make sure that the state does not exist in the cache.
+ try {
+ if ($this->getCachedState() !== null) {
+ throw new UnexpectedValueException('Job runner state already initialized.');
+ }
+ } catch (\Throwable $exception) {
+ $message = sprintf('Error initializing job runner state. Error was: %s.', $exception->getMessage());
+ $this->logger->error($message);
+ throw new Exception($message, (int)$exception->getCode(), $exception);
+ }
+
+ $startedAt = new \DateTimeImmutable();
+ $this->state->setStartedAt($startedAt);
+ $this->updateCachedState($this->state, $startedAt);
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function validatePreRunState(): void
+ {
+ $cachedState = $this->getCachedState();
+
+ // Empty state means that no other job runner is active.
+ if ($cachedState === null) {
+ return;
+ }
+
+ if ($cachedState->getJobRunnerId() === $this->jobRunnerId) {
+ $message = 'Job runner ID in cached state same as new ID.';
+ $this->logger->error($message);
+ throw new Exception($message);
+ }
+
+ if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
+ $message = 'Stale state encountered.';
+ $this->logger->warning($message);
+ throw new Exception($message);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function validateSelfState(): void
+ {
+ $cachedState = $this->getCachedState();
+
+ // Validate state before start.
+ if ($this->state->hasRunStarted() === false) {
+ if ($cachedState !== null) {
+ $message = 'Job run has not started, however cached state has already been initialized.';
+ throw new Exception($message);
+ }
+ }
+
+ // Validate state after start.
+ if ($this->state->hasRunStarted() === true) {
+ if ($cachedState === null) {
+ $message = 'Job run has started, however cached state has not been initialized.';
+ throw new Exception($message);
+ }
+
+ if ($cachedState->getJobRunnerId() !== $this->jobRunnerId) {
+ $message = 'Current job runner ID differs from the ID in the cached state.';
+ throw new Exception($message);
+ }
+
+ if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
+ $message = 'Job runner cached state is stale, which means possible job runner process shutdown' .
+ ' without cached state clearing.';
+ throw new Exception($message);
+ }
+
+ if ($cachedState->getIsGracefulInterruptInitiated()) {
+ $message = 'Graceful job processing interrupt initiated.';
+ throw new Exception($message);
+ }
+ }
+ }
+
+ protected function isAnotherJobRunnerActive(): bool
+ {
+ try {
+ $cachedState = $this->getCachedState();
+
+ if ($cachedState === null) {
+ return false;
+ }
+
+ // There is cached state, which would indicate that a job runner is active. However, make sure that the
+ // state is not stale (which indicates that the runner was shutdown without state clearing). If stale,
+ // this means that the job runner is not active.
+ if ($cachedState->isStale($this->stateStaleThresholdInterval)) {
+ $this->logger->warning('Stale cache encountered. Assuming no job runner is active.');
+ return false;
+ }
+
+ return $cachedState->getJobRunnerId() !== $this->jobRunnerId;
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Error checking if another job runner is active. To play safe, we will assume true. ' .
+ 'Error was: %s',
+ $exception->getMessage()
+ );
+ $this->logger->error($message);
+ return true;
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function resolveCache(): SimpleFileCache
+ {
+ try {
+ $this->logger->debug('Trying to initialize job runner cache using SSP datadir.');
+ $cache = new SimpleFileCache(
+ self::CACHE_NAME,
+ $this->sspConfiguration->getPathValue('datadir')
+ );
+ $this->logger->debug('Successfully initialized cache using SSP datadir.');
+ return $cache;
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Error initializing job runner cache using datadir. Error was: %s',
+ $exception->getMessage()
+ );
+ $this->logger->debug($message);
+ }
+
+ try {
+ $this->logger->debug('Trying to initialize job runner cache using SSP tempdir.');
+ $cache = new SimpleFileCache(
+ self::CACHE_NAME,
+ $this->sspConfiguration->getPathValue('tempdir')
+ );
+ $this->logger->debug('Successfully initialized job runner cache using SSP tempdir.');
+ return $cache;
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Error initializing job runner cache using tempdir. Error was: %s.',
+ $exception->getMessage()
+ );
+ $this->logger->debug($message);
+ }
+
+ try {
+ $this->logger->debug('Trying to initialize job runner cache using system tmp dir.');
+ $cache = new SimpleFileCache(self::CACHE_NAME);
+ $this->logger->debug('Successfully initialized cache using system tmp dir.');
+ return $cache;
+ } catch (\Throwable $exception) {
+ $message = sprintf(
+ 'Error initializing job runner cache. Error was: %s.',
+ $exception->getMessage()
+ );
+ $this->logger->debug($message);
+ throw new Exception($message, (int)$exception->getCode(), $exception);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function clearCachedState(): void
+ {
+ /** @psalm-suppress InvalidCatch */
+ try {
+ $this->cache->delete(self::CACHE_KEY_STATE);
+ } catch (\Throwable | InvalidArgumentException $exception) {
+ $message = sprintf(
+ 'Error clearing job runner cache. Error was: %s.',
+ $exception->getMessage()
+ );
+ $this->logger->error($message);
+ throw new Exception($message, (int)$exception->getCode(), $exception);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function getCachedState(): ?State
+ {
+ /** @psalm-suppress InvalidCatch */
+ try {
+ /** @var ?State $state */
+ $state = $this->cache->get(self::CACHE_KEY_STATE);
+ if ($state instanceof State) {
+ return $state;
+ } else {
+ return null;
+ }
+ } catch (\Throwable | InvalidArgumentException $exception) {
+ $message = sprintf('Error getting job runner state from cache. Error was: %s', $exception->getMessage());
+ throw new Exception($message, (int)$exception->getCode(), $exception);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function updateCachedState(State $state, \DateTimeImmutable $updatedAt = null): void
+ {
+ $updatedAt = $updatedAt ?? new \DateTimeImmutable();
+ $state->setUpdatedAt($updatedAt);
+
+ /** @psalm-suppress InvalidCatch */
+ try {
+ $this->cache->set(self::CACHE_KEY_STATE, $state);
+ } catch (\Throwable | InvalidArgumentException $exception) {
+ $message = sprintf('Error setting job runner state. Error was: %s.', $exception->getMessage());
+ $this->logger->error($message);
+ throw new Exception($message, (int)$exception->getCode(), $exception);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function validateRunConditions(): void
+ {
+ if (
+ $this->moduleConfiguration->getAccountingProcessingType() !==
+ ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS
+ ) {
+ $message = 'Job runner called, however accounting mode is not ' .
+ ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS;
+ $this->logger->warning($message);
+ throw new Exception($message);
+ }
+
+ if ($this->isAnotherJobRunnerActive()) {
+ $message = 'Another job runner is active.';
+ $this->logger->debug($message);
+ throw new Exception($message);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function resolveTrackers(): array
+ {
+ $trackers = [];
+
+ $configuredTrackerClasses = array_merge(
+ [$this->moduleConfiguration->getDefaultDataTrackerAndProviderClass()],
+ $this->moduleConfiguration->getAdditionalTrackers()
+ );
+
+ /** @var string $trackerClass */
+ foreach ($configuredTrackerClasses as $trackerClass) {
+ $trackers[$trackerClass] = $this->authenticationDataTrackerBuilder->build($trackerClass);
+ }
+
+ return $trackers;
+ }
+
+ protected function isCli(): bool
+ {
+ return $this->helpersManager->getEnvironmentHelper()->isCli();
+ }
+
+ /**
+ * Register interrupt handler. This makes it possible to stop job processing gracefully by
+ * clearing the current state. It relies on pcntl extension, so to use this feature,
+ * that extension has to be enabled.
+ * @see https://www.php.net/manual/en/pcntl.installation.php
+ * @return void
+ */
+ protected function registerInterruptHandler(): void
+ {
+ // pcntl won't be available in web server environment, so skip immediately.
+ if (! $this->isCli()) {
+ return;
+ }
+
+ // Extension pcntl doesn't come with PHP by default, so check if the proper function is available.
+ if (! function_exists('pcntl_signal')) {
+ $message = 'pcntl related functions not available, skipping registering interrupt handler.';
+ $this->logger->info($message);
+ $this->state->addStatusMessage($message);
+ return;
+ }
+
+ pcntl_signal(SIGINT, [$this, 'handleInterrupt']);
+ pcntl_signal(SIGTERM, [$this, 'handleInterrupt']);
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected function handleInterrupt(int $signal): void
+ {
+ $message = sprintf('Gracefully stopping job processing. Interrupt signal was %s.', $signal);
+ $this->state->addStatusMessage($message);
+ $this->logger->info($message);
+ $this->state->setIsGracefulInterruptInitiated(true);
+ $this->updateCachedState($this->state);
+ }
+
+ protected function resolveMaximumExecutionTime(): ?\DateInterval
+ {
+ $maximumExecutionTime = $this->moduleConfiguration->getJobRunnerMaximumExecutionTime();
+
+ // If we are in CLI environment, we can safely use module configuration setting.
+ if ($this->isCli()) {
+ return $maximumExecutionTime;
+ }
+
+ // We are in a "web" environment, so take max execution time ini setting into account.
+ $iniMaximumExecutionTimeSeconds = (int)floor((int)ini_get('max_execution_time') * 0.8);
+ $iniMaximumExecutionTime = new \DateInterval('PT' . $iniMaximumExecutionTimeSeconds . 'S');
+
+ // If the module setting is null (meaning infinite), use the ini setting.
+ if ($maximumExecutionTime === null) {
+ return $iniMaximumExecutionTime;
+ }
+
+ // Use the shorter interval from the two...
+ $maximumExecutionTimeSeconds = $this->helpersManager
+ ->getDateTimeHelper()
+ ->convertDateIntervalToSeconds($maximumExecutionTime);
+
+ if ($iniMaximumExecutionTimeSeconds < $maximumExecutionTimeSeconds) {
+ $this->logger->debug('Using maximum execution time from INI setting since it is shorter.');
+ return $iniMaximumExecutionTime;
+ }
+
+ return $maximumExecutionTime;
+ }
+
+ protected function isMaximumExecutionTimeReached(): bool
+ {
+ if ($this->maximumExecutionTime === null) {
+ // Execution time is infinite.
+ return false;
+ }
+
+ $startedAt = $this->state->getStartedAt();
+ if ($startedAt === null) {
+ // Processing has not even started yet.
+ return false;
+ }
+
+ $maxDateTime = $startedAt->add($this->maximumExecutionTime);
+ if ($maxDateTime > (new \DateTimeImmutable())) {
+ // Maximum has not been reached yet.
+ return false;
+ }
+
+ // Maximum has been reached.
+ return true;
+ }
+}
diff --git a/src/Services/JobRunner/RateLimiter.php b/src/Services/JobRunner/RateLimiter.php
new file mode 100644
index 0000000000000000000000000000000000000000..bc8dc55b7845250c45753cd71cf647536ed97263
--- /dev/null
+++ b/src/Services/JobRunner/RateLimiter.php
@@ -0,0 +1,76 @@
+<?php
+
+namespace SimpleSAML\Module\accounting\Services\JobRunner;
+
+use SimpleSAML\Module\accounting\Services\HelpersManager;
+
+class RateLimiter
+{
+ public const DEFAULT_MAX_PAUSE_DURATION = 'PT10M';
+ public const DEFAULT_MAX_BACKOFF_PAUSE_DURATION = 'PT1M';
+
+ protected HelpersManager $helpersManager;
+
+ protected int $maxPauseInSeconds;
+ protected int $maxBackoffPauseInSeconds;
+ protected int $currentBackoffPauseInSeconds = 1;
+
+ public function __construct(
+ \DateInterval $maxPauseInterval = null,
+ \DateInterval $maxBackoffInterval = null,
+ HelpersManager $helpersManager = null
+ ) {
+ $this->helpersManager = $helpersManager ?? new HelpersManager();
+
+ $this->maxPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds(
+ $maxPauseInterval ?? new \DateInterval(self::DEFAULT_MAX_PAUSE_DURATION)
+ );
+ $this->maxBackoffPauseInSeconds = $this->helpersManager->getDateTimeHelper()->convertDateIntervalToSeconds(
+ $maxBackoffInterval ?? new \DateInterval(self::DEFAULT_MAX_BACKOFF_PAUSE_DURATION)
+ );
+ }
+
+ public function doBackoffPause(): void
+ {
+ /** @psalm-suppress ArgumentTypeCoercion */
+ sleep($this->currentBackoffPauseInSeconds);
+
+ $newBackoffPauseInSeconds = $this->currentBackoffPauseInSeconds + $this->currentBackoffPauseInSeconds;
+ $this->currentBackoffPauseInSeconds = min($newBackoffPauseInSeconds, $this->maxBackoffPauseInSeconds);
+ }
+
+ public function doPause(int $seconds = 1): void
+ {
+ $seconds = $seconds > 0 ? $seconds : 1;
+ sleep($seconds);
+ }
+
+ public function resetBackoffPause(): void
+ {
+ $this->currentBackoffPauseInSeconds = 1;
+ }
+
+ /**
+ * @return int
+ */
+ public function getMaxPauseInSeconds(): int
+ {
+ return $this->maxPauseInSeconds;
+ }
+
+ /**
+ * @return int
+ */
+ public function getMaxBackoffPauseInSeconds(): int
+ {
+ return $this->maxBackoffPauseInSeconds;
+ }
+
+ /**
+ * @return int
+ */
+ public function getCurrentBackoffPauseInSeconds(): int
+ {
+ return $this->currentBackoffPauseInSeconds;
+ }
+}
diff --git a/src/Services/JobRunner/State.php b/src/Services/JobRunner/State.php
new file mode 100644
index 0000000000000000000000000000000000000000..74eb2e4afd714e5eb4e209213ded6fe9d9a4a8ac
--- /dev/null
+++ b/src/Services/JobRunner/State.php
@@ -0,0 +1,196 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Module\accounting\Services\JobRunner;
+
+class State
+{
+ public const MAX_NUMBER_OF_MESSAGES_TO_KEEP = 100;
+ public const DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP = 10;
+
+ protected int $jobRunnerId;
+ protected ?\DateTimeImmutable $startedAt;
+ protected \DateTimeImmutable $updatedAt;
+ protected ?\DateTimeImmutable $endedAt = null;
+ protected int $successfulJobsProcessed = 0;
+ protected int $failedJobsProcessed = 0;
+ protected array $statusMessages = [];
+ protected int $numberOfStatusMessagesToKeep = 10;
+ protected bool $isGracefulInterruptInitiated = false;
+
+ public function __construct(
+ int $jobRunnerId,
+ \DateTimeImmutable $startedAt = null,
+ \DateTimeImmutable $updatedAt = null,
+ int $numberOfStatusMessagesToKeep = self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP
+ ) {
+ $this->jobRunnerId = $jobRunnerId;
+ $this->startedAt = $startedAt;
+ $this->updatedAt = $updatedAt ?? new \DateTimeImmutable();
+
+ $this->numberOfStatusMessagesToKeep =
+ $numberOfStatusMessagesToKeep > 0 && $numberOfStatusMessagesToKeep <= self::MAX_NUMBER_OF_MESSAGES_TO_KEEP ?
+ $numberOfStatusMessagesToKeep :
+ self::DEFAULT_NUMBER_OF_MESSAGES_TO_KEEP;
+ }
+
+ /**
+ * @return int
+ */
+ public function getJobRunnerId(): int
+ {
+ return $this->jobRunnerId;
+ }
+
+ /**
+ * @return ?\DateTimeImmutable
+ */
+ public function getStartedAt(): ?\DateTimeImmutable
+ {
+ return $this->startedAt;
+ }
+
+ /**
+ * Set startedAt if not already set.
+ * @param \DateTimeImmutable $startedAt
+ * @return bool True if set, false otherwise.
+ */
+ public function setStartedAt(\DateTimeImmutable $startedAt): bool
+ {
+ if ($this->startedAt === null) {
+ $this->startedAt = $startedAt;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return \DateTimeImmutable
+ */
+ public function getUpdatedAt(): \DateTimeImmutable
+ {
+ return $this->updatedAt;
+ }
+
+ /**
+ * @param \DateTimeImmutable $updatedAt
+ */
+ public function setUpdatedAt(\DateTimeImmutable $updatedAt): void
+ {
+ $this->updatedAt = $updatedAt;
+ }
+
+ /**
+ * Set endedAt if not already set.
+ * @param \DateTimeImmutable $endedAt
+ * @return bool True if set, false otherwise.
+ */
+ public function setEndedAt(\DateTimeImmutable $endedAt): bool
+ {
+ if ($this->endedAt === null) {
+ $this->endedAt = $endedAt;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return ?\DateTimeImmutable
+ */
+ public function getEndedAt(): ?\DateTimeImmutable
+ {
+ return $this->endedAt;
+ }
+
+ public function hasRunStarted(): bool
+ {
+ return $this->startedAt !== null;
+ }
+
+ public function incrementSuccessfulJobsProcessed(): void
+ {
+ $this->successfulJobsProcessed++;
+ }
+
+ public function incrementFailedJobsProcessed(): void
+ {
+ $this->failedJobsProcessed++;
+ }
+
+ /**
+ * @return int
+ */
+ public function getSuccessfulJobsProcessed(): int
+ {
+ return $this->successfulJobsProcessed;
+ }
+
+ /**
+ * @return int
+ */
+ public function getFailedJobsProcessed(): int
+ {
+ return $this->failedJobsProcessed;
+ }
+
+ public function isStale(\DateInterval $threshold): bool
+ {
+ $minDateTime = (new \DateTimeImmutable())->sub($threshold);
+
+ if ($this->getUpdatedAt() < $minDateTime) {
+ return true;
+ }
+
+ return false;
+ }
+
+ public function getTotalJobsProcessed(): int
+ {
+ return $this->getSuccessfulJobsProcessed() + $this->getFailedJobsProcessed();
+ }
+
+ public function addStatusMessage(string $message): void
+ {
+ $this->statusMessages[] = $message;
+
+ if (count($this->statusMessages) > $this->numberOfStatusMessagesToKeep) {
+ array_shift($this->statusMessages);
+ }
+ }
+
+ public function getStatusMessages(): array
+ {
+ return $this->statusMessages;
+ }
+
+ public function getLastStatusMessage(): ?string
+ {
+ if (empty($this->statusMessages)) {
+ return null;
+ }
+
+ $message = (string)end($this->statusMessages);
+ reset($this->statusMessages);
+
+ return $message;
+ }
+
+ /**
+ * @return bool
+ */
+ public function getIsGracefulInterruptInitiated(): bool
+ {
+ return $this->isGracefulInterruptInitiated;
+ }
+
+ /**
+ * @param bool $isGracefulInterruptInitiated
+ */
+ public function setIsGracefulInterruptInitiated(bool $isGracefulInterruptInitiated): void
+ {
+ $this->isGracefulInterruptInitiated = $isGracefulInterruptInitiated;
+ }
+}
diff --git a/src/Stores/Interfaces/JobsStoreInterface.php b/src/Stores/Interfaces/JobsStoreInterface.php
index 1ff238d66b921fc0005587f66fdb17972475b1b7..90d05c4572c07a7d019f52dc7ba79eea09fb5f96 100644
--- a/src/Stores/Interfaces/JobsStoreInterface.php
+++ b/src/Stores/Interfaces/JobsStoreInterface.php
@@ -24,6 +24,12 @@ interface JobsStoreInterface extends StoreInterface
*/
public function dequeue(string $type = null): ?JobInterface;
+ /**
+ * @param JobInterface $job
+ * @return void
+ */
+ public function markFailedJob(JobInterface $job): void;
+
public static function build(
ModuleConfiguration $moduleConfiguration,
LoggerInterface $logger,
diff --git a/src/Stores/Jobs/DoctrineDbal/Store.php b/src/Stores/Jobs/DoctrineDbal/Store.php
index 5e81988ffc71ab00496969bf0316c1a2ecad1527..7bdbeebb01e87d5ad24695765c3a917aa916f2d6 100644
--- a/src/Stores/Jobs/DoctrineDbal/Store.php
+++ b/src/Stores/Jobs/DoctrineDbal/Store.php
@@ -21,6 +21,7 @@ class Store extends AbstractStore implements JobsStoreInterface
protected string $prefixedTableNameJobs;
protected string $prefixedTableNameFailedJobs;
protected Repository $jobsRepository;
+ protected Repository $failedJobsRepository;
/**
* @throws StoreException
@@ -30,7 +31,8 @@ class Store extends AbstractStore implements JobsStoreInterface
LoggerInterface $logger,
Factory $connectionFactory,
string $connectionKey = null,
- Repository $jobsRepository = null
+ Repository $jobsRepository = null,
+ Repository $failedJobsRepository = null
) {
parent::__construct($moduleConfiguration, $logger, $connectionFactory, $connectionKey);
@@ -40,6 +42,9 @@ class Store extends AbstractStore implements JobsStoreInterface
$this->jobsRepository = $jobsRepository ??
new Repository($this->connection, $this->prefixedTableNameJobs, $this->logger);
+
+ $this->failedJobsRepository = $failedJobsRepository ??
+ new Repository($this->connection, $this->prefixedTableNameFailedJobs, $this->logger);
}
/**
@@ -136,4 +141,12 @@ class Store extends AbstractStore implements JobsStoreInterface
$connectionKey
);
}
+
+ /**
+ * @throws StoreException
+ */
+ public function markFailedJob(JobInterface $job): void
+ {
+ $this->failedJobsRepository->insert($job);
+ }
}
diff --git a/tests/config-templates/module_accounting.php b/tests/config-templates/module_accounting.php
index 4eec5cf688732bbad540a467bffcf384fbcdfeb3..60da309e0f79caa532a4c0f0b9d16d4972d62ad2 100644
--- a/tests/config-templates/module_accounting.php
+++ b/tests/config-templates/module_accounting.php
@@ -16,6 +16,8 @@ $config = [
ModuleConfiguration::OPTION_ACCOUNTING_PROCESSING_TYPE =>
ModuleConfiguration\AccountingProcessingType::VALUE_SYNCHRONOUS,
+ ModuleConfiguration::OPTION_CRON_TAG_FOR_JOB_RUNNER => 'accounting_job_runner',
+
ModuleConfiguration::OPTION_JOBS_STORE => Stores\Jobs\DoctrineDbal\Store::class,
ModuleConfiguration::OPTION_DEFAULT_DATA_TRACKER_AND_PROVIDER =>
diff --git a/tests/src/Services/JobRunner/RateLimiterTest.php b/tests/src/Services/JobRunner/RateLimiterTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..19a98c64aa60f6ef32a6e0bf6a4c8855c3be0762
--- /dev/null
+++ b/tests/src/Services/JobRunner/RateLimiterTest.php
@@ -0,0 +1,67 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Test\Module\accounting\Services\JobRunner;
+
+use PHPUnit\Framework\TestCase;
+use SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter;
+
+/**
+ * @covers \SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter
+ * @uses \SimpleSAML\Module\accounting\Helpers\DateTimeHelper
+ * @uses \SimpleSAML\Module\accounting\Services\HelpersManager
+ */
+class RateLimiterTest extends TestCase
+{
+ protected function setUp(): void
+ {
+ }
+
+ public function testCanCreateInstance(): void
+ {
+ $this->assertInstanceOf(RateLimiter::class, new RateLimiter());
+ }
+
+ public function testCanDoPause(): void
+ {
+ $rateLimiter = new RateLimiter();
+ $startTimeInSeconds = (new \DateTimeImmutable())->getTimestamp();
+ $rateLimiter->doPause();
+ $endTimeInSeconds = (new \DateTimeImmutable())->getTimestamp();
+
+ $this->assertTrue(($endTimeInSeconds - $startTimeInSeconds) >= 1);
+ }
+
+ public function testCanSetMaxPause(): void
+ {
+ $rateLimiter = new RateLimiter(new \DateInterval('PT1S'));
+ $this->assertSame(1, $rateLimiter->getMaxPauseInSeconds());
+ $splitSecondInterval = \DateInterval::createFromDateString('10000 microsecond'); // 10 milliseconds
+ $rateLimiter = new RateLimiter($splitSecondInterval);
+ $this->assertSame(1, $rateLimiter->getMaxPauseInSeconds());
+ $rateLimiter->doPause();
+ }
+
+ public function testCanDoBackoffPause(): void
+ {
+ $rateLimiter = new RateLimiter();
+ $startTimeInSeconds = (new \DateTimeImmutable())->getTimestamp();
+ $rateLimiter->doBackoffPause();
+ $endTimeInSeconds = (new \DateTimeImmutable())->getTimestamp();
+ $this->assertTrue(($endTimeInSeconds - $startTimeInSeconds) >= 1);
+ $this->assertTrue($rateLimiter->getCurrentBackoffPauseInSeconds() > 1);
+ $rateLimiter->resetBackoffPause();
+ $this->assertTrue($rateLimiter->getCurrentBackoffPauseInSeconds() === 1);
+ }
+
+ public function testCanSetMaxBackoffPause(): void
+ {
+ $rateLimiter = new RateLimiter(null, new \DateInterval('PT1S'));
+ $this->assertSame(1, $rateLimiter->getMaxBackoffPauseInSeconds());
+ $splitSecondInterval = \DateInterval::createFromDateString('10000 microsecond'); // 10 milliseconds
+ $rateLimiter = new \SimpleSAML\Module\accounting\Services\JobRunner\RateLimiter(null, $splitSecondInterval);
+ $this->assertSame(1, $rateLimiter->getMaxBackoffPauseInSeconds());
+ $rateLimiter->doBackoffPause();
+ }
+}
diff --git a/tests/src/Services/JobRunner/StateTest.php b/tests/src/Services/JobRunner/StateTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..061b40b4d992fd0107aa8440a62ecbcdf4ed226e
--- /dev/null
+++ b/tests/src/Services/JobRunner/StateTest.php
@@ -0,0 +1,108 @@
+<?php
+
+namespace SimpleSAML\Test\Module\accounting\Services\JobRunner;
+
+use SimpleSAML\Module\accounting\Services\JobRunner\State;
+use PHPUnit\Framework\TestCase;
+
+/**
+ * @covers \SimpleSAML\Module\accounting\Services\JobRunner\State
+ */
+class StateTest extends TestCase
+{
+ protected int $jobRunnerId;
+
+ protected function setUp(): void
+ {
+ $this->jobRunnerId = 1;
+ }
+
+ public function testCanCreateInstance(): void
+ {
+ $startedAt = $updatedAt = new \DateTimeImmutable();
+
+ $state = new State($this->jobRunnerId);
+ $this->assertInstanceOf(State::class, $state);
+ $this->assertSame($this->jobRunnerId, $state->getJobRunnerId());
+
+ $state = new State($this->jobRunnerId, $startedAt, null);
+ $this->assertInstanceOf(State::class, $state);
+
+ $state = new State($this->jobRunnerId, $startedAt, $updatedAt);
+ $this->assertInstanceOf(State::class, $state);
+
+ $state = new State($this->jobRunnerId, $startedAt, $updatedAt, 1000);
+ $this->assertInstanceOf(State::class, $state);
+ }
+
+ public function testCanWorkWithTimestamps(): void
+ {
+ $startedAt = $updatedAt = $endedAt = new \DateTimeImmutable();
+
+ $state = new State($this->jobRunnerId);
+ $this->assertNull($state->getStartedAt());
+ $this->assertInstanceOf(\DateTimeImmutable::class, $state->getUpdatedAt());
+ $this->assertNull($state->getEndedAt());
+
+ $this->assertTrue($state->setStartedAt($startedAt));
+ $this->assertTrue($state->hasRunStarted());
+ $state->setUpdatedAt($updatedAt);
+ $this->assertTrue($state->setEndedAt($endedAt));
+
+ $this->assertSame($startedAt, $state->getStartedAt());
+ $this->assertSame($updatedAt, $state->getUpdatedAt());
+ $this->assertSame($endedAt, $state->getEndedAt());
+
+ $this->assertFalse($state->setStartedAt($startedAt));
+ $this->assertFalse($state->setEndedAt($endedAt));
+ }
+
+ public function testCanCountProcessedJobs(): void
+ {
+ $state = new State($this->jobRunnerId);
+
+ $this->assertSame(0, $state->getTotalJobsProcessed());
+ $state->incrementSuccessfulJobsProcessed();
+
+ $this->assertSame(1, $state->getTotalJobsProcessed());
+ $this->assertSame(1, $state->getSuccessfulJobsProcessed());
+ $this->assertSame(0, $state->getFailedJobsProcessed());
+
+ $state->incrementFailedJobsProcessed();
+
+ $this->assertSame(2, $state->getTotalJobsProcessed());
+ $this->assertSame(1, $state->getSuccessfulJobsProcessed());
+ $this->assertSame(1, $state->getFailedJobsProcessed());
+ }
+
+ public function testCanCheckIfStateIsStale(): void
+ {
+ $state = new State($this->jobRunnerId);
+ $freshnessDuration = new \DateInterval('PT5M');
+
+ $this->assertFalse($state->isStale($freshnessDuration));
+
+ $dateTimeInHistory = new \DateTimeImmutable('-9 minutes');
+ $state->setUpdatedAt($dateTimeInHistory);
+
+ $this->assertTrue($state->isStale($freshnessDuration));
+ }
+
+ public function testCanWorkWithStatusMessages(): void
+ {
+ $state = new State($this->jobRunnerId, null, null, 2);
+ $this->assertEmpty($state->getStatusMessages());
+ $this->assertNull($state->getLastStatusMessage());
+
+ $state->addStatusMessage('test');
+ $this->assertSame(1, count($state->getStatusMessages()));
+ $this->assertSame('test', $state->getLastStatusMessage());
+ $state->addStatusMessage('test2');
+ $this->assertSame('test2', $state->getLastStatusMessage());
+ $this->assertSame(2, count($state->getStatusMessages()));
+ $state->addStatusMessage('test3');
+ $this->assertSame('test3', $state->getLastStatusMessage());
+ $this->assertSame(2, count($state->getStatusMessages()));
+ $this->assertSame('test3', $state->getLastStatusMessage());
+ }
+}
diff --git a/tests/src/Services/JobRunnerTest.php b/tests/src/Services/JobRunnerTest.php
new file mode 100644
index 0000000000000000000000000000000000000000..8e1fbc94c3f44e15c7fb662a87b064e83d0b6d9b
--- /dev/null
+++ b/tests/src/Services/JobRunnerTest.php
@@ -0,0 +1,1085 @@
+<?php
+
+declare(strict_types=1);
+
+namespace SimpleSAML\Test\Module\accounting\Services;
+
+use PHPUnit\Framework\MockObject\MockObject;
+use PHPUnit\Framework\MockObject\Stub;
+use Psr\Log\LoggerInterface;
+use Psr\SimpleCache\CacheInterface;
+use SimpleSAML\Configuration;
+use SimpleSAML\Module\accounting\Entities\Authentication\Event;
+use SimpleSAML\Module\accounting\Entities\Bases\AbstractPayload;
+use SimpleSAML\Module\accounting\Entities\Interfaces\JobInterface;
+use SimpleSAML\Module\accounting\Exceptions\Exception;
+use SimpleSAML\Module\accounting\Helpers\DateTimeHelper;
+use SimpleSAML\Module\accounting\Helpers\EnvironmentHelper;
+use SimpleSAML\Module\accounting\Helpers\RandomHelper;
+use SimpleSAML\Module\accounting\ModuleConfiguration;
+use SimpleSAML\Module\accounting\Services\HelpersManager;
+use SimpleSAML\Module\accounting\Services\JobRunner;
+use PHPUnit\Framework\TestCase;
+use SimpleSAML\Module\accounting\Stores\Builders\JobsStoreBuilder;
+use SimpleSAML\Module\accounting\Stores\Interfaces\JobsStoreInterface;
+use SimpleSAML\Module\accounting\Trackers\Builders\AuthenticationDataTrackerBuilder;
+use SimpleSAML\Module\accounting\Trackers\Interfaces\AuthenticationDataTrackerInterface;
+
+/**
+ * @covers \SimpleSAML\Module\accounting\Services\JobRunner
+ *
+ * @psalm-suppress all
+ */
+class JobRunnerTest extends TestCase
+{
+ /**
+ * @var Stub|ModuleConfiguration
+ */
+ protected $moduleConfigurationStub;
+ /**
+ * @var Stub|Configuration
+ */
+ protected $sspConfigurationStub;
+ /**
+ * @var MockObject|LoggerInterface
+ */
+ protected $loggerMock;
+ /**
+ * @var MockObject|CacheInterface
+ */
+ protected $cacheMock;
+ /**
+ * @var Stub|JobRunner\State
+ */
+ protected $stateStub;
+ /**
+ * @var Stub|JobRunner\RateLimiter
+ */
+ protected $rateLimiterMock;
+ /**
+ * @var Stub|AuthenticationDataTrackerBuilder
+ */
+ protected $authenticationDataTrackerBuilderStub;
+ /**
+ * @var MockObject|AuthenticationDataTrackerInterface
+ */
+ protected $authenticationDataTrackerMock;
+ /**
+ * @var Stub|JobsStoreBuilder
+ */
+ protected $jobsStoreBuilderStub;
+ /**
+ * @var Stub|RandomHelper
+ */
+ protected $randomHelperStub;
+ /**
+ * @var Stub|EnvironmentHelper
+ */
+ protected $environmentHelperStub;
+ /**
+ * @var Stub|DateTimeHelper
+ */
+ protected $dateTimeHelperStub;
+ /**
+ * @var Stub|HelpersManager
+ */
+ protected $helpersManagerStub;
+ /**
+ * @var Stub|JobsStoreInterface
+ */
+ protected $jobsStoreMock;
+ /**
+ * @var Stub|JobInterface
+ */
+ protected $jobStub;
+ /**
+ * @var Stub|AbstractPayload
+ */
+ protected $payloadStub;
+
+ protected function setUp(): void
+ {
+ $this->moduleConfigurationStub = $this->createStub(ModuleConfiguration::class);
+ $this->sspConfigurationStub = $this->createStub(Configuration::class);
+ $this->loggerMock = $this->createMock(LoggerInterface::class);
+ $this->authenticationDataTrackerBuilderStub = $this->createStub(AuthenticationDataTrackerBuilder::class);
+ $this->authenticationDataTrackerMock = $this->createMock(AuthenticationDataTrackerInterface::class);
+ $this->jobsStoreBuilderStub = $this->createStub(JobsStoreBuilder::class);
+ $this->cacheMock = $this->createMock(CacheInterface::class);
+ $this->stateStub = $this->createStub(JobRunner\State::class);
+ $this->rateLimiterMock = $this->createMock(JobRunner\RateLimiter::class);
+ $this->randomHelperStub = $this->createStub(RandomHelper::class);
+ $this->environmentHelperStub = $this->createStub(EnvironmentHelper::class);
+ $this->dateTimeHelperStub = $this->createStub(DateTimeHelper::class);
+ $this->helpersManagerStub = $this->createStub(HelpersManager::class);
+ $this->jobsStoreMock = $this->createMock(JobsStoreInterface::class);
+ $this->jobStub = $this->createStub(JobInterface::class);
+ $this->payloadStub = $this->createStub(Event::class);
+ }
+
+ public function testCanCreateInstance(): void
+ {
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->assertInstanceOf(
+ JobRunner::class,
+ new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ )
+ );
+ }
+
+ public function testPreRunValidationFailsForSameJobRunnerId(): void
+ {
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->cacheMock->method('get')->willReturn($this->stateStub);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with('Job runner ID in cached state same as new ID.');
+ $this->loggerMock->expects($this->atLeast(2))->method('warning')
+ ->withConsecutive(
+ [$this->stringContains('Pre-run state validation failed. Clearing cached state and continuing.')],
+ [$this->stringContains('Job runner called, however accounting mode is not')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testPreRunValidationFailsForStaleState(): void
+ {
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+ $this->stateStub->method('isStale')->willReturn(true);
+ $this->cacheMock->method('get')->willReturn($this->stateStub);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(3))->method('warning')
+ ->withConsecutive(
+ [$this->stringContains('Stale state encountered.')],
+ [$this->stringContains('Pre-run state validation failed. Clearing cached state and continuing.')],
+ [$this->stringContains('Job runner called, however accounting mode is not')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testPreRunValidationPassesWhenStateIsNull(): void
+ {
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->cacheMock->method('get')->willReturn(null);
+
+ $this->cacheMock->expects($this->never())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(1))->method('warning')
+ ->withConsecutive(
+ [$this->stringContains('Job runner called, however accounting mode is not')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateRunConditionsFailsIfAnotherJobRunnerIsActive(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->cacheMock->method('get')->willReturn($this->stateStub);
+
+ $this->cacheMock->expects($this->never())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('debug')
+ ->with($this->stringContains('Another job runner is active.'));
+ $this->loggerMock->expects($this->once())->method('info')
+ ->with($this->stringContains('Run conditions are not met, stopping.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testAssumeTrueOnJobRunnerActivityIfThrown(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ $this->stateStub,
+ $this->throwException(new Exception('test'))
+ );
+
+ $this->cacheMock->expects($this->never())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with($this->stringContains('Error checking if another job runner is active.'));
+ $this->loggerMock->expects($this->once())->method('info')
+ ->with($this->stringContains('Run conditions are not met, stopping.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanLogCacheClearingError(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->cacheMock->method('get')->willThrowException(new Exception('test'));
+ $this->cacheMock->method('delete')->willThrowException(new Exception('test'));
+
+ $this->expectException(Exception::class);
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with($this->stringContains('Error clearing job runner cache.'));
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateRunConditionsSuccessIfStaleStateEncountered(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+ $this->stateStub->method('isStale')->willReturn(true);
+ $this->cacheMock->method('get')
+ ->willReturnOnConsecutiveCalls(
+ null,
+ $this->stateStub
+ );
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Assuming no job runner is active.'));
+ $this->loggerMock->expects($this->once())->method('debug')
+ ->with($this->stringContains('Run conditions validated.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testShouldRunCheckFailsIfMaximumExecutionTimeIsReached(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->moduleConfigurationStub->method('getJobRunnerMaximumExecutionTime')
+ ->willReturn(new \DateInterval('PT1S'));
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->cacheMock->method('get')->willReturn(null);
+
+ $this->stateStub->method('getStartedAt')->willReturn(new \DateTimeImmutable('-2 seconds'));
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(2))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Maximum job runner execution time reached.')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanUseIniSettingForMaximumExecutionTime(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->moduleConfigurationStub->method('getJobRunnerMaximumExecutionTime')
+ ->willReturn(new \DateInterval('PT20S'));
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+ $this->dateTimeHelperStub->method('convertDateIntervalToSeconds')->willReturn(20);
+ $this->helpersManagerStub->method('getDateTimeHelper')->willReturn($this->dateTimeHelperStub);
+
+ ini_set('max_execution_time', '10');
+
+ $this->cacheMock->method('get')->willReturn(null);
+
+ $this->stateStub->method('getStartedAt')->willReturn(new \DateTimeImmutable('-30 seconds'));
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(3))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Using maximum execution time from INI setting since it is shorter.')],
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Maximum job runner execution time reached.')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testShouldRunCheckFailsIfMaximumNumberOfProcessedJobsIsReached(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->cacheMock->method('get')->willReturn(null);
+
+ $this->stateStub->method('getTotalJobsProcessed')->willReturn(PHP_INT_MAX);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(2))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Maximum number of processed jobs reached.')]
+ );
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateSelfStateFailsIfRunHasNotStartedButCachedStateExists(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(false);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('cached state has already been initialized.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateSelfStateFailsIfRunHasStartedButCachedStateDoesNotExist(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ null,
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('cached state has not been initialized.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateSelfStateFailsIfRunHasStartedButDifferentJobRunnerIdEncountered(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(321);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Current job runner ID differs from the ID in the cached state.'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateSelfStateFailsIfRunHasStartedButStaleCachedStateEncountered(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Job runner cached state is stale'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testValidateSelfStateFailsIfRunHasStartedButGracefulInterruptIsInitiated(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')->willReturn(true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanDoBackoffPauseIfNoJobsInCli(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(true);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(2))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Doing a backoff pause')]
+ );
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanBreakImmediatelyIfNoJobsInWeb(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('debug')
+ ->with($this->stringContains('Run conditions validated'));
+
+ $this->loggerMock->expects($this->never())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanProcessJob(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+ $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass')
+ ->willReturn('mock');
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->rateLimiterMock->expects($this->once())->method('resetBackoffPause');
+
+ $this->authenticationDataTrackerMock->expects($this->once())
+ ->method('process');
+ $this->authenticationDataTrackerBuilderStub->method('build')
+ ->willReturn($this->authenticationDataTrackerMock);
+
+ $this->jobStub->method('getPayload')->willReturn($this->payloadStub);
+ $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub);
+ $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(2))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Successfully processed job with ID')]
+ );
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanLogCacheUpdateError(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+ $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass')
+ ->willReturn('mock');
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('set')->willThrowException(new Exception('test'));
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->jobStub->method('getPayload')->willReturn($this->payloadStub);
+ $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub);
+ $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock);
+
+ $this->loggerMock->expects($this->atLeast(1))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ );
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with($this->stringContains('Error setting job runner state'));
+
+ $this->expectException(Exception::class);
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanPauseProcessingBasedOnConfiguration(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+ $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass')
+ ->willReturn('mock');
+ $this->moduleConfigurationStub->method('getJobRunnerShouldPauseAfterNumberOfJobsProcessed')
+ ->willReturn(0);
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->rateLimiterMock->expects($this->exactly(2))->method('resetBackoffPause');
+ $this->rateLimiterMock->expects($this->once())->method('doPause');
+
+ $this->authenticationDataTrackerMock->expects($this->exactly(2))
+ ->method('process');
+ $this->authenticationDataTrackerBuilderStub->method('build')
+ ->willReturn($this->authenticationDataTrackerMock);
+
+ $this->jobStub->method('getPayload')->willReturn($this->payloadStub);
+ $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub);
+ $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->atLeast(3))->method('debug')
+ ->withConsecutive(
+ [$this->stringContains('Run conditions validated.')],
+ [$this->stringContains('Successfully processed job with ID')],
+ [$this->stringContains('Successfully processed job with ID')]
+ );
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testCanMarkFailedJobOnError(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+ $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass')
+ ->willReturn('mock');
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ null,
+ $this->stateStub,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->rateLimiterMock->expects($this->once())->method('resetBackoffPause');
+
+ $this->authenticationDataTrackerMock->expects($this->once())
+ ->method('process')
+ ->willThrowException(new Exception('test'));
+ $this->authenticationDataTrackerBuilderStub->method('build')
+ ->willReturn($this->authenticationDataTrackerMock);
+
+ $this->jobStub->method('getPayload')->willReturn($this->payloadStub);
+ $this->jobsStoreMock->method('dequeue')->willReturn($this->jobStub);
+ $this->jobsStoreMock->expects($this->once())->method('markFailedJob')->with($this->jobStub);
+ $this->jobsStoreBuilderStub->method('build')->willReturn($this->jobsStoreMock);
+
+ $this->cacheMock->expects($this->once())->method('delete');
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with($this->stringContains('Error while processing jobs.'));
+
+ $this->loggerMock->expects($this->once())->method('warning')
+ ->with($this->stringContains('Graceful job processing interrupt initiated'));
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+
+ public function testThrowsOnAlreadyInitializedState(): void
+ {
+ $this->moduleConfigurationStub->method('getAccountingProcessingType')
+ ->willReturn(ModuleConfiguration\AccountingProcessingType::VALUE_ASYNCHRONOUS);
+ $this->moduleConfigurationStub->method('getDefaultDataTrackerAndProviderClass')
+ ->willReturn('mock');
+
+ $this->randomHelperStub->method('getRandomInt')->willReturn(123);
+ $this->helpersManagerStub->method('getRandomHelper')->willReturn($this->randomHelperStub);
+ $this->environmentHelperStub->method('isCli')->willReturn(false);
+ $this->helpersManagerStub->method('getEnvironmentHelper')->willReturn($this->environmentHelperStub);
+
+ $this->stateStub->method('getJobRunnerId')->willReturn(123);
+ $this->stateStub->method('isStale')->willReturn(false);
+ $this->stateStub->method('getIsGracefulInterruptInitiated')
+ ->willReturnOnConsecutiveCalls(false, true);
+
+ $this->cacheMock->method('get')->willReturnOnConsecutiveCalls(
+ null,
+ null,
+ $this->stateStub
+ );
+
+ $this->stateStub->method('hasRunStarted')->willReturn(true);
+
+ $this->loggerMock->expects($this->once())->method('error')
+ ->with($this->stringContains('Job runner state already initialized'));
+ $this->expectException(Exception::class);
+
+ $jobRunner = new JobRunner(
+ $this->moduleConfigurationStub,
+ $this->sspConfigurationStub,
+ $this->loggerMock,
+ $this->authenticationDataTrackerBuilderStub,
+ $this->jobsStoreBuilderStub,
+ $this->cacheMock,
+ $this->stateStub,
+ $this->rateLimiterMock,
+ $this->helpersManagerStub
+ );
+
+ $jobRunner->run();
+ }
+}