Частично восстановлена логика AMQP воркера

This commit is contained in:
ErickSkrauch 2017-08-17 02:47:35 +03:00
parent 4bf146dd43
commit 78917a70d3
7 changed files with 382 additions and 23 deletions

View File

@ -36,3 +36,26 @@ docker-compose rm -f app # Удаляем конейтнер
docker-compose build app # Запускаем билд по новой
docker-compose up -d app # Поднимаем свежесобранный контейнер обратно
```
### Шорткаты для разработки
Потом это надо преобразовать в нормальные доки.
Run Redis:
```sh
docker run --rm \
-p 6379:6379 \
redis:3.0-alpine
```
Run RabbitMQ:
```sh
docker run --rm \
-p 5672:5672 \
-e RABBITMQ_DEFAULT_USER=ely-skinsystem-app \
-e RABBITMQ_DEFAULT_PASS=ely-skinsystem-app-password \
-e RABBITMQ_DEFAULT_VHOST=/ely \
rabbitmq:3.6
```

View File

@ -1,12 +1,15 @@
package bootstrap
import (
"fmt"
"net/url"
"os"
"github.com/mono83/slf/rays"
"github.com/mono83/slf/recievers/ansi"
"github.com/mono83/slf/recievers/statsd"
"github.com/mono83/slf/wd"
"github.com/streadway/amqp"
)
func CreateLogger(statsdAddr string) (wd.Watchdog, error) {
@ -28,3 +31,34 @@ func CreateLogger(statsdAddr string) (wd.Watchdog, error) {
return wd.New("", "").WithParams(rays.Host), nil
}
type RabbitMQConfig struct {
Username string
Password string
Host string
Port int
Vhost string
}
func CreateRabbitMQChannel(config *RabbitMQConfig) (*amqp.Channel, error) {
addr := fmt.Sprintf(
"amqp://%s:%s@%s:%d/%s",
config.Username,
config.Password,
config.Host,
config.Port,
url.PathEscape(config.Vhost),
)
rabbitConnection, err := amqp.Dial(addr)
if err != nil {
return nil, err
}
rabbitChannel, err := rabbitConnection.Channel()
if err != nil {
return nil, err
}
return rabbitChannel, nil
}

63
cmd/amqpWorker.go Normal file
View File

@ -0,0 +1,63 @@
package cmd
import (
"fmt"
"log"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"elyby/minecraft-skinsystem/bootstrap"
"elyby/minecraft-skinsystem/db"
"elyby/minecraft-skinsystem/worker"
)
var amqpWorkerCmd = &cobra.Command{
Use: "amqp-worker",
Short: "Launches a worker which listens to events and processes them",
Run: func(cmd *cobra.Command, args []string) {
logger, err := bootstrap.CreateLogger(viper.GetString("statsd.addr"))
if err != nil {
log.Fatal(fmt.Printf("Cannot initialize logger: %v", err))
}
logger.Info("Logger successfully initialized")
storageFactory := db.StorageFactory{Config: viper.GetViper()}
logger.Info("Initializing skins repository")
skinsRepo, err := storageFactory.CreateFactory("redis").CreateSkinsRepository()
if err != nil {
logger.Emergency(fmt.Sprintf("Error on creating skins repo: %+v", err))
return
}
logger.Info("Skins repository successfully initialized")
logger.Info("Initializing AMQP connection")
amqpChannel, err := bootstrap.CreateRabbitMQChannel(&bootstrap.RabbitMQConfig{
Host: viper.GetString("amqp.host"),
Port: viper.GetInt("amqp.port"),
Username: viper.GetString("amqp.username"),
Password: viper.GetString("amqp.password"),
Vhost: viper.GetString("amqp.vhost"),
})
if err != nil {
logger.Emergency(fmt.Sprintf("Error on connecting AMQP: %+v", err))
return
}
logger.Info("AMQP connection successfully initialized")
services := &worker.Services{
Logger: logger,
Channel: amqpChannel,
SkinsRepo: skinsRepo,
}
if err := services.Run(); err != nil {
logger.Error(fmt.Sprintf("Cannot initialize worker: %+v", err))
}
},
}
func init() {
RootCmd.AddCommand(amqpWorkerCmd)
}

View File

@ -96,42 +96,44 @@ type redisDb struct {
const accountIdToUsernameKey string = "hash:username-to-account-id"
func (db *redisDb) FindByUsername(username string) (model.Skin, error) {
var record model.Skin
func (db *redisDb) FindByUsername(username string) (*model.Skin, error) {
if username == "" {
return record, &SkinNotFoundError{username}
return nil, &SkinNotFoundError{username}
}
redisKey := buildKey(username)
response := db.conn.Cmd("GET", redisKey)
if response.IsType(redis.Nil) {
return record, &SkinNotFoundError{username}
return nil, &SkinNotFoundError{username}
}
encodedResult, err := response.Bytes()
if err == nil {
result, err := zlibDecode(encodedResult)
if err != nil {
log.Println("Cannot uncompress zlib for key " + redisKey) // TODO: replace with valid error
return record, err
}
err = json.Unmarshal(result, &record)
if err != nil {
log.Println("Cannot decode record data for key" + redisKey) // TODO: replace with valid error
return record, nil
}
record.OldUsername = record.Username
if err != nil {
return nil, err
}
return record, nil
result, err := zlibDecode(encodedResult)
if err != nil {
log.Println("Cannot uncompress zlib for key " + redisKey) // TODO: replace with valid error
return nil, err
}
var skin *model.Skin
err = json.Unmarshal(result, &skin)
if err != nil {
log.Println("Cannot decode record data for key" + redisKey) // TODO: replace with valid error
return nil, nil
}
skin.OldUsername = skin.Username
return skin, nil
}
func (db *redisDb) FindByUserId(id int) (model.Skin, error) {
func (db *redisDb) FindByUserId(id int) (*model.Skin, error) {
response := db.conn.Cmd("HGET", accountIdToUsernameKey, id)
if response.IsType(redis.Nil) {
return model.Skin{}, SkinNotFoundError{"unknown"}
return nil, SkinNotFoundError{"unknown"}
}
username, _ := response.Str()
@ -139,6 +141,34 @@ func (db *redisDb) FindByUserId(id int) (model.Skin, error) {
return db.FindByUsername(username)
}
func (db *redisDb) Save(skin *model.Skin) error {
conn := db.conn
if poolConn, isPool := conn.(*pool.Pool); isPool {
conn, _ = poolConn.Get()
}
conn.Cmd("MULTI")
// Если пользователь сменил ник, то мы должны удать его ключ
if skin.OldUsername != "" && skin.OldUsername != skin.Username {
conn.Cmd("DEL", buildKey(skin.OldUsername))
}
// Если это новая запись или если пользователь сменил ник, то обновляем значение в хэш-таблице
if skin.OldUsername != "" || skin.OldUsername != skin.Username {
conn.Cmd("HSET", accountIdToUsernameKey, skin.UserId, skin.Username)
}
str, _ := json.Marshal(skin)
conn.Cmd("SET", buildKey(skin.Username), zlibEncode(str))
conn.Cmd("EXEC")
skin.OldUsername = skin.Username
return nil
}
func buildKey(username string) string {
return "username:" + strings.ToLower(username)
}

20
model/events.go Normal file
View File

@ -0,0 +1,20 @@
package model
type UsernameChanged struct {
AccountId int `json:"accountId"`
OldUsername string `json:"oldUsername"`
NewUsername string `json:"newUsername"`
}
type SkinChanged struct {
AccountId int `json:"userId"`
Uuid string `json:"uuid"`
SkinId int `json:"skinId"`
OldSkinId int `json:"oldSkinId"`
Hash string `json:"hash"`
Is1_8 bool `json:"is1_8"`
IsSlim bool `json:"isSlim"`
Url string `json:"url"`
MojangTextures string `json:"mojangTextures"`
MojangSignature string `json:"mojangSignature"`
}

View File

@ -3,6 +3,7 @@ package repositories
import "elyby/minecraft-skinsystem/model"
type SkinsRepository interface {
FindByUsername(username string) (model.Skin, error)
FindByUserId(id int) (model.Skin, error)
FindByUsername(username string) (*model.Skin, error)
FindByUserId(id int) (*model.Skin, error)
Save(skin *model.Skin) error
}

188
worker/worker.go Normal file
View File

@ -0,0 +1,188 @@
package worker
import (
"encoding/json"
"github.com/mono83/slf/wd"
"github.com/streadway/amqp"
"elyby/minecraft-skinsystem/model"
"elyby/minecraft-skinsystem/repositories"
)
type Services struct {
Channel *amqp.Channel
SkinsRepo repositories.SkinsRepository
Logger wd.Watchdog
}
const exchangeName string = "events"
const queueName string = "skinsystem-accounts-events"
func (service *Services) Run() error {
deliveryChannel, err := setupConsume(service.Channel)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for d := range deliveryChannel {
service.Logger.Debug("Incoming message with routing key " + d.RoutingKey)
var result bool = true
switch d.RoutingKey {
case "accounts.username-changed":
var event *model.UsernameChanged
json.Unmarshal(d.Body, &event)
result = service.HandleChangeUsername(event)
case "accounts.skin-changed":
var event *model.SkinChanged
json.Unmarshal(d.Body, &event)
result = service.HandleSkinChanged(event)
}
if result {
d.Ack(false)
} else {
d.Reject(true)
}
}
}()
<-forever
return nil
}
func (service *Services) HandleChangeUsername(event *model.UsernameChanged) bool {
if event.OldUsername == "" {
service.Logger.IncCounter("worker.change_username.empty_old_username", 1)
record := &model.Skin{
UserId: event.AccountId,
Username: event.NewUsername,
}
service.SkinsRepo.Save(record)
return true
}
record, err := service.SkinsRepo.FindByUserId(event.AccountId)
if err != nil {
/*
// TODO: вернуть логику восстановления информации об аккаунте
service.Logger.IncCounter("worker.change_username.id_not_found", 1)
service.Logger.Warning("Cannot find user id. Trying to search.")
response, err := getById(event.AccountId)
if err != nil {
service.Logger.IncCounter("worker.change_username.id_not_restored", 1)
service.Logger.Error("Cannot restore user info. %T\n", err)
// TODO: логгировать в какой-нибудь Sentry, если там не 404
return true
}
service.Logger.IncCounter("worker.change_username.id_restored", 1)
fmt.Println("User info successfully restored.")
record = &event.Skin{
UserId: response.Id,
}
*/
}
record.Username = event.NewUsername
service.SkinsRepo.Save(record)
service.Logger.IncCounter("worker.change_username.processed", 1)
return true
}
func (service *Services) HandleSkinChanged(event *model.SkinChanged) bool {
record, err := service.SkinsRepo.FindByUserId(event.AccountId)
if err != nil {
service.Logger.IncCounter("worker.skin_changed.id_not_found", 1)
service.Logger.Warning("Cannot find user id. Trying to search.")
/*
// TODO: вернуть логику восстановления информации об аккаунте
response, err := getById(event.AccountId)
if err != nil {
services.Logger.IncCounter("worker.skin_changed.id_not_restored", 1)
fmt.Printf("Cannot restore user info. %T\n", err)
// TODO: логгировать в какой-нибудь Sentry, если там не 404
return true
}
services.Logger.IncCounter("worker.skin_changed.id_restored", 1)
fmt.Println("User info successfully restored.")
record.UserId = response.Id
record.Username = response.Username
*/
}
record.Uuid = event.Uuid
record.SkinId = event.SkinId
record.Hash = event.Hash
record.Is1_8 = event.Is1_8
record.IsSlim = event.IsSlim
record.Url = event.Url
record.MojangTextures = event.MojangTextures
record.MojangSignature = event.MojangSignature
service.SkinsRepo.Save(record)
service.Logger.IncCounter("worker.skin_changed.processed", 1)
return true
}
func setupConsume(channel *amqp.Channel) (<-chan amqp.Delivery, error) {
var err error
err = channel.ExchangeDeclare(
exchangeName, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
_, err = channel.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.username-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
err = channel.QueueBind(queueName, "accounts.skin-changed", exchangeName, false, nil)
if err != nil {
return nil, err
}
deliveryChannel, err := channel.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
return deliveryChannel, nil
}