From c3d0490d4a043a7f1fe7ee896533b6e78b413837 Mon Sep 17 00:00:00 2001 From: ErickSkrauch Date: Tue, 16 Aug 2016 01:27:18 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B5=D0=BE=D1=80=D0=B3=D0=B0=D0=BD?= =?UTF-8?q?=D0=B8=D0=B7=D0=BE=D0=B2=D0=B0=D0=BD=20=D0=BA=D0=BE=D0=BC=D0=BF?= =?UTF-8?q?=D0=BE=D0=BD=D0=B5=D0=BD=D1=82=20=D0=B4=D0=BB=D1=8F=20=D0=BA?= =?UTF-8?q?=D0=BE=D0=BD=D1=82=D1=80=D0=BE=D0=BB=D0=BB=D0=B5=D1=80=D0=B0=20?= =?UTF-8?q?amqp=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B5=D0=B9=20?= =?UTF-8?q?=D0=B2=20=D0=BD=D0=B0=D1=88=20=D0=B2=D0=BD=D0=B5=D1=88=D0=BD?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=BF=D0=B0=D0=BA=D0=B5=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/components/RabbitMQ/Controller.php | 228 -------------------- common/components/RabbitMQ/MessageTrait.php | 16 -- composer.json | 7 +- console/controllers/base/AmqpController.php | 29 ++- 4 files changed, 34 insertions(+), 246 deletions(-) delete mode 100644 common/components/RabbitMQ/Controller.php delete mode 100644 common/components/RabbitMQ/MessageTrait.php diff --git a/common/components/RabbitMQ/Controller.php b/common/components/RabbitMQ/Controller.php deleted file mode 100644 index 143bae7..0000000 --- a/common/components/RabbitMQ/Controller.php +++ /dev/null @@ -1,228 +0,0 @@ -configureListen(); - } - - /** - * Имя exchange, который будет прослушивать этот интерпретатор - * - * @return string - */ - abstract public function getExchangeName(); - - /** - * Есть метод вернёт null, то будет создана временная очередь, которая будет автоматически удалена - * после завершения процесса её обработчика - * - * @return null|string - */ - public function getQueueName() { - return null; - } - - /** - * @return Component - */ - protected function getAmqp() { - return Yii::$app->amqp; - } - - /** - * Позволяет задать карту route из amqp к обработчику внутри класса: - * return [ - * 'accounts.change-username' => 'routeChangeUsername', - * 'accounts.delete-account' => 'routeAccountDeleted', - * ]; - * - * @return array - */ - public function getRoutesMap() { - return []; - } - - protected function configureListen() { - $exchangeName = $this->getExchangeName(); - $connection = $this->getAmqp()->getConnection(); - $channel = $this->getAmqp()->getChannel(); - call_user_func_array([$channel, 'exchange_declare'], $this->getExchangeDeclareArgs()); - list($queueName) = call_user_func_array([$channel, 'queue_declare'], $this->getQueueDeclareArgs()); - // TODO: нужно продумать механизм для подписки на множество роутов - call_user_func_array([$channel, 'queue_bind'], $this->getQueueBindArgs($exchangeName, $queueName)); - call_user_func_array([$channel, 'basic_consume'], $this->getBasicConsumeArgs($queueName)); - $channel->basic_qos(null, 1, true); - - while(count($channel->callbacks)) { - $channel->wait(); - } - - $channel->close(); - $connection->close(); - } - - public function callback(AMQPMessage $msg) { - $body = Json::decode($msg->body, true); - /** @var string $routingKey */ - $routingKey = $msg->delivery_info['routing_key']; - $map = $this->getRoutesMap(); - if (isset($map[$routingKey])) { - $method = $map[$routingKey]; - } else { - $method = 'route' . Inflector::camelize($routingKey); - } - - if (!method_exists($this, $method)) { - $this->log( - sprintf('Unknown routing key "%s" for exchange "%s".', $routingKey, $this->getExchangeName()), - static::MESSAGE_ERROR - ); - - $this->log( - print_r($body, true), - static::MESSAGE_INFO - ); - } - - // Инверсия значения, т.к. параметр называется no_ack, то есть уже инвертирован - $isAckRequired = !ArrayHelper::getValue($this->getBasicConsumeArgs($this->getQueueName()), 3, true); - $result = $this->getResult($method, $body, $msg); - if ($isAckRequired) { - if ($result === false) { - $this->reject($msg, true); - } else { - $this->ack($msg); - } - } - } - - private function getResult($method, $body, $msg) { - try { - $result = $this->$method($this->prepareArguments($method, $body), $msg); - } catch(Exception $e) { - if (strstr($e->getMessage(), '2006 MySQL server has gone away') !== false) { - Console::output(Console::ansiFormat('Server gone away, try to reconnect', [Console::FG_GREY])); - Yii::$app->db->close(); - Yii::$app->db->open(); - Console::output(Console::ansiFormat('recall method', [Console::FG_GREY])); - $result = $this->$method($body, $msg); - } else { - throw $e; - } - } - - return $result; - } - - private function prepareArguments($methodName, $body) { - $method = new ReflectionMethod($this, $methodName); - $parameters = $method->getParameters(); - if (!isset($parameters[0])) { - return $body; - } - - $bodyParam = $parameters[0]; - if (PHP_MAJOR_VERSION === 7) { - // TODO: логика для php7 не тестировалась, так то не факт, что оно взлетит на php7 - if (!$bodyParam->hasType() || $bodyParam->isArray()) { - return $body; - } - - $type = (string)$bodyParam->getType(); - $object = new $type; - } else { - $class = $bodyParam->getClass(); - if ($class === null) { - return $body; - } - - $type = $class->name; - $object = new $type; - } - - return Yii::configure($object, $body); - } - - /** - * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::exchange_declare() - * По умолчанию создаётся очередь с типом fanout. Кроме того, в отличие от стандартных аргументов, - * здесь указано, что auto_delete в false состоянии - * - * @return array - */ - protected function getExchangeDeclareArgs() { - return [$this->getExchangeName(), Component::TYPE_FANOUT, false, false, false]; - } - - /** - * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_declare() - * - * Если метод getQueueName() не переопределён и в нём не задано имя очереди, то будет создана - * временная очередь, которая будет автоматически удалена после завершения работы всех Consumer'ов - * Если же есть фиксированное имя очереди, то она будет создана с аргументом - * auto_delete в false (4 индекс массива) - * - * @return array - */ - protected function getQueueDeclareArgs() { - $queueName = $this->getQueueName(); - if ($queueName === null) { - return []; - } else { - return [$queueName, false, false, false, false]; - } - } - - /** - * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_bind() - * - * @param string $exchangeName - * @param string $queueName - * @return array - */ - protected function getQueueBindArgs($exchangeName, $queueName) { - return [$queueName, $exchangeName]; - } - - /** - * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::basic_consume() - * По умолчанию здесь находятся стандартные аргументы для этого метода - * - * @param string $queueName - * @return array - */ - protected function getBasicConsumeArgs($queueName) { - return [$queueName, '', false, false, false, false, [$this, 'callback']]; - } - - /** - * Logs info and error messages. - * - * TODO: что-то мне подсказывает, что ему тут не место - * - * @param $message - * @param $type - */ - protected function log($message, $type = self::MESSAGE_INFO) { - $format = [$type == self::MESSAGE_ERROR ? Console::FG_RED : Console::FG_BLUE]; - Console::output(Console::ansiFormat($message, $format)); - } - -} diff --git a/common/components/RabbitMQ/MessageTrait.php b/common/components/RabbitMQ/MessageTrait.php deleted file mode 100644 index d39808e..0000000 --- a/common/components/RabbitMQ/MessageTrait.php +++ /dev/null @@ -1,16 +0,0 @@ -delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); - } - - public function reject(AMQPMessage $msg, $requeue = true) { - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); - } - -} diff --git a/composer.json b/composer.json index 51435db..a962b3b 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,8 @@ "guzzlehttp/guzzle": "^6.0.0", "php-amqplib/php-amqplib": "~2.6.2", "ely/yii2-tempmail-validator": "~1.0.0", - "emarref/jwt": "dev-master#1e4fdf731f9fdfbc5906659ef5384715197fd90b" + "emarref/jwt": "dev-master#1e4fdf731f9fdfbc5906659ef5384715197fd90b", + "ely/amqp-controller": "^0.1.0" }, "require-dev": { "yiisoft/yii2-codeception": "*", @@ -47,6 +48,10 @@ { "type": "git", "url": "git@github.com:erickskrauch/jwt.git" + }, + { + "type": "git", + "url": "git@gitlab.com:elyby/amqp-controller.git" } ], "scripts": { diff --git a/console/controllers/base/AmqpController.php b/console/controllers/base/AmqpController.php index 32527f6..9828298 100644 --- a/console/controllers/base/AmqpController.php +++ b/console/controllers/base/AmqpController.php @@ -1,8 +1,35 @@ start(); + } + + public function getRoutesMap() { + return []; + } + + /** + * @inheritdoc + */ + protected function getConnection() { + return Yii::$app->amqp->getConnection(); + } + + /** + * @inheritdoc + */ + protected function buildRouteActionName($route) { + return ArrayHelper::getValue($this->getRoutesMap(), $route, 'route' . Inflector::camelize($route)); + } }