diff --git a/CHANGELOG.md b/CHANGELOG.md index 28984b1..4c6ced5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - xxxx-xx-xx +### Added +- [#24](https://github.com/elyby/chrly/issues/24): Implemented a new strategy for the queue in the batch provider of + Mojang UUIDs: `full-bus`. +- A new configuration param `QUEUE_STRATEGY` with the default value `periodic`. + +### Changed +- `ely.skinsystem.{hostname}.app.mojang_textures.usernames.round_time` timer will not be recorded if the iteration was + empty. ## [4.4.1] - 2020-04-24 ### Added diff --git a/README.md b/README.md index 3fc740a..95e7dcc 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,14 @@ docker-compose up -d app Sentry can be used to collect app errors https://public:private@your.sentry.io/1 + + QUEUE_STRATEGY + + Sets the strategy for the queue in the batch provider of Mojang UUIDs. Allowed values are periodic + and full-bus (see #24). + + periodic + QUEUE_LOOP_DELAY diff --git a/di/mojang_textures.go b/di/mojang_textures.go index dee1bec..42f385c 100644 --- a/di/mojang_textures.go +++ b/di/mojang_textures.go @@ -1,6 +1,7 @@ package di import ( + "context" "fmt" "net/url" "time" @@ -18,6 +19,9 @@ var mojangTextures = di.Options( di.Provide(newMojangTexturesProvider), di.Provide(newMojangTexturesUuidsProviderFactory), di.Provide(newMojangTexturesBatchUUIDsProvider), + di.Provide(newMojangTexturesBatchUUIDsProviderStrategyFactory), + di.Provide(newMojangTexturesBatchUUIDsProviderDelayedStrategy), + di.Provide(newMojangTexturesBatchUUIDsProviderFullBusStrategy), di.Provide(newMojangTexturesRemoteUUIDsProvider), di.Provide(newMojangSignedTexturesProvider), di.Provide(newMojangTexturesStorageFactory), @@ -75,7 +79,7 @@ func newMojangTexturesUuidsProviderFactory( func newMojangTexturesBatchUUIDsProvider( container *di.Container, - config *viper.Viper, + strategy mojangtextures.BatchUuidsProviderStrategy, emitter mojangtextures.Emitter, ) (*mojangtextures.BatchUuidsProvider, error) { if err := container.Provide(func(emitter es.Subscriber, config *viper.Viper) *namedHealthChecker { @@ -106,14 +110,56 @@ func newMojangTexturesBatchUUIDsProvider( return nil, err } + return mojangtextures.NewBatchUuidsProvider(context.Background(), strategy, emitter), nil +} + +func newMojangTexturesBatchUUIDsProviderStrategyFactory( + container *di.Container, + config *viper.Viper, +) (mojangtextures.BatchUuidsProviderStrategy, error) { + config.SetDefault("queue.strategy", "periodic") + + strategyName := config.GetString("queue.strategy") + switch strategyName { + case "periodic": + var strategy *mojangtextures.PeriodicStrategy + err := container.Resolve(&strategy) + if err != nil { + return nil, err + } + + return strategy, nil + case "full-bus": + var strategy *mojangtextures.FullBusStrategy + err := container.Resolve(&strategy) + if err != nil { + return nil, err + } + + return strategy, nil + default: + return nil, fmt.Errorf("unknown queue strategy \"%s\"", strategyName) + } +} + +func newMojangTexturesBatchUUIDsProviderDelayedStrategy(config *viper.Viper) *mojangtextures.PeriodicStrategy { config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) config.SetDefault("queue.batch_size", 10) - return &mojangtextures.BatchUuidsProvider{ - Emitter: emitter, - IterationDelay: config.GetDuration("queue.loop_delay"), - IterationSize: config.GetInt("queue.batch_size"), - }, nil + return mojangtextures.NewPeriodicStrategy( + config.GetDuration("queue.loop_delay"), + config.GetInt("queue.batch_size"), + ) +} + +func newMojangTexturesBatchUUIDsProviderFullBusStrategy(config *viper.Viper) *mojangtextures.FullBusStrategy { + config.SetDefault("queue.loop_delay", 2*time.Second+500*time.Millisecond) + config.SetDefault("queue.batch_size", 10) + + return mojangtextures.NewFullBusStrategy( + config.GetDuration("queue.loop_delay"), + config.GetInt("queue.batch_size"), + ) } func newMojangTexturesRemoteUUIDsProvider( diff --git a/eventsubscribers/stats_reporter.go b/eventsubscribers/stats_reporter.go index 849c644..3c535d2 100644 --- a/eventsubscribers/stats_reporter.go +++ b/eventsubscribers/stats_reporter.go @@ -96,12 +96,12 @@ func (s *StatsReporter) ConfigureWithDispatcher(d Subscriber) { d.Subscribe("mojang_textures:batch_uuids_provider:round", func(usernames []string, queueSize int) { s.UpdateGauge("mojang_textures.usernames.iteration_size", int64(len(usernames))) s.UpdateGauge("mojang_textures.usernames.queue_size", int64(queueSize)) + if len(usernames) != 0 { + s.startTimeRecording("batch_uuids_provider_round_time_" + strings.Join(usernames, "|")) + } }) - d.Subscribe("mojang_textures:batch_uuids_provider:before_round", func() { - s.startTimeRecording("batch_uuids_provider_round_time") - }) - d.Subscribe("mojang_textures:batch_uuids_provider:after_round", func() { - s.finalizeTimeRecording("batch_uuids_provider_round_time", "mojang_textures.usernames.round_time") + d.Subscribe("mojang_textures:batch_uuids_provider:result", func(usernames []string, profiles []*mojang.ProfileInfo, err error) { + s.finalizeTimeRecording("batch_uuids_provider_round_time_"+strings.Join(usernames, "|"), "mojang_textures.usernames.round_time") }) } diff --git a/eventsubscribers/stats_reporter_test.go b/eventsubscribers/stats_reporter_test.go index 448bafc..33eb4ed 100644 --- a/eventsubscribers/stats_reporter_test.go +++ b/eventsubscribers/stats_reporter_test.go @@ -337,19 +337,24 @@ var statsReporterTestCases = []*StatsReporterTestCase{ { Events: [][]interface{}{ {"mojang_textures:batch_uuids_provider:round", []string{"username1", "username2"}, 5}, + {"mojang_textures:batch_uuids_provider:result", []string{"username1", "username2"}, []*mojang.ProfileInfo{}, nil}, }, ExpectedCalls: [][]interface{}{ {"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(2)}, {"UpdateGauge", "mojang_textures.usernames.queue_size", int64(5)}, + {"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")}, }, }, { Events: [][]interface{}{ - {"mojang_textures:batch_uuids_provider:before_round"}, - {"mojang_textures:batch_uuids_provider:after_round"}, + {"mojang_textures:batch_uuids_provider:round", []string{}, 0}, + // This event will be not emitted, but we emit it to ensure, that RecordTimer will not be called + {"mojang_textures:batch_uuids_provider:result", []string{}, []*mojang.ProfileInfo{}, nil}, }, ExpectedCalls: [][]interface{}{ - {"RecordTimer", "mojang_textures.usernames.round_time", mock.AnythingOfType("time.Duration")}, + {"UpdateGauge", "mojang_textures.usernames.iteration_size", int64(0)}, + {"UpdateGauge", "mojang_textures.usernames.queue_size", int64(0)}, + // Should not call RecordTimer }, }, } diff --git a/mojangtextures/batch_uuids_provider.go b/mojangtextures/batch_uuids_provider.go index 2cac5b9..d8de3fb 100644 --- a/mojangtextures/batch_uuids_provider.go +++ b/mojangtextures/batch_uuids_provider.go @@ -1,6 +1,7 @@ package mojangtextures import ( + "context" "strings" "sync" "time" @@ -9,131 +10,234 @@ import ( ) type jobResult struct { - profile *mojang.ProfileInfo - error error + Profile *mojang.ProfileInfo + Error error } -type jobItem struct { - username string - respondChan chan *jobResult +type job struct { + Username string + RespondChan chan *jobResult } type jobsQueue struct { lock sync.Mutex - items []*jobItem + items []*job } -func (s *jobsQueue) New() *jobsQueue { - s.items = []*jobItem{} - return s -} - -func (s *jobsQueue) Enqueue(t *jobItem) { - s.lock.Lock() - defer s.lock.Unlock() - - s.items = append(s.items, t) -} - -func (s *jobsQueue) Dequeue(n int) []*jobItem { - s.lock.Lock() - defer s.lock.Unlock() - - if n > s.size() { - n = s.size() +func newJobsQueue() *jobsQueue { + return &jobsQueue{ + items: []*job{}, } - - items := s.items[0:n] - s.items = s.items[n:len(s.items)] - - return items } -func (s *jobsQueue) Size() int { +func (s *jobsQueue) Enqueue(job *job) int { s.lock.Lock() defer s.lock.Unlock() - return s.size() -} + s.items = append(s.items, job) -func (s *jobsQueue) size() int { return len(s.items) } +func (s *jobsQueue) Dequeue(n int) ([]*job, int) { + s.lock.Lock() + defer s.lock.Unlock() + + l := len(s.items) + if n > l { + n = l + } + + items := s.items[0:n] + s.items = s.items[n:l] + + return items, l - n +} + var usernamesToUuids = mojang.UsernamesToUuids -var forever = func() bool { - return true + +type JobsIteration struct { + Jobs []*job + Queue int + c chan struct{} +} + +func (j *JobsIteration) Done() { + if j.c != nil { + close(j.c) + } +} + +type BatchUuidsProviderStrategy interface { + Queue(job *job) + GetJobs(abort context.Context) <-chan *JobsIteration +} + +type PeriodicStrategy struct { + Delay time.Duration + Batch int + queue *jobsQueue + done chan struct{} +} + +func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy { + return &PeriodicStrategy{ + Delay: delay, + Batch: batch, + queue: newJobsQueue(), + } +} + +func (ctx *PeriodicStrategy) Queue(job *job) { + ctx.queue.Enqueue(job) +} + +func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { + ch := make(chan *JobsIteration) + go func() { + for { + select { + case <-abort.Done(): + close(ch) + return + case <-time.After(ctx.Delay): + jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) + jobDoneChan := make(chan struct{}) + ch <- &JobsIteration{jobs, queueLen, jobDoneChan} + <-jobDoneChan + } + } + }() + + return ch +} + +type FullBusStrategy struct { + Delay time.Duration + Batch int + queue *jobsQueue + busIsFull chan bool +} + +func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy { + return &FullBusStrategy{ + Delay: delay, + Batch: batch, + queue: newJobsQueue(), + busIsFull: make(chan bool), + } +} + +func (ctx *FullBusStrategy) Queue(job *job) { + n := ctx.queue.Enqueue(job) + if n % ctx.Batch == 0 { + ctx.busIsFull <- true + } +} + +// Формально, это описание логики водителя маршрутки xD +func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { + ch := make(chan *JobsIteration) + go func() { + for { + t := time.NewTimer(ctx.Delay) + select { + case <-abort.Done(): + close(ch) + return + case <-t.C: + ctx.sendJobs(ch) + case <-ctx.busIsFull: + t.Stop() + ctx.sendJobs(ch) + } + } + }() + + return ch +} + +func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) { + jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) + ch <- &JobsIteration{jobs, queueLen, nil} } type BatchUuidsProvider struct { - Emitter - - IterationDelay time.Duration - IterationSize int - + context context.Context + emitter Emitter + strategy BatchUuidsProviderStrategy onFirstCall sync.Once - queue jobsQueue +} + +func NewBatchUuidsProvider( + context context.Context, + strategy BatchUuidsProviderStrategy, + emitter Emitter, +) *BatchUuidsProvider { + return &BatchUuidsProvider{ + context: context, + emitter: emitter, + strategy: strategy, + } } func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) { - ctx.onFirstCall.Do(func() { - ctx.queue.New() - ctx.startQueue() - }) + ctx.onFirstCall.Do(ctx.startQueue) resultChan := make(chan *jobResult) - ctx.queue.Enqueue(&jobItem{username, resultChan}) - ctx.Emit("mojang_textures:batch_uuids_provider:queued", username) + ctx.strategy.Queue(&job{username, resultChan}) + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username) result := <-resultChan - return result.profile, result.error + return result.Profile, result.Error } func (ctx *BatchUuidsProvider) startQueue() { go func() { - time.Sleep(ctx.IterationDelay) - for forever() { - ctx.Emit("mojang_textures:batch_uuids_provider:before_round") - ctx.queueRound() - ctx.Emit("mojang_textures:batch_uuids_provider:after_round") - time.Sleep(ctx.IterationDelay) + jobsChan := ctx.strategy.GetJobs(ctx.context) + for { + select { + case <-ctx.context.Done(): + return + case iteration := <-jobsChan: + go func() { + ctx.performRequest(iteration) + iteration.Done() + }() + } } }() } -func (ctx *BatchUuidsProvider) queueRound() { - queueSize := ctx.queue.Size() - jobs := ctx.queue.Dequeue(ctx.IterationSize) - - var usernames []string - for _, job := range jobs { - usernames = append(usernames, job.username) +func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) { + usernames := make([]string, len(iteration.Jobs)) + for i, job := range iteration.Jobs { + usernames[i] = job.Username } - ctx.Emit("mojang_textures:batch_uuids_provider:round", usernames, queueSize-len(jobs)) + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue) if len(usernames) == 0 { return } profiles, err := usernamesToUuids(usernames) - ctx.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) - for _, job := range jobs { - go func(job *jobItem) { - response := &jobResult{} - if err != nil { - response.error = err - } else { - // The profiles in the response aren't ordered, so we must search each username over full array - for _, profile := range profiles { - if strings.EqualFold(job.username, profile.Name) { - response.profile = profile - break - } + ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err) + for _, job := range iteration.Jobs { + response := &jobResult{} + if err == nil { + // The profiles in the response aren't ordered, so we must search each username over full array + for _, profile := range profiles { + if strings.EqualFold(job.Username, profile.Name) { + response.Profile = profile + break } } + } else { + response.Error = err + } - job.respondChan <- response - }(job) + job.RespondChan <- response + close(job.RespondChan) } } diff --git a/mojangtextures/batch_uuids_provider_test.go b/mojangtextures/batch_uuids_provider_test.go index 55e785a..1165a1c 100644 --- a/mojangtextures/batch_uuids_provider_test.go +++ b/mojangtextures/batch_uuids_provider_test.go @@ -1,64 +1,50 @@ package mojangtextures import ( - "crypto/rand" - "encoding/base64" - "strings" + "context" + "strconv" + "sync" "testing" + "time" - testify "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/elyby/chrly/api/mojang" ) func TestJobsQueue(t *testing.T) { - createQueue := func() *jobsQueue { - queue := &jobsQueue{} - queue.New() - - return queue - } - t.Run("Enqueue", func(t *testing.T) { - assert := testify.New(t) - - s := createQueue() - s.Enqueue(&jobItem{username: "username1"}) - s.Enqueue(&jobItem{username: "username2"}) - s.Enqueue(&jobItem{username: "username3"}) - - assert.Equal(3, s.Size()) + s := newJobsQueue() + require.Equal(t, 1, s.Enqueue(&job{Username: "username1"})) + require.Equal(t, 2, s.Enqueue(&job{Username: "username2"})) + require.Equal(t, 3, s.Enqueue(&job{Username: "username3"})) }) t.Run("Dequeue", func(t *testing.T) { - assert := testify.New(t) + s := newJobsQueue() + s.Enqueue(&job{Username: "username1"}) + s.Enqueue(&job{Username: "username2"}) + s.Enqueue(&job{Username: "username3"}) + s.Enqueue(&job{Username: "username4"}) + s.Enqueue(&job{Username: "username5"}) - s := createQueue() - s.Enqueue(&jobItem{username: "username1"}) - s.Enqueue(&jobItem{username: "username2"}) - s.Enqueue(&jobItem{username: "username3"}) - s.Enqueue(&jobItem{username: "username4"}) + items, queueLen := s.Dequeue(2) + require.Len(t, items, 2) + require.Equal(t, 3, queueLen) + require.Equal(t, "username1", items[0].Username) + require.Equal(t, "username2", items[1].Username) - items := s.Dequeue(2) - assert.Len(items, 2) - assert.Equal("username1", items[0].username) - assert.Equal("username2", items[1].username) - assert.Equal(2, s.Size()) - - items = s.Dequeue(40) - assert.Len(items, 2) - assert.Equal("username3", items[0].username) - assert.Equal("username4", items[1].username) + items, queueLen = s.Dequeue(40) + require.Len(t, items, 3) + require.Equal(t, 0, queueLen) + require.Equal(t, "username3", items[0].Username) + require.Equal(t, "username4", items[1].Username) + require.Equal(t, "username5", items[2].Username) }) } -// This is really stupid test just to get 100% coverage on this package :) -func TestBatchUuidsProvider_forever(t *testing.T) { - testify.True(t, forever()) -} - type mojangUsernamesToUuidsRequestMock struct { mock.Mock } @@ -73,6 +59,37 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string) return result, args.Error(1) } +type manualStrategy struct { + ch chan *JobsIteration + once sync.Once + lock sync.Mutex + jobs []*job +} + +func (m *manualStrategy) Queue(job *job) { + m.lock.Lock() + m.jobs = append(m.jobs, job) + m.lock.Unlock() +} + +func (m *manualStrategy) GetJobs(_ context.Context) <-chan *JobsIteration { + m.lock.Lock() + defer m.lock.Unlock() + m.ch = make(chan *JobsIteration) + + return m.ch +} + +func (m *manualStrategy) Iterate(countJobsToReturn int, countLeftJobsInQueue int) { + m.lock.Lock() + defer m.lock.Unlock() + + m.ch <- &JobsIteration{ + Jobs: m.jobs[0:countJobsToReturn], + Queue: countLeftJobsInQueue, + } +} + type batchUuidsProviderGetUuidResult struct { Result *mojang.ProfileInfo Error error @@ -81,49 +98,36 @@ type batchUuidsProviderGetUuidResult struct { type batchUuidsProviderTestSuite struct { suite.Suite - Provider *BatchUuidsProvider - GetUuidAsync func(username string) chan *batchUuidsProviderGetUuidResult + Provider *BatchUuidsProvider Emitter *mockEmitter + Strategy *manualStrategy MojangApi *mojangUsernamesToUuidsRequestMock - Iterate func() - done func() - iterateChan chan bool + GetUuidAsync func(username string) <- chan *batchUuidsProviderGetUuidResult + stop context.CancelFunc } func (suite *batchUuidsProviderTestSuite) SetupTest() { suite.Emitter = &mockEmitter{} + suite.Strategy = &manualStrategy{} + ctx, stop := context.WithCancel(context.Background()) + suite.stop = stop + suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} + usernamesToUuids = suite.MojangApi.UsernamesToUuids - suite.Provider = &BatchUuidsProvider{ - Emitter: suite.Emitter, - IterationDelay: 0, - IterationSize: 10, - } + suite.Provider = NewBatchUuidsProvider(ctx, suite.Strategy, suite.Emitter) - suite.iterateChan = make(chan bool) - forever = func() bool { - return <-suite.iterateChan - } - - suite.Iterate = func() { - suite.iterateChan <- true - } - - suite.done = func() { - suite.iterateChan <- false - } - - suite.GetUuidAsync = func(username string) chan *batchUuidsProviderGetUuidResult { - s := make(chan bool) + suite.GetUuidAsync = func(username string) <- chan *batchUuidsProviderGetUuidResult { + s := make(chan struct{}) // This dirty hack ensures, that the username will be queued before we return control to the caller. - // It's needed to keep expected calls order and prevent cases when iteration happens before all usernames - // will be queued. + // It's needed to keep expected calls order and prevent cases when iteration happens before + // all usernames will be queued. suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:queued", username, ).Once().Run(func(args mock.Arguments) { - s <- true + close(s) }) c := make(chan *batchUuidsProviderGetUuidResult) @@ -139,13 +143,10 @@ func (suite *batchUuidsProviderTestSuite) SetupTest() { return c } - - suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} - usernamesToUuids = suite.MojangApi.UsernamesToUuids } func (suite *batchUuidsProviderTestSuite) TearDownTest() { - suite.done() + suite.stop() suite.Emitter.AssertExpectations(suite.T()) suite.MojangApi.AssertExpectations(suite.T()) } @@ -154,37 +155,14 @@ func TestBatchUuidsProvider(t *testing.T) { suite.Run(t, new(batchUuidsProviderTestSuite)) } -func (suite *batchUuidsProviderTestSuite) TestGetUuidForOneUsername() { - expectedUsernames := []string{"username"} - expectedResult := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username"} - expectedResponse := []*mojang.ProfileInfo{expectedResult} - - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() - - suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{expectedResult}, nil) - - resultChan := suite.GetUuidAsync("username") - - suite.Iterate() - - result := <-resultChan - suite.Assert().Equal(expectedResult, result.Result) - suite.Assert().Nil(result.Error) -} - -func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { +func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernames() { expectedUsernames := []string{"username1", "username2"} expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"} expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"} expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2} - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{ expectedResult1, @@ -194,7 +172,7 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { resultChan1 := suite.GetUuidAsync("username1") resultChan2 := suite.GetUuidAsync("username2") - suite.Iterate() + suite.Strategy.Iterate(2, 0) result1 := <-resultChan1 suite.Assert().Equal(expectedResult1, result1.Result) @@ -205,78 +183,40 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { suite.Assert().Nil(result2.Error) } -func (suite *batchUuidsProviderTestSuite) TestGetUuidForMoreThan10Usernames() { - usernames := make([]string, 12) - for i := 0; i < cap(usernames); i++ { - usernames[i] = randStr(8) - } +func (suite *batchUuidsProviderTestSuite) TestShouldNotSendRequestWhenNoJobsAreReturned() { + //noinspection GoPreferNilSlice + emptyUsernames := []string{} + done := make(chan struct{}) + suite.Emitter.On("Emit", + "mojang_textures:batch_uuids_provider:round", + emptyUsernames, + 1, + ).Once().Run(func(args mock.Arguments) { + close(done) + }) - // In this test we're not testing response, so always return an empty resultset - expectedResponse := []*mojang.ProfileInfo{} + _ = suite.GetUuidAsync("username") // Schedule one username to run the queue - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Twice() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[0:10], 2).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[0:10], expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[10:12], 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[10:12], expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Twice() - - suite.MojangApi.On("UsernamesToUuids", usernames[0:10]).Once().Return(expectedResponse, nil) - suite.MojangApi.On("UsernamesToUuids", usernames[10:12]).Once().Return(expectedResponse, nil) - - channels := make([]chan *batchUuidsProviderGetUuidResult, len(usernames)) - for i, username := range usernames { - channels[i] = suite.GetUuidAsync(username) - } - - suite.Iterate() - suite.Iterate() - - for _, channel := range channels { - <-channel - } + suite.Strategy.Iterate(0, 1) // Return no jobs and indicate that there is one job in queue + <- done } -func (suite *batchUuidsProviderTestSuite) TestDoNothingWhenNoTasks() { - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Times(3) - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", []string{"username"}, 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", []string{"username"}, mock.Anything, nil).Once() - var nilStringSlice []string - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", nilStringSlice, 0).Twice() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Times(3) - - suite.MojangApi.On("UsernamesToUuids", []string{"username"}).Once().Return([]*mojang.ProfileInfo{}, nil) - - // Perform first iteration and await it finishes - resultChan := suite.GetUuidAsync("username") - - suite.Iterate() - - result := <-resultChan - suite.Assert().Nil(result.Result) - suite.Assert().Nil(result.Error) - - // Let it to perform a few more iterations to ensure, that there are no calls to external APIs - suite.Iterate() - suite.Iterate() -} - -func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError() { +// Test written for multiple usernames to ensure that the error +// will be returned for each iteration group +func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernamesWithAnError() { expectedUsernames := []string{"username1", "username2"} expectedError := &mojang.TooManyRequestsError{} var nilProfilesResponse []*mojang.ProfileInfo - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError) resultChan1 := suite.GetUuidAsync("username1") resultChan2 := suite.GetUuidAsync("username2") - suite.Iterate() + suite.Strategy.Iterate(2, 0) result1 := <-resultChan1 suite.Assert().Nil(result1.Result) @@ -287,14 +227,213 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError( suite.Assert().Equal(expectedError, result2.Error) } -var replacer = strings.NewReplacer("-", "_", "=", "") +func TestPeriodicStrategy(t *testing.T) { + t.Run("should return first job only after duration", func(t *testing.T) { + d := 20 * time.Millisecond + strategy := NewPeriodicStrategy(d, 10) + j := &job{} + strategy.Queue(j) -// https://stackoverflow.com/a/50581165 -func randStr(len int) string { - buff := make([]byte, len) - _, _ = rand.Read(buff) - str := replacer.Replace(base64.URLEncoding.EncodeToString(buff)) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + startedAt := time.Now() + iteration := <-ch + durationBeforeResult := time.Now().Sub(startedAt) + require.True(t, durationBeforeResult >= d) + require.True(t, durationBeforeResult < d * 2) - // Base 64 can be longer than len - return str[:len] + require.Equal(t, []*job{j}, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + + cancel() + }) + + t.Run("should return the configured batch size", func(t *testing.T) { + strategy := NewPeriodicStrategy(0, 10) + jobs := make([]*job, 15) + for i := 0; i < 15; i++ { + jobs[i] = &job{Username: strconv.Itoa(i)} + strategy.Queue(jobs[i]) + } + + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + iteration := <-ch + require.Len(t, iteration.Jobs, 10) + require.Equal(t, jobs[0:10], iteration.Jobs) + require.Equal(t, 5, iteration.Queue) + + cancel() + }) + + t.Run("should not return the next iteration until the previous one is finished", func(t *testing.T) { + strategy := NewPeriodicStrategy(0, 10) + strategy.Queue(&job{}) + + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + iteration := <-ch + require.Len(t, iteration.Jobs, 1) + require.Equal(t, 0, iteration.Queue) + + time.Sleep(time.Millisecond) // Let strategy's internal loop to work (if the implementation is broken) + + select { + case <-ch: + require.Fail(t, "the previous iteration isn't marked as done") + default: + // ok + } + + iteration.Done() + + time.Sleep(time.Millisecond) // Let strategy's internal loop to work + + select { + case iteration = <-ch: + // ok + default: + require.Fail(t, "iteration should be provided") + } + + require.Empty(t, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + iteration.Done() + + cancel() + }) + + t.Run("each iteration should be returned only after the configured duration", func(t *testing.T) { + d := 5 * time.Millisecond + strategy := NewPeriodicStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + for i := 0; i < 3; i++ { + startedAt := time.Now() + iteration := <-ch + durationBeforeResult := time.Now().Sub(startedAt) + require.True(t, durationBeforeResult >= d) + require.True(t, durationBeforeResult < d * 2) + + require.Empty(t, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + + // Sleep for at least doubled duration before calling Done() to check, + // that this duration isn't included into the next iteration time + time.Sleep(d * 2) + iteration.Done() + } + + cancel() + }) +} + + +func TestFullBusStrategy(t *testing.T) { + t.Run("should provide iteration immediately when the batch size exceeded", func(t *testing.T) { + jobs := make([]*job, 10) + for i := 0; i < 10; i++ { + jobs[i] = &job{} + } + + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + select { + case iteration := <-ch: + require.Len(t, iteration.Jobs, 10) + require.Equal(t, 0, iteration.Queue) + case <-time.After(d): + require.Fail(t, "iteration should be provided immediately") + } + }() + + for _, j := range jobs { + strategy.Queue(j) + } + + <-done + + cancel() + }) + + t.Run("should provide iteration after duration if batch size isn't exceeded", func(t *testing.T) { + jobs := make([]*job, 9) + for i := 0; i < 9; i++ { + jobs[i] = &job{} + } + + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + startedAt := time.Now() + iteration := <-ch + duration := time.Now().Sub(startedAt) + require.True(t, duration >= d) + require.True(t, duration < d * 2) + require.Equal(t, jobs, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + }() + + for _, j := range jobs { + strategy.Queue(j) + } + + <-done + + cancel() + }) + + t.Run("should provide iteration as soon as the bus is full, without waiting for the previous iteration to finish", func(t *testing.T) { + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < 3; i++ { + time.Sleep(5 * time.Millisecond) // See comment below + select { + case iteration := <- ch: + require.Len(t, iteration.Jobs, 10) + // Don't assert iteration.Queue length since it might be unstable + // Don't call iteration.Done() + case <-time.After(d): + t.Fatalf("iteration should be provided as soon as the bus is full") + } + } + + // Scheduled 31 tasks. 3 iterations should be performed immediately + // and should be executed only after timeout. The timeout above is used + // to increase overall time to ensure, that timer resets on every iteration + + startedAt := time.Now() + iteration := <-ch + duration := time.Now().Sub(startedAt) + require.True(t, duration >= d) + require.True(t, duration < d * 2) + require.Len(t, iteration.Jobs, 1) + require.Equal(t, 0, iteration.Queue) + }() + + for i := 0; i < 31; i++ { + strategy.Queue(&job{}) + } + + <-done + + cancel() + }) }