diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 4632d92..a7db8a2 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -2,9 +2,16 @@ version: '2' services: app: ports: - - "80:80" + - "81:80" redis: image: redis:3.0 volumes: - ./data/redis:/data + + rabbitmq: + image: rabbitmq:3.6 + environment: + RABBITMQ_DEFAULT_USER: "ely-skinsystem-app" + RABBITMQ_DEFAULT_PASS: "ely-skinsystem-app-password" + RABBITMQ_DEFAULT_VHOST: "/ely" diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index a1547cc..087a4bc 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -10,8 +10,14 @@ services: command: ["go", "run", "minecraft-skinsystem.go"] links: - redis + - rabbitmq redis: extends: file: docker-compose.base.yml service: redis + + rabbitmq: + extends: + file: docker-compose.base.yml + service: rabbitmq diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 5663db1..a995d39 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -7,6 +7,7 @@ services: image: erickskrauch/ely-by-skinsystem:master links: - redis + - rabbitmq restart: always redis: @@ -14,3 +15,9 @@ services: file: docker-compose.base.yml service: redis restart: always + + rabbitmq: + extends: + file: docker-compose.base.yml + service: rabbitmq + restart: always diff --git a/lib/services/services.go b/lib/services/services.go index f0f612d..5f4e996 100644 --- a/lib/services/services.go +++ b/lib/services/services.go @@ -2,9 +2,9 @@ package services import ( "github.com/mediocregopher/radix.v2/pool" - "github.com/gorilla/mux" + "github.com/streadway/amqp" ) var RedisPool *pool.Pool -var Router *mux.Router +var RabbitMQChannel *amqp.Channel diff --git a/lib/worker/handlers.go b/lib/worker/handlers.go new file mode 100644 index 0000000..7986490 --- /dev/null +++ b/lib/worker/handlers.go @@ -0,0 +1,52 @@ +package worker + +import ( + "elyby/minecraft-skinsystem/lib/data" + "log" +) + +func handleChangeUsername(model usernameChanged) (bool) { + if (model.OldUsername == "") { + record := data.SkinItem{ + UserId: model.AccountId, + Username: model.NewUsername, + } + + record.Save() + + return true + } + + record, err := data.FindByUsername(model.OldUsername) + if (err != nil) { + log.Println("Exit by not found record") + // TODO: я не уверен, что это валидное поведение + // Суть в том, что здесь может возникнуть ошибка в том случае, если записи в базе нету + // а значит его нужно, как минимум, зарегистрировать + return true + } + + record.Username = model.NewUsername + record.Save() + + log.Println("all saved!") + + return true +} + +func handleSkinChanged(model skinChanged) (bool) { + record, err := data.FindById(model.AccountId) + if (err != nil) { + return true + } + + record.SkinId = model.SkinId + record.Hash = model.Hash + record.Is1_8 = model.Is1_8 + record.IsSlim = model.IsSlim + record.Url = model.Url + + record.Save() + + return true +} diff --git a/lib/worker/models.go b/lib/worker/models.go new file mode 100644 index 0000000..eede2c2 --- /dev/null +++ b/lib/worker/models.go @@ -0,0 +1,17 @@ +package worker + +type usernameChanged struct { + AccountId int `json:"accountId"` + OldUsername string `json:"oldUsername"` + NewUsername string `json:"newUsername"` +} + +type skinChanged struct { + AccountId int `json:"userId"` + SkinId int `json:"skinId"` + OldSkinId int `json:"oldSkinId"` + Hash string `json:"hash"` + Is1_8 bool `json:"is1_8"` + IsSlim bool `json:"isSlim"` + Url string `json:"url"` +} diff --git a/lib/worker/worker.go b/lib/worker/worker.go new file mode 100644 index 0000000..27a3407 --- /dev/null +++ b/lib/worker/worker.go @@ -0,0 +1,88 @@ +package worker + +import ( + "log" + + "encoding/json" + + "elyby/minecraft-skinsystem/lib/services" +) + +const exchangeName string = "events" +const queueName string = "skinsystem-accounts-events" + +func Listen() { + var err error + ch := services.RabbitMQChannel + + err = ch.ExchangeDeclare( + exchangeName, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare an exchange") + + _, err = ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when usused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil) + failOnError(err, "Failed to bind a queue") + + err = ch.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil) + failOnError(err, "Failed to bind a queue") + + msgs, err := ch.Consume( + queueName, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + forever := make(chan bool) + + go func() { + for d := range msgs { + log.Println("Incoming message with routing key " + d.RoutingKey) + var result bool = true; + switch d.RoutingKey { + case "accounts.username-changed": + var model usernameChanged + json.Unmarshal(d.Body, &model) + result = handleChangeUsername(model) + case "accounts.skin-changed": + var model skinChanged + json.Unmarshal(d.Body, &model) + result = handleSkinChanged(model) + } + + if (result) { + d.Ack(false) + } else { + d.Reject(true) + } + } + }() + + <-forever +} + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} diff --git a/minecraft-skinsystem.go b/minecraft-skinsystem.go index 49df237..513aa30 100644 --- a/minecraft-skinsystem.go +++ b/minecraft-skinsystem.go @@ -3,18 +3,22 @@ package main import ( "log" "runtime" - //"time" + "time" "net/http" "github.com/gorilla/mux" + "github.com/streadway/amqp" "github.com/mediocregopher/radix.v2/pool" "elyby/minecraft-skinsystem/lib/routes" "elyby/minecraft-skinsystem/lib/services" - //"github.com/mediocregopher/radix.v2/redis" + "elyby/minecraft-skinsystem/lib/worker" ) const redisString string = "redis:6379" +const redisPoolSize int = 10 + +const rabbitmqString string = "amqp://ely-skinsystem-app:ely-skinsystem-app-password@rabbitmq:5672/%2fely" func main() { log.Println("Starting...") @@ -22,12 +26,25 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) log.Println("Connecting to redis") - redisPool, redisErr := pool.New("tcp", redisString, 10) - if redisErr != nil { + redisPool, redisErr := pool.New("tcp", redisString, redisPoolSize) + if (redisErr != nil) { log.Fatal("Redis unavailable") } log.Println("Connected to redis") + log.Println("Connecting to rabbitmq") + // TODO: rabbitmq становится доступен не сразу. Нужно дождаться, пока он станет доступен, периодически повторяя запросы + rabbitConnection, rabbitmqErr := amqp.Dial(rabbitmqString) + if (rabbitmqErr != nil) { + log.Fatalf("%s", rabbitmqErr) + } + log.Println("Connected to rabbitmq. Trying to open a channel") + rabbitChannel, rabbitmqErr := rabbitConnection.Channel() + if (rabbitmqErr != nil) { + log.Fatalf("%s", rabbitmqErr) + } + log.Println("Connected to rabbitmq channel") + router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/skins/{username}", routes.Skin).Methods("GET").Name("skins") router.HandleFunc("/cloaks/{username}", routes.Cape).Methods("GET").Name("cloaks") @@ -43,25 +60,31 @@ func main() { apiRouter.HandleFunc("/user/{username}/skin", routes.SetSkin).Methods("POST") services.RedisPool = redisPool - services.Router = router + services.RabbitMQChannel = rabbitChannel - /*go func() { + go func() { + period := 5 for { - time.Sleep(5 * time.Second) + time.Sleep(time.Duration(period) * time.Second) - resp := services.Redis.Cmd("PING") - if (resp.Err != nil) { - log.Println("Redis not pinged. Try to reconnect") - newClient, redisErr := redis.Dial("tcp", redisString) - if (redisErr != nil) { - log.Println("Cannot reconnect to redis") - } else { - services.Redis = newClient - log.Println("Reconnected") - } + resp := services.RedisPool.Cmd("PING") + if (resp.Err == nil) { + // Если редис успешно пинганулся, значит всё хорошо + continue + } + + log.Println("Redis not pinged. Try to reconnect") + newPool, redisErr := pool.New("tcp", redisString, redisPoolSize) + if (redisErr != nil) { + log.Printf("Cannot reconnect to redis, waiting %d seconds\n", period) + } else { + services.RedisPool = newPool + log.Println("Reconnected") } } - }()*/ + }() + + go worker.Listen() log.Println("Started"); log.Fatal(http.ListenAndServe(":80", router))