mirror of
https://github.com/elyby/chrly.git
synced 2025-01-03 10:41:47 +05:30
250 lines
4.9 KiB
Go
250 lines
4.9 KiB
Go
package mojangtextures
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/elyby/chrly/api/mojang"
|
|
)
|
|
|
|
type jobResult struct {
|
|
Profile *mojang.ProfileInfo
|
|
Error error
|
|
}
|
|
|
|
type job struct {
|
|
Username string
|
|
RespondChan chan *jobResult
|
|
}
|
|
|
|
type jobsQueue struct {
|
|
lock sync.Mutex
|
|
items []*job
|
|
}
|
|
|
|
func newJobsQueue() *jobsQueue {
|
|
return &jobsQueue{
|
|
items: []*job{},
|
|
}
|
|
}
|
|
|
|
func (s *jobsQueue) Enqueue(job *job) int {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.items = append(s.items, job)
|
|
|
|
return len(s.items)
|
|
}
|
|
|
|
func (s *jobsQueue) Dequeue(n int) ([]*job, int) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
l := len(s.items)
|
|
if n > l {
|
|
n = l
|
|
}
|
|
|
|
items := s.items[0:n]
|
|
s.items = s.items[n:l]
|
|
|
|
return items, l - n
|
|
}
|
|
|
|
var usernamesToUuids = mojang.UsernamesToUuids
|
|
|
|
type JobsIteration struct {
|
|
Jobs []*job
|
|
Queue int
|
|
c chan struct{}
|
|
}
|
|
|
|
func (j *JobsIteration) Done() {
|
|
if j.c != nil {
|
|
close(j.c)
|
|
}
|
|
}
|
|
|
|
type BatchUuidsProviderStrategy interface {
|
|
Queue(job *job)
|
|
GetJobs(abort context.Context) <-chan *JobsIteration
|
|
}
|
|
|
|
type PeriodicStrategy struct {
|
|
Delay time.Duration
|
|
Batch int
|
|
queue *jobsQueue
|
|
done chan struct{}
|
|
}
|
|
|
|
func NewPeriodicStrategy(delay time.Duration, batch int) *PeriodicStrategy {
|
|
return &PeriodicStrategy{
|
|
Delay: delay,
|
|
Batch: batch,
|
|
queue: newJobsQueue(),
|
|
}
|
|
}
|
|
|
|
func (ctx *PeriodicStrategy) Queue(job *job) {
|
|
ctx.queue.Enqueue(job)
|
|
}
|
|
|
|
func (ctx *PeriodicStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
|
|
ch := make(chan *JobsIteration)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-abort.Done():
|
|
close(ch)
|
|
return
|
|
case <-time.After(ctx.Delay):
|
|
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
|
|
jobDoneChan := make(chan struct{})
|
|
ch <- &JobsIteration{jobs, queueLen, jobDoneChan}
|
|
<-jobDoneChan
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
type FullBusStrategy struct {
|
|
Delay time.Duration
|
|
Batch int
|
|
queue *jobsQueue
|
|
busIsFull chan bool
|
|
}
|
|
|
|
func NewFullBusStrategy(delay time.Duration, batch int) *FullBusStrategy {
|
|
return &FullBusStrategy{
|
|
Delay: delay,
|
|
Batch: batch,
|
|
queue: newJobsQueue(),
|
|
busIsFull: make(chan bool),
|
|
}
|
|
}
|
|
|
|
func (ctx *FullBusStrategy) Queue(job *job) {
|
|
n := ctx.queue.Enqueue(job)
|
|
if n%ctx.Batch == 0 {
|
|
ctx.busIsFull <- true
|
|
}
|
|
}
|
|
|
|
// Формально, это описание логики водителя маршрутки xD
|
|
func (ctx *FullBusStrategy) GetJobs(abort context.Context) <-chan *JobsIteration {
|
|
ch := make(chan *JobsIteration)
|
|
go func() {
|
|
for {
|
|
t := time.NewTimer(ctx.Delay)
|
|
select {
|
|
case <-abort.Done():
|
|
close(ch)
|
|
return
|
|
case <-t.C:
|
|
ctx.sendJobs(ch)
|
|
case <-ctx.busIsFull:
|
|
t.Stop()
|
|
ctx.sendJobs(ch)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
func (ctx *FullBusStrategy) sendJobs(ch chan *JobsIteration) {
|
|
jobs, queueLen := ctx.queue.Dequeue(ctx.Batch)
|
|
ch <- &JobsIteration{jobs, queueLen, nil}
|
|
}
|
|
|
|
type BatchUuidsProvider struct {
|
|
context context.Context
|
|
emitter Emitter
|
|
strategy BatchUuidsProviderStrategy
|
|
onFirstCall sync.Once
|
|
}
|
|
|
|
func NewBatchUuidsProvider(
|
|
context context.Context,
|
|
strategy BatchUuidsProviderStrategy,
|
|
emitter Emitter,
|
|
) *BatchUuidsProvider {
|
|
return &BatchUuidsProvider{
|
|
context: context,
|
|
emitter: emitter,
|
|
strategy: strategy,
|
|
}
|
|
}
|
|
|
|
func (ctx *BatchUuidsProvider) GetUuid(username string) (*mojang.ProfileInfo, error) {
|
|
ctx.onFirstCall.Do(ctx.startQueue)
|
|
|
|
resultChan := make(chan *jobResult)
|
|
ctx.strategy.Queue(&job{username, resultChan})
|
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:queued", username)
|
|
|
|
result := <-resultChan
|
|
|
|
return result.Profile, result.Error
|
|
}
|
|
|
|
func (ctx *BatchUuidsProvider) startQueue() {
|
|
// This synchronization chan is used to ensure that strategy's jobs provider
|
|
// will be initialized before any job will be scheduled
|
|
d := make(chan struct{})
|
|
go func() {
|
|
jobsChan := ctx.strategy.GetJobs(ctx.context)
|
|
close(d)
|
|
for {
|
|
select {
|
|
case <-ctx.context.Done():
|
|
return
|
|
case iteration := <-jobsChan:
|
|
go func() {
|
|
ctx.performRequest(iteration)
|
|
iteration.Done()
|
|
}()
|
|
}
|
|
}
|
|
}()
|
|
|
|
<-d
|
|
}
|
|
|
|
func (ctx *BatchUuidsProvider) performRequest(iteration *JobsIteration) {
|
|
usernames := make([]string, len(iteration.Jobs))
|
|
for i, job := range iteration.Jobs {
|
|
usernames[i] = job.Username
|
|
}
|
|
|
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:round", usernames, iteration.Queue)
|
|
if len(usernames) == 0 {
|
|
return
|
|
}
|
|
|
|
profiles, err := usernamesToUuids(usernames)
|
|
ctx.emitter.Emit("mojang_textures:batch_uuids_provider:result", usernames, profiles, err)
|
|
for _, job := range iteration.Jobs {
|
|
response := &jobResult{}
|
|
if err == nil {
|
|
// The profiles in the response aren't ordered, so we must search each username over full array
|
|
for _, profile := range profiles {
|
|
if strings.EqualFold(job.Username, profile.Name) {
|
|
response.Profile = profile
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
response.Error = err
|
|
}
|
|
|
|
job.RespondChan <- response
|
|
close(job.RespondChan)
|
|
}
|
|
}
|