Implemented WebHooks delivery queue.

Completely removed usage of the RabbitMQ. Queue now based on Redis channels.
Worker process now extracted as separate docker container.
Base image upgraded to the 1.8.0 version (PHP 7.2.7 and pcntl extension).
This commit is contained in:
ErickSkrauch
2018-07-08 18:20:19 +03:00
parent 6751eb6591
commit c0aa78d156
55 changed files with 933 additions and 1684 deletions

View File

@@ -0,0 +1,64 @@
<?php
declare(strict_types=1);
namespace common\tasks;
use common\models\Account;
use Yii;
use yii\queue\RetryableJobInterface;
class ClearAccountSessions implements RetryableJobInterface {
public $accountId;
public static function createFromAccount(Account $account): self {
$result = new static();
$result->accountId = $account->id;
return $result;
}
/**
* @return int time to reserve in seconds
*/
public function getTtr(): int {
return 5 * 60;
}
/**
* @param int $attempt number
* @param \Exception|\Throwable $error from last execute of the job
*
* @return bool
*/
public function canRetry($attempt, $error): bool {
return true;
}
/**
* @param \yii\queue\Queue $queue which pushed and is handling the job
* @throws \Exception
*/
public function execute($queue): void {
$account = Account::findOne($this->accountId);
if ($account === null) {
return;
}
foreach ($account->getSessions()->each(100, Yii::$app->unbufferedDb) as $authSession) {
/** @var \common\models\AccountSession $authSession */
$authSession->delete();
}
foreach ($account->getMinecraftAccessKeys()->each(100, Yii::$app->unbufferedDb) as $key) {
/** @var \common\models\MinecraftAccessKey $key */
$key->delete();
}
foreach ($account->getOauthSessions()->each(100, Yii::$app->unbufferedDb) as $oauthSession) {
/** @var \common\models\OauthSession $oauthSession */
$oauthSession->delete();
}
}
}

View File

@@ -0,0 +1,76 @@
<?php
declare(strict_types=1);
namespace common\tasks;
use common\models\Account;
use common\models\WebHook;
use Yii;
use yii\queue\RetryableJobInterface;
class CreateWebHooksDeliveries implements RetryableJobInterface {
/**
* @var string
*/
public $type;
/**
* @var array
*/
public $payloads;
public static function createAccountEdit(Account $account, array $changedAttributes): self {
$result = new static();
$result->type = 'account.edit';
$result->payloads = [
'id' => $account->id,
'uuid' => $account->uuid,
'username' => $account->username,
'email' => $account->email,
'lang' => $account->lang,
'isActive' => $account->status === Account::STATUS_ACTIVE,
'registered' => date('c', (int)$account->created_at),
'changedAttributes' => $changedAttributes,
];
return $result;
}
/**
* @return int time to reserve in seconds
*/
public function getTtr() {
return 10;
}
/**
* @param int $attempt number
* @param \Exception|\Throwable $error from last execute of the job
*
* @return bool
*/
public function canRetry($attempt, $error) {
return true;
}
/**
* @param \yii\queue\Queue $queue which pushed and is handling the job
*/
public function execute($queue) {
/** @var WebHook[] $targets */
$targets = WebHook::find()
->joinWith('events e', false)
->andWhere(['e.event_type' => $this->type])
->all();
foreach ($targets as $target) {
$job = new DeliveryWebHook();
$job->type = $this->type;
$job->url = $target->url;
$job->secret = $target->secret;
$job->payloads = $this->payloads;
Yii::$app->queue->push($job);
}
}
}

View File

