Merge branch 'rabbitmq_integration' into develop

This commit is contained in:
ErickSkrauch 2016-09-21 20:29:13 +03:00
commit 0d6ca356d1
8 changed files with 221 additions and 21 deletions

View File

@ -2,9 +2,16 @@ version: '2'
services: services:
app: app:
ports: ports:
- "80:80" - "81:80"
redis: redis:
image: redis:3.0 image: redis:3.0
volumes: volumes:
- ./data/redis:/data - ./data/redis:/data
rabbitmq:
image: rabbitmq:3.6
environment:
RABBITMQ_DEFAULT_USER: "ely-skinsystem-app"
RABBITMQ_DEFAULT_PASS: "ely-skinsystem-app-password"
RABBITMQ_DEFAULT_VHOST: "/ely"

View File

@ -10,8 +10,14 @@ services:
command: ["go", "run", "minecraft-skinsystem.go"] command: ["go", "run", "minecraft-skinsystem.go"]
links: links:
- redis - redis
- rabbitmq
redis: redis:
extends: extends:
file: docker-compose.base.yml file: docker-compose.base.yml
service: redis service: redis
rabbitmq:
extends:
file: docker-compose.base.yml
service: rabbitmq

View File

@ -7,6 +7,7 @@ services:
image: erickskrauch/ely-by-skinsystem:master image: erickskrauch/ely-by-skinsystem:master
links: links:
- redis - redis
- rabbitmq
restart: always restart: always
redis: redis:
@ -14,3 +15,9 @@ services:
file: docker-compose.base.yml file: docker-compose.base.yml
service: redis service: redis
restart: always restart: always
rabbitmq:
extends:
file: docker-compose.base.yml
service: rabbitmq
restart: always

View File

@ -2,9 +2,9 @@ package services
import ( import (
"github.com/mediocregopher/radix.v2/pool" "github.com/mediocregopher/radix.v2/pool"
"github.com/gorilla/mux" "github.com/streadway/amqp"
) )
var RedisPool *pool.Pool var RedisPool *pool.Pool
var Router *mux.Router var RabbitMQChannel *amqp.Channel

52
lib/worker/handlers.go Normal file
View File

@ -0,0 +1,52 @@
package worker
import (
"elyby/minecraft-skinsystem/lib/data"
"log"
)
func handleChangeUsername(model usernameChanged) (bool) {
if (model.OldUsername == "") {
record := data.SkinItem{
UserId: model.AccountId,
Username: model.NewUsername,
}
record.Save()
return true
}
record, err := data.FindByUsername(model.OldUsername)
if (err != nil) {
log.Println("Exit by not found record")
// TODO: я не уверен, что это валидное поведение
// Суть в том, что здесь может возникнуть ошибка в том случае, если записи в базе нету
// а значит его нужно, как минимум, зарегистрировать
return true
}
record.Username = model.NewUsername
record.Save()
log.Println("all saved!")
return true
}
func handleSkinChanged(model skinChanged) (bool) {
record, err := data.FindById(model.AccountId)
if (err != nil) {
return true
}
record.SkinId = model.SkinId
record.Hash = model.Hash
record.Is1_8 = model.Is1_8
record.IsSlim = model.IsSlim
record.Url = model.Url
record.Save()
return true
}

17
lib/worker/models.go Normal file
View File

@ -0,0 +1,17 @@
package worker
type usernameChanged struct {
AccountId int `json:"accountId"`
OldUsername string `json:"oldUsername"`
NewUsername string `json:"newUsername"`
}
type skinChanged struct {
AccountId int `json:"userId"`
SkinId int `json:"skinId"`
OldSkinId int `json:"oldSkinId"`
Hash string `json:"hash"`
Is1_8 bool `json:"is1_8"`
IsSlim bool `json:"isSlim"`
Url string `json:"url"`
}

88
lib/worker/worker.go Normal file
View File

@ -0,0 +1,88 @@
package worker
import (
"log"
"encoding/json"
"elyby/minecraft-skinsystem/lib/services"
)
const exchangeName string = "events"
const queueName string = "skinsystem-accounts-events"
func Listen() {
var err error
ch := services.RabbitMQChannel
err = ch.ExchangeDeclare(
exchangeName, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil)
failOnError(err, "Failed to bind a queue")
err = ch.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Println("Incoming message with routing key " + d.RoutingKey)
var result bool = true;
switch d.RoutingKey {
case "accounts.username-changed":
var model usernameChanged
json.Unmarshal(d.Body, &model)
result = handleChangeUsername(model)
case "accounts.skin-changed":
var model skinChanged
json.Unmarshal(d.Body, &model)
result = handleSkinChanged(model)
}
if (result) {
d.Ack(false)
} else {
d.Reject(true)
}
}
}()
<-forever
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

