From 189e3231df6879f0908a02b6f83793c30e4cfb5c Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 15:21:17 +0200 Subject: [PATCH 1/8] * Add possibility to save exceptions in file --- .../CodeRhapsodieDataflowExtension.php | 7 ++++- .../Compiler/BusCompilerPass.php | 2 +- .../Compiler/DataflowTypeCompilerPass.php | 2 +- .../Compiler/DefaultLoggerCompilerPass.php | 2 +- .../Compiler/ExceptionCompilerPass.php | 29 +++++++++++++++++++ src/DependencyInjection/Configuration.php | 14 +++++++++ src/Processor/JobProcessor.php | 17 +++++++++-- 7 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 src/DependencyInjection/Compiler/ExceptionCompilerPass.php diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index 7c00a85..a10a643 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -15,7 +15,7 @@ */ class CodeRhapsodieDataflowExtension extends Extension { - public function load(array $configs, ContainerBuilder $container) + public function load(array $configs, ContainerBuilder $container): void { $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yaml'); @@ -29,6 +29,11 @@ public function load(array $configs, ContainerBuilder $container) $container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']); $container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']); + $container->setParameter('coderhapsodie.dataflow.exception_mode.type', $config['exception_mode']['type']); + + if ($config['exception_mode']['type'] === 'file') { + $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exception_mode']['flysystem_service']); + } if ($config['messenger_mode']['enabled']) { $container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']); diff --git a/src/DependencyInjection/Compiler/BusCompilerPass.php b/src/DependencyInjection/Compiler/BusCompilerPass.php index 5a8114f..9c95169 100644 --- a/src/DependencyInjection/Compiler/BusCompilerPass.php +++ b/src/DependencyInjection/Compiler/BusCompilerPass.php @@ -12,7 +12,7 @@ class BusCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { if (!$container->hasParameter('coderhapsodie.dataflow.bus')) { return; diff --git a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php index 2c0ba24..d2ad94d 100644 --- a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php +++ b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php @@ -16,7 +16,7 @@ */ class DataflowTypeCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { if (!$container->has(DataflowTypeRegistry::class)) { return; diff --git a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php index e3c3a41..fe5fdfb 100644 --- a/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php +++ b/src/DependencyInjection/Compiler/DefaultLoggerCompilerPass.php @@ -12,7 +12,7 @@ class DefaultLoggerCompilerPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { $defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger'); if (!$container->has($defaultLogger)) { diff --git a/src/DependencyInjection/Compiler/ExceptionCompilerPass.php b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php new file mode 100644 index 0000000..f201b9a --- /dev/null +++ b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php @@ -0,0 +1,29 @@ +hasParameter('coderhapsodie.dataflow.flysystem_service')) { + return; + } + + $flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service'); + if (!$container->has($flysystem)) { + throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem)); + } + + $definition = $container->findDefinition(JobProcessor::class); + $definition->setArgument('$filesystem', new Reference($flysystem)); + } +} diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index a4bf333..d21fb15 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -38,6 +38,20 @@ public function getConfigTreeBuilder(): TreeBuilder ->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.') ->end() ->end() + ->arrayNode('exceptions_mode') + ->addDefaultsIfNotSet() + ->children() + ->scalarNode('type') + ->defaultValue('database') + ->end() + ->scalarNode('flysystem_service') + ->end() + ->validate() + ->ifTrue(static fn($v): bool => 'file' === $v['type'] && !interface_exists('\League\Flysystem\Filesystem')) + ->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.') + ->end() + ->end() + ->end() ->end() ; diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index b59d995..00ac52c 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -13,6 +13,7 @@ use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; +use League\Flysystem\Filesystem; use Monolog\Logger; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; @@ -22,7 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface { use LoggerAwareTrait; - public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher) + public function __construct( + private JobRepository $repository, + private DataflowTypeRegistryInterface $registry, + private EventDispatcherInterface $dispatcher, + private ?Filesystem $filesystem = null + ) { } @@ -69,12 +75,19 @@ private function beforeProcessing(Job $job): void private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void { + $exceptions = $bufferLogger->clearBuffer(); + if ($this->filesystem) { + $this->filesystem->write(sprintf('dataflow-job-%s.log',$job->getId()), json_encode($exceptions)); + $exceptions = []; + } + $job ->setEndTime($result->getEndTime()) ->setStatus(Job::STATUS_COMPLETED) ->setCount($result->getSuccessCount()) - ->setExceptions($bufferLogger->clearBuffer()) + ->setExceptions($exceptions) ; + $this->repository->save($job); $this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING); From 1b08498691f4f8d0b46d0aa36c52ad7bb67fa02e Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 15:27:59 +0200 Subject: [PATCH 2/8] * Add possibility to save exceptions in file --- src/DependencyInjection/CodeRhapsodieDataflowExtension.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index a10a643..72267bd 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -29,10 +29,10 @@ public function load(array $configs, ContainerBuilder $container): void $container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']); $container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']); - $container->setParameter('coderhapsodie.dataflow.exception_mode.type', $config['exception_mode']['type']); + $container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']); - if ($config['exception_mode']['type'] === 'file') { - $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exception_mode']['flysystem_service']); + if ($config['exceptions_mode']['type'] === 'file') { + $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']); } if ($config['messenger_mode']['enabled']) { From bcd77f99af7a61ca01f73d0788062249aa764a2a Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:19:45 +0200 Subject: [PATCH 3/8] * Add possibility to save exceptions in file --- src/Command/JobShowCommand.php | 7 +-- .../CodeRhapsodieDataflowExtension.php | 1 + .../Compiler/ExceptionCompilerPass.php | 4 +- .../ExceptionHandlerInterface.php | 12 +++++ .../FilesystemExceptionHandler.php | 37 +++++++++++++ .../NullExceptionHandler.php | 18 +++++++ src/Gateway/JobGateway.php | 52 +++++++++++++++++++ src/Processor/JobProcessor.php | 15 ++---- src/Resources/config/exceptions_services.yaml | 5 ++ src/Resources/config/services.yaml | 9 +++- 10 files changed, 144 insertions(+), 16 deletions(-) create mode 100644 src/ExceptionsHandler/ExceptionHandlerInterface.php create mode 100644 src/ExceptionsHandler/FilesystemExceptionHandler.php create mode 100644 src/ExceptionsHandler/NullExceptionHandler.php create mode 100644 src/Gateway/JobGateway.php create mode 100644 src/Resources/config/exceptions_services.yaml diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 67e5412..5a9b35a 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -6,6 +6,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; @@ -26,7 +27,7 @@ class JobShowCommand extends Command Job::STATUS_COMPLETED => 'Completed', ]; - public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory) + public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory) { parent::__construct(); } @@ -58,9 +59,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int } if ($scheduleId) { - $job = $this->jobRepository->findLastForDataflowId($scheduleId); + $job = $this->jobGateway->findLastForDataflowId($scheduleId); } elseif ($jobId) { - $job = $this->jobRepository->find($jobId); + $job = $this->jobGateway->find($jobId); } else { $io->error('You must pass `job-id` or `schedule-id` option.'); diff --git a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php index 72267bd..ca4bb9c 100644 --- a/src/DependencyInjection/CodeRhapsodieDataflowExtension.php +++ b/src/DependencyInjection/CodeRhapsodieDataflowExtension.php @@ -33,6 +33,7 @@ public function load(array $configs, ContainerBuilder $container): void if ($config['exceptions_mode']['type'] === 'file') { $container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']); + $loader->load('exceptions_services.yaml'); } if ($config['messenger_mode']['enabled']) { diff --git a/src/DependencyInjection/Compiler/ExceptionCompilerPass.php b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php index f201b9a..105f56f 100644 --- a/src/DependencyInjection/Compiler/ExceptionCompilerPass.php +++ b/src/DependencyInjection/Compiler/ExceptionCompilerPass.php @@ -4,7 +4,7 @@ namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler; -use CodeRhapsodie\DataflowBundle\Processor\JobProcessor; +use CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException; @@ -23,7 +23,7 @@ public function process(ContainerBuilder $container): void throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem)); } - $definition = $container->findDefinition(JobProcessor::class); + $definition = $container->findDefinition(FilesystemExceptionHandler::class); $definition->setArgument('$filesystem', new Reference($flysystem)); } } diff --git a/src/ExceptionsHandler/ExceptionHandlerInterface.php b/src/ExceptionsHandler/ExceptionHandlerInterface.php new file mode 100644 index 0000000..81c12f5 --- /dev/null +++ b/src/ExceptionsHandler/ExceptionHandlerInterface.php @@ -0,0 +1,12 @@ +filesystem->write(sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions)); + } + + public function find(int $jobId): ?array + { + try { + if (!$this->filesystem->has(sprintf('dataflow-job-%s.log', $jobId))) { + return []; + } + + return json_decode($this->filesystem->read(sprintf('dataflow-job-%s.log', $jobId)), true); + } catch (FilesystemException) { + return []; + } + } +} \ No newline at end of file diff --git a/src/ExceptionsHandler/NullExceptionHandler.php b/src/ExceptionsHandler/NullExceptionHandler.php new file mode 100644 index 0000000..9a8ad55 --- /dev/null +++ b/src/ExceptionsHandler/NullExceptionHandler.php @@ -0,0 +1,18 @@ +repository->find($jobId); + + return $this->loadExceptions($job); + } + + public function save(Job $job): void + { + if (!$this->exceptionHandler instanceof NullExceptionHandler) { + $this->exceptionHandler->save($job->getId(), $job->getExceptions()); + $job->setExceptions([]); + } + + $this->repository->save($job); + } + + public function findLastForDataflowId(int $scheduleId): ?Job + { + $job = $this->repository->findLastForDataflowId($scheduleId); + + return $this->loadExceptions($job); + } + + private function loadExceptions(?Job $job): ?Job + { + if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) { + return $job; + } + + $this->exceptionHandler->save($job->getId(), $job->getExceptions()); + + return $job->setExceptions($this->exceptionHandler->find($job->getId())); + } +} \ No newline at end of file diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index 00ac52c..dbe658a 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -9,6 +9,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Logger\BufferHandler; use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; @@ -27,7 +28,7 @@ public function __construct( private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher, - private ?Filesystem $filesystem = null + private JobGateway $jobGateway, ) { } @@ -70,25 +71,19 @@ private function beforeProcessing(Job $job): void ->setStatus(Job::STATUS_RUNNING) ->setStartTime(new \DateTime()) ; - $this->repository->save($job); + $this->jobGateway->save($job); } private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void { - $exceptions = $bufferLogger->clearBuffer(); - if ($this->filesystem) { - $this->filesystem->write(sprintf('dataflow-job-%s.log',$job->getId()), json_encode($exceptions)); - $exceptions = []; - } - $job ->setEndTime($result->getEndTime()) ->setStatus(Job::STATUS_COMPLETED) ->setCount($result->getSuccessCount()) - ->setExceptions($exceptions) + ->setExceptions($bufferLogger->clearBuffer()) ; - $this->repository->save($job); + $this->jobGateway->save($job); $this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING); } diff --git a/src/Resources/config/exceptions_services.yaml b/src/Resources/config/exceptions_services.yaml new file mode 100644 index 0000000..44b142a --- /dev/null +++ b/src/Resources/config/exceptions_services.yaml @@ -0,0 +1,5 @@ +services: + CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler: + arguments: + $filesystem: ~ # Filled in compiler pass diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 3233846..dce5b62 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -28,7 +28,7 @@ services: CodeRhapsodie\DataflowBundle\Command\JobShowCommand: arguments: - $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' $connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory' tags: ['console.command'] @@ -93,3 +93,10 @@ services: $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface' $dispatcher: '@event_dispatcher' + $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler' + CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler: + CodeRhapsodie\DataflowBundle\Gateway\JobGateway: + arguments: + $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository' + $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface' From 484ed6db7beb6e8ebca3fb76dd7e068532941853 Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:20:30 +0200 Subject: [PATCH 4/8] * Add possibility to save exceptions in file --- src/Command/JobShowCommand.php | 1 - src/DependencyInjection/Configuration.php | 2 +- src/ExceptionsHandler/ExceptionHandlerInterface.php | 2 +- src/ExceptionsHandler/FilesystemExceptionHandler.php | 8 ++++---- src/ExceptionsHandler/NullExceptionHandler.php | 3 +-- src/Gateway/JobGateway.php | 2 +- src/Processor/JobProcessor.php | 4 +--- 7 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php index 5a9b35a..2863b94 100644 --- a/src/Command/JobShowCommand.php +++ b/src/Command/JobShowCommand.php @@ -7,7 +7,6 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory; use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; -use CodeRhapsodie\DataflowBundle\Repository\JobRepository; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index d21fb15..bd0a67b 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -47,7 +47,7 @@ public function getConfigTreeBuilder(): TreeBuilder ->scalarNode('flysystem_service') ->end() ->validate() - ->ifTrue(static fn($v): bool => 'file' === $v['type'] && !interface_exists('\League\Flysystem\Filesystem')) + ->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem')) ->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.') ->end() ->end() diff --git a/src/ExceptionsHandler/ExceptionHandlerInterface.php b/src/ExceptionsHandler/ExceptionHandlerInterface.php index 81c12f5..a950c85 100644 --- a/src/ExceptionsHandler/ExceptionHandlerInterface.php +++ b/src/ExceptionsHandler/ExceptionHandlerInterface.php @@ -9,4 +9,4 @@ interface ExceptionHandlerInterface public function save(?int $jobId, ?array $exceptions): void; public function find(int $jobId): ?array; -} \ No newline at end of file +} diff --git a/src/ExceptionsHandler/FilesystemExceptionHandler.php b/src/ExceptionsHandler/FilesystemExceptionHandler.php index b08eead..1fd5bac 100644 --- a/src/ExceptionsHandler/FilesystemExceptionHandler.php +++ b/src/ExceptionsHandler/FilesystemExceptionHandler.php @@ -19,19 +19,19 @@ public function save(?int $jobId, ?array $exceptions): void return; } - $this->filesystem->write(sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions)); + $this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions)); } public function find(int $jobId): ?array { try { - if (!$this->filesystem->has(sprintf('dataflow-job-%s.log', $jobId))) { + if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) { return []; } - return json_decode($this->filesystem->read(sprintf('dataflow-job-%s.log', $jobId)), true); + return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true); } catch (FilesystemException) { return []; } } -} \ No newline at end of file +} diff --git a/src/ExceptionsHandler/NullExceptionHandler.php b/src/ExceptionsHandler/NullExceptionHandler.php index 9a8ad55..c3a0d2d 100644 --- a/src/ExceptionsHandler/NullExceptionHandler.php +++ b/src/ExceptionsHandler/NullExceptionHandler.php @@ -6,7 +6,6 @@ class NullExceptionHandler implements ExceptionHandlerInterface { - public function save(?int $jobId, ?array $exceptions): void { } @@ -15,4 +14,4 @@ public function find(int $jobId): ?array { return null; } -} \ No newline at end of file +} diff --git a/src/Gateway/JobGateway.php b/src/Gateway/JobGateway.php index 8e3c1d3..fa24e41 100644 --- a/src/Gateway/JobGateway.php +++ b/src/Gateway/JobGateway.php @@ -49,4 +49,4 @@ private function loadExceptions(?Job $job): ?Job return $job->setExceptions($this->exceptionHandler->find($job->getId())); } -} \ No newline at end of file +} diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index dbe658a..e7019db 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -14,7 +14,6 @@ use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; -use League\Flysystem\Filesystem; use Monolog\Logger; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; @@ -29,8 +28,7 @@ public function __construct( private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher, private JobGateway $jobGateway, - ) - { + ) { } public function process(Job $job): void From eef5e6f3a0730d04cbe752a4dc5da5df4a8be8af Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:23:45 +0200 Subject: [PATCH 5/8] * Add possibility to save exceptions in file --- Tests/Processor/JobProcessorTest.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php index f1879ee..e9b4c89 100644 --- a/Tests/Processor/JobProcessorTest.php +++ b/Tests/Processor/JobProcessorTest.php @@ -7,6 +7,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Processor\JobProcessor; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; @@ -20,14 +21,16 @@ class JobProcessorTest extends TestCase private JobRepository|MockObject $repository; private DataflowTypeRegistryInterface|MockObject $registry; private EventDispatcherInterface|MockObject $dispatcher; + private JobGateway|MockObject $jobGateway; protected function setUp(): void { $this->repository = $this->createMock(JobRepository::class); $this->registry = $this->createMock(DataflowTypeRegistryInterface::class); $this->dispatcher = $this->createMock(EventDispatcherInterface::class); + $this->jobGateway = $this->createMock(JobGateway::class); - $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher); + $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway); } public function testProcess() From 4996128bd0ed80aa912bea4b8994779c8926c947 Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:24:48 +0200 Subject: [PATCH 6/8] * Add possibility to save exceptions in file --- Tests/Processor/JobProcessorTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php index e9b4c89..bd37342 100644 --- a/Tests/Processor/JobProcessorTest.php +++ b/Tests/Processor/JobProcessorTest.php @@ -75,7 +75,7 @@ public function testProcess() ->willReturn($result) ; - $this->repository + $this->jobGateway ->expects($this->exactly(2)) ->method('save') ; From 605a2bd6f3a126eb0ae5cc960fbc36b506774efa Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:34:26 +0200 Subject: [PATCH 7/8] * Add possibility to save exceptions in file --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 85a4aa5..84638b5 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,20 @@ framework: CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async ``` +### Exceptions mode +Dataflow can save exceptions in any filesystem you want. +This allows dataflow to save exceptions in filesystem instead of the database +You have to install `league/flysystem`. + +To enable exceptions mode: + +```yaml +code_rhapsodie_dataflow: + exceptions_mode: + type: 'file' + flysystem_service: 'app.filesystem' #The name of the \League\Flysystem\Filesystem service +``` + ## Define a dataflow type This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of From b35372ab269ab4680869d27492fca549ebd36eeb Mon Sep 17 00:00:00 2001 From: loic Date: Tue, 7 Oct 2025 16:35:12 +0200 Subject: [PATCH 8/8] * Add possibility to save exceptions in file --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e63eef..5e8db49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Version 5.4.0 +* Add possibility to save exceptions in file + # Version 5.3.1 * Fix interface naming