@@ -0,0 +1,110 @@
<?php
declare(strict_types=1);
namespace common\tasks;
use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Exception\ClientException;
use GuzzleHttp\Exception\ConnectException;
use GuzzleHttp\Exception\ServerException;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Middleware;
use Psr\Http\Message\RequestInterface;
use Yii;
use yii\queue\RetryableJobInterface;
class DeliveryWebHook implements RetryableJobInterface {
/**
* @var string
*/
public $type;
/**
* @var string
*/
public $url;
/**
* @var string|null
*/
public $secret;
/**
* @var array
*/
public $payloads;
/**
* @return int time to reserve in seconds
*/
public function getTtr(): int {
return 65;
}
/**
* @param int $attempt number
* @param \Exception|\Throwable $error from last execute of the job
*
* @return bool
*/
public function canRetry($attempt, $error): bool {
if ($attempt >= 5) {
return false;
}
if ($error instanceof ServerException || $error instanceof ConnectException) {
return true;
}
return false;
}
/**
* @param \yii\queue\Queue $queue which pushed and is handling the job
*
* @throws \GuzzleHttp\Exception\GuzzleException
*/
public function execute($queue): void {
$client = $this->createClient();
try {
$client->request('POST', $this->url, [
'headers' => [
'User-Agent' => 'Account-Ely-Hookshot/' . Yii::$app->version,
'X-Ely-Accounts-Event' => $this->type,
],
'form_params' => $this->payloads,
]);
} catch (ClientException $e) {
Yii::info("Delivery for {$this->url} has failed with {$e->getResponse()->getStatusCode()} status.");
return;
}
}
protected function createClient(): ClientInterface {
return new GuzzleClient([
'handler' => $this->createStack(),
'timeout' => 60,
'connect_timeout' => 10,
]);
}
protected function createStack(): HandlerStack {
$stack = HandlerStack::create();
$stack->push(Middleware::mapRequest(function(RequestInterface $request): RequestInterface {
if (empty($this->secret)) {
return $request;
}
$payload = (string)$request->getBody();
$signature = hash_hmac('sha1', $payload, $this->secret);
/** @noinspection ExceptionsAnnotatingAndHandlingInspection */
return $request->withHeader('X-Hub-Signature', 'sha1=' . $signature);
}));
return $stack;
}
}

View File

@@ -0,0 +1,72 @@
<?php
declare(strict_types=1);
namespace common\tasks;
use api\exceptions\ThisShouldNotHappenException;
use common\components\Mojang\Api as MojangApi;
use common\components\Mojang\exceptions\MojangApiException;
use common\components\Mojang\exceptions\NoContentException;
use common\models\Account;
use common\models\MojangUsername;
use GuzzleHttp\Exception\RequestException;
use Yii;
use yii\queue\JobInterface;
class PullMojangUsername implements JobInterface {
public $username;
public static function createFromAccount(Account $account): self {
$result = new static();
$result->username = $account->username;
return $result;
}
/**
* @param \yii\queue\Queue $queue which pushed and is handling the job
*
* @throws \Exception
*/
public function execute($queue) {
Yii::$app->statsd->inc('queue.pullMojangUsername.attempt');
$mojangApi = $this->createMojangApi();
try {
$response = $mojangApi->usernameToUUID($this->username);
Yii::$app->statsd->inc('queue.pullMojangUsername.found');
} catch (NoContentException $e) {
$response = false;
Yii::$app->statsd->inc('queue.pullMojangUsername.not_found');
} catch (RequestException | MojangApiException $e) {
Yii::$app->statsd->inc('queue.pullMojangUsername.error');
return;
}
/** @var MojangUsername|null $mojangUsername */
$mojangUsername = MojangUsername::findOne($this->username);
if ($response === false) {
if ($mojangUsername !== null) {
$mojangUsername->delete();
}
} else {
if ($mojangUsername === null) {
$mojangUsername = new MojangUsername();
$mojangUsername->username = $response->name;
$mojangUsername->uuid = $response->id;
} else {
$mojangUsername->uuid = $response->id;
$mojangUsername->touch('last_pulled_at');
}
if (!$mojangUsername->save()) {
throw new ThisShouldNotHappenException('Cannot save mojang username');
}
}
}
protected function createMojangApi(): MojangApi {
return new MojangApi();
}
}