Automatically migrate database

This commit is contained in:
Omar Roth
2019-04-10 16:23:37 -05:00
parent b8c87632e6
commit 5dc45c35e6
5 changed files with 123 additions and 9 deletions

View File

@@ -1,5 +1,20 @@
require "./macros"
struct Nonce
db_mapping({
nonce: String,
expire: Time,
})
end
struct SessionId
db_mapping({
id: String,
email: String,
issued: String,
})
end
struct ConfigPreferences
module StringToArray
def self.to_yaml(value : Array(String), yaml : YAML::Nodes::Builder)
@@ -483,3 +498,92 @@ def extract_shelf_items(nodeset, ucid = nil, author_name = nil)
return items
end
def analyze_table(db, logger, table_name, struct_type = nil)
# Create table if it doesn't exist
if !db.query_one?("SELECT true FROM information_schema.tables WHERE table_name = $1", table_name, as: Bool)
db.using_connection do |conn|
conn.as(PG::Connection).exec_all(File.read("config/sql/#{table_name}.sql"))
end
logger.write("CREATE TABLE #{table_name}\n")
end
if !struct_type
return
end
struct_array = struct_type.to_type_tuple
column_array = get_column_array(db, table_name)
column_types = File.read("config/sql/#{table_name}.sql").match(/CREATE TABLE public\.#{table_name}\n\((?<types>[\d\D]*?)\);/)
.try &.["types"].split(",").map { |line| line.strip }
if !column_types
return
end
struct_array.each_with_index do |name, i|
if name != column_array[i]?
if !column_array[i]?
new_column = column_types.select { |line| line.starts_with? name }[0]
db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}")
logger.write("ALTER TABLE #{table_name} ADD COLUMN #{new_column}\n")
next
end
# Column doesn't exist
if !column_array.includes? name
new_column = column_types.select { |line| line.starts_with? name }[0]
db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}")
end
# Column exists but in the wrong position, rotate
if struct_array.includes? column_array[i]
until name == column_array[i]
new_column = column_types.select { |line| line.starts_with? column_array[i] }[0]?.try &.gsub("#{column_array[i]}", "#{column_array[i]}_new")
# There's a column we didn't expect
if !new_column
db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE")
logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]}\n")
column_array = get_column_array(db, table_name)
next
end
db.exec("ALTER TABLE #{table_name} ADD COLUMN #{new_column}")
logger.write("ALTER TABLE #{table_name} ADD COLUMN #{new_column}\n")
db.exec("UPDATE #{table_name} SET #{column_array[i]}_new=#{column_array[i]}")
logger.write("UPDATE #{table_name} SET #{column_array[i]}_new=#{column_array[i]}\n")
db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE")
logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE\n")
db.exec("ALTER TABLE #{table_name} RENAME COLUMN #{column_array[i]}_new TO #{column_array[i]}")
logger.write("ALTER TABLE #{table_name} RENAME COLUMN #{column_array[i]}_new TO #{column_array[i]}\n")
column_array = get_column_array(db, table_name)
end
else
db.exec("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE")
logger.write("ALTER TABLE #{table_name} DROP COLUMN #{column_array[i]} CASCADE\n")
end
end
end
end
class PG::ResultSet
def field(index = @column_index)
@fields.not_nil![index]
end
end
def get_column_array(db, table_name)
column_array = [] of String
db.query("SELECT * FROM #{table_name} LIMIT 0") do |rs|
rs.column_count.times do |i|
column = rs.as(PG::ResultSet).field(i)
column_array << column.name
end
end
return column_array
end

View File

