Реализовано автоматическое восстановление соединения с AMQP

This commit is contained in:
ErickSkrauch 2017-09-03 22:45:38 +03:00
parent 9cb6502f9c
commit 8007b082d6
5 changed files with 86 additions and 89 deletions

20
Gopkg.lock generated
View File

@ -1,6 +1,12 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/assembla/cony"
packages = ["."]
revision = "dd62697b0adb9adfda8589520cb85f4cbc2361f1"
version = "v0.3.2"
[[projects]] [[projects]]
name = "github.com/certifi/gocertifi" name = "github.com/certifi/gocertifi"
packages = ["."] packages = ["."]
@ -65,7 +71,7 @@
branch = "master" branch = "master"
name = "github.com/mediocregopher/radix.v2" name = "github.com/mediocregopher/radix.v2"
packages = ["cluster","pool","redis","util"] packages = ["cluster","pool","redis","util"]
revision = "0fe87e4a1bde3bb29991ea5ee1d9d536b77877d5" revision = "d234cfb904a91daafa4e1f92599a893b349cc0c2"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -107,7 +113,7 @@
branch = "master" branch = "master"
name = "github.com/spf13/afero" name = "github.com/spf13/afero"
packages = [".","mem"] packages = [".","mem"]
revision = "36f8810e2e3d7eeac4ac05b57f65690fbfba62a2" revision = "ee1bd8ee15a1306d1f9201acc41ef39cd9f99a1b"
[[projects]] [[projects]]
name = "github.com/spf13/cast" name = "github.com/spf13/cast"
@ -119,13 +125,13 @@
branch = "master" branch = "master"
name = "github.com/spf13/cobra" name = "github.com/spf13/cobra"
packages = ["."] packages = ["."]
revision = "2df9a531813370438a4d79bfc33e21f58063ed87" revision = "3c0b56b677e04926dfa835a1b3f11cd4f62f076e"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "github.com/spf13/jwalterweatherman" name = "github.com/spf13/jwalterweatherman"
packages = ["."] packages = ["."]
revision = "0efa5202c04663c757d84f90f5219c1250baf94f" revision = "12bd96e66386c1960ab0f74ced1362f66f552f7b"
[[projects]] [[projects]]
name = "github.com/spf13/pflag" name = "github.com/spf13/pflag"
@ -155,13 +161,13 @@
branch = "master" branch = "master"
name = "golang.org/x/sys" name = "golang.org/x/sys"
packages = ["unix"] packages = ["unix"]
revision = "2d6f6f883a06fc0d5f4b14a81e4c28705ea64c15" revision = "7ddbeae9ae08c6a06a59597f0c9edbc5ff2444ce"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "golang.org/x/text" name = "golang.org/x/text"
packages = ["internal/gen","internal/triegen","internal/ucd","transform","unicode/cldr","unicode/norm"] packages = ["internal/gen","internal/triegen","internal/ucd","transform","unicode/cldr","unicode/norm"]
revision = "ac87088df8ef557f1e32cd00ed0b6fbc3f7ddafb" revision = "bd91bbf73e9a4a801adbfb97133c992678533126"
[[projects]] [[projects]]
name = "gopkg.in/h2non/gock.v1" name = "gopkg.in/h2non/gock.v1"
@ -178,6 +184,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "fe0c38f46a10e79ddfd7449f98bcde02dbac62ec20cb9d957f661c6da079013f" inputs-digest = "dd545fafc23f9b6429b5b679ad5c213c14c819f1e4ea381823acf338651122e1"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -17,10 +17,11 @@ ignored = ["elyby/minecraft-skinsystem"]
name = "github.com/spf13/viper" name = "github.com/spf13/viper"
[[constraint]] [[constraint]]
name = "github.com/streadway/amqp" name = "github.com/getsentry/raven-go"
[[constraint]] [[constraint]]
name = "github.com/getsentry/raven-go" name = "github.com/assembla/cony"
version = "^0.3.2"
# Testing dependencies # Testing dependencies

View File

@ -5,12 +5,12 @@ import (
"net/url" "net/url"
"os" "os"
"github.com/assembla/cony"
"github.com/getsentry/raven-go" "github.com/getsentry/raven-go"
"github.com/mono83/slf/rays" "github.com/mono83/slf/rays"
"github.com/mono83/slf/recievers/ansi" "github.com/mono83/slf/recievers/ansi"
"github.com/mono83/slf/recievers/statsd" "github.com/mono83/slf/recievers/statsd"
"github.com/mono83/slf/wd" "github.com/mono83/slf/wd"
"github.com/streadway/amqp"
"elyby/minecraft-skinsystem/logger/receivers/sentry" "elyby/minecraft-skinsystem/logger/receivers/sentry"
) )
@ -72,7 +72,7 @@ type RabbitMQConfig struct {
Vhost string Vhost string
} }
func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) { func CreateRabbitMQClient(config *RabbitMQConfig) *cony.Client {
addr := fmt.Sprintf( addr := fmt.Sprintf(
"amqp://%s:%s@%s:%d/%s", "amqp://%s:%s@%s:%d/%s",
config.Username, config.Username,
@ -82,15 +82,7 @@ func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) {
url.PathEscape(config.Vhost), url.PathEscape(config.Vhost),
) )
rabbitConnection, err := amqp.Dial(addr) client := cony.NewClient(cony.URL(addr), cony.Backoff(cony.DefaultBackoff))
if err != nil {
return nil, err
}
rabbitChannel, err := rabbitConnection.Channel() return client
if err != nil {
return nil, err
}
return rabbitChannel, nil
} }

