diff --git a/Gopkg.lock b/Gopkg.lock index f974ee9..f2630e1 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,12 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + name = "github.com/assembla/cony" + packages = ["."] + revision = "dd62697b0adb9adfda8589520cb85f4cbc2361f1" + version = "v0.3.2" + [[projects]] name = "github.com/certifi/gocertifi" packages = ["."] @@ -65,7 +71,7 @@ branch = "master" name = "github.com/mediocregopher/radix.v2" packages = ["cluster","pool","redis","util"] - revision = "0fe87e4a1bde3bb29991ea5ee1d9d536b77877d5" + revision = "d234cfb904a91daafa4e1f92599a893b349cc0c2" [[projects]] branch = "master" @@ -107,7 +113,7 @@ branch = "master" name = "github.com/spf13/afero" packages = [".","mem"] - revision = "36f8810e2e3d7eeac4ac05b57f65690fbfba62a2" + revision = "ee1bd8ee15a1306d1f9201acc41ef39cd9f99a1b" [[projects]] name = "github.com/spf13/cast" @@ -119,13 +125,13 @@ branch = "master" name = "github.com/spf13/cobra" packages = ["."] - revision = "2df9a531813370438a4d79bfc33e21f58063ed87" + revision = "3c0b56b677e04926dfa835a1b3f11cd4f62f076e" [[projects]] branch = "master" name = "github.com/spf13/jwalterweatherman" packages = ["."] - revision = "0efa5202c04663c757d84f90f5219c1250baf94f" + revision = "12bd96e66386c1960ab0f74ced1362f66f552f7b" [[projects]] name = "github.com/spf13/pflag" @@ -155,13 +161,13 @@ branch = "master" name = "golang.org/x/sys" packages = ["unix"] - revision = "2d6f6f883a06fc0d5f4b14a81e4c28705ea64c15" + revision = "7ddbeae9ae08c6a06a59597f0c9edbc5ff2444ce" [[projects]] branch = "master" name = "golang.org/x/text" packages = ["internal/gen","internal/triegen","internal/ucd","transform","unicode/cldr","unicode/norm"] - revision = "ac87088df8ef557f1e32cd00ed0b6fbc3f7ddafb" + revision = "bd91bbf73e9a4a801adbfb97133c992678533126" [[projects]] name = "gopkg.in/h2non/gock.v1" @@ -178,6 +184,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "fe0c38f46a10e79ddfd7449f98bcde02dbac62ec20cb9d957f661c6da079013f" + inputs-digest = "dd545fafc23f9b6429b5b679ad5c213c14c819f1e4ea381823acf338651122e1" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index b169acf..da94f32 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -17,10 +17,11 @@ ignored = ["elyby/minecraft-skinsystem"] name = "github.com/spf13/viper" [[constraint]] - name = "github.com/streadway/amqp" + name = "github.com/getsentry/raven-go" [[constraint]] - name = "github.com/getsentry/raven-go" + name = "github.com/assembla/cony" + version = "^0.3.2" # Testing dependencies diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index e0048a1..fd6a7c3 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -5,12 +5,12 @@ import ( "net/url" "os" + "github.com/assembla/cony" "github.com/getsentry/raven-go" "github.com/mono83/slf/rays" "github.com/mono83/slf/recievers/ansi" "github.com/mono83/slf/recievers/statsd" "github.com/mono83/slf/wd" - "github.com/streadway/amqp" "elyby/minecraft-skinsystem/logger/receivers/sentry" ) @@ -72,7 +72,7 @@ type RabbitMQConfig struct { Vhost string } -func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) { +func CreateRabbitMQClient(config *RabbitMQConfig) *cony.Client { addr := fmt.Sprintf( "amqp://%s:%s@%s:%d/%s", config.Username, @@ -82,15 +82,7 @@ func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) { url.PathEscape(config.Vhost), ) - rabbitConnection, err := amqp.Dial(addr) - if err != nil { - return nil, err - } + client := cony.NewClient(cony.URL(addr), cony.Backoff(cony.DefaultBackoff)) - rabbitChannel, err := rabbitConnection.Channel() - if err != nil { - return nil, err - } - - return rabbitChannel, nil + return client } diff --git a/cmd/amqpWorker.go b/cmd/amqpWorker.go index 282c7ea..ab42f69 100644 --- a/cmd/amqpWorker.go +++ b/cmd/amqpWorker.go @@ -33,19 +33,14 @@ var amqpWorkerCmd = &cobra.Command{ } logger.Info("Skins repository successfully initialized") - logger.Info("Initializing AMQP connection") - amqpChannel, err := bootstrap.CreateRabbitMQChannel(&bootstrap.RabbitMQConfig{ + logger.Info("Creating AMQP client") + amqpClient := bootstrap.CreateRabbitMQClient(&bootstrap.RabbitMQConfig{ Host: viper.GetString("amqp.host"), Port: viper.GetInt("amqp.port"), Username: viper.GetString("amqp.username"), Password: viper.GetString("amqp.password"), Vhost: viper.GetString("amqp.vhost"), }) - if err != nil { - logger.Emergency(fmt.Sprintf("Error on connecting AMQP: %+v", err)) - return - } - logger.Info("AMQP connection successfully initialized") accountsApi := (&accounts.Config{ Addr: viper.GetString("api.accounts.host"), @@ -56,7 +51,7 @@ var amqpWorkerCmd = &cobra.Command{ services := &worker.Services{ Logger: logger, - Channel: amqpChannel, + AmqpClient: amqpClient, SkinsRepo: skinsRepo, AccountsAPI: accountsApi, } diff --git a/worker/worker.go b/worker/worker.go index 4467553..19702ad 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,6 +4,7 @@ import ( "encoding/json" "strconv" + "github.com/assembla/cony" "github.com/mono83/slf/wd" "github.com/streadway/amqp" @@ -13,7 +14,7 @@ import ( ) type Services struct { - Channel *amqp.Channel + AmqpClient *cony.Client SkinsRepo interfaces.SkinsRepository AccountsAPI interfaces.AccountsAPI Logger wd.Watchdog @@ -42,18 +43,28 @@ const exchangeName string = "events" const queueName string = "skinsystem-accounts-events" func (service *Services) Run() error { - deliveryChannel, err := setupConsume(service.Channel) - if err != nil { - return err - } + clientErrs, consumerErrs, deliveryChannel := setupClient(service.AmqpClient) + shouldReturnError := true - forever := make(chan bool) - go func() { - for d := range deliveryChannel { - service.HandleDelivery(&d) + for service.AmqpClient.Loop() { + select { + case msg := <-deliveryChannel: + shouldReturnError = false + service.HandleDelivery(&msg) + case err := <-consumerErrs: + if shouldReturnError { + return err + } + + service.Logger.Error("Consume error: :err", wd.ErrParam(err)) + case err := <-clientErrs: + if shouldReturnError { + return err + } + + service.Logger.Error("Client error: :err", wd.ErrParam(err)) } - }() - <-forever + } return nil } @@ -163,55 +174,47 @@ func (service *Services) HandleSkinChanged(event *SkinChanged) bool { return true } -func setupConsume(channel *amqp.Channel) (<-chan amqp.Delivery, error) { - var err error - err = channel.ExchangeDeclare( - exchangeName, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments +func setupClient(client *cony.Client) (<-chan error, <-chan error, <-chan amqp.Delivery ) { + exchange := cony.Exchange{ + Name: exchangeName, + Kind: "topic", + Durable: true, + AutoDelete: false, + } + + queue := &cony.Queue{ + Name: queueName, + Durable: true, + AutoDelete: false, + Exclusive: false, + } + + usernameEventBinding := cony.Binding{ + Exchange: exchange, + Queue: queue, + Key: "accounts.username-changed", + } + + skinEventBinding := cony.Binding{ + Exchange: exchange, + Queue: queue, + Key: "accounts.skin-changed", + } + + declarations := []cony.Declaration{ + cony.DeclareExchange(exchange), + cony.DeclareQueue(queue), + cony.DeclareBinding(usernameEventBinding), + cony.DeclareBinding(skinEventBinding), + } + + client.Declare(declarations) + + consumer := cony.NewConsumer(queue, + cony.Qos(10), + cony.AutoTag(), ) - if err != nil { - return nil, err - } + client.Consume(consumer) - _, err = channel.QueueDeclare( - queueName, // name - true, // durable - false, // delete when usused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - return nil, err - } - - err = channel.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil) - if err != nil { - return nil, err - } - - err = channel.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil) - if err != nil { - return nil, err - } - - deliveryChannel, err := channel.Consume( - queueName, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - return nil, err - } - - return deliveryChannel, nil + return client.Errors(), consumer.Errors(), consumer.Deliveries() }