From 305d63621777a76af7cab25e900cef179e69070c Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Thu, 4 Apr 2019 07:49:53 -0500 Subject: [PATCH] Add multithreading to pubsub job --- src/invidious.cr | 6 +++-- src/invidious/helpers/helpers.cr | 10 ++++----- src/invidious/jobs.cr | 38 ++++++++++++++++++++++++++++---- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/invidious.cr b/src/invidious.cr index 8494f9d9..0cffc9af 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -2344,7 +2344,8 @@ get "/feed/webhook/:token" do |env| data = "#{time}" end - # The hub will sometimes check if we're still subscribed after delivery errors + # The hub will sometimes check if we're still subscribed after delivery errors, + # so we reply with a 200 as long as the request hasn't expired if Time.now.to_unix - time.to_i > 432000 env.response.status_code = 400 next @@ -2377,11 +2378,12 @@ post "/feed/webhook/:token" do |env| rss = XML.parse_html(body) rss.xpath_nodes("//feed/entry").each do |entry| id = entry.xpath_node("videoid").not_nil!.content + author = entry.xpath_node("author/name").not_nil!.content published = Time.parse_rfc3339(entry.xpath_node("published").not_nil!.content) updated = Time.parse_rfc3339(entry.xpath_node("updated").not_nil!.content) video = get_video(id, PG_DB, proxies, region: nil) - video = ChannelVideo.new(id, video.title, published, updated, video.ucid, video.author, video.length_seconds, video.live_now, video.premiere_timestamp) + video = ChannelVideo.new(id, video.title, published, updated, video.ucid, author, video.length_seconds, video.live_now, video.premiere_timestamp) PG_DB.exec("UPDATE users SET notifications = notifications || $1 \ WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 1fe41f80..3b53a468 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -81,11 +81,11 @@ user: String, port: Int32, dbname: String, ), - full_refresh: Bool, # Used for crawling channels: threads should check all videos uploaded by a channel - https_only: Bool?, # Used to tell Invidious it is behind a proxy, so links to resources should be https:// - hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions - domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required - use_pubsub_feeds: {type: Bool, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) + full_refresh: Bool, # Used for crawling channels: threads should check all videos uploaded by a channel + https_only: Bool?, # Used to tell Invidious it is behind a proxy, so links to resources should be https:// + hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions + domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required + use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) default_home: {type: String, default: "Top"}, feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, top_enabled: {type: Bool, default: true}, diff --git a/src/invidious/jobs.cr b/src/invidious/jobs.cr index a217c1af..82e58c93 100644 --- a/src/invidious/jobs.cr +++ b/src/invidious/jobs.cr @@ -104,15 +104,43 @@ end def subscribe_to_feeds(db, logger, key, config) if config.use_pubsub_feeds + case config.use_pubsub_feeds + when Bool + max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe + when Int32 + max_threads = config.use_pubsub_feeds.as(Int32) + end + max_channel = Channel(Int32).new + spawn do + max_threads = max_channel.receive + active_threads = 0 + active_channel = Channel(Bool).new + loop do - db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > '4 days' OR subscribed IS NULL") do |rs| + db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| rs.each do ucid = rs.read(String) - response = subscribe_pubsub(ucid, key, config) - if response.status_code >= 400 - logger.write("#{ucid} : #{response.body}\n") + if active_threads >= max_threads.as(Int32) + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + + spawn do + begin + response = subscribe_pubsub(ucid, key, config) + + if response.status_code >= 400 + logger.write("#{ucid} : #{response.body}\n") + end + rescue ex + end + + active_channel.send(true) end end end @@ -120,6 +148,8 @@ def subscribe_to_feeds(db, logger, key, config) sleep 1.minute end end + + max_channel.send(max_threads.as(Int32)) end end