Fixed strategies implementations, added tests

This commit is contained in:
ErickSkrauch 2020-04-26 03:48:23 +03:00
parent 29b6bc89b3
commit 1b9e943c0e
No known key found for this signature in database
GPG Key ID: 669339FCBB30EE0E
2 changed files with 304 additions and 147 deletions

View File

@ -59,6 +59,13 @@ var usernamesToUuids = mojang.UsernamesToUuids
type JobsIteration struct {
Jobs []*job
Queue int
c chan struct{}
}
func (j *JobsIteration) Done() {
if j.c != nil {
close(j.c)
}
}
type BatchUuidsProviderStrategy interface {
@ -70,6 +77,7 @@ type PeriodicStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
done chan struct{}
}
func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy {
@ -90,10 +98,13 @@ func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteratio
for {
select {
case <-abort.Done():
close(ch)
return
case <-time.After(ctx.Delay):
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
ch <- &JobsIteration{jobs, queueLen}
jobDoneChan := make(chan struct{})
ch <- &JobsIteration{jobs, queueLen, jobDoneChan}
<-jobDoneChan
}
}
}()
@ -102,28 +113,29 @@ func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteratio
}
type FullBusStrategy struct {
Delay time.Duration
Batch int
queue *jobsQueue
ready chan bool
Delay time.Duration
Batch int
queue *jobsQueue
busIsFull chan bool
}
func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy {
return &FullBusStrategy{
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
ready: make(chan bool),
Delay: delay,
Batch: batch,
queue: newJobsQueue(),
busIsFull: make(chan bool),
}
}
func (ctx *FullBusStrategy) Queue(job *job) {
n := ctx.queue.Enqueue(job)
if n == ctx.Batch {
ctx.ready <- true
if n % ctx.Batch == 0 {
ctx.busIsFull <- true
}
}
// Формально, это описание логики водителя маршрутки xD
func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
ch := make(chan *JobsIteration)
go func() {
@ -131,10 +143,11 @@ func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration
t := time.NewTimer(ctx.Delay)
select {
case <-abort.Done():
close(ch)
return
case <-t.C:
ctx.sendJobs(ch)
case <-ctx.ready:
case <-ctx.busIsFull:
t.Stop()
ctx.sendJobs(ch)
}
@ -146,7 +159,7 @@ func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration
func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) {
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
ch <- &JobsIteration{jobs, queueLen} // TODO: should not wait for iteration result
ch <- &JobsIteration{jobs, queueLen, nil}
}
type BatchUuidsProvider struct {
@ -156,7 +169,11 @@ type BatchUuidsProvider struct {
onFirstCall sync.Once
}
func NewBatchUuidsProvider(context context.Context, strategy BatchUuidsProviderStrategy, emitter Emitter) *BatchUuidsProvider {
func NewBatchUuidsProvider(
context context.Context,
strategy BatchUuidsProviderStrategy,
emitter Emitter,
) *BatchUuidsProvider {
return &BatchUuidsProvider{
context: context,
emitter: emitter,
@ -184,9 +201,10 @@ func (ctx *BatchUuidsProvider) startQueue() {
case <-ctx.context.Done():
return
case iteration := <-jobsChan:
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:before_round") // TODO: where should I move this events?
ctx.performRequest(iteration)
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:after_round")
go func() {
ctx.performRequest(iteration)
iteration.Done()
}()
}
}
}()

View File

@ -2,10 +2,10 @@ package mojangtextures
import (
"context"
"crypto/rand"
"encoding/base64"
"strings"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -59,22 +59,35 @@ func (o *mojangUsernamesToUuidsRequestMock) UsernamesToUuids(usernames []string)
return result, args.Error(1)
}
type queueStrategyMock struct {
mock.Mock
ch chan *JobsIteration
type manualStrategy struct {
ch chan *JobsIteration
once sync.Once
lock sync.Mutex
jobs []*job
}
func (m *queueStrategyMock) Queue(job *job) {
m.Called(job)
func (m *manualStrategy) Queue(job *job) {
m.lock.Lock()
m.jobs = append(m.jobs, job)
m.lock.Unlock()
}
func (m *queueStrategyMock) GetJobs(abort context.Context) <-chan *JobsIteration {
m.Called(abort)
func (m *manualStrategy) GetJobs(_ context.Context) <-chan *JobsIteration {
m.lock.Lock()
defer m.lock.Unlock()
m.ch = make(chan *JobsIteration)
return m.ch
}
func (m *queueStrategyMock) PushIteration(iteration *JobsIteration) {
m.ch <- iteration
func (m *manualStrategy) Iterate(countJobsToReturn int, countLeftJobsInQueue int) {
m.lock.Lock()
defer m.lock.Unlock()
m.ch <- &JobsIteration{
Jobs: m.jobs[0:countJobsToReturn],
Queue: countLeftJobsInQueue,
}
}
type batchUuidsProviderGetUuidResult struct {
@ -85,45 +98,36 @@ type batchUuidsProviderGetUuidResult struct {
type batchUuidsProviderTestSuite struct {
suite.Suite
Provider *BatchUuidsProvider
GetUuidAsync func(username string) chan *batchUuidsProviderGetUuidResult
Provider *BatchUuidsProvider
Emitter *mockEmitter
Strategy *manualStrategy
MojangApi *mojangUsernamesToUuidsRequestMock
GetUuidAsync func(username string) <- chan *batchUuidsProviderGetUuidResult
stop context.CancelFunc
}
func (suite *batchUuidsProviderTestSuite) SetupTest() {
suite.Emitter = &mockEmitter{}
suite.Strategy = &manualStrategy{}
ctx, stop := context.WithCancel(context.Background())
suite.stop = stop
suite.MojangApi = &mojangUsernamesToUuidsRequestMock{}
usernamesToUuids = suite.MojangApi.UsernamesToUuids
suite.Provider = &BatchUuidsProvider{
// Emitter: suite.Emitter,
// IterationDelay: 0,
// IterationSize: 10,
}
suite.Provider = NewBatchUuidsProvider(ctx, suite.Strategy, suite.Emitter)
suite.iterateChan = make(chan bool)
// forever = func() bool {
// return <-suite.iterateChan
// }
suite.Iterate = func() {
suite.iterateChan <- true
}
suite.done = func() {
suite.iterateChan <- false
}
suite.GetUuidAsync = func(username string) chan *batchUuidsProviderGetUuidResult {
s := make(chan bool)
suite.GetUuidAsync = func(username string) <- chan *batchUuidsProviderGetUuidResult {
s := make(chan struct{})
// This dirty hack ensures, that the username will be queued before we return control to the caller.
// It's needed to keep expected calls order and prevent cases when iteration happens before all usernames
// will be queued.
// It's needed to keep expected calls order and prevent cases when iteration happens before
// all usernames will be queued.
suite.Emitter.On("Emit",
"mojang_textures:batch_uuids_provider:queued",
username,
).Once().Run(func(args mock.Arguments) {
s <- true
close(s)
})
c := make(chan *batchUuidsProviderGetUuidResult)
@ -139,13 +143,10 @@ func (suite *batchUuidsProviderTestSuite) SetupTest() {
return c
}
suite.MojangApi = &mojangUsernamesToUuidsRequestMock{}
usernamesToUuids = suite.MojangApi.UsernamesToUuids
}
func (suite *batchUuidsProviderTestSuite) TearDownTest() {
suite.done()
suite.stop()
suite.Emitter.AssertExpectations(suite.T())
suite.MojangApi.AssertExpectations(suite.T())
}
@ -154,37 +155,14 @@ func TestBatchUuidsProvider(t *testing.T) {
suite.Run(t, new(batchUuidsProviderTestSuite))
}
func (suite *batchUuidsProviderTestSuite) TestGetUuidForOneUsername() {
expectedUsernames := []string{"username"}
expectedResult := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username"}
expectedResponse := []*mojang.ProfileInfo{expectedResult}
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{expectedResult}, nil)
resultChan := suite.GetUuidAsync("username")
suite.Iterate()
result := <-resultChan
suite.Assert().Equal(expectedResult, result.Result)
suite.Assert().Nil(result.Error)
}
func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernames() {
expectedUsernames := []string{"username1", "username2"}
expectedResult1 := &mojang.ProfileInfo{Id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", Name: "username1"}
expectedResult2 := &mojang.ProfileInfo{Id: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", Name: "username2"}
expectedResponse := []*mojang.ProfileInfo{expectedResult1, expectedResult2}
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, expectedResponse, nil).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return([]*mojang.ProfileInfo{
expectedResult1,
@ -194,7 +172,7 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
resultChan1 := suite.GetUuidAsync("username1")
resultChan2 := suite.GetUuidAsync("username2")
suite.Iterate()
suite.Strategy.Iterate(2, 0)
result1 := <-resultChan1
suite.Assert().Equal(expectedResult1, result1.Result)
@ -205,78 +183,40 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernames() {
suite.Assert().Nil(result2.Error)
}
func (suite *batchUuidsProviderTestSuite) TestGetUuidForMoreThan10Usernames() {
usernames := make([]string, 12)
for i := 0; i < cap(usernames); i++ {
usernames[i] = randStr(8)
}
func (suite *batchUuidsProviderTestSuite) TestShouldNotSendRequestWhenNoJobsAreReturned() {
//noinspection GoPreferNilSlice
emptyUsernames := []string{}
done := make(chan struct{})
suite.Emitter.On("Emit",
"mojang_textures:batch_uuids_provider:round",
emptyUsernames,
1,
).Once().Run(func(args mock.Arguments) {
close(done)
})
// In this test we're not testing response, so always return an empty resultset
expectedResponse := []*mojang.ProfileInfo{}
_ = suite.GetUuidAsync("username") // Schedule one username to run the queue
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Twice()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[0:10], 2).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[0:10], expectedResponse, nil).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", usernames[10:12], 0).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", usernames[10:12], expectedResponse, nil).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Twice()
suite.MojangApi.On("UsernamesToUuids", usernames[0:10]).Once().Return(expectedResponse, nil)
suite.MojangApi.On("UsernamesToUuids", usernames[10:12]).Once().Return(expectedResponse, nil)
channels := make([]chan *batchUuidsProviderGetUuidResult, len(usernames))
for i, username := range usernames {
channels[i] = suite.GetUuidAsync(username)
}
suite.Iterate()
suite.Iterate()
for _, channel := range channels {
<-channel
}
suite.Strategy.Iterate(0, 1) // Return no jobs and indicate that there is one job in queue
<- done
}
func (suite *batchUuidsProviderTestSuite) TestDoNothingWhenNoTasks() {
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Times(3)
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", []string{"username"}, 0).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", []string{"username"}, mock.Anything, nil).Once()
var nilStringSlice []string
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", nilStringSlice, 0).Twice()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Times(3)
suite.MojangApi.On("UsernamesToUuids", []string{"username"}).Once().Return([]*mojang.ProfileInfo{}, nil)
// Perform first iteration and await it finishes
resultChan := suite.GetUuidAsync("username")
suite.Iterate()
result := <-resultChan
suite.Assert().Nil(result.Result)
suite.Assert().Nil(result.Error)
// Let it to perform a few more iterations to ensure, that there are no calls to external APIs
suite.Iterate()
suite.Iterate()
}
func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError() {
// Test written for multiple usernames to ensure that the error
// will be returned for each iteration group
func (suite *batchUuidsProviderTestSuite) TestGetUuidForFewUsernamesWithAnError() {
expectedUsernames := []string{"username1", "username2"}
expectedError := &mojang.TooManyRequestsError{}
var nilProfilesResponse []*mojang.ProfileInfo
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:before_round").Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:round", expectedUsernames, 0).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:result", expectedUsernames, nilProfilesResponse, expectedError).Once()
suite.Emitter.On("Emit", "mojang_textures:batch_uuids_provider:after_round").Once()
suite.MojangApi.On("UsernamesToUuids", expectedUsernames).Once().Return(nil, expectedError)
resultChan1 := suite.GetUuidAsync("username1")
resultChan2 := suite.GetUuidAsync("username2")
suite.Iterate()
suite.Strategy.Iterate(2, 0)
result1 := <-resultChan1
suite.Assert().Nil(result1.Result)
@ -287,14 +227,213 @@ func (suite *batchUuidsProviderTestSuite) TestGetUuidForTwoUsernamesWithAnError(
suite.Assert().Equal(expectedError, result2.Error)
}
var replacer = strings.NewReplacer("-", "_", "=", "")
func TestPeriodicStrategy(t *testing.T) {
t.Run("should return first job only after duration", func(t *testing.T) {
d := 20 * time.Millisecond
strategy := NewPeriodicStrategy(d, 10)
j := &job{}
strategy.Queue(j)
// https://stackoverflow.com/a/50581165
func randStr(len int) string {
buff := make([]byte, len)
_, _ = rand.Read(buff)
str := replacer.Replace(base64.URLEncoding.EncodeToString(buff))
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
startedAt := time.Now()
iteration := <-ch
durationBeforeResult := time.Now().Sub(startedAt)
require.True(t, durationBeforeResult >= d)
require.True(t, durationBeforeResult < d * 2)
// Base 64 can be longer than len
return str[:len]
require.Equal(t, []*job{j}, iteration.Jobs)
require.Equal(t, 0, iteration.Queue)
cancel()
})
t.Run("should return the configured batch size", func(t *testing.T) {
strategy := NewPeriodicStrategy(0, 10)
jobs := make([]*job, 15)
for i := 0; i < 15; i++ {
jobs[i] = &job{Username: strconv.Itoa(i)}
strategy.Queue(jobs[i])
}
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
iteration := <-ch
require.Len(t, iteration.Jobs, 10)
require.Equal(t, jobs[0:10], iteration.Jobs)
require.Equal(t, 5, iteration.Queue)
cancel()
})
t.Run("should not return the next iteration until the previous one is finished", func(t *testing.T) {
strategy := NewPeriodicStrategy(0, 10)
strategy.Queue(&job{})
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
iteration := <-ch
require.Len(t, iteration.Jobs, 1)
require.Equal(t, 0, iteration.Queue)
time.Sleep(time.Millisecond) // Let strategy's internal loop to work (if the implementation is broken)
select {
case <-ch:
require.Fail(t, "the previous iteration isn't marked as done")
default:
// ok
}
iteration.Done()
time.Sleep(time.Millisecond) // Let strategy's internal loop to work
select {
case iteration = <-ch:
// ok
default:
require.Fail(t, "iteration should be provided")
}
require.Empty(t, iteration.Jobs)
require.Equal(t, 0, iteration.Queue)
iteration.Done()
cancel()
})
t.Run("each iteration should be returned only after the configured duration", func(t *testing.T) {
d := 5 * time.Millisecond
strategy := NewPeriodicStrategy(d, 10)
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
for i := 0; i < 3; i++ {
startedAt := time.Now()
iteration := <-ch
durationBeforeResult := time.Now().Sub(startedAt)
require.True(t, durationBeforeResult >= d)
require.True(t, durationBeforeResult < d * 2)
require.Empty(t, iteration.Jobs)
require.Equal(t, 0, iteration.Queue)
// Sleep for at least doubled duration before calling Done() to check,
// that this duration isn't included into the next iteration time
time.Sleep(d * 2)
iteration.Done()
}
cancel()
})
}
func TestFullBusStrategy(t *testing.T) {
t.Run("should provide iteration immediately when the batch size exceeded", func(t *testing.T) {
jobs := make([]*job, 10)
for i := 0; i < 10; i++ {
jobs[i] = &job{}
}
d := 20 * time.Millisecond
strategy := NewFullBusStrategy(d, 10)
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
done := make(chan struct{})
go func() {
defer close(done)
select {
case iteration := <-ch:
require.Len(t, iteration.Jobs, 10)
require.Equal(t, 0, iteration.Queue)
case <-time.After(d):
require.Fail(t, "iteration should be provided immediately")
}
}()
for _, j := range jobs {
strategy.Queue(j)
}
<-done
cancel()
})
t.Run("should provide iteration after duration if batch size isn't exceeded", func(t *testing.T) {
jobs := make([]*job, 9)
for i := 0; i < 9; i++ {
jobs[i] = &job{}
}
d := 20 * time.Millisecond
strategy := NewFullBusStrategy(d, 10)
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
done := make(chan struct{})
go func() {
defer close(done)
startedAt := time.Now()
iteration := <-ch
duration := time.Now().Sub(startedAt)
require.True(t, duration >= d)
require.True(t, duration < d * 2)
require.Equal(t, jobs, iteration.Jobs)
require.Equal(t, 0, iteration.Queue)
}()
for _, j := range jobs {
strategy.Queue(j)
}
<-done
cancel()
})
t.Run("should provide iteration as soon as the bus is full, without waiting for the previous iteration to finish", func(t *testing.T) {
d := 20 * time.Millisecond
strategy := NewFullBusStrategy(d, 10)
ctx, cancel := context.WithCancel(context.Background())
ch := strategy.GetJobs(ctx)
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 3; i++ {
time.Sleep(5 * time.Millisecond) // See comment below
select {
case iteration := <- ch:
require.Len(t, iteration.Jobs, 10)
// Don't assert iteration.Queue length since it might be unstable
// Don't call iteration.Done()
case <-time.After(d):
t.Fatalf("iteration should be provided as soon as the bus is full")
}
}
// Scheduled 31 tasks. 3 iterations should be performed immediately
// and should be executed only after timeout. The timeout above is used
// to increase overall time to ensure, that timer resets on every iteration
startedAt := time.Now()
iteration := <-ch
duration := time.Now().Sub(startedAt)
require.True(t, duration >= d)
require.True(t, duration < d * 2)
require.Len(t, iteration.Jobs, 1)
require.Equal(t, 0, iteration.Queue)
}()
for i := 0; i < 31; i++ {
strategy.Queue(&job{})
}
<-done
cancel()
})
}