View File

@ -3,18 +3,22 @@ package main
import ( import (
"log" "log"
"runtime" "runtime"
//"time" "time"
"net/http" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/streadway/amqp"
"github.com/mediocregopher/radix.v2/pool" "github.com/mediocregopher/radix.v2/pool"
"elyby/minecraft-skinsystem/lib/routes" "elyby/minecraft-skinsystem/lib/routes"
"elyby/minecraft-skinsystem/lib/services" "elyby/minecraft-skinsystem/lib/services"
//"github.com/mediocregopher/radix.v2/redis" "elyby/minecraft-skinsystem/lib/worker"
) )
const redisString string = "redis:6379" const redisString string = "redis:6379"
const redisPoolSize int = 10
const rabbitmqString string = "amqp://ely-skinsystem-app:ely-skinsystem-app-password@rabbitmq:5672/%2fely"
func main() { func main() {
log.Println("Starting...") log.Println("Starting...")
@ -22,12 +26,25 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
log.Println("Connecting to redis") log.Println("Connecting to redis")
redisPool, redisErr := pool.New("tcp", redisString, 10) redisPool, redisErr := pool.New("tcp", redisString, redisPoolSize)
if redisErr != nil { if (redisErr != nil) {
log.Fatal("Redis unavailable") log.Fatal("Redis unavailable")
} }
log.Println("Connected to redis") log.Println("Connected to redis")
log.Println("Connecting to rabbitmq")
// TODO: rabbitmq становится доступен не сразу. Нужно дождаться, пока он станет доступен, периодически повторяя запросы
rabbitConnection, rabbitmqErr := amqp.Dial(rabbitmqString)
if (rabbitmqErr != nil) {
log.Fatalf("%s", rabbitmqErr)
}
log.Println("Connected to rabbitmq. Trying to open a channel")
rabbitChannel, rabbitmqErr := rabbitConnection.Channel()
if (rabbitmqErr != nil) {
log.Fatalf("%s", rabbitmqErr)
}
log.Println("Connected to rabbitmq channel")
router := mux.NewRouter().StrictSlash(true) router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/skins/{username}", routes.Skin).Methods("GET").Name("skins") router.HandleFunc("/skins/{username}", routes.Skin).Methods("GET").Name("skins")
router.HandleFunc("/cloaks/{username}", routes.Cape).Methods("GET").Name("cloaks") router.HandleFunc("/cloaks/{username}", routes.Cape).Methods("GET").Name("cloaks")
@ -43,25 +60,31 @@ func main() {
apiRouter.HandleFunc("/user/{username}/skin", routes.SetSkin).Methods("POST") apiRouter.HandleFunc("/user/{username}/skin", routes.SetSkin).Methods("POST")
services.RedisPool = redisPool services.RedisPool = redisPool
services.Router = router services.RabbitMQChannel = rabbitChannel
/*go func() { go func() {
period := 5
for { for {
time.Sleep(5 * time.Second) time.Sleep(time.Duration(period) * time.Second)
resp := services.RedisPool.Cmd("PING")
if (resp.Err == nil) {
// Если редис успешно пинганулся, значит всё хорошо
continue
}
resp := services.Redis.Cmd("PING")
if (resp.Err != nil) {
log.Println("Redis not pinged. Try to reconnect") log.Println("Redis not pinged. Try to reconnect")
newClient, redisErr := redis.Dial("tcp", redisString) newPool, redisErr := pool.New("tcp", redisString, redisPoolSize)
if (redisErr != nil) { if (redisErr != nil) {
log.Println("Cannot reconnect to redis") log.Printf("Cannot reconnect to redis, waiting %d seconds\n", period)
} else { } else {
services.Redis = newClient services.RedisPool = newPool
log.Println("Reconnected") log.Println("Reconnected")
} }
} }
} }()
}()*/
go worker.Listen()
log.Println("Started"); log.Println("Started");
log.Fatal(http.ListenAndServe(":80", router)) log.Fatal(http.ListenAndServe(":80", router))