Merge branch 'rabbit_mq'

This commit is contained in:
ErickSkrauch 2016-04-15 01:45:52 +03:00
commit 085869f2bc
12 changed files with 459 additions and 1 deletions

View File

@ -0,0 +1,177 @@
<?php
namespace common\components\RabbitMQ;
use yii\base\Exception;
use yii\helpers\Json;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Не гибкий компонент для работы с RabbitMQ, заточенный под нужны текущего проекта
*
* Компонент основан на расширении Alexey Kuznetsov <mirakuru@webtoucher.ru>
*
* @property AMQPStreamConnection $connection AMQP connection.
* @property AMQPChannel $channel AMQP channel.
*/
class Component extends \yii\base\Component {
const TYPE_TOPIC = 'topic';
const TYPE_DIRECT = 'direct';
const TYPE_HEADERS = 'headers';
const TYPE_FANOUT = 'fanout';
/**
* @var AMQPStreamConnection
*/
protected $amqpConnection;
/**
* @var AMQPChannel[]
*/
protected $channels = [];
/**
* @var string
*/
public $host = '127.0.0.1';
/**
* @var integer
*/
public $port = 5672;
/**
* @var string
*/
public $user;
/**
* @var string
*/
public $password;
/**
* @var string
*/
public $vhost = '/';
/**
* @inheritdoc
*/
public function init() {
parent::init();
if (empty($this->user)) {
throw new Exception("Parameter 'user' was not set for AMQP connection.");
}
}
/**
* @return AMQPStreamConnection
*/
public function getConnection() {
if (!$this->amqpConnection) {
$this->amqpConnection = new AMQPStreamConnection(
$this->host,
$this->port,
$this->user,
$this->password,
$this->vhost
);
}
return $this->amqpConnection;
}
/**
* @param string $channel_id
* @return AMQPChannel
*/
public function getChannel($channel_id = null) {
$index = $channel_id ?: 'default';
if (!array_key_exists($index, $this->channels)) {
$this->channels[$index] = $this->getConnection()->channel($channel_id);
}
return $this->channels[$index];
}
// TODO: метод sendToQueue
/**
* Sends message to the exchange.
*
* @param string $exchangeName
* @param string $routingKey
* @param string|array $message
* @param array $exchangeArgs
* @param array $publishArgs
*/
public function sendToExchange($exchangeName, $routingKey, $message, $exchangeArgs = [], $publishArgs = []) {
$message = $this->prepareMessage($message);
$channel = $this->getChannel();
call_user_func_array([$channel, 'exchange_declare'], $this->prepareExchangeArgs($exchangeName, $exchangeArgs));
call_user_func_array([$channel, 'basic_publish'], $this->preparePublishArgs($message, $exchangeName, $routingKey, $publishArgs));
}
/**
* Объединяет переданный набор аргументов с поведением по умолчанию
*
* @param string $exchangeName
* @param array $args
* @return array
*/
protected function prepareExchangeArgs($exchangeName, array $args) {
return array_replace([
$exchangeName,
self::TYPE_FANOUT,
false,
false,
false,
], $args);
}
/**
* Объединяет переданный набор аргументов с поведением по умолчанию
*
* @param AMQPMessage $message
* @param string $exchangeName
* @param string $routeKey
* @param array $args
*
* @return array
*/
protected function preparePublishArgs($message, $exchangeName, $routeKey, array $args) {
return array_replace([
$message,
$exchangeName,
$routeKey,
], $args);
}
/**
* Returns prepaired AMQP message.
*
* @param string|array|object $message
* @param array $properties
* @return AMQPMessage
* @throws Exception If message is empty.
*/
public function prepareMessage($message, $properties = null) {
if ($message instanceof AMQPMessage) {
return $message;
}
if (empty($message)) {
throw new Exception('AMQP message can not be empty');
}
if (is_array($message) || is_object($message)) {
$message = Json::encode($message);
}
return new AMQPMessage($message, $properties);
}
}

View File

