mirror of
https://github.com/elyby/chrly.git
synced 2024-11-17 02:32:59 +05:30
Merge pull request #25 from elyby/24_batch_uuids_provider_strategies
FullBus stategy
This commit is contained in:
commit
d8f6786c69
@ -5,6 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
|||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
## [Unreleased] - xxxx-xx-xx
|
## [Unreleased] - xxxx-xx-xx
|
||||||
|
### Added
|
||||||
|
- [#24](https://github.com/elyby/chrly/issues/24): Implemented a new strategy for the queue in the batch provider of
|
||||||
|
Mojang UUIDs: `full-bus`.
|
||||||
|
- A new configuration param `QUEUE_STRATEGY` with the default value `periodic`.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- `ely.skinsystem.{hostname}.app.mojang_textures.usernames.round_time` timer will not be recorded if the iteration was
|
||||||
|
empty.
|
||||||
|
|
||||||
## [4.4.1] - 2020-04-24
|
## [4.4.1] - 2020-04-24
|
||||||
### Added
|
### Added
|
||||||
|
@ -97,6 +97,14 @@ docker-compose up -d app
|
|||||||
<td>Sentry can be used to collect app errors</td>
|
<td>Sentry can be used to collect app errors</td>
|
||||||
<td><code>https://public:private@your.sentry.io/1</code></td>
|
<td><code>https://public:private@your.sentry.io/1</code></td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>QUEUE_STRATEGY</td>
|
||||||
|
<td>
|
||||||
|
Sets the strategy for the queue in the batch provider of Mojang UUIDs. Allowed values are <code>periodic</code>
|
||||||
|
and <code>full-bus</code> (see <a href="https://github.com/elyby/chrly/issues/24">#24</a>).
|
||||||
|
</td>
|
||||||
|
<td><code>periodic</code></td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>QUEUE_LOOP_DELAY</td>
|
<td>QUEUE_LOOP_DELAY</td>
|
||||||
<td>
|
<td>
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package di
|
package di
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
@ -18,6 +19,9 @@ var mojangTextures = di.Options(
|
|||||||
di.Provide(newMojangTexturesProvider),
|
di.Provide(newMojangTexturesProvider),
|
||||||
di.Provide(newMojangTexturesUuidsProviderFactory),
|
di.Provide(newMojangTexturesUuidsProviderFactory),
|
||||||
di.Provide(newMojangTexturesBatchUUIDsProvider),
|
di.Provide(newMojangTexturesBatchUUIDsProvider),
|
||||||
|
di.Provide(newMojangTexturesBatchUUIDsProviderStrategyFactory),
|
||||||
|
di.Provide(newMojangTexturesBatchUUIDsProviderDelayedStrategy),
|
||||||
|
di.Provide(newMojangTexturesBatchUUIDsProviderFullBusStrategy),
|
||||||
di.Provide(newMojangTexturesRemoteUUIDsProvider),
|
di.Provide(newMojangTexturesRemoteUUIDsProvider),
|
||||||
di.Provide(newMojangSignedTexturesProvider),
|
di.Provide(newMojangSignedTexturesProvider),
|
||||||
di.Provide(newMojangTexturesStorageFactory),
|
di.Provide(newMojangTexturesStorageFactory),
|
||||||
@ -75,7 +79,7 @@ func newMojangTexturesUuidsProviderFactory(
|
|||||||
|
|
||||||
func newMojangTexturesBatchUUIDsProvider(
|
func newMojangTexturesBatchUUIDsProvider(
|
||||||
container *di.Container,
|
container *di.Container,
|
||||||
config *viper.Viper,
|
strategy mojangtextures.BatchUuidsProviderStrategy,
|
||||||
emitter mojangtextures.Emitter,
|
emitter mojangtextures.Emitter,
|
||||||
) (*mojangtextures.BatchUuidsProvider, error) {
|
) (*mojangtextures.BatchUuidsProvider, error) {
|
||||||
if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker {
|
if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker {
|
||||||
@ -106,14 +110,56 @@ func newMojangTexturesBatchUUIDsProvider(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return mojangtextures.NewBatchUuidsProvider(context.Background(), strategy, emitter), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMojangTexturesBatchUUIDsProviderStrategyFactory(
|
||||||
|
container *di.Container,
|
||||||
|
config *viper.Viper,
|
||||||
|
) (mojangtextures.BatchUuidsProviderStrategy, error) {
|
||||||
|
config.SetDefault("queue.strategy", "periodic")
|
||||||
|
|
||||||
|
strategyName := config.GetString("queue.strategy")
|
||||||
|
switch strategyName {
|
||||||
|
case "periodic":
|
||||||
|
var strategy *mojangtextures.PeriodicStrategy
|
||||||
|
err := container.Resolve(&strategy)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return strategy, nil
|
||||||
|
case "full-bus":
|
||||||
|
var strategy *mojangtextures.FullBusStrategy
|
||||||
|
err := container.Resolve(&strategy)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return strategy, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown queue strategy \"%s\"", strategyName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMojangTexturesBatchUUIDsProviderDelayedStrategy(config *viper.Viper) *mojangtextures.PeriodicStrategy {
|
||||||
config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond)
|
config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond)
|
||||||
config.SetDefault("queue.batch_size", 10)
|
config.SetDefault("queue.batch_size", 10)
|
||||||
|
|
||||||
return &mojangtextures.BatchUuidsProvider{
|
return mojangtextures.NewPeriodicStrategy(
|
||||||
Emitter: emitter,
|
config.GetDuration("queue.loop_delay"),
|
||||||
IterationDelay: config.GetDuration("queue.loop_delay"),
|
config.GetInt("queue.batch_size"),
|
||||||
IterationSize: config.GetInt("queue.batch_size"),
|
)
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
func newMojangTexturesBatchUUIDsProviderFullBusStrategy(config *viper.Viper) *mojangtextures.FullBusStrategy {
|
||||||
|
config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond)
|
||||||
|
config.SetDefault("queue.batch_size", 10)
|
||||||
|
|
||||||
|
return mojangtextures.NewFullBusStrategy(
|
||||||
|
config.GetDuration("queue.loop_delay"),
|
||||||
|
config.GetInt("queue.batch_size"),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMojangTexturesRemoteUUIDsProvider(
|
func newMojangTexturesRemoteUUIDsProvider(
|
||||||
|
@ -96,12 +96,12 @@ func (s *StatsReporter) ConfigureWithDispatcher(d Subscriber) {
|
|||||||
d.Subscribe("mojang_textures:batch_uuids_provider:round", func(usernames []string, queueSize int) {
|
d.Subscribe("mojang_textures:batch_uuids_provider:round", func(usernames []string, queueSize int) {
|
||||||
s.UpdateGauge("mojang_textures.usernames.iteration_size", int64(len(usernames)))
|
s.UpdateGauge("mojang_textures.usernames.iteration_size", int64(len(usernames)))
|
||||||
s.UpdateGauge("mojang_textures.usernames.queue_size", int64(queueSize))
|
s.UpdateGauge("mojang_textures.usernames.queue_size", int64(queueSize))
|
||||||
|
if len(usernames) != 0 {
|
||||||
|
s.startTimeRecording("batch_uuids_provider_round_time_" + strings.Join(usernames, "|"))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
d.Subscribe("mojang_textures:batch_uuids_provider:before_round", func() {
|
d.Subscribe("mojang_textures:batch_uuids_provider:result", func(usernames []string, profiles []*mojang.ProfileInfo, err error) {
|
||||||
s.startTimeRecording("batch_uuids_provider_round_time")
|
s.finalizeTimeRecording("batch_uuids_provider_round_time_"+strings.Join(usernames, "|"), "mojang_textures.usernames.round_time")
|
||||||
})
|
|
||||||
d.Subscribe("mojang_textures:batch_uuids_provider:after_round", func() {
|
|
||||||
s.finalizeTimeRecording("batch_uuids_provider_round_time", "mojang_textures.usernames.round_time")
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,19 +337,24 @@ var statsReporterTestCases = []*StatsReporterTestCase{
|
|||||||
{
|
{
|
||||||
Events: [][]interface{}{
|
Events: [][]interface{}{
|
||||||
{"mojang_textures:batch_uuids_provider:round", []string{"username1", "username2"}, 5},
|
{"mojang_textures:batch_uuids_provider:round", []string{"username1", "username2"}, 5},
|
||||||
|
{"mojang_textures:batch_uuids_provider:result", []string{"username1", "username2"}, []*mojang.ProfileInfo{}, nil},
|
||||||
},
|
},
|
||||||
ExpectedCalls: [][]interface{}{
|
ExpectedCalls: [][]interface{}{
|
||||||
{"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(2)},
|
{"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(2)},
|
||||||
{"UpdateGauge", "mojang_textures.usernames.queue_size", int64(5)},
|
{"UpdateGauge", "mojang_textures.usernames.queue_size", int64(5)},
|
||||||
|
{"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Events: [][]interface{}{
|
Events: [][]interface{}{
|
||||||
{"mojang_textures:batch_uuids_provider:before_round"},
|
{"mojang_textures:batch_uuids_provider:round", []string{}, 0},
|
||||||
{"mojang_textures:batch_uuids_provider:after_round"},
|
// This event will be not emitted, but we emit it to ensure, that RecordTimer will not be called
|
||||||
|
{"mojang_textures:batch_uuids_provider:result", []string{}, []*mojang.ProfileInfo{}, nil},
|
||||||
},
|
},
|
||||||
ExpectedCalls: [][]interface{}{
|
ExpectedCalls: [][]interface{}{
|
||||||
{"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")},
|
{"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(0)},
|
||||||
|
{"UpdateGauge", "mojang_textures.usernames.queue_size", int64(0)},
|
||||||
|
// Should not call RecordTimer
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package mojangtextures
|
package mojangtextures
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -9,131 +10,234 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type jobResult struct {
|
type jobResult struct {
|
||||||
profile *mojang.ProfileInfo
|
Profile *mojang.ProfileInfo
|
||||||
error error
|
Error error
|
||||||
}
|
}
|
||||||
|
|
||||||
type jobItem struct {
|
type job struct {
|
||||||
username string
|
Username string
|
||||||
respondChan chan *jobResult
|
RespondChan chan *jobResult
|
||||||
}
|
}
|
||||||
|
|
||||||
type jobsQueue struct {
|
type jobsQueue struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
items []*jobItem
|
items []*job
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *jobsQueue) New() *jobsQueue {
|
func newJobsQueue() *jobsQueue {
|
||||||
s.items = []*jobItem{}
|
return &jobsQueue{
|
||||||
return s
|
items: []*job{},
|
||||||
}
|
|
||||||
|
|
||||||
func (s *jobsQueue) Enqueue(t *jobItem) {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
s.items = append(s.items, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *jobsQueue) Dequeue(n int) []*jobItem {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
if n > s.size() {
|
|
||||||
n = s.size()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
items := s.items[0:n]
|
|
||||||
s.items = s.items[n:len(s.items)]
|
|
||||||
|
|
||||||
return items
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *jobsQueue) Size() int {
|
func (s *jobsQueue) Enqueue(job *job) int {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
return s.size()
|
s.items = append(s.items, job)
|
||||||
}
|
|
||||||
|
|
||||||
func (s *jobsQueue) size() int {
|
|
||||||
return len(s.items)
|
return len(s.items)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *jobsQueue) Dequeue(n int) ([]*job, int) {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
l := len(s.items)
|
||||||
|
if n > l {
|
||||||
|
n = l
|
||||||
|
}
|
||||||
|
|
||||||
|
items := s.items[0:n]
|
||||||
|
s.items = s.items[n:l]
|
||||||
|
|
||||||
|
return items, l - n
|
||||||
|
}
|
||||||
|
|
||||||
var usernamesToUuids = mojang.UsernamesToUuids
|
var usernamesToUuids = mojang.UsernamesToUuids
|
||||||
var forever = func() bool {
|
|
||||||
return true
|
type JobsIteration struct {
|
||||||
|
Jobs []*job
|
||||||
|
Queue int
|
||||||
|
c chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JobsIteration) Done() {
|
||||||
|
if j.c != nil {
|
||||||
|
close(j.c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type BatchUuidsProviderStrategy interface {
|
||||||
|
Queue(job *job)
|
||||||
|
GetJobs(abort context.Context) <-chan *JobsIteration
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeriodicStrategy struct {
|
||||||
|
Delay time.Duration
|
||||||
|
Batch int
|
||||||
|
queue *jobsQueue
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy {
|
||||||
|
return &PeriodicStrategy{
|
||||||
|
Delay: delay,
|
||||||
|
Batch: batch,
|
||||||
|
queue: newJobsQueue(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *PeriodicStrategy) Queue(job *job) {
|
||||||
|
ctx.queue.Enqueue(job)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
|
||||||
|
ch := make(chan *JobsIteration)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-abort.Done():
|
||||||
|
close(ch)
|
||||||
|
return
|
||||||
|
case <-time.After(ctx.Delay):
|
||||||
|
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
|
||||||
|
jobDoneChan := make(chan struct{})
|
||||||
|
ch <- &JobsIteration{jobs, queueLen, jobDoneChan}
|
||||||
|
<-jobDoneChan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
type FullBusStrategy struct {
|
||||||
|
Delay time.Duration
|
||||||
|
Batch int
|
||||||
|
queue *jobsQueue
|
||||||
|
busIsFull chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy {
|
||||||
|
return &FullBusStrategy{
|
||||||
|
Delay: delay,
|
||||||
|
Batch: batch,
|
||||||
|
queue: newJobsQueue(),
|
||||||
|
busIsFull: make(chan bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *FullBusStrategy) Queue(job *job) {
|
||||||
|
n := ctx.queue.Enqueue(job)
|
||||||
|
if n % ctx.Batch == 0 {
|
||||||
|
ctx.busIsFull <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Формально, это описание логики водителя маршрутки xD
|
||||||
|
func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
|
||||||
|
ch := make(chan *JobsIteration)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
t := time.NewTimer(ctx.Delay)
|
||||||
|
select {
|
||||||
|
case <-abort.Done():
|
||||||
|
close(ch)
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
ctx.sendJobs(ch)
|
||||||
|
case <-ctx.busIsFull:
|
||||||
|
t.Stop()
|
||||||
|
ctx.sendJobs(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) {
|
||||||
|
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
|
||||||
|
ch <- &JobsIteration{jobs, queueLen, nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
type BatchUuidsProvider struct {
|
type BatchUuidsProvider struct {
|
||||||
Emitter
|
context context.Context
|
||||||
|
emitter Emitter
|
||||||
IterationDelay time.Duration
|
strategy BatchUuidsProviderStrategy
|
||||||
IterationSize int
|
|
||||||
|
|
||||||
onFirstCall sync.Once
|
onFirstCall sync.Once
|
||||||
queue jobsQueue
|
}
|
||||||
|
|
||||||
|
func NewBatchUuidsProvider(
|
||||||
|
context context.Context,
|
||||||
|
strategy BatchUuidsProviderStrategy,
|
||||||
|
emitter Emitter,
|
||||||
|
) *BatchUuidsProvider {
|
||||||
|
return &BatchUuidsProvider{
|
||||||
|
context: context,
|
||||||
|
emitter: emitter,
|
||||||
|
strategy: strategy,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) {
|
func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) {
|
||||||
ctx.onFirstCall.Do(func() {
|
ctx.onFirstCall.Do(ctx.startQueue)
|
||||||
ctx.queue.New()
|
|
||||||
ctx.startQueue()
|
|
||||||
})
|
|
||||||
|
|
||||||
resultChan := make(chan *jobResult)
|
resultChan := make(chan *jobResult)
|
||||||
ctx.queue.Enqueue(&jobItem{username, resultChan})
|
ctx.strategy.Queue(&job{username, resultChan})
|
||||||
ctx.Emit("mojang_textures:batch_uuids_provider:queued", username)
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username)
|
||||||
|
|
||||||
result := <-resultChan
|
result := <-resultChan
|
||||||
|
|
||||||
return result.profile, result.error
|
return result.Profile, result.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *BatchUuidsProvider) startQueue() {
|
func (ctx *BatchUuidsProvider) startQueue() {
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(ctx.IterationDelay)
|
jobsChan := ctx.strategy.GetJobs(ctx.context)
|
||||||
for forever() {
|
for {
|
||||||
ctx.Emit("mojang_textures:batch_uuids_provider:before_round")
|
select {
|
||||||
ctx.queueRound()
|
case <-ctx.context.Done():
|
||||||
ctx.Emit("mojang_textures:batch_uuids_provider:after_round")
|
return
|
||||||
time.Sleep(ctx.IterationDelay)
|
case iteration := <-jobsChan:
|
||||||
|
go func() {
|
||||||
|
ctx.performRequest(iteration)
|
||||||
|
iteration.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *BatchUuidsProvider) queueRound() {
|
func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) {
|
||||||
queueSize := ctx.queue.Size()
|
usernames := make([]string, len(iteration.Jobs))
|
||||||
jobs := ctx.queue.Dequeue(ctx.IterationSize)
|
for i, job := range iteration.Jobs {
|
||||||
|
usernames[i] = job.Username
|
||||||
var usernames []string
|
|
||||||
for _, job := range jobs {
|
|
||||||
usernames = append(usernames, job.username)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.Emit("mojang_textures:batch_uuids_provider:round", usernames, queueSize-len(jobs))
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue)
|
||||||
if len(usernames) == 0 {
|
if len(usernames) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
profiles, err := usernamesToUuids(usernames)
|
profiles, err := usernamesToUuids(usernames)
|
||||||
ctx.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err)
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err)
|
||||||
for _, job := range jobs {
|
for _, job := range iteration.Jobs {
|
||||||
go func(job *jobItem) {
|
|
||||||
response := &jobResult{}
|
response := &jobResult{}
|
||||||
if err != nil {
|
if err == nil {
|
||||||
response.error = err
|
|
||||||
} else {
|
|
||||||
// The profiles in the response aren't ordered, so we must search each username over full array
|
// The profiles in the response aren't ordered, so we must search each username over full array
|
||||||
for _, profile := range profiles {
|
for _, profile := range profiles {
|
||||||
if strings.EqualFold(job.username, profile.Name) {
|
if strings.EqualFold(job.Username, profile.Name) {
|
||||||
response.profile = profile
|
response.Profile = profile
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
response.Error = err
|
||||||
}
|
}
|
||||||
|
|
||||||
job.respondChan <- response
|
job.RespondChan <- response
|
||||||
}(job)
|
close(job.RespondChan)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,64 +1,50 @@
|
|||||||
package mojangtextures
|
package mojangtextures
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"context"
|
||||||
"encoding/base64"
|
"strconv"
|
||||||
"strings"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
testify "github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
"github.com/elyby/chrly/api/mojang"
|
"github.com/elyby/chrly/api/mojang"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestJobsQueue(t *testing.T) {
|
func TestJobsQueue(t *testing.T) {
|
||||||
createQueue := func() *jobsQueue {
|
|
||||||
queue := &jobsQueue{}
|
|
||||||
queue.New()
|
|
||||||
|
|
||||||
return queue
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("Enqueue", func(t *testing.T) {
|
t.Run("Enqueue", func(t *testing.T) {
|
||||||
assert := testify.New(t)
|
s := newJobsQueue()
|
||||||
|
require.Equal(t, 1, s.Enqueue(&job{Username: "username1"}))
|
||||||
s := createQueue()
|
require.Equal(t, 2, s.Enqueue(&job{Username: "username2"}))
|
||||||
s.Enqueue(&jobItem{username: "username1"})
|
require.Equal(t, 3, s.Enqueue(&job{Username: "username3"}))
|
||||||
s.Enqueue(&jobItem{username: "username2"})
|
|
||||||
s.Enqueue(&jobItem{username: "username3"})
|
|
||||||
|
|
||||||
assert.Equal(3, s.Size())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Dequeue", func(t *testing.T) {
|
t.Run("Dequeue", func(t *testing.T) {
|
||||||
assert := testify.New(t)
|
s := newJobsQueue()
|
||||||
|
s.Enqueue(&job{Username: "username1"})
|
||||||
|
s.Enqueue(&job{Username: "username2"})
|
||||||
|
s.Enqueue(&job{Username: "username3"})
|
||||||
|
s.Enqueue(&job{Username: "username4"})
|
||||||
|
s.Enqueue(&job{Username: "username5"})
|
||||||
|
|
||||||
s := createQueue()
|
items, queueLen := s.Dequeue(2)
|
||||||
s.Enqueue(&jobItem{username: "username1"})
|
require.Len(t, items, 2)
|
||||||
s.Enqueue(&jobItem{username: "username2"})
|
require.Equal(t, 3, queueLen)
|
||||||
s.Enqueue(&jobItem{username: "username3"})
|
require.Equal(t, "username1", items[0].Username)
|
||||||
s.Enqueue(&jobItem{username: "username4"})
|
require.Equal(t, "username2", items[1].Username)
|
||||||
|
|
||||||
items := s.Dequeue(2)
|
items, queueLen = s.Dequeue(40)
|
||||||
assert.Len(items, 2)
|
require.Len(t, items, 3)
|
||||||
assert.Equal("username1", items[0].username)
|
require.Equal(t, 0, queueLen)
|
||||||
assert.Equal("username2", items[1].username)
|
require.Equal(t, "username3", items[0].Username)
|
||||||
assert.Equal(2, s.Size())
|
require.Equal(t, "username4", items[1].Username)
|
||||||
|
require.Equal(t, "username5", items[2].Username)
|
||||||
items = s.Dequeue(40)
|
|
||||||
assert.Len(items, 2)
|
|
||||||
assert.Equal("username3", items[0].username)
|
|
||||||
assert.Equal("username4", items[1].username)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is really stupid test just to get 100% coverage on this package :)
|
|
||||||
func TestBatchUuidsProvider_forever(t *testing.T) {
|
|
||||||
testify.True(t, forever())
|
|
||||||
}
|
|
||||||
|
|
||||||
type mojangUsernamesToUuidsRequestMock struct {
|
type mojangUsernamesToUuidsRequestMock struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
@ -73,6 +59,37 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string)
|
|||||||
return result, args.Error(1)
|
return result, args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type manualStrategy struct {
|
||||||
|
ch chan *JobsIteration
|
||||||
|
once sync.Once
|
||||||
|
lock sync.Mutex
|
||||||
|
jobs []*job
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manualStrategy) Queue(job *job) {
|
||||||
|
m.lock.Lock()
|
||||||
|
m.jobs = append(m.jobs, job)
|
||||||
|
m.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manualStrategy) GetJobs(_ context.Context) <-chan *JobsIteration {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
m.ch = make(chan *JobsIteration)
|
||||||
|
|
||||||
|
return m.ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manualStrategy) Iterate(countJobsToReturn int, countLeftJobsInQueue int) {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
m.ch <- &JobsIteration{
|
||||||
|
Jobs: m.jobs[0:countJobsToReturn],
|
||||||
|
Queue: countLeftJobsInQueue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type batchUuidsProviderGetUuidResult struct {
|
type batchUuidsProviderGetUuidResult struct {
|
||||||
Result *mojang.ProfileInfo
|
Result *mojang.ProfileInfo
|
||||||
Error error
|
Error error
|
||||||
@ -82,48 +99,35 @@ type batchUuidsProviderTestSuite struct {
|
|||||||
suite.Suite
|
suite.Suite
|
||||||
|
|
||||||
Provider *BatchUuidsProvider
|
Provider *BatchUuidsProvider
|
||||||
GetUuidAsync func(username string) chan *batchUuidsProviderGetUuidResult
|
|
||||||
|
|
||||||
Emitter *mockEmitter
|
Emitter *mockEmitter
|
||||||
|
Strategy *manualStrategy
|
||||||
MojangApi *mojangUsernamesToUuidsRequestMock
|
MojangApi *mojangUsernamesToUuidsRequestMock
|
||||||
|
|
||||||
Iterate func()
|
GetUuidAsync func(username string) <- chan *batchUuidsProviderGetUuidResult
|
||||||
done func()
|
stop context.CancelFunc
|
||||||
iterateChan chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) SetupTest() {
|
func (suite *batchUuidsProviderTestSuite) SetupTest() {
|
||||||
suite.Emitter = &mockEmitter{}
|
suite.Emitter = &mockEmitter{}
|
||||||
|
suite.Strategy = &manualStrategy{}
|
||||||
|
ctx, stop := context.WithCancel(context.Background())
|
||||||
|
suite.stop = stop
|
||||||
|
suite.MojangApi = &mojangUsernamesToUuidsRequestMock{}
|
||||||
|
usernamesToUuids = suite.MojangApi.UsernamesToUuids
|
||||||
|
|
||||||
suite.Provider = &BatchUuidsProvider{
|
suite.Provider = NewBatchUuidsProvider(ctx, suite.Strategy, suite.Emitter)
|
||||||
Emitter: suite.Emitter,
|
|
||||||
IterationDelay: 0,
|
|
||||||
IterationSize: 10,
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.iterateChan = make(chan bool)
|
suite.GetUuidAsync = func(username string) <- chan *batchUuidsProviderGetUuidResult {
|
||||||
forever = func() bool {
|
s := make(chan struct{})
|
||||||
return <-suite.iterateChan
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.Iterate = func() {
|
|
||||||
suite.iterateChan <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.done = func() {
|
|
||||||
suite.iterateChan <- false
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.GetUuidAsync = func(username string) chan *batchUuidsProviderGetUuidResult {
|
|
||||||
s := make(chan bool)
|
|
||||||
// This dirty hack ensures, that the username will be queued before we return control to the caller.
|
// This dirty hack ensures, that the username will be queued before we return control to the caller.
|
||||||
// It's needed to keep expected calls order and prevent cases when iteration happens before all usernames
|
// It's needed to keep expected calls order and prevent cases when iteration happens before
|
||||||
// will be queued.
|
// all usernames will be queued.
|
||||||
suite.Emitter.On("Emit",
|
suite.Emitter.On("Emit",
|
||||||
"mojang_textures:batch_uuids_provider:queued",
|
"mojang_textures:batch_uuids_provider:queued",
|
||||||
username,
|
username,
|
||||||
).Once().Run(func(args mock.Arguments) {
|
).Once().Run(func(args mock.Arguments) {
|
||||||
s <- true
|
close(s)
|
||||||
})
|
})
|
||||||
|
|
||||||
c := make(chan *batchUuidsProviderGetUuidResult)
|
c := make(chan *batchUuidsProviderGetUuidResult)
|
||||||
@ -139,13 +143,10 @@ func (suite *batchUuidsProviderTestSuite) SetupTest() {
|
|||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
suite.MojangApi = &mojangUsernamesToUuidsRequestMock{}
|
|
||||||
usernamesToUuids = suite.MojangApi.UsernamesToUuids
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TearDownTest() {
|
func (suite *batchUuidsProviderTestSuite) TearDownTest() {
|
||||||
suite.done()
|
suite.stop()
|
||||||
suite.Emitter.AssertExpectations(suite.T())
|
suite.Emitter.AssertExpectations(suite.T())
|
||||||
suite.MojangApi.AssertExpectations(suite.T())
|
suite.MojangApi.AssertExpectations(suite.T())
|
||||||
}
|
}
|
||||||
@ -154,37 +155,14 @@ func TestBatchUuidsProvider(t *testing.T) {
|
|||||||
suite.Run(t, new(batchUuidsProviderTestSuite))
|
suite.Run(t, new(batchUuidsProviderTestSuite))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TestGetUuidForOneUsername() {
|
func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernames() {
|
||||||
expectedUsernames := []string{"username"}
|
|
||||||
expectedResult := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username"}
|
|
||||||
expectedResponse := []*mojang.ProfileInfo{expectedResult}
|
|
||||||
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
|
|
||||||
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{expectedResult}, nil)
|
|
||||||
|
|
||||||
resultChan := suite.GetUuidAsync("username")
|
|
||||||
|
|
||||||
suite.Iterate()
|
|
||||||
|
|
||||||
result := <-resultChan
|
|
||||||
suite.Assert().Equal(expectedResult, result.Result)
|
|
||||||
suite.Assert().Nil(result.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
|
|
||||||
expectedUsernames := []string{"username1", "username2"}
|
expectedUsernames := []string{"username1", "username2"}
|
||||||
expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"}
|
expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"}
|
||||||
expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"}
|
expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"}
|
||||||
expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2}
|
expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2}
|
||||||
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
|
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once()
|
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once()
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
|
|
||||||
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{
|
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{
|
||||||
expectedResult1,
|
expectedResult1,
|
||||||
@ -194,7 +172,7 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
|
|||||||
resultChan1 := suite.GetUuidAsync("username1")
|
resultChan1 := suite.GetUuidAsync("username1")
|
||||||
resultChan2 := suite.GetUuidAsync("username2")
|
resultChan2 := suite.GetUuidAsync("username2")
|
||||||
|
|
||||||
suite.Iterate()
|
suite.Strategy.Iterate(2, 0)
|
||||||
|
|
||||||
result1 := <-resultChan1
|
result1 := <-resultChan1
|
||||||
suite.Assert().Equal(expectedResult1, result1.Result)
|
suite.Assert().Equal(expectedResult1, result1.Result)
|
||||||
@ -205,78 +183,40 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
|
|||||||
suite.Assert().Nil(result2.Error)
|
suite.Assert().Nil(result2.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TestGetUuidForMoreThan10Usernames() {
|
func (suite *batchUuidsProviderTestSuite) TestShouldNotSendRequestWhenNoJobsAreReturned() {
|
||||||
usernames := make([]string, 12)
|
//noinspection GoPreferNilSlice
|
||||||
for i := 0; i < cap(usernames); i++ {
|
emptyUsernames := []string{}
|
||||||
usernames[i] = randStr(8)
|
done := make(chan struct{})
|
||||||
}
|
suite.Emitter.On("Emit",
|
||||||
|
"mojang_textures:batch_uuids_provider:round",
|
||||||
|
emptyUsernames,
|
||||||
|
1,
|
||||||
|
).Once().Run(func(args mock.Arguments) {
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
// In this test we're not testing response, so always return an empty resultset
|
_ = suite.GetUuidAsync("username") // Schedule one username to run the queue
|
||||||
expectedResponse := []*mojang.ProfileInfo{}
|
|
||||||
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Twice()
|
suite.Strategy.Iterate(0, 1) // Return no jobs and indicate that there is one job in queue
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[0:10], 2).Once()
|
<- done
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[0:10], expectedResponse, nil).Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[10:12], 0).Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[10:12], expectedResponse, nil).Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Twice()
|
|
||||||
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", usernames[0:10]).Once().Return(expectedResponse, nil)
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", usernames[10:12]).Once().Return(expectedResponse, nil)
|
|
||||||
|
|
||||||
channels := make([]chan *batchUuidsProviderGetUuidResult, len(usernames))
|
|
||||||
for i, username := range usernames {
|
|
||||||
channels[i] = suite.GetUuidAsync(username)
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.Iterate()
|
|
||||||
suite.Iterate()
|
|
||||||
|
|
||||||
for _, channel := range channels {
|
|
||||||
<-channel
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TestDoNothingWhenNoTasks() {
|
// Test written for multiple usernames to ensure that the error
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Times(3)
|
// will be returned for each iteration group
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", []string{"username"}, 0).Once()
|
func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernamesWithAnError() {
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", []string{"username"}, mock.Anything, nil).Once()
|
|
||||||
var nilStringSlice []string
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", nilStringSlice, 0).Twice()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Times(3)
|
|
||||||
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", []string{"username"}).Once().Return([]*mojang.ProfileInfo{}, nil)
|
|
||||||
|
|
||||||
// Perform first iteration and await it finishes
|
|
||||||
resultChan := suite.GetUuidAsync("username")
|
|
||||||
|
|
||||||
suite.Iterate()
|
|
||||||
|
|
||||||
result := <-resultChan
|
|
||||||
suite.Assert().Nil(result.Result)
|
|
||||||
suite.Assert().Nil(result.Error)
|
|
||||||
|
|
||||||
// Let it to perform a few more iterations to ensure, that there are no calls to external APIs
|
|
||||||
suite.Iterate()
|
|
||||||
suite.Iterate()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError() {
|
|
||||||
expectedUsernames := []string{"username1", "username2"}
|
expectedUsernames := []string{"username1", "username2"}
|
||||||
expectedError := &mojang.TooManyRequestsError{}
|
expectedError := &mojang.TooManyRequestsError{}
|
||||||
var nilProfilesResponse []*mojang.ProfileInfo
|
var nilProfilesResponse []*mojang.ProfileInfo
|
||||||
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
|
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
|
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once()
|
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once()
|
||||||
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
|
|
||||||
|
|
||||||
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError)
|
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError)
|
||||||
|
|
||||||
resultChan1 := suite.GetUuidAsync("username1")
|
resultChan1 := suite.GetUuidAsync("username1")
|
||||||
resultChan2 := suite.GetUuidAsync("username2")
|
resultChan2 := suite.GetUuidAsync("username2")
|
||||||
|
|
||||||
suite.Iterate()
|
suite.Strategy.Iterate(2, 0)
|
||||||
|
|
||||||
result1 := <-resultChan1
|
result1 := <-resultChan1
|
||||||
suite.Assert().Nil(result1.Result)
|
suite.Assert().Nil(result1.Result)
|
||||||
@ -287,14 +227,213 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError(
|
|||||||
suite.Assert().Equal(expectedError, result2.Error)
|
suite.Assert().Equal(expectedError, result2.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var replacer = strings.NewReplacer("-", "_", "=", "")
|
func TestPeriodicStrategy(t *testing.T) {
|
||||||
|
t.Run("should return first job only after duration", func(t *testing.T) {
|
||||||
|
d := 20 * time.Millisecond
|
||||||
|
strategy := NewPeriodicStrategy(d, 10)
|
||||||
|
j := &job{}
|
||||||
|
strategy.Queue(j)
|
||||||
|
|
||||||
// https://stackoverflow.com/a/50581165
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
func randStr(len int) string {
|
ch := strategy.GetJobs(ctx)
|
||||||
buff := make([]byte, len)
|
startedAt := time.Now()
|
||||||
_, _ = rand.Read(buff)
|
iteration := <-ch
|
||||||
str := replacer.Replace(base64.URLEncoding.EncodeToString(buff))
|
durationBeforeResult := time.Now().Sub(startedAt)
|
||||||
|
require.True(t, durationBeforeResult >= d)
|
||||||
|
require.True(t, durationBeforeResult < d * 2)
|
||||||
|
|
||||||
// Base 64 can be longer than len
|
require.Equal(t, []*job{j}, iteration.Jobs)
|
||||||
return str[:len]
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should return the configured batch size", func(t *testing.T) {
|
||||||
|
strategy := NewPeriodicStrategy(0, 10)
|
||||||
|
jobs := make([]*job, 15)
|
||||||
|
for i := 0; i < 15; i++ {
|
||||||
|
jobs[i] = &job{Username: strconv.Itoa(i)}
|
||||||
|
strategy.Queue(jobs[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
iteration := <-ch
|
||||||
|
require.Len(t, iteration.Jobs, 10)
|
||||||
|
require.Equal(t, jobs[0:10], iteration.Jobs)
|
||||||
|
require.Equal(t, 5, iteration.Queue)
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should not return the next iteration until the previous one is finished", func(t *testing.T) {
|
||||||
|
strategy := NewPeriodicStrategy(0, 10)
|
||||||
|
strategy.Queue(&job{})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
iteration := <-ch
|
||||||
|
require.Len(t, iteration.Jobs, 1)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond) // Let strategy's internal loop to work (if the implementation is broken)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
require.Fail(t, "the previous iteration isn't marked as done")
|
||||||
|
default:
|
||||||
|
// ok
|
||||||
|
}
|
||||||
|
|
||||||
|
iteration.Done()
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond) // Let strategy's internal loop to work
|
||||||
|
|
||||||
|
select {
|
||||||
|
case iteration = <-ch:
|
||||||
|
// ok
|
||||||
|
default:
|
||||||
|
require.Fail(t, "iteration should be provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Empty(t, iteration.Jobs)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
iteration.Done()
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("each iteration should be returned only after the configured duration", func(t *testing.T) {
|
||||||
|
d := 5 * time.Millisecond
|
||||||
|
strategy := NewPeriodicStrategy(d, 10)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
startedAt := time.Now()
|
||||||
|
iteration := <-ch
|
||||||
|
durationBeforeResult := time.Now().Sub(startedAt)
|
||||||
|
require.True(t, durationBeforeResult >= d)
|
||||||
|
require.True(t, durationBeforeResult < d * 2)
|
||||||
|
|
||||||
|
require.Empty(t, iteration.Jobs)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
|
||||||
|
// Sleep for at least doubled duration before calling Done() to check,
|
||||||
|
// that this duration isn't included into the next iteration time
|
||||||
|
time.Sleep(d * 2)
|
||||||
|
iteration.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func TestFullBusStrategy(t *testing.T) {
|
||||||
|
t.Run("should provide iteration immediately when the batch size exceeded", func(t *testing.T) {
|
||||||
|
jobs := make([]*job, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
jobs[i] = &job{}
|
||||||
|
}
|
||||||
|
|
||||||
|
d := 20 * time.Millisecond
|
||||||
|
strategy := NewFullBusStrategy(d, 10)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
select {
|
||||||
|
case iteration := <-ch:
|
||||||
|
require.Len(t, iteration.Jobs, 10)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
case <-time.After(d):
|
||||||
|
require.Fail(t, "iteration should be provided immediately")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, j := range jobs {
|
||||||
|
strategy.Queue(j)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-done
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should provide iteration after duration if batch size isn't exceeded", func(t *testing.T) {
|
||||||
|
jobs := make([]*job, 9)
|
||||||
|
for i := 0; i < 9; i++ {
|
||||||
|
jobs[i] = &job{}
|
||||||
|
}
|
||||||
|
|
||||||
|
d := 20 * time.Millisecond
|
||||||
|
strategy := NewFullBusStrategy(d, 10)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
startedAt := time.Now()
|
||||||
|
iteration := <-ch
|
||||||
|
duration := time.Now().Sub(startedAt)
|
||||||
|
require.True(t, duration >= d)
|
||||||
|
require.True(t, duration < d * 2)
|
||||||
|
require.Equal(t, jobs, iteration.Jobs)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, j := range jobs {
|
||||||
|
strategy.Queue(j)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-done
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should provide iteration as soon as the bus is full, without waiting for the previous iteration to finish", func(t *testing.T) {
|
||||||
|
d := 20 * time.Millisecond
|
||||||
|
strategy := NewFullBusStrategy(d, 10)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ch := strategy.GetJobs(ctx)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
time.Sleep(5 * time.Millisecond) // See comment below
|
||||||
|
select {
|
||||||
|
case iteration := <- ch:
|
||||||
|
require.Len(t, iteration.Jobs, 10)
|
||||||
|
// Don't assert iteration.Queue length since it might be unstable
|
||||||
|
// Don't call iteration.Done()
|
||||||
|
case <-time.After(d):
|
||||||
|
t.Fatalf("iteration should be provided as soon as the bus is full")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduled 31 tasks. 3 iterations should be performed immediately
|
||||||
|
// and should be executed only after timeout. The timeout above is used
|
||||||
|
// to increase overall time to ensure, that timer resets on every iteration
|
||||||
|
|
||||||
|
startedAt := time.Now()
|
||||||
|
iteration := <-ch
|
||||||
|
duration := time.Now().Sub(startedAt)
|
||||||
|
require.True(t, duration >= d)
|
||||||
|
require.True(t, duration < d * 2)
|
||||||
|
require.Len(t, iteration.Jobs, 1)
|
||||||
|
require.Equal(t, 0, iteration.Queue)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := 0; i < 31; i++ {
|
||||||
|
strategy.Queue(&job{})
|
||||||
|
}
|
||||||
|
|
||||||
|
<-done
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user