View File

@ -33,19 +33,14 @@ var amqpWorkerCmd = &cobra.Command{
} }
logger.Info("Skins repository successfully initialized") logger.Info("Skins repository successfully initialized")
logger.Info("Initializing AMQP connection") logger.Info("Creating AMQP client")
amqpChannel, err := bootstrap.CreateRabbitMQChannel(&bootstrap.RabbitMQConfig{ amqpClient := bootstrap.CreateRabbitMQClient(&bootstrap.RabbitMQConfig{
Host: viper.GetString("amqp.host"), Host: viper.GetString("amqp.host"),
Port: viper.GetInt("amqp.port"), Port: viper.GetInt("amqp.port"),
Username: viper.GetString("amqp.username"), Username: viper.GetString("amqp.username"),
Password: viper.GetString("amqp.password"), Password: viper.GetString("amqp.password"),
Vhost: viper.GetString("amqp.vhost"), Vhost: viper.GetString("amqp.vhost"),
}) })
if err != nil {
logger.Emergency(fmt.Sprintf("Error on connecting AMQP: %+v", err))
return
}
logger.Info("AMQP connection successfully initialized")
accountsApi := (&accounts.Config{ accountsApi := (&accounts.Config{
Addr: viper.GetString("api.accounts.host"), Addr: viper.GetString("api.accounts.host"),
@ -56,7 +51,7 @@ var amqpWorkerCmd = &cobra.Command{
services := &worker.Services{ services := &worker.Services{
Logger: logger, Logger: logger,
Channel: amqpChannel, AmqpClient: amqpClient,
SkinsRepo: skinsRepo, SkinsRepo: skinsRepo,
AccountsAPI: accountsApi, AccountsAPI: accountsApi,
} }

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/assembla/cony"
"github.com/mono83/slf/wd" "github.com/mono83/slf/wd"
"github.com/streadway/amqp" "github.com/streadway/amqp"
@ -13,7 +14,7 @@ import (
) )
type Services struct { type Services struct {
Channel *amqp.Channel AmqpClient *cony.Client
SkinsRepo interfaces.SkinsRepository SkinsRepo interfaces.SkinsRepository
AccountsAPI interfaces.AccountsAPI AccountsAPI interfaces.AccountsAPI
Logger wd.Watchdog Logger wd.Watchdog
@ -42,18 +43,28 @@ const exchangeName string = "events"
const queueName string = "skinsystem-accounts-events" const queueName string = "skinsystem-accounts-events"
func (service *Services) Run() error { func (service *Services) Run() error {
deliveryChannel, err := setupConsume(service.Channel) clientErrs, consumerErrs, deliveryChannel := setupClient(service.AmqpClient)
if err != nil { shouldReturnError := true
for service.AmqpClient.Loop() {
select {
case msg := <-deliveryChannel:
shouldReturnError = false
service.HandleDelivery(&msg)
case err := <-consumerErrs:
if shouldReturnError {
return err return err
} }
forever := make(chan bool) service.Logger.Error("Consume error: :err", wd.ErrParam(err))
go func() { case err := <-clientErrs:
for d := range deliveryChannel { if shouldReturnError {
service.HandleDelivery(&d) return err
}
service.Logger.Error("Client error: :err", wd.ErrParam(err))
}
} }
}()
<-forever
return nil return nil
} }
@ -163,55 +174,47 @@ func (service *Services) HandleSkinChanged(event *SkinChanged) bool {
return true return true
} }
func setupConsume(channel *amqp.Channel) (<-chan amqp.Delivery, error) { func setupClient(client *cony.Client) (<-chan error, <-chan error, <-chan amqp.Delivery ) {
var err error exchange := cony.Exchange{
err = channel.ExchangeDeclare( Name: exchangeName,
exchangeName, // name Kind: "topic",
"topic", // type Durable: true,
true, // durable AutoDelete: false,
false, // auto-deleted }
false, // internal
false, // no-wait queue := &cony.Queue{
nil, // arguments Name: queueName,
Durable: true,
AutoDelete: false,
Exclusive: false,
}
usernameEventBinding := cony.Binding{
Exchange: exchange,
Queue: queue,
Key: "accounts.username-changed",
}
skinEventBinding := cony.Binding{
Exchange: exchange,
Queue: queue,
Key: "accounts.skin-changed",
}
declarations := []cony.Declaration{
cony.DeclareExchange(exchange),
cony.DeclareQueue(queue),
cony.DeclareBinding(usernameEventBinding),
cony.DeclareBinding(skinEventBinding),
}
client.Declare(declarations)
consumer := cony.NewConsumer(queue,
cony.Qos(10),
cony.AutoTag(),
) )
if err != nil { client.Consume(consumer)
return nil, err
}
_, err = channel.QueueDeclare( return client.Errors(), consumer.Errors(), consumer.Deliveries()
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
deliveryChannel, err := channel.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
return deliveryChannel, nil
} }