@ -0,0 +1,179 @@
<?php
namespace common\components\RabbitMQ;
use PhpAmqpLib\Message\AMQPMessage;
use Yii;
use yii\db\Exception;
use yii\helpers\ArrayHelper;
use yii\helpers\Console;
use yii\helpers\Inflector;
use yii\helpers\Json;
abstract class Controller extends \yii\console\Controller {
use MessageTrait;
const MESSAGE_INFO = 0;
const MESSAGE_ERROR = 1;
public $defaultAction = 'run';
public function actionRun() {
$this->configureListen();
}
/**
* Имя exchange, который будет прослушивать этот интерпретатор
*
* @return string
*/
abstract public function getExchangeName();
/**
* Есть метод вернёт null, то будет создана временная очередь, которая будет автоматически удалена
* после завершения процесса её обработчика
*
* @return null|string
*/
public function getQueueName() {
return null;
}
/**
* @return Component
*/
protected function getAmqp() {
return Yii::$app->get('amqp');
}
protected function configureListen() {
$exchangeName = $this->getExchangeName();
$connection = $this->getAmqp()->getConnection();
$channel = $this->getAmqp()->getChannel();
call_user_func_array([$channel, 'exchange_declare'], $this->getExchangeDeclareArgs());
list($queueName) = call_user_func_array([$channel, 'queue_declare'], $this->getQueueDeclareArgs());
// TODO: нужно продумать механизм для подписки на множество роутов
call_user_func_array([$channel, 'queue_bind'], $this->getQueueBindArgs($exchangeName, $queueName));
call_user_func_array([$channel, 'basic_consume'], $this->getBasicConsumeArgs($queueName));
$channel->basic_qos(null, 1, true);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
public function callback(AMQPMessage $msg) {
$routingKey = $msg->delivery_info['routing_key'];
$method = 'route' . Inflector::camelize($routingKey);
$body = Json::decode($msg->body, true);
if (!method_exists($this, $method)) {
$this->log(
sprintf('Unknown routing key "%s" for exchange "%s".', $routingKey, $this->getExchangeName()),
static::MESSAGE_ERROR
);
$this->log(
print_r($body, true),
static::MESSAGE_INFO
);
}
// Инверсия значения, т.к. параметр называется no_ack, то есть уже инвертирован
$isAckRequired = !ArrayHelper::getValue($this->getBasicConsumeArgs($this->getQueueName()), 3, true);
$result = $this->getResult($method, $body, $msg);
if ($isAckRequired) {
if ($result === false) {
$this->reject($msg, true);
} else {
$this->ack($msg);
}
}
}
private function getResult($method, $body, $msg) {
try {
$result = $this->$method($body, $msg);
} catch(Exception $e) {
if (strstr($e->getMessage(), '2006 MySQL server has gone away') !== false) {
Console::output(Console::ansiFormat('Server gone away, try to reconnect', [Console::FG_GREY]));
Yii::$app->db->close();
Yii::$app->db->open();
Console::output(Console::ansiFormat('recall method', [Console::FG_GREY]));
$result = $this->$method($body, $msg);
} else {
throw $e;
}
}
return $result;
}
/**
* Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::exchange_declare()
* По умолчанию создаётся очередь с типом fanout. Кроме того, в отличие от стандартных аргументов,
* здесь указано, что auto_delete в false состоянии
*
* @return array
*/
protected function getExchangeDeclareArgs() {
return [$this->getExchangeName(), Component::TYPE_FANOUT, false, false, false];
}
/**
* Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_declare()
*
* Если метод getQueueName() не переопределён и в нём не задано имя очереди, то будет создана
* временная очередь, которая будет автоматически удалена после завершения работы всех Consumer'ов
* Если же есть фиксированное имя очереди, то она будет создана с аргументом
* auto_delete в false (4 индекс массива)
*
* @return array
*/
protected function getQueueDeclareArgs() {
$queueName = $this->getQueueName();
if ($queueName === null) {
return [];
} else {
return [$queueName, false, false, false, false];
}
}
/**
* Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_bind()
*
* @param string $exchangeName
* @param string $queueName
* @return array
*/
protected function getQueueBindArgs($exchangeName, $queueName) {
return [$queueName, $exchangeName];
}
/**
* Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::basic_consume()
* По умолчанию здесь находятся стандартные аргументы для этого метода
*
* @param string $queueName
* @return array
*/
protected function getBasicConsumeArgs($queueName) {
return [$queueName, '', false, false, false, false, [$this, 'callback']];
}
/**
* Logs info and error messages.
*
* TODO: что-то мне подсказывает, что ему тут не место
*
* @param $message
* @param $type
*/
protected function log($message, $type = self::MESSAGE_INFO) {
$format = [$type == self::MESSAGE_ERROR ? Console::FG_RED : Console::FG_BLUE];
Console::output(Console::ansiFormat($message, $format));
}
}

View File

@ -0,0 +1,19 @@
<?php
namespace common\components\RabbitMQ;
use Yii;
class Helper {
/**
* @return Component $amqp
*/
public static function getInstance() {
return Yii::$app->get('amqp');
}
public static function sendToExchange($exchange, $routingKey, $message, $exchangeArgs = []) {
static::getInstance()->sendToExchange($exchange, $routingKey, $message, $exchangeArgs);
}
}

View File

@ -0,0 +1,16 @@
<?php
namespace common\components\RabbitMQ;
use PhpAmqpLib\Message\AMQPMessage;
trait MessageTrait {
public function ack(AMQPMessage $msg) {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
public function reject(AMQPMessage $msg, $requeue = true) {
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue);
}
}

View File

@ -19,5 +19,8 @@ return [
'redis' => [
'class' => 'yii\redis\Connection',
],
'amqp' => [
'class' => \common\components\RabbitMQ\Component::class,
],
],
];

