Extracted strategy from batch uuids provider implementation.

Reimplemented Periodic strategy.
Implemented FullBus strategy (#24).
Started working on tests.
This commit is contained in:
ErickSkrauch 2020-04-24 13:20:03 +03:00
parent e08bb23b3d
commit 29b6bc89b3
No known key found for this signature in database
GPG Key ID: 669339FCBB30EE0E
5 changed files with 273 additions and 130 deletions

View File

@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased] - xxxx-xx-xx ## [Unreleased] - xxxx-xx-xx
### Added
- [#24](https://github.com/elyby/chrly/issues/24): Added a new batch Mojang UUIDs provider strategy `full-bus` and
corresponding configuration param `QUEUE_STRATEGY` with the default value `periodic`.
## [4.4.1] - 2020-04-24 ## [4.4.1] - 2020-04-24
### Added ### Added

View File

@ -97,6 +97,14 @@ docker-compose up -d app
<td>Sentry can be used to collect app errors</td> <td>Sentry can be used to collect app errors</td>
<td><code>https://public:private@your.sentry.io/1</code></td> <td><code>https://public:private@your.sentry.io/1</code></td>
</tr> </tr>
<tr>
<td>QUEUE_STRATEGY</td>
<td>
Sets the strategy for batch Mojang UUIDs provider queue. Allowed values are <code>periodic</code> and
<code>full-bus</code> (see <a href="https://github.com/elyby/chrly/issues/24">#24</a>).
</td>
<td><code>periodic</code></td>
</tr>
<tr> <tr>
<td>QUEUE_LOOP_DELAY</td> <td>QUEUE_LOOP_DELAY</td>
<td> <td>

View File

@ -1,6 +1,7 @@
package di package di
import ( import (
"context"
"fmt" "fmt"
"net/url" "net/url"
"time" "time"
@ -18,6 +19,9 @@ var mojangTextures = di.Options(
di.Provide(newMojangTexturesProvider), di.Provide(newMojangTexturesProvider),
di.Provide(newMojangTexturesUuidsProviderFactory), di.Provide(newMojangTexturesUuidsProviderFactory),
di.Provide(newMojangTexturesBatchUUIDsProvider), di.Provide(newMojangTexturesBatchUUIDsProvider),
di.Provide(newMojangTexturesBatchUUIDsProviderStrategyFactory),
di.Provide(newMojangTexturesBatchUUIDsProviderDelayedStrategy),
di.Provide(newMojangTexturesBatchUUIDsProviderFullBusStrategy),
di.Provide(newMojangTexturesRemoteUUIDsProvider), di.Provide(newMojangTexturesRemoteUUIDsProvider),
di.Provide(newMojangSignedTexturesProvider), di.Provide(newMojangSignedTexturesProvider),
di.Provide(newMojangTexturesStorageFactory), di.Provide(newMojangTexturesStorageFactory),
@ -75,7 +79,7 @@ func newMojangTexturesUuidsProviderFactory(
func newMojangTexturesBatchUUIDsProvider( func newMojangTexturesBatchUUIDsProvider(
container *di.Container, container *di.Container,
config *viper.Viper, strategy mojangtextures.BatchUuidsProviderStrategy,
emitter mojangtextures.Emitter, emitter mojangtextures.Emitter,
) (*mojangtextures.BatchUuidsProvider, error) { ) (*mojangtextures.BatchUuidsProvider, error) {
if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker { if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker {
@ -106,14 +110,56 @@ func newMojangTexturesBatchUUIDsProvider(
return nil, err return nil, err
} }
return mojangtextures.NewBatchUuidsProvider(context.Background(), strategy, emitter), nil
}
func newMojangTexturesBatchUUIDsProviderStrategyFactory(
container *di.Container,
config *viper.Viper,
) (mojangtextures.BatchUuidsProviderStrategy, error) {
config.SetDefault("queue.strategy", "periodic")
strategyName := config.GetString("queue.strategy")
switch strategyName {
case "periodic":
var strategy *mojangtextures.PeriodicStrategy
err := container.Resolve(&strategy)
if err != nil {
return nil, err
}
return strategy, nil
case "full-bus":
var strategy *mojangtextures.FullBusStrategy
err := container.Resolve(&strategy)
if err != nil {
return nil, err
}
return strategy, nil
default:
return nil, fmt.Errorf("unknown queue strategy \"%s\"", strategyName)
}
}
func newMojangTexturesBatchUUIDsProviderDelayedStrategy(config *viper.Viper) *mojangtextures.PeriodicStrategy {
config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond)
config.SetDefault("queue.batch_size", 10) config.SetDefault("queue.batch_size", 10)
return &mojangtextures.BatchUuidsProvider{ return mojangtextures.NewPeriodicStrategy(
Emitter: emitter, config.GetDuration("queue.loop_delay"),
IterationDelay: config.GetDuration("queue.loop_delay"), config.GetInt("queue.batch_size"),
IterationSize: config.GetInt("queue.batch_size"), )
}, nil }
func newMojangTexturesBatchUUIDsProviderFullBusStrategy(config *viper.Viper) *mojangtextures.FullBusStrategy {
config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond)
config.SetDefault("queue.batch_size", 10)
return mojangtextures.NewFullBusStrategy(
config.GetDuration("queue.loop_delay"),
config.GetInt("queue.batch_size"),
)
} }
func newMojangTexturesRemoteUUIDsProvider( func newMojangTexturesRemoteUUIDsProvider(

View File

@ -1,6 +1,7 @@
package mojangtextures package mojangtextures
import ( import (
"context"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -9,131 +10,216 @@ import (
) )
type jobResult struct { type jobResult struct {
profile *mojang.ProfileInfo Profile *mojang.ProfileInfo
error error Error error
} }
type jobItem struct { type job struct {
username string Username string
respondChan chan *jobResult RespondChan chan *jobResult
} }
type jobsQueue struct { type jobsQueue struct {
lock sync.Mutex lock sync.Mutex
items []*jobItem items []*job
} }
func (s *jobsQueue) New() *jobsQueue { func newJobsQueue() *jobsQueue {
s.items = []*jobItem{} return &jobsQueue{
return s items: []*job{},
}
func (s *jobsQueue) Enqueue(t *jobItem) {
s.lock.Lock()
defer s.lock.Unlock()
s.items = append(s.items, t)
}
func (s *jobsQueue) Dequeue(n int) []*jobItem {
s.lock.Lock()
defer s.lock.Unlock()
if n > s.size() {
n = s.size()
} }
items := s.items[0:n]
s.items = s.items[n:len(s.items)]
return items
} }
func (s *jobsQueue) Size() int { func (s *jobsQueue) Enqueue(job *job) int {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
return s.size() s.items = append(s.items, job)
}
func (s *jobsQueue) size() int {
return len(s.items) return len(s.items)
} }
func (s *jobsQueue) Dequeue(n int) ([]*job, int) {
s.lock.Lock()
defer s.lock.Unlock()
l := len(s.items)
if n > l {
n = l
}
items := s.items[0:n]
s.items = s.items[n:l]
return items, l - n
}
var usernamesToUuids = mojang.UsernamesToUuids var usernamesToUuids = mojang.UsernamesToUuids
var forever = func() bool {
return true type JobsIteration struct {
Jobs []*job
Queue int
}
type BatchUuidsProviderStrategy interface {
Queue(job *job)
GetJobs(abort context.Context) <-chan *JobsIteration
}
type PeriodicStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
}
func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy {
return &PeriodicStrategy{
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
}
}
func (ctx *PeriodicStrategy) Queue(job *job) {
ctx.queue.Enqueue(job)
}
func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
ch := make(chan *JobsIteration)
go func() {
for {
select {
case <-abort.Done():
return
case <-time.After(ctx.Delay):
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
ch <- &JobsIteration{jobs, queueLen}
}
}
}()
return ch
}
type FullBusStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
ready chan bool
}
func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy {
return &FullBusStrategy{
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
ready: make(chan bool),
}
}
func (ctx *FullBusStrategy) Queue(job *job) {
n := ctx.queue.Enqueue(job)
if n == ctx.Batch {
ctx.ready <- true
}
}
func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
ch := make(chan *JobsIteration)
go func() {
for {
t := time.NewTimer(ctx.Delay)
select {
case <-abort.Done():
return
case <-t.C:
ctx.sendJobs(ch)
case <-ctx.ready:
t.Stop()
ctx.sendJobs(ch)
}
}
}()
return ch
}
func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) {
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
ch <- &JobsIteration{jobs, queueLen} // TODO: should not wait for iteration result
} }
type BatchUuidsProvider struct { type BatchUuidsProvider struct {
Emitter context context.Context
emitter Emitter
IterationDelay time.Duration strategy BatchUuidsProviderStrategy
IterationSize int
onFirstCall sync.Once onFirstCall sync.Once
queue jobsQueue }
func NewBatchUuidsProvider(context context.Context, strategy BatchUuidsProviderStrategy, emitter Emitter) *BatchUuidsProvider {
return &BatchUuidsProvider{
context: context,
emitter: emitter,
strategy: strategy,
}
} }
func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) { func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) {
ctx.onFirstCall.Do(func() { ctx.onFirstCall.Do(ctx.startQueue)
ctx.queue.New()
ctx.startQueue()
})
resultChan := make(chan *jobResult) resultChan := make(chan *jobResult)
ctx.queue.Enqueue(&jobItem{username, resultChan}) ctx.strategy.Queue(&job{username, resultChan})
ctx.Emit("mojang_textures:batch_uuids_provider:queued", username) ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username)
result := <-resultChan result := <-resultChan
return result.profile, result.error return result.Profile, result.Error
} }
func (ctx *BatchUuidsProvider) startQueue() { func (ctx *BatchUuidsProvider) startQueue() {
go func() { go func() {
time.Sleep(ctx.IterationDelay) jobsChan := ctx.strategy.GetJobs(ctx.context)
for forever() { for {
ctx.Emit("mojang_textures:batch_uuids_provider:before_round") select {
ctx.queueRound() case <-ctx.context.Done():
ctx.Emit("mojang_textures:batch_uuids_provider:after_round") return
time.Sleep(ctx.IterationDelay) case iteration := <-jobsChan:
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:before_round") // TODO: where should I move this events?
ctx.performRequest(iteration)
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:after_round")
}
} }
}() }()
} }
func (ctx *BatchUuidsProvider) queueRound() { func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) {
queueSize := ctx.queue.Size() usernames := make([]string, len(iteration.Jobs))
jobs := ctx.queue.Dequeue(ctx.IterationSize) for i, job := range iteration.Jobs {
usernames[i] = job.Username
var usernames []string
for _, job := range jobs {
usernames = append(usernames, job.username)
} }
ctx.Emit("mojang_textures:batch_uuids_provider:round", usernames, queueSize-len(jobs)) ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue)
if len(usernames) == 0 { if len(usernames) == 0 {
return return
} }
profiles, err := usernamesToUuids(usernames) profiles, err := usernamesToUuids(usernames)
ctx.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err)
for _, job := range jobs { for _, job := range iteration.Jobs {
go func(job *jobItem) { response := &jobResult{}
response := &jobResult{} if err == nil {
if err != nil { // The profiles in the response aren't ordered, so we must search each username over full array
response.error = err for _, profile := range profiles {
} else { if strings.EqualFold(job.Username, profile.Name) {
// The profiles in the response aren't ordered, so we must search each username over full array response.Profile = profile
for _, profile := range profiles { break
if strings.EqualFold(job.username, profile.Name) {
response.profile = profile
break
}
} }
} }
} else {
response.Error = err
}
job.respondChan <- response job.RespondChan <- response
}(job) close(job.RespondChan)
} }
} }

