Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 5.4.0
* Add possibility to save exceptions in file

# Version 5.3.1
* Fix interface naming

Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ framework:
CodeRhapsodie\DataflowBundle\MessengerMode\JobMessage: async
```

### Exceptions mode
Dataflow can save exceptions in any filesystem you want.
This allows dataflow to save exceptions in filesystem instead of the database
You have to install `league/flysystem`.

To enable exceptions mode:

```yaml
code_rhapsodie_dataflow:
exceptions_mode:
type: 'file'
flysystem_service: 'app.filesystem' #The name of the \League\Flysystem\Filesystem service
```

## Define a dataflow type

This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of
Expand Down
7 changes: 5 additions & 2 deletions Tests/Processor/JobProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
Expand All @@ -20,14 +21,16 @@ class JobProcessorTest extends TestCase
private JobRepository|MockObject $repository;
private DataflowTypeRegistryInterface|MockObject $registry;
private EventDispatcherInterface|MockObject $dispatcher;
private JobGateway|MockObject $jobGateway;

protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->jobGateway = $this->createMock(JobGateway::class);

$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway);
}

public function testProcess()
Expand Down Expand Up @@ -72,7 +75,7 @@ public function testProcess()
->willReturn($result)
;

$this->repository
$this->jobGateway
->expects($this->exactly(2))
->method('save')
;
Expand Down
8 changes: 4 additions & 4 deletions src/Command/JobShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -26,7 +26,7 @@ class JobShowCommand extends Command
Job::STATUS_COMPLETED => 'Completed',
];

public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory)
{
parent::__construct();
}
Expand Down Expand Up @@ -58,9 +58,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}

if ($scheduleId) {
$job = $this->jobRepository->findLastForDataflowId($scheduleId);
$job = $this->jobGateway->findLastForDataflowId($scheduleId);
} elseif ($jobId) {
$job = $this->jobRepository->find($jobId);
$job = $this->jobGateway->find($jobId);
} else {
$io->error('You must pass `job-id` or `schedule-id` option.');

Expand Down
8 changes: 7 additions & 1 deletion src/DependencyInjection/CodeRhapsodieDataflowExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
class CodeRhapsodieDataflowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yaml');
Expand All @@ -29,6 +29,12 @@ public function load(array $configs, ContainerBuilder $container)

$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);

if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
$loader->load('exceptions_services.yaml');
}

if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
Expand Down
2 changes: 1 addition & 1 deletion src/DependencyInjection/Compiler/BusCompilerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class BusCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->has(DataflowTypeRegistry::class)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class DefaultLoggerCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
if (!$container->has($defaultLogger)) {
Expand Down
29 changes: 29 additions & 0 deletions src/DependencyInjection/Compiler/ExceptionCompilerPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;

use CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;

class ExceptionCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.flysystem_service')) {
return;
}

$flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service');
if (!$container->has($flysystem)) {
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem));
}

$definition = $container->findDefinition(FilesystemExceptionHandler::class);
$definition->setArgument('$filesystem', new Reference($flysystem));
}
}
14 changes: 14 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ public function getConfigTreeBuilder(): TreeBuilder
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->arrayNode('exceptions_mode')
->addDefaultsIfNotSet()
->children()
->scalarNode('type')
->defaultValue('database')
->end()
->scalarNode('flysystem_service')
->end()
->validate()
->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem'))
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->end()
->end()
;

Expand Down
12 changes: 12 additions & 0 deletions src/ExceptionsHandler/ExceptionHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

interface ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void;

public function find(int $jobId): ?array;
}
37 changes: 37 additions & 0 deletions src/ExceptionsHandler/FilesystemExceptionHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

use League\Flysystem\Filesystem;
use League\Flysystem\FilesystemException;

class FilesystemExceptionHandler implements ExceptionHandlerInterface
{
public function __construct(private Filesystem $filesystem)
{
}

public function save(?int $jobId, ?array $exceptions): void
{
if ($jobId === null || empty($exceptions)) {
return;
}

$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
}

public function find(int $jobId): ?array
{
try {
if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) {
return [];
}

return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
} catch (FilesystemException) {
return [];
}
}
}
17 changes: 17 additions & 0 deletions src/ExceptionsHandler/NullExceptionHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

class NullExceptionHandler implements ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void
{
}

public function find(int $jobId): ?array
{
return null;
}
}
52 changes: 52 additions & 0 deletions src/Gateway/JobGateway.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Gateway;

use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;

class JobGateway
{
public function __construct(private JobRepository $repository, private ExceptionHandlerInterface $exceptionHandler)
{
}

public function find(int $jobId): ?Job
{
$job = $this->repository->find($jobId);

return $this->loadExceptions($job);
}

public function save(Job $job): void
{
if (!$this->exceptionHandler instanceof NullExceptionHandler) {
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
$job->setExceptions([]);
}

$this->repository->save($job);
}

public function findLastForDataflowId(int $scheduleId): ?Job
{
$job = $this->repository->findLastForDataflowId($scheduleId);

return $this->loadExceptions($job);
}

private function loadExceptions(?Job $job): ?Job
{
if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) {
return $job;
}

$this->exceptionHandler->save($job->getId(), $job->getExceptions());

return $job->setExceptions($this->exceptionHandler->find($job->getId()));
}
}
14 changes: 10 additions & 4 deletions src/Processor/JobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
Expand All @@ -22,8 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;

public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
{
public function __construct(
private JobRepository $repository,
private DataflowTypeRegistryInterface $registry,
private EventDispatcherInterface $dispatcher,
private JobGateway $jobGateway,
) {
}

public function process(Job $job): void
Expand Down Expand Up @@ -64,7 +69,7 @@ private function beforeProcessing(Job $job): void
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
$this->jobGateway->save($job);
}

private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
Expand All @@ -75,7 +80,8 @@ private function afterProcessing(Job $job, Result $result, BufferHandler $buffer
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);

$this->jobGateway->save($job);

$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
Expand Down
5 changes: 5 additions & 0 deletions src/Resources/config/exceptions_services.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler:
arguments:
$filesystem: ~ # Filled in compiler pass
9 changes: 8 additions & 1 deletion src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:

CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']

Expand Down Expand Up @@ -93,3 +93,10 @@ services:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$dispatcher: '@event_dispatcher'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler:
CodeRhapsodie\DataflowBundle\Gateway\JobGateway:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'