9
common/helpers/Amqp.php Normal file
View File

@ -0,0 +1,9 @@
<?php
namespace common\helpers;
use common\components\RabbitMQ\Helper;
use Yii;
class Amqp extends Helper {
}

View File

@ -22,7 +22,8 @@
"league/oauth2-server": "~4.1.5",
"yiisoft/yii2-redis": "~2.0.0",
"damirka/yii2-jwt": "dev-master#c3fdbf2efc7e547e92d884aa7f71a7880a1e6a8e",
"guzzlehttp/guzzle": "~5.3.0"
"guzzlehttp/guzzle": "~5.3.0",
"php-amqplib/php-amqplib": "~2.6.2"
},
"require-dev": {
"yiisoft/yii2-codeception": "*",

View File

@ -0,0 +1,32 @@
<?php
namespace console\controllers;
use common\components\RabbitMQ\Component as RabbitMQComponent;
use console\controllers\base\AmqpController;
class AccountQueueController extends AmqpController {
public function getExchangeName() {
return 'account';
}
public function getQueueName() {
return 'account-operations';
}
public function getExchangeDeclareArgs() {
return array_replace(parent::getExchangeDeclareArgs(), [
1 => RabbitMQComponent::TYPE_DIRECT, // exchange-type -> direct
3 => false, // no-ack -> false
]);
}
public function getQueueBindArgs($exchangeName, $queueName) {
return [$exchangeName, $queueName, '#']; // Мы хотим получать сюда все события по аккаунту
}
public function routeChangeUsername($body) {
// TODO: implement this
}
}

View File

@ -0,0 +1,8 @@
<?php
namespace console\controllers\base;
use common\components\RabbitMQ\Controller;
abstract class AmqpController extends Controller {
}

View File

@ -18,5 +18,12 @@ return [
'port' => 6379,
'database' => 0,
],
'amqp' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'root',
'password' => '',
'vhost' => '/',
],
],
];

View File

@ -12,5 +12,12 @@ return [
'port' => 6379,
'database' => 0,
],
'amqp' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'root',
'password' => '',
'vhost' => '/',
],
],
];