import re import requests # Dirty hack to mute warnings about unverified certificates def __nothing(a, b): pass requests.warnings.warn = __nothing class ParseIsNotDoneException(Exception): def __init__(self): super().__init__("you should perform parsing first") class FailedToParseException(Exception): def __init__(self, url, errtext): super().__init__(f"cant parse web site \"{url}\"; error message: {errtext}") class CantRequestPageException(Exception): def __init__(self, url, errtext=None): if errtext: super().__init__(f"cant request page \"{url}\"; reason: {errtext}") else: super().__init__(f"cant request page \"{url}\"") class WebSiteParser: def __init__(self, url): self.EMPTY = str(None) self.CFBlockText = "Enable JavaScript and cookies to continue".lower() self.DefaultHeaders = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", #"Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } # Single result number format: # { # "number": "88005553535", # "country": "USSR", # "last_use": "11 milliseconds ago", # "born": "9 eras ago", # "url": "https://example.com/number/..." # } # Single number internal format: # { # "number": "...", # "country": "...", # "times_used": 100500, # "last_use_raw": "log(12451 ^ 76346 mod 54) * 420000 / 146 seconds ago", # "born_raw": "2 << 2 | 1 eRaS aGo ayo dude" # "uri_raw": "/numbers/..." # } self.Output = { "heavily_used": [], "almost_not_used": [], "not_used": [] } self.ParseDone = False self.AlmostNotUsedThreshold = 5 # If number is used less or equal {} times, than it counted as almost not used self.ErrorsWhileParsing = 0 self.WebSiteURL = url self.WebSiteFullURL = f"https://{url}/" def Cut(self, text, l=64): """Cut trailing symbols""" if type(text) != str: text = str(text) if len(text) > l: text = text[:l-1] + "..." return text def Log(self, text, src=None): """Write text to stdout""" if not src: print(f"{self.WebSiteURL} parser: {text}") else: print(f"{self.WebSiteURL} parser at {src}: {text}") def RequestPage(self, location=""): """Request page at given location""" url = f"{self.WebSiteFullURL}{location}" try: resp = requests.get( url, headers=self.DefaultHeaders, verify=False ) except requests.exceptions.ConnectionError: raise CantRequestPageException(url, "cant connect, retries limit exceeded") if not (resp.status_code in (200, 302)): if (resp.status_code == 403) and (self.CFBlockText in resp.text.lower()): raise CantRequestPageException(url, "blocked by cloudflare") elif (resp.status_code == 403) and (resp.headers.get("Server") == "cloudflare"): raise CantRequestPageException(url, "seems like blocked by cloudflare") raise CantRequestPageException(url, f"status code is {resp.status_code}") else: return resp def ParseAllFromPage(self, exp, to_remove="", location=""): """Parse all things which fit regular expression "exp" and remove from them all things that fit RE "to_remove" """ markup = self.RequestPage(location).text peaces_of_markup = re.findall(exp, markup) result = [] for peace in peaces_of_markup: if type(peace) != str: e = self.Cut(str(peace)) self.Log(f"warning: unexpected result while parsing: {e}", f"ParseAllFromPage(self, \"{self.Cut(exp,32)}\", \"{self.Cut(to_remove,32)}\", \"{self.Cut(location,32)}\")") continue result.append( re.sub(to_remove, "", peace) if to_remove else peace ) return result def GetResult(self): if not self.ParseDone: raise ParseIsNotDoneException return self.Output