View File

@ -1,64 +1,50 @@
package mojangtextures package mojangtextures
import ( import (
"context"
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"strings" "strings"
"testing" "testing"
testify "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/elyby/chrly/api/mojang" "github.com/elyby/chrly/api/mojang"
) )
func TestJobsQueue(t *testing.T) { func TestJobsQueue(t *testing.T) {
createQueue := func() *jobsQueue {
queue := &jobsQueue{}
queue.New()
return queue
}
t.Run("Enqueue", func(t *testing.T) { t.Run("Enqueue", func(t *testing.T) {
assert := testify.New(t) s := newJobsQueue()
require.Equal(t, 1, s.Enqueue(&job{Username: "username1"}))
s := createQueue() require.Equal(t, 2, s.Enqueue(&job{Username: "username2"}))
s.Enqueue(&jobItem{username: "username1"}) require.Equal(t, 3, s.Enqueue(&job{Username: "username3"}))
s.Enqueue(&jobItem{username: "username2"})
s.Enqueue(&jobItem{username: "username3"})
assert.Equal(3, s.Size())
}) })
t.Run("Dequeue", func(t *testing.T) { t.Run("Dequeue", func(t *testing.T) {
assert := testify.New(t) s := newJobsQueue()
s.Enqueue(&job{Username: "username1"})
s.Enqueue(&job{Username: "username2"})
s.Enqueue(&job{Username: "username3"})
s.Enqueue(&job{Username: "username4"})
s.Enqueue(&job{Username: "username5"})
s := createQueue() items, queueLen := s.Dequeue(2)
s.Enqueue(&jobItem{username: "username1"}) require.Len(t, items, 2)
s.Enqueue(&jobItem{username: "username2"}) require.Equal(t, 3, queueLen)
s.Enqueue(&jobItem{username: "username3"}) require.Equal(t, "username1", items[0].Username)
s.Enqueue(&jobItem{username: "username4"}) require.Equal(t, "username2", items[1].Username)
items := s.Dequeue(2) items, queueLen = s.Dequeue(40)
assert.Len(items, 2) require.Len(t, items, 3)
assert.Equal("username1", items[0].username) require.Equal(t, 0, queueLen)
assert.Equal("username2", items[1].username) require.Equal(t, "username3", items[0].Username)
assert.Equal(2, s.Size()) require.Equal(t, "username4", items[1].Username)
require.Equal(t, "username5", items[2].Username)
items = s.Dequeue(40)
assert.Len(items, 2)
assert.Equal("username3", items[0].username)
assert.Equal("username4", items[1].username)
}) })
} }
// This is really stupid test just to get 100% coverage on this package :)
func TestBatchUuidsProvider_forever(t *testing.T) {
testify.True(t, forever())
}
type mojangUsernamesToUuidsRequestMock struct { type mojangUsernamesToUuidsRequestMock struct {
mock.Mock mock.Mock
} }
@ -73,6 +59,24 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string)
return result, args.Error(1) return result, args.Error(1)
} }
type queueStrategyMock struct {
mock.Mock
ch chan *JobsIteration
}
func (m *queueStrategyMock) Queue(job *job) {
m.Called(job)
}
func (m *queueStrategyMock) GetJobs(abort context.Context) <-chan *JobsIteration {
m.Called(abort)
return m.ch
}
func (m *queueStrategyMock) PushIteration(iteration *JobsIteration) {
m.ch <- iteration
}
type batchUuidsProviderGetUuidResult struct { type batchUuidsProviderGetUuidResult struct {
Result *mojang.ProfileInfo Result *mojang.ProfileInfo
Error error Error error
@ -86,25 +90,21 @@ type batchUuidsProviderTestSuite struct {
Emitter *mockEmitter Emitter *mockEmitter
MojangApi *mojangUsernamesToUuidsRequestMock MojangApi *mojangUsernamesToUuidsRequestMock
Iterate func()
done func()
iterateChan chan bool
} }
func (suite *batchUuidsProviderTestSuite) SetupTest() { func (suite *batchUuidsProviderTestSuite) SetupTest() {
suite.Emitter = &mockEmitter{} suite.Emitter = &mockEmitter{}
suite.Provider = &BatchUuidsProvider{ suite.Provider = &BatchUuidsProvider{
Emitter: suite.Emitter, // Emitter: suite.Emitter,
IterationDelay: 0, // IterationDelay: 0,
IterationSize: 10, // IterationSize: 10,
} }
suite.iterateChan = make(chan bool) suite.iterateChan = make(chan bool)
forever = func() bool { // forever = func() bool {
return <-suite.iterateChan // return <-suite.iterateChan
} // }
suite.Iterate = func() { suite.Iterate = func() {
suite.iterateChan <- true suite.iterateChan <- true