diff --git a/mojangtextures/batch_uuids_provider.go b/mojangtextures/batch_uuids_provider.go index cb99a20..d8de3fb 100644 --- a/mojangtextures/batch_uuids_provider.go +++ b/mojangtextures/batch_uuids_provider.go @@ -59,6 +59,13 @@ var usernamesToUuids = mojang.UsernamesToUuids type JobsIteration struct { Jobs []*job Queue int + c chan struct{} +} + +func (j *JobsIteration) Done() { + if j.c != nil { + close(j.c) + } } type BatchUuidsProviderStrategy interface { @@ -70,6 +77,7 @@ type PeriodicStrategy struct { Delay time.Duration Batch int queue *jobsQueue + done chan struct{} } func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy { @@ -90,10 +98,13 @@ func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteratio for { select { case <-abort.Done(): + close(ch) return case <-time.After(ctx.Delay): jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) - ch <- &JobsIteration{jobs, queueLen} + jobDoneChan := make(chan struct{}) + ch <- &JobsIteration{jobs, queueLen, jobDoneChan} + <-jobDoneChan } } }() @@ -102,28 +113,29 @@ func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteratio } type FullBusStrategy struct { - Delay time.Duration - Batch int - queue *jobsQueue - ready chan bool + Delay time.Duration + Batch int + queue *jobsQueue + busIsFull chan bool } func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy { return &FullBusStrategy{ - Delay: delay, - Batch: batch, - queue: newJobsQueue(), - ready: make(chan bool), + Delay: delay, + Batch: batch, + queue: newJobsQueue(), + busIsFull: make(chan bool), } } func (ctx *FullBusStrategy) Queue(job *job) { n := ctx.queue.Enqueue(job) - if n == ctx.Batch { - ctx.ready <- true + if n % ctx.Batch == 0 { + ctx.busIsFull <- true } } +// Формально, это описание логики водителя маршрутки xD func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration { ch := make(chan *JobsIteration) go func() { @@ -131,10 +143,11 @@ func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration t := time.NewTimer(ctx.Delay) select { case <-abort.Done(): + close(ch) return case <-t.C: ctx.sendJobs(ch) - case <-ctx.ready: + case <-ctx.busIsFull: t.Stop() ctx.sendJobs(ch) } @@ -146,7 +159,7 @@ func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) { jobs, queueLen := ctx.queue.Dequeue(ctx.Batch) - ch <- &JobsIteration{jobs, queueLen} // TODO: should not wait for iteration result + ch <- &JobsIteration{jobs, queueLen, nil} } type BatchUuidsProvider struct { @@ -156,7 +169,11 @@ type BatchUuidsProvider struct { onFirstCall sync.Once } -func NewBatchUuidsProvider(context context.Context, strategy BatchUuidsProviderStrategy, emitter Emitter) *BatchUuidsProvider { +func NewBatchUuidsProvider( + context context.Context, + strategy BatchUuidsProviderStrategy, + emitter Emitter, +) *BatchUuidsProvider { return &BatchUuidsProvider{ context: context, emitter: emitter, @@ -184,9 +201,10 @@ func (ctx *BatchUuidsProvider) startQueue() { case <-ctx.context.Done(): return case iteration := <-jobsChan: - ctx.emitter.Emit("mojang_textures:batch_uuids_provider:before_round") // TODO: where should I move this events? - ctx.performRequest(iteration) - ctx.emitter.Emit("mojang_textures:batch_uuids_provider:after_round") + go func() { + ctx.performRequest(iteration) + iteration.Done() + }() } } }() diff --git a/mojangtextures/batch_uuids_provider_test.go b/mojangtextures/batch_uuids_provider_test.go index 820aa8b..1165a1c 100644 --- a/mojangtextures/batch_uuids_provider_test.go +++ b/mojangtextures/batch_uuids_provider_test.go @@ -2,10 +2,10 @@ package mojangtextures import ( "context" - "crypto/rand" - "encoding/base64" - "strings" + "strconv" + "sync" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -59,22 +59,35 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string) return result, args.Error(1) } -type queueStrategyMock struct { - mock.Mock - ch chan *JobsIteration +type manualStrategy struct { + ch chan *JobsIteration + once sync.Once + lock sync.Mutex + jobs []*job } -func (m *queueStrategyMock) Queue(job *job) { - m.Called(job) +func (m *manualStrategy) Queue(job *job) { + m.lock.Lock() + m.jobs = append(m.jobs, job) + m.lock.Unlock() } -func (m *queueStrategyMock) GetJobs(abort context.Context) <-chan *JobsIteration { - m.Called(abort) +func (m *manualStrategy) GetJobs(_ context.Context) <-chan *JobsIteration { + m.lock.Lock() + defer m.lock.Unlock() + m.ch = make(chan *JobsIteration) + return m.ch } -func (m *queueStrategyMock) PushIteration(iteration *JobsIteration) { - m.ch <- iteration +func (m *manualStrategy) Iterate(countJobsToReturn int, countLeftJobsInQueue int) { + m.lock.Lock() + defer m.lock.Unlock() + + m.ch <- &JobsIteration{ + Jobs: m.jobs[0:countJobsToReturn], + Queue: countLeftJobsInQueue, + } } type batchUuidsProviderGetUuidResult struct { @@ -85,45 +98,36 @@ type batchUuidsProviderGetUuidResult struct { type batchUuidsProviderTestSuite struct { suite.Suite - Provider *BatchUuidsProvider - GetUuidAsync func(username string) chan *batchUuidsProviderGetUuidResult + Provider *BatchUuidsProvider Emitter *mockEmitter + Strategy *manualStrategy MojangApi *mojangUsernamesToUuidsRequestMock + + GetUuidAsync func(username string) <- chan *batchUuidsProviderGetUuidResult + stop context.CancelFunc } func (suite *batchUuidsProviderTestSuite) SetupTest() { suite.Emitter = &mockEmitter{} + suite.Strategy = &manualStrategy{} + ctx, stop := context.WithCancel(context.Background()) + suite.stop = stop + suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} + usernamesToUuids = suite.MojangApi.UsernamesToUuids - suite.Provider = &BatchUuidsProvider{ - // Emitter: suite.Emitter, - // IterationDelay: 0, - // IterationSize: 10, - } + suite.Provider = NewBatchUuidsProvider(ctx, suite.Strategy, suite.Emitter) - suite.iterateChan = make(chan bool) - // forever = func() bool { - // return <-suite.iterateChan - // } - - suite.Iterate = func() { - suite.iterateChan <- true - } - - suite.done = func() { - suite.iterateChan <- false - } - - suite.GetUuidAsync = func(username string) chan *batchUuidsProviderGetUuidResult { - s := make(chan bool) + suite.GetUuidAsync = func(username string) <- chan *batchUuidsProviderGetUuidResult { + s := make(chan struct{}) // This dirty hack ensures, that the username will be queued before we return control to the caller. - // It's needed to keep expected calls order and prevent cases when iteration happens before all usernames - // will be queued. + // It's needed to keep expected calls order and prevent cases when iteration happens before + // all usernames will be queued. suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:queued", username, ).Once().Run(func(args mock.Arguments) { - s <- true + close(s) }) c := make(chan *batchUuidsProviderGetUuidResult) @@ -139,13 +143,10 @@ func (suite *batchUuidsProviderTestSuite) SetupTest() { return c } - - suite.MojangApi = &mojangUsernamesToUuidsRequestMock{} - usernamesToUuids = suite.MojangApi.UsernamesToUuids } func (suite *batchUuidsProviderTestSuite) TearDownTest() { - suite.done() + suite.stop() suite.Emitter.AssertExpectations(suite.T()) suite.MojangApi.AssertExpectations(suite.T()) } @@ -154,37 +155,14 @@ func TestBatchUuidsProvider(t *testing.T) { suite.Run(t, new(batchUuidsProviderTestSuite)) } -func (suite *batchUuidsProviderTestSuite) TestGetUuidForOneUsername() { - expectedUsernames := []string{"username"} - expectedResult := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username"} - expectedResponse := []*mojang.ProfileInfo{expectedResult} - - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() - - suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{expectedResult}, nil) - - resultChan := suite.GetUuidAsync("username") - - suite.Iterate() - - result := <-resultChan - suite.Assert().Equal(expectedResult, result.Result) - suite.Assert().Nil(result.Error) -} - -func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { +func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernames() { expectedUsernames := []string{"username1", "username2"} expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"} expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"} expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2} - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{ expectedResult1, @@ -194,7 +172,7 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { resultChan1 := suite.GetUuidAsync("username1") resultChan2 := suite.GetUuidAsync("username2") - suite.Iterate() + suite.Strategy.Iterate(2, 0) result1 := <-resultChan1 suite.Assert().Equal(expectedResult1, result1.Result) @@ -205,78 +183,40 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() { suite.Assert().Nil(result2.Error) } -func (suite *batchUuidsProviderTestSuite) TestGetUuidForMoreThan10Usernames() { - usernames := make([]string, 12) - for i := 0; i < cap(usernames); i++ { - usernames[i] = randStr(8) - } +func (suite *batchUuidsProviderTestSuite) TestShouldNotSendRequestWhenNoJobsAreReturned() { + //noinspection GoPreferNilSlice + emptyUsernames := []string{} + done := make(chan struct{}) + suite.Emitter.On("Emit", + "mojang_textures:batch_uuids_provider:round", + emptyUsernames, + 1, + ).Once().Run(func(args mock.Arguments) { + close(done) + }) - // In this test we're not testing response, so always return an empty resultset - expectedResponse := []*mojang.ProfileInfo{} + _ = suite.GetUuidAsync("username") // Schedule one username to run the queue - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Twice() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[0:10], 2).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[0:10], expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[10:12], 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[10:12], expectedResponse, nil).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Twice() - - suite.MojangApi.On("UsernamesToUuids", usernames[0:10]).Once().Return(expectedResponse, nil) - suite.MojangApi.On("UsernamesToUuids", usernames[10:12]).Once().Return(expectedResponse, nil) - - channels := make([]chan *batchUuidsProviderGetUuidResult, len(usernames)) - for i, username := range usernames { - channels[i] = suite.GetUuidAsync(username) - } - - suite.Iterate() - suite.Iterate() - - for _, channel := range channels { - <-channel - } + suite.Strategy.Iterate(0, 1) // Return no jobs and indicate that there is one job in queue + <- done } -func (suite *batchUuidsProviderTestSuite) TestDoNothingWhenNoTasks() { - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Times(3) - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", []string{"username"}, 0).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", []string{"username"}, mock.Anything, nil).Once() - var nilStringSlice []string - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", nilStringSlice, 0).Twice() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Times(3) - - suite.MojangApi.On("UsernamesToUuids", []string{"username"}).Once().Return([]*mojang.ProfileInfo{}, nil) - - // Perform first iteration and await it finishes - resultChan := suite.GetUuidAsync("username") - - suite.Iterate() - - result := <-resultChan - suite.Assert().Nil(result.Result) - suite.Assert().Nil(result.Error) - - // Let it to perform a few more iterations to ensure, that there are no calls to external APIs - suite.Iterate() - suite.Iterate() -} - -func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError() { +// Test written for multiple usernames to ensure that the error +// will be returned for each iteration group +func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernamesWithAnError() { expectedUsernames := []string{"username1", "username2"} expectedError := &mojang.TooManyRequestsError{} var nilProfilesResponse []*mojang.ProfileInfo - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once() suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once() - suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once() suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError) resultChan1 := suite.GetUuidAsync("username1") resultChan2 := suite.GetUuidAsync("username2") - suite.Iterate() + suite.Strategy.Iterate(2, 0) result1 := <-resultChan1 suite.Assert().Nil(result1.Result) @@ -287,14 +227,213 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError( suite.Assert().Equal(expectedError, result2.Error) } -var replacer = strings.NewReplacer("-", "_", "=", "") +func TestPeriodicStrategy(t *testing.T) { + t.Run("should return first job only after duration", func(t *testing.T) { + d := 20 * time.Millisecond + strategy := NewPeriodicStrategy(d, 10) + j := &job{} + strategy.Queue(j) -// https://stackoverflow.com/a/50581165 -func randStr(len int) string { - buff := make([]byte, len) - _, _ = rand.Read(buff) - str := replacer.Replace(base64.URLEncoding.EncodeToString(buff)) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + startedAt := time.Now() + iteration := <-ch + durationBeforeResult := time.Now().Sub(startedAt) + require.True(t, durationBeforeResult >= d) + require.True(t, durationBeforeResult < d * 2) - // Base 64 can be longer than len - return str[:len] + require.Equal(t, []*job{j}, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + + cancel() + }) + + t.Run("should return the configured batch size", func(t *testing.T) { + strategy := NewPeriodicStrategy(0, 10) + jobs := make([]*job, 15) + for i := 0; i < 15; i++ { + jobs[i] = &job{Username: strconv.Itoa(i)} + strategy.Queue(jobs[i]) + } + + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + iteration := <-ch + require.Len(t, iteration.Jobs, 10) + require.Equal(t, jobs[0:10], iteration.Jobs) + require.Equal(t, 5, iteration.Queue) + + cancel() + }) + + t.Run("should not return the next iteration until the previous one is finished", func(t *testing.T) { + strategy := NewPeriodicStrategy(0, 10) + strategy.Queue(&job{}) + + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + iteration := <-ch + require.Len(t, iteration.Jobs, 1) + require.Equal(t, 0, iteration.Queue) + + time.Sleep(time.Millisecond) // Let strategy's internal loop to work (if the implementation is broken) + + select { + case <-ch: + require.Fail(t, "the previous iteration isn't marked as done") + default: + // ok + } + + iteration.Done() + + time.Sleep(time.Millisecond) // Let strategy's internal loop to work + + select { + case iteration = <-ch: + // ok + default: + require.Fail(t, "iteration should be provided") + } + + require.Empty(t, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + iteration.Done() + + cancel() + }) + + t.Run("each iteration should be returned only after the configured duration", func(t *testing.T) { + d := 5 * time.Millisecond + strategy := NewPeriodicStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + for i := 0; i < 3; i++ { + startedAt := time.Now() + iteration := <-ch + durationBeforeResult := time.Now().Sub(startedAt) + require.True(t, durationBeforeResult >= d) + require.True(t, durationBeforeResult < d * 2) + + require.Empty(t, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + + // Sleep for at least doubled duration before calling Done() to check, + // that this duration isn't included into the next iteration time + time.Sleep(d * 2) + iteration.Done() + } + + cancel() + }) +} + + +func TestFullBusStrategy(t *testing.T) { + t.Run("should provide iteration immediately when the batch size exceeded", func(t *testing.T) { + jobs := make([]*job, 10) + for i := 0; i < 10; i++ { + jobs[i] = &job{} + } + + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + select { + case iteration := <-ch: + require.Len(t, iteration.Jobs, 10) + require.Equal(t, 0, iteration.Queue) + case <-time.After(d): + require.Fail(t, "iteration should be provided immediately") + } + }() + + for _, j := range jobs { + strategy.Queue(j) + } + + <-done + + cancel() + }) + + t.Run("should provide iteration after duration if batch size isn't exceeded", func(t *testing.T) { + jobs := make([]*job, 9) + for i := 0; i < 9; i++ { + jobs[i] = &job{} + } + + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + startedAt := time.Now() + iteration := <-ch + duration := time.Now().Sub(startedAt) + require.True(t, duration >= d) + require.True(t, duration < d * 2) + require.Equal(t, jobs, iteration.Jobs) + require.Equal(t, 0, iteration.Queue) + }() + + for _, j := range jobs { + strategy.Queue(j) + } + + <-done + + cancel() + }) + + t.Run("should provide iteration as soon as the bus is full, without waiting for the previous iteration to finish", func(t *testing.T) { + d := 20 * time.Millisecond + strategy := NewFullBusStrategy(d, 10) + ctx, cancel := context.WithCancel(context.Background()) + ch := strategy.GetJobs(ctx) + + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < 3; i++ { + time.Sleep(5 * time.Millisecond) // See comment below + select { + case iteration := <- ch: + require.Len(t, iteration.Jobs, 10) + // Don't assert iteration.Queue length since it might be unstable + // Don't call iteration.Done() + case <-time.After(d): + t.Fatalf("iteration should be provided as soon as the bus is full") + } + } + + // Scheduled 31 tasks. 3 iterations should be performed immediately + // and should be executed only after timeout. The timeout above is used + // to increase overall time to ensure, that timer resets on every iteration + + startedAt := time.Now() + iteration := <-ch + duration := time.Now().Sub(startedAt) + require.True(t, duration >= d) + require.True(t, duration < d * 2) + require.Len(t, iteration.Jobs, 1) + require.Equal(t, 0, iteration.Queue) + }() + + for i := 0; i < 31; i++ { + strategy.Queue(&job{}) + } + + <-done + + cancel() + }) }