Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Command/JobShowCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -26,7 +26,7 @@ class JobShowCommand extends Command
Job::STATUS_COMPLETED => 'Completed',
];

public function __construct(private JobRepository $jobRepository, private ConnectionFactory $connectionFactory)
public function __construct(private JobGateway $jobGateway, private ConnectionFactory $connectionFactory)
{
parent::__construct();
}
Expand Down Expand Up @@ -58,9 +58,9 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}

if ($scheduleId) {
$job = $this->jobRepository->findLastForDataflowId($scheduleId);
$job = $this->jobGateway->findLastForDataflowId($scheduleId);
} elseif ($jobId) {
$job = $this->jobRepository->find($jobId);
$job = $this->jobGateway->find($jobId);
} else {
$io->error('You must pass `job-id` or `schedule-id` option.');

Expand Down
8 changes: 7 additions & 1 deletion src/DependencyInjection/CodeRhapsodieDataflowExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
class CodeRhapsodieDataflowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container)
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yaml');
Expand All @@ -29,6 +29,12 @@ public function load(array $configs, ContainerBuilder $container)

$container->setParameter('coderhapsodie.dataflow.dbal_default_connection', $config['dbal_default_connection']);
$container->setParameter('coderhapsodie.dataflow.default_logger', $config['default_logger']);
$container->setParameter('coderhapsodie.dataflow.exceptions_mode.type', $config['exceptions_mode']['type']);

if ($config['exceptions_mode']['type'] === 'file') {
$container->setParameter('coderhapsodie.dataflow.flysystem_service', $config['exceptions_mode']['flysystem_service']);
$loader->load('exceptions_services.yaml');
}

if ($config['messenger_mode']['enabled']) {
$container->setParameter('coderhapsodie.dataflow.bus', $config['messenger_mode']['bus']);
Expand Down
2 changes: 1 addition & 1 deletion src/DependencyInjection/Compiler/BusCompilerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class BusCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.bus')) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
class DataflowTypeCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (!$container->has(DataflowTypeRegistry::class)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class DefaultLoggerCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
$defaultLogger = $container->getParameter('coderhapsodie.dataflow.default_logger');
if (!$container->has($defaultLogger)) {
Expand Down
29 changes: 29 additions & 0 deletions src/DependencyInjection/Compiler/ExceptionCompilerPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\DependencyInjection\Compiler;

use CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Reference;

class ExceptionCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (!$container->hasParameter('coderhapsodie.dataflow.flysystem_service')) {
return;
}

$flysystem = $container->getParameter('coderhapsodie.dataflow.flysystem_service');
if (!$container->has($flysystem)) {
throw new InvalidArgumentException(\sprintf('Service "%s" not found', $flysystem));
}

$definition = $container->findDefinition(FilesystemExceptionHandler::class);
$definition->setArgument('$filesystem', new Reference($flysystem));
}
}
14 changes: 14 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ public function getConfigTreeBuilder(): TreeBuilder
->thenInvalid('You need "symfony/messenger" in order to use Dataflow messenger mode.')
->end()
->end()
->arrayNode('exceptions_mode')
->addDefaultsIfNotSet()
->children()
->scalarNode('type')
->defaultValue('database')
->end()
->scalarNode('flysystem_service')
->end()
->validate()
->ifTrue(static fn ($v): bool => $v['type'] === 'file' && !interface_exists('\League\Flysystem\Filesystem'))
->thenInvalid('You need "league/flysystem" to use Dataflow file exception mode.')
->end()
->end()
->end()
->end()
;

Expand Down
12 changes: 12 additions & 0 deletions src/ExceptionsHandler/ExceptionHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

interface ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void;

public function find(int $jobId): ?array;
}
37 changes: 37 additions & 0 deletions src/ExceptionsHandler/FilesystemExceptionHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

use League\Flysystem\Filesystem;
use League\Flysystem\FilesystemException;

class FilesystemExceptionHandler implements ExceptionHandlerInterface
{
public function __construct(private Filesystem $filesystem)
{
}

public function save(?int $jobId, ?array $exceptions): void
{
if ($jobId === null || empty($exceptions)) {
return;
}

$this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions));
}

public function find(int $jobId): ?array
{
try {
if (!$this->filesystem->has(\sprintf('dataflow-job-%s.log', $jobId))) {
return [];
}

return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true);
} catch (FilesystemException) {
return [];
}
}
}
17 changes: 17 additions & 0 deletions src/ExceptionsHandler/NullExceptionHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\ExceptionsHandler;

class NullExceptionHandler implements ExceptionHandlerInterface
{
public function save(?int $jobId, ?array $exceptions): void
{
}

public function find(int $jobId): ?array
{
return null;
}
}
52 changes: 52 additions & 0 deletions src/Gateway/JobGateway.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace CodeRhapsodie\DataflowBundle\Gateway;

use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler;
use CodeRhapsodie\DataflowBundle\Repository\JobRepository;

class JobGateway
{
public function __construct(private JobRepository $repository, private ExceptionHandlerInterface $exceptionHandler)
{
}

public function find(int $jobId): ?Job
{
$job = $this->repository->find($jobId);

return $this->loadExceptions($job);
}

public function save(Job $job): void
{
if (!$this->exceptionHandler instanceof NullExceptionHandler) {
$this->exceptionHandler->save($job->getId(), $job->getExceptions());
$job->setExceptions([]);
}

$this->repository->save($job);
}

public function findLastForDataflowId(int $scheduleId): ?Job
{
$job = $this->repository->findLastForDataflowId($scheduleId);

return $this->loadExceptions($job);
}

private function loadExceptions(?Job $job): ?Job
{
if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) {
return $job;
}

$this->exceptionHandler->save($job->getId(), $job->getExceptions());

return $job->setExceptions($this->exceptionHandler->find($job->getId()));
}
}
14 changes: 10 additions & 4 deletions src/Processor/JobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Logger\BufferHandler;
use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
Expand All @@ -22,8 +23,12 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface
{
use LoggerAwareTrait;

public function __construct(private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher)
{
public function __construct(
private JobRepository $repository,
private DataflowTypeRegistryInterface $registry,
private EventDispatcherInterface $dispatcher,
private JobGateway $jobGateway,
) {
}

public function process(Job $job): void
Expand Down Expand Up @@ -64,7 +69,7 @@ private function beforeProcessing(Job $job): void
->setStatus(Job::STATUS_RUNNING)
->setStartTime(new \DateTime())
;
$this->repository->save($job);
$this->jobGateway->save($job);
}

private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void
Expand All @@ -75,7 +80,8 @@ private function afterProcessing(Job $job, Result $result, BufferHandler $buffer
->setCount($result->getSuccessCount())
->setExceptions($bufferLogger->clearBuffer())
;
$this->repository->save($job);

$this->jobGateway->save($job);

$this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING);
}
Expand Down
5 changes: 5 additions & 0 deletions src/Resources/config/exceptions_services.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\FilesystemExceptionHandler:
arguments:
$filesystem: ~ # Filled in compiler pass
9 changes: 8 additions & 1 deletion src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:

CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
arguments:
$jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
$connectionFactory: '@CodeRhapsodie\DataflowBundle\Factory\ConnectionFactory'
tags: ['console.command']

Expand Down Expand Up @@ -93,3 +93,10 @@ services:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
$dispatcher: '@event_dispatcher'
$jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler'
CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler:
CodeRhapsodie\DataflowBundle\Gateway\JobGateway:
arguments:
$repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
$exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface'
Loading