From 989317e5d399b275f4ebf28878a8a558fa23bd55 Mon Sep 17 00:00:00 2001 From: Matthew McGarvey Date: Thu, 15 Oct 2020 09:22:41 -0500 Subject: [PATCH] Extract RefreshChannelsJob (#1403) --- src/invidious.cr | 2 +- src/invidious/helpers/jobs.cr | 57 --------------------- src/invidious/jobs/refresh_channels_job.cr | 59 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 58 deletions(-) create mode 100644 src/invidious/jobs/refresh_channels_job.cr diff --git a/src/invidious.cr b/src/invidious.cr index ad63fcada..3e03628f1 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -159,7 +159,7 @@ end # Start jobs -refresh_channels(PG_DB, logger, config) +Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config) refresh_feeds(PG_DB, logger, config) subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr index ca3d44d05..11eb7defd 100644 --- a/src/invidious/helpers/jobs.cr +++ b/src/invidious/helpers/jobs.cr @@ -1,60 +1,3 @@ -def refresh_channels(db, logger, config) - max_channel = Channel(Int32).new - - spawn do - max_threads = max_channel.receive - lim_threads = max_threads - active_threads = 0 - active_channel = Channel(Bool).new - backoff = 1.seconds - - loop do - db.query("SELECT id FROM channels ORDER BY updated") do |rs| - rs.each do - id = rs.read(String) - - if active_threads >= lim_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - spawn do - begin - channel = fetch_channel(id, db, config.full_refresh) - - lim_threads = max_threads - db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id) - rescue ex - logger.puts("#{id} : #{ex.message}") - if ex.message == "Deleted or invalid channel" - db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id) - else - lim_threads = 1 - logger.puts("#{id} : backing off for #{backoff}s") - sleep backoff - if backoff < 1.days - backoff += backoff - else - backoff = 1.days - end - end - end - - active_channel.send(true) - end - end - end - - sleep 1.minute - Fiber.yield - end - end - - max_channel.send(config.channel_threads) -end - def refresh_feeds(db, logger, config) max_channel = Channel(Int32).new spawn do diff --git a/src/invidious/jobs/refresh_channels_job.cr b/src/invidious/jobs/refresh_channels_job.cr new file mode 100644 index 000000000..75fc474d1 --- /dev/null +++ b/src/invidious/jobs/refresh_channels_job.cr @@ -0,0 +1,59 @@ +class Invidious::Jobs::RefreshChannelsJob < Invidious::Jobs::BaseJob + private getter db : DB::Database + private getter logger : Invidious::LogHandler + private getter config : Config + + def initialize(@db, @logger, @config) + end + + def begin + max_threads = config.channel_threads + lim_threads = max_threads + active_threads = 0 + active_channel = Channel(Bool).new + backoff = 1.seconds + + loop do + db.query("SELECT id FROM channels ORDER BY updated") do |rs| + rs.each do + id = rs.read(String) + + if active_threads >= lim_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + channel = fetch_channel(id, db, config.full_refresh) + + lim_threads = max_threads + db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.utc, channel.author, id) + rescue ex + logger.puts("#{id} : #{ex.message}") + if ex.message == "Deleted or invalid channel" + db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.utc, id) + else + lim_threads = 1 + logger.puts("#{id} : backing off for #{backoff}s") + sleep backoff + if backoff < 1.days + backoff += backoff + else + backoff = 1.days + end + end + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + Fiber.yield + end + end +end