@@ -0,0 +1,225 @@
def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
loop do
db.query("SELECT id FROM channels ORDER BY updated") do |rs|
rs.each do
id = rs.read(String)
if active_threads >= max_threads
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
channel = fetch_channel(id, db, full_refresh)
db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id)
rescue ex
if ex.message == "Deleted or invalid channel"
db.exec("UPDATE channels SET updated = $1, deleted = true WHERE id = $2", Time.now, id)
end
logger.write("#{id} : #{ex.message}\n")
end
active_channel.send(true)
end
end
end
sleep 1.minute
end
end
max_channel.send(max_threads)
end
def refresh_feeds(db, logger, max_threads = 1)
max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
loop do
db.query("SELECT email FROM users") do |rs|
rs.each do
email = rs.read(String)
view_name = "subscriptions_#{sha256(email)[0..7]}"
if active_threads >= max_threads
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
db.query("SELECT * FROM #{view_name} LIMIT 1") do |rs|
# Drop view that doesn't contain same number of rows as ChannelVideo
if ChannelVideo.from_rs(rs)[0]?.try &.to_a.size.try &.!= rs.column_count
db.exec("DROP MATERIALIZED VIEW #{view_name}")
raise "valid schema does not exist"
end
end
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
rescue ex
# Create view if it doesn't exist
if ex.message.try &.ends_with?("does not exist")
# While iterating through, we may have an email stored from a deleted account
if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool)
db.exec("CREATE MATERIALIZED VIEW #{view_name} AS \
SELECT * FROM channel_videos WHERE \
ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \
ORDER BY published DESC;")
logger.write("CREATE #{view_name}\n")
end
else
logger.write("REFRESH #{email} : #{ex.message}\n")
end
end
active_channel.send(true)
end
end
end
sleep 1.minute
end
end
max_channel.send(max_threads)
end
def subscribe_to_feeds(db, logger, key, config)
if config.use_pubsub_feeds
case config.use_pubsub_feeds
when Bool
max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe
when Int32
max_threads = config.use_pubsub_feeds.as(Int32)
end
max_channel = Channel(Int32).new
spawn do
max_threads = max_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
loop do
db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs|
rs.each do
ucid = rs.read(String)
if active_threads >= max_threads.as(Int32)
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
response = subscribe_pubsub(ucid, key, config)
if response.status_code >= 400
logger.write("#{ucid} : #{response.body}\n")
end
rescue ex
end
active_channel.send(true)
end
end
end
sleep 1.minute
end
end
max_channel.send(max_threads.as(Int32))
end
end
def pull_top_videos(config, db)
loop do
begin
top = rank_videos(db, 40)
rescue ex
next
end
if top.size > 0
args = arg_array(top)
else
next
end
videos = [] of Video
top.each do |id|
begin
videos << get_video(id, db)
rescue ex
next
end
end
yield videos
sleep 1.minute
end
end
def pull_popular_videos(db)
loop do
subscriptions = db.query_all("SELECT channel FROM \
(SELECT UNNEST(subscriptions) AS channel FROM users) AS d \
GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40", as: String)
videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM \
channel_videos WHERE ucid IN (#{arg_array(subscriptions)}) \
ORDER BY ucid, published DESC", subscriptions, as: ChannelVideo).sort_by { |video| video.published }.reverse
yield videos
sleep 1.minute
end
end
def update_decrypt_function
loop do
begin
decrypt_function = fetch_decrypt_function
rescue ex
next
end
yield decrypt_function
sleep 1.minute
end
end
def find_working_proxies(regions)
loop do
regions.each do |region|
proxies = get_proxies(region).first(20)
proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} }
# proxies = filter_proxies(proxies)
yield region, proxies
end
sleep 1.minute
end
end

View File

@@ -3,10 +3,14 @@ macro db_mapping(mapping)
end
def to_a
return [{{*mapping.keys.map { |id| "@#{id}".id }}}]
return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ]
end
DB.mapping({{mapping}})
def self.to_type_tuple
return { {{*mapping.keys.map { |id| "#{id}" }}} }
end
DB.mapping( {{mapping}} )
end
macro json_mapping(mapping)
@@ -14,11 +18,11 @@ macro json_mapping(mapping)
end
def to_a
return [{{*mapping.keys.map { |id| "@#{id}".id }}}]
return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ]
end
JSON.mapping({{mapping}})
YAML.mapping({{mapping}})
JSON.mapping( {{mapping}} )
YAML.mapping( {{mapping}} )
end
macro yaml_mapping(mapping)
@@ -26,7 +30,7 @@ macro yaml_mapping(mapping)
end
def to_a
return [{{*mapping.keys.map { |id| "@#{id}".id }}}]
return [ {{*mapping.keys.map { |id| "@#{id}".id }}} ]
end
def to_tuple

View File

@@ -0,0 +1,70 @@
def fetch_decrypt_function(id = "CvFH_6DNRCY")
client = make_client(YT_URL)
document = client.get("/watch?v=#{id}&gl=US&hl=en&disable_polymer=1").body
url = document.match(/src="(?<url>\/yts\/jsbin\/player_ias-.{9}\/en_US\/base.js)"/).not_nil!["url"]
player = client.get(url).body
function_name = player.match(/^(?<name>[^=]+)=function\(a\){a=a\.split\(""\)/m).not_nil!["name"]
function_body = player.match(/^#{Regex.escape(function_name)}=function\(a\){(?<body>[^}]+)}/m).not_nil!["body"]
function_body = function_body.split(";")[1..-2]
var_name = function_body[0][0, 2]
var_body = player.delete("\n").match(/var #{Regex.escape(var_name)}={(?<body>(.*?))};/).not_nil!["body"]
operations = {} of String => String
var_body.split("},").each do |operation|
op_name = operation.match(/^[^:]+/).not_nil![0]
op_body = operation.match(/\{[^}]+/).not_nil![0]
case op_body
when "{a.reverse()"
operations[op_name] = "a"
when "{a.splice(0,b)"
operations[op_name] = "b"
else
operations[op_name] = "c"
end
end
decrypt_function = [] of {name: String, value: Int32}
function_body.each do |function|
function = function.lchop(var_name).delete("[].")
op_name = function.match(/[^\(]+/).not_nil![0]
value = function.match(/\(a,(?<value>[\d]+)\)/).not_nil!["value"].to_i
decrypt_function << {name: operations[op_name], value: value}
end
return decrypt_function
end
def decrypt_signature(fmt, code)
if !fmt["s"]?
return ""
end
a = fmt["s"]
a = a.split("")
code.each do |item|
case item[:name]
when "a"
a.reverse!
when "b"
a.delete_at(0..(item[:value] - 1))
when "c"
a = splice(a, item[:value])
end
end
signature = a.join("")
return "&#{fmt["sp"]?}=#{signature}"
end
def splice(a, b)
c = a[0]
a[0] = a[b % a.size]
a[b % a.size] = c
return a
end