Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ waitForService redis 6379 50
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create --if-not-exists
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"enqueue/fs": "*@dev",
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/mongodb": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
Expand Down Expand Up @@ -143,6 +144,10 @@
{
"type": "path",
"url": "pkg/async-event-dispatcher"
},
{
"type": "path",
"url": "pkg/mongodb"
}
]
}
9 changes: 8 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- zookeeper
- google-pubsub
- rabbitmqssl
- mongo
volumes:
- './:/mqdev'
environment:
Expand All @@ -24,7 +25,7 @@ services:
- RABBITMQ_PASSWORD=guest
- RABBITMQ_VHOST=mqdev
- RABBITMQ_AMQP__PORT=5672
- RABBITMQ_STOMP_PORT=61613
- RABBITMQ_STOMP_PORT=61613
- DOCTRINE_DRIVER=pdo_mysql
- DOCTRINE_HOST=mysql
- DOCTRINE_PORT=3306
Expand All @@ -44,6 +45,7 @@ services:
- RDKAFKA_PORT=9092
- PUBSUB_EMULATOR_HOST=http://google-pubsub:8085
- GCLOUD_PROJECT=mqdev
- MONGO_DSN=mongodb://mongo

rabbitmq:
image: 'enqueue/rabbitmq:latest'
Expand Down Expand Up @@ -102,6 +104,11 @@ services:
image: 'google/cloud-sdk:latest'
entrypoint: 'gcloud beta emulators pubsub start --host-port=0.0.0.0:8085'

mongo:
image: mongo
ports:
- "27017:27017"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an empty line after the service definition.


volumes:
mysql-data:
driver: local
7 changes: 7 additions & 0 deletions pkg/mongodb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
/examples/
186 changes: 186 additions & 0 deletions pkg/mongodb/Client/MongodbDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
<?php

namespace Enqueue\Mongodb\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\MessagePriority;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Mongodb\MongodbContext;
use Enqueue\Mongodb\MongodbMessage;
use Interop\Queue\PsrMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class MongodbDriver implements DriverInterface
{
/**
* @var MongodbContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @var array
*/
private static $priorityMap = [
MessagePriority::VERY_LOW => 0,
MessagePriority::LOW => 1,
MessagePriority::NORMAL => 2,
MessagePriority::HIGH => 3,
MessagePriority::VERY_HIGH => 4,
];

/**
* @param MongodbContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(MongodbContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*
* @return MongodbMessage
*/
public function createTransportMessage(Message $message)
{
$properties = $message->getProperties();

$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($properties);
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());
$transportMessage->setDeliveryDelay($message->getDelay());
$transportMessage->setReplyTo($message->getReplyTo());
$transportMessage->setCorrelationId($message->getCorrelationId());
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
}

return $transportMessage;
}

/**
* @param MongodbMessage $message
*
* {@inheritdoc}
*/
public function createClientMessage(PsrMessage $message)
{
$clientMessage = new Message();

$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setContentType($message->getHeader('content_type'));
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setDelay($message->getDeliveryDelay());
$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

$priorityMap = array_flip(self::$priorityMap);
$priority = array_key_exists($message->getPriority(), $priorityMap) ?
$priorityMap[$message->getPriority()] :
MessagePriority::NORMAL;
$clientMessage->setPriority($priority);

return $clientMessage;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$queue = $this->createQueue($this->config->getRouterQueueName());
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($queue, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();

return $this->context->createQueue($transportName);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$log = function ($text, ...$args) use ($logger) {
$logger->debug(sprintf('[MongodbDriver] '.$text, ...$args));
};
$contextConfig = $this->context->getConfig();
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
$this->context->createCollection();
}

/**
* {@inheritdoc}
*/
public function getConfig()
{
return $this->config;
}

/**
* @return array
*/
public static function getPriorityMap()
{
return self::$priorityMap;
}
}
103 changes: 103 additions & 0 deletions pkg/mongodb/MongodbConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

namespace Enqueue\Mongodb;

use Interop\Queue\PsrConnectionFactory;
use MongoDB\Client;

class MongodbConnectionFactory implements PsrConnectionFactory
{
/**
* @var array
*/
private $config;

/**
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Mongodb localhost with default credentials.
*
* $config = [
* 'uri' => 'mongodb://127.0.0.1/' - Mongodb connection string. see http://docs.mongodb.org/manual/reference/connection-string/
* 'dbname' => 'enqueue', - database name.
* 'collection_name' => 'enqueue' - collection name
* 'polling_interval' => '1000', - How often query for new messages (milliseconds)
* ]
*
* or
*
* mongodb://127.0.0.1:27017/dbname?polling_interval=1000&enqueue_collection=enqueue
*
* @param array|string|null $config
*/
public function __construct($config = 'mongodb:')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add a docblcok with the information on possible configuration options

{
if (empty($config)) {
$config = $this->parseDsn('mongodb:');
} elseif (is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}
$config = array_replace([
'uri' => 'mongodb://127.0.0.1/',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add dbname and a collection here as well

'dbname' => 'enqueue',
'collection_name' => 'enqueue',
], $config);

$this->config = $config;
}

public function createContext()
{
$client = new Client($this->config['uri']);

return new MongodbContext($client, $this->config);
}

public static function parseDsn($dsn)
{
$parsedUrl = parse_url($dsn);
if (false === $parsedUrl) {
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}
if (empty($parsedUrl['scheme'])) {
throw new \LogicException('Schema is empty');
}
$supported = [
'mongodb' => true,
];
if (false == isset($parsedUrl['scheme'])) {
throw new \LogicException(sprintf(
'The given DSN schema "%s" is not supported. There are supported schemes: "%s".',
$parsedUrl['scheme'],
implode('", "', array_keys($supported))
));
}
if ('mongodb:' === $dsn) {
return [
'uri' => 'mongodb://127.0.0.1/',
];
}
$config['uri'] = $dsn;
if (isset($parsedUrl['path']) && '/' !== $parsedUrl['path']) {
$pathParts = explode('/', $parsedUrl['path']);
//DB name
if ($pathParts[1]) {
$config['dbname'] = $pathParts[1];
}
}
if (isset($parsedUrl['query'])) {
$queryParts = null;
parse_str($parsedUrl['query'], $queryParts);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use of undefined var, define $queryParts = null just before this line

//get enqueue attributes values
if (!empty($queryParts['polling_interval'])) {
$config['polling_interval'] = $queryParts['polling_interval'];
}
if (!empty($queryParts['enqueue_collection'])) {
$config['collection_name'] = $queryParts['enqueue_collection'];
}
}

return $config;
}
}
Loading