359 lines
10 KiB
Python
Executable File
359 lines
10 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
"""TorSwitch configures the onion router to redirect all internet traffic through SOCKS5 Tor proxy. This is version for SystemD.
|
|
|
|
Licensed under GNU GPLv3+ terms.
|
|
(c) 2023, xxx_stroboscope_420_xxx
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import getopt
|
|
import requests
|
|
import subprocess
|
|
import time
|
|
import random
|
|
import signal
|
|
from stem import Signal
|
|
from stem.control import Controller
|
|
from packaging import version
|
|
|
|
|
|
|
|
VERSION = "1.2"
|
|
|
|
IP_API = "https://api.ipify.org/?format=json"
|
|
TOR_CHECK = "https://check.torproject.org"
|
|
UA = "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0"
|
|
|
|
class font:
|
|
# Colors of foreground
|
|
RED = "\033[31m"
|
|
LIGHTRED = "\033[91m"
|
|
YELLOW = "\033[93m"
|
|
GREEN = "\033[92m"
|
|
BLUE = "\033[34m"
|
|
LIGHTBLUE = "\033[94m"
|
|
GRAY = "\033[90m"
|
|
WHITE = "\033[97m"
|
|
# Colors of background
|
|
BG_RED = "\033[41m"
|
|
BG_BLUE = "\033[104m"
|
|
# Special
|
|
CRIT = BG_RED + WHITE # Critical error
|
|
ERR = RED # Just error
|
|
WARN = YELLOW # Warning
|
|
TIME = GRAY # Timestamp
|
|
EXEC = LIGHTBLUE # Executed command
|
|
# Not colors
|
|
BOLD = "\033[1m"
|
|
ENDC = "\033[0m"
|
|
|
|
LOGO = f"""{font.RED + font.BOLD}
|
|
|
|
_____
|
|
|_ _|__ _ __
|
|
| |/ _ \| '__|
|
|
| | (_) | |
|
|
|_|\___/|_| SWITCH v{VERSION}
|
|
|
|
|
|
{font.ENDC}"""
|
|
USAGE = f"""
|
|
{font.BOLD}Usage:{font.ENDC}
|
|
-s, --start
|
|
Setup Tor as system-wide proxy
|
|
-r, --switch
|
|
Request new Tor exit node
|
|
-x, --stop
|
|
Shut down onion router and restore system defaults
|
|
-i, --info
|
|
Show information about current connection
|
|
-h, --help
|
|
Show this text and exit
|
|
"""
|
|
|
|
TOR_TRANS_PORT = 9040
|
|
TOR_SOCKS_PORT = 9050
|
|
TOR_CONTROL_PORT = 9051
|
|
TOR_DNS_PORT = 9053
|
|
TOR_HTTP_PORT = 9080
|
|
NON_TOR_RNG = "192.168.1.0/24 192.168.0.0/24"
|
|
|
|
FileConfigTorrcTorswitcher = "/etc/tor/torswitcherrc"
|
|
FileConfigResolv = "/etc/resolv.conf"
|
|
FileLogTorswitcher = "./torswitcher.log"
|
|
FileLogTor = "/var/log/tor/notices.log"
|
|
DirTorData = "/var/lib/tor"
|
|
|
|
StringConfigTorrcTorSwitcher = f"""
|
|
SOCKSPort {TOR_SOCKS_PORT}
|
|
HTTPTunnelPort {TOR_HTTP_PORT}
|
|
Log notice file {FileLogTor}
|
|
DataDirectory {DirTorData}
|
|
VirtualAddrNetwork 10.0.0.0/10
|
|
AutomapHostsOnResolve 1
|
|
TransPort {TOR_TRANS_PORT}
|
|
DNSPort {TOR_DNS_PORT}
|
|
ControlPort {TOR_CONTROL_PORT}
|
|
RunAsDaemon 1
|
|
"""
|
|
StringConfigResolv = "nameserver 127.0.0.1"
|
|
|
|
LogToFile = False
|
|
|
|
|
|
|
|
# Remove any colors from text
|
|
def strip_colors(text):
|
|
return text.replace(font.RED,"").replace(font.LIGHTRED,"").replace(font.YELLOW,"").replace(font.GREEN,"").replace(font.BLUE,"")\
|
|
.replace(font.LIGHTBLUE,"").replace(font.GRAY,"").replace(font.WHITE,"").replace(font.BG_RED,"").replace(font.CRIT,"")\
|
|
.replace(font.ERR,"").replace(font.WARN,"").replace(font.TIME,"").replace(font.EXEC,"").replace(font.BOLD,"").replace(font.ENDC,"")\
|
|
.replace(font.BG_BLUE,"")
|
|
|
|
# Print log line
|
|
def log(text, endt="\n"):
|
|
global LogToFile
|
|
global FileLogTorswitcher
|
|
now = time.strftime("%H:%M:%S", time.localtime())
|
|
print(f"{font.TIME}[{now}]{font.ENDC} {text}", end=endt)
|
|
if LogToFile:
|
|
try:
|
|
with open(FileLogTorswitcher, "at") as fd:
|
|
fd.write(f"[{now}] {strip_colors(text)}{endt}")
|
|
except Exception as exc:
|
|
print(f"{font.ERR}Error: can not write log line to file '{FileLogTorswitcher}'{font.ENDC}")
|
|
|
|
# Print log line w/o time stamp
|
|
def logapp(text, endt="\n"):
|
|
global LogToFile
|
|
global FileLogTorswitcher
|
|
print(f"{text}", end=endt)
|
|
if LogToFile:
|
|
try:
|
|
with open(FileLogTorswitcher, "at") as fd:
|
|
fd.write(f"{strip_colors(text)}{endt}")
|
|
except Exception as exc:
|
|
print(f"{font.ERR}Error: can not write log line to file '{FileLogTorswitcher}'{font.ENDC}")
|
|
|
|
# Handler for interrupt signal
|
|
def sigint_handler(signum, frame):
|
|
log(f"{font.WARN}User interrupt ! shutting down{font.ENDC}")
|
|
stop_tor_proxy()
|
|
|
|
# Execute command with printing it at terminal
|
|
def execute(cmd):
|
|
log(f"{font.EXEC}Executing '{cmd}'...{font.ENDC}")
|
|
os.system(cmd)
|
|
|
|
# Get current public IP
|
|
def ip():
|
|
retries = 20
|
|
while retries:
|
|
retries -= 1
|
|
try:
|
|
jsonRes = requests.get(IP_API,headers={"User-Agent":UA}).json()
|
|
return jsonRes["ip"]
|
|
except:
|
|
log(f"{font.ERR}Error: cant fetch IP{font.ENDC}")
|
|
continue
|
|
return "cant fetch ip address"
|
|
|
|
# Check if we connected via Tor network
|
|
def check_tor():
|
|
retries = 20
|
|
while retries:
|
|
retries -= 1
|
|
try:
|
|
resp = requests.get(TOR_CHECK)
|
|
if resp.status_code != 200:
|
|
log(f"{font.ERR}Error: cant access check.torproject.org{font.ENDC}")
|
|
continue
|
|
if "Congratulations. This browser is configured to use Tor." in resp.text:
|
|
return True
|
|
else:
|
|
return False
|
|
except:
|
|
log(f"{font.ERR}Error: something went wrong while trying to access check.torproject.org{font.ENDC}")
|
|
continue
|
|
log(f"{font.ERR}Error: retries limit exceeded{font.ENDC}")
|
|
return False
|
|
|
|
# Check if we running as root
|
|
def check_root():
|
|
if os.geteuid() != 0:
|
|
log(f"{font.CRIT}CRITICAL: you must be root, say the magic word 'sudo'. Aborting...{font.ENDC}")
|
|
sys.exit(1)
|
|
|
|
# Check if file contains supplied string
|
|
def file_contains(path, text):
|
|
try:
|
|
with open(path, "rt") as fd:
|
|
buff = fd.read()
|
|
return (text in buff)
|
|
except Exception as exc:
|
|
log(f"{font.WARN}Warning: error occured while trying to read file '{path}': {exc}{font.ENDC}")
|
|
return False
|
|
|
|
# Print logo
|
|
def logo():
|
|
print(LOGO) # TODO: make this look better
|
|
|
|
# Print usage text
|
|
def usage():
|
|
logo()
|
|
print(USAGE)
|
|
sys.exit(0)
|
|
|
|
def setup_tor_proxy():
|
|
log(f"{font.GREEN}Trying to setup onion router as system-wide proxy{font.ENDC}")
|
|
check_root()
|
|
if os.path.exists(FileConfigTorrcTorswitcher) and file_contains(FileConfigTorrcTorswitcher, StringConfigTorrcTorSwitcher):
|
|
log(f"Torrc file ('{FileConfigTorrcTorswitcher}') already configured")
|
|
else:
|
|
log("Writing torcc file... ", "")
|
|
with open(FileConfigTorrcTorswitcher, "wt") as fd:
|
|
fd.write(StringConfigTorrcTorSwitcher)
|
|
logapp(f"{font.GREEN}[done]{font.ENDC}")
|
|
if file_contains(FileConfigResolv, StringConfigResolv):
|
|
log(f"DNS '{FileConfigResolv}' file already configured")
|
|
else:
|
|
log(f"Saving original DNS '{FileConfigResolv}' file")
|
|
execute(f"sudo cp '{FileConfigResolv}' '{FileConfigResolv}.bak'")
|
|
log("Now creating our new... ", "")
|
|
with open(FileConfigResolv, "wt") as fd:
|
|
fd.write(StringConfigResolv)
|
|
logapp(f"{font.GREEN}[done]{font.ENDC}")
|
|
|
|
log("Stopping tor service")
|
|
execute("sudo systemctl stop tor")
|
|
log("Freeing tor control port")
|
|
execute(f"sudo fuser -k {TOR_CONTROL_PORT}/tcp > /dev/null 2>&1")
|
|
log("Starting new tor daemon")
|
|
execute(f"sudo -u debian-tor tor -f {FileConfigTorrcTorswitcher} > /dev/null")
|
|
log("Setting up iptables rules")
|
|
|
|
iptables_rules = f"""
|
|
NON_TOR="{NON_TOR_RNG}"
|
|
TOR_UID={subprocess.getoutput('id -ur debian-tor')}
|
|
TRANS_PORT="{TOR_TRANS_PORT}"
|
|
|
|
iptables -F
|
|
iptables -t nat -F
|
|
|
|
iptables -t nat -A OUTPUT -m owner --uid-owner $TOR_UID -j RETURN
|
|
iptables -t nat -A OUTPUT -p udp --dport 53 -j REDIRECT --to-ports {TOR_DNS_PORT}
|
|
for NET in $NON_TOR 127.0.0.0/9 127.128.0.0/10; do
|
|
iptables -t nat -A OUTPUT -d $NET -j RETURN
|
|
done
|
|
iptables -t nat -A OUTPUT -p tcp --syn -j REDIRECT --to-ports $TRANS_PORT
|
|
|
|
iptables -A OUTPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
|
|
for NET in $NON_TOR 127.0.0.0/8; do
|
|
iptables -A OUTPUT -d $NET -j ACCEPT
|
|
done
|
|
iptables -A OUTPUT -m owner --uid-owner $TOR_UID -j ACCEPT
|
|
iptables -A OUTPUT -j REJECT
|
|
"""
|
|
|
|
execute(iptables_rules)
|
|
log("Are we connected to Tor?.. ", "")
|
|
if not check_tor():
|
|
logapp(f"{font.RED}[no]{font.ENDC}")
|
|
log(f"{font.CRIT}CRITICAL: we are NOT connected to Tor network! Reverting changes...{font.ENDC}")
|
|
stop_tor_proxy()
|
|
return False
|
|
logapp(f"{font.GREEN}[yes]{font.ENDC}")
|
|
log("Fetching current IP")
|
|
log(f"Current IP is {font.GREEN}{ip()}{font.ENDC}")
|
|
return True
|
|
|
|
def stop_tor_proxy():
|
|
log(f"{font.RED}Restoring system defaults and shutting down onion router{font.ENDC}")
|
|
check_root()
|
|
log(f"Restoring DNS '{FileConfigResolv}' file")
|
|
execute(f"mv '{FileConfigResolv}.bak' '{FileConfigResolv}'")
|
|
log(f"Flushing iptables, resetting to default")
|
|
IpFlush = """
|
|
iptables -P INPUT ACCEPT
|
|
iptables -P FORWARD ACCEPT
|
|
iptables -P OUTPUT ACCEPT
|
|
iptables -t nat -F
|
|
iptables -t mangle -F
|
|
iptables -F
|
|
iptables -X
|
|
"""
|
|
execute(IpFlush)
|
|
log("Freeing tor control port")
|
|
execute(f"sudo fuser -k {TOR_CONTROL_PORT}/tcp > /dev/null 2>&1")
|
|
# TODO: stop tor?
|
|
log("Restarting Network Manager")
|
|
execute('service network-manager restart')
|
|
time.sleep(3)
|
|
# R u rly want make request 2 some proprietary service without any proxying?
|
|
#log("Fetching current IP")
|
|
#log(f"Current IP is {font.GREEN}{ip()}{font.ENDC}")
|
|
|
|
def switch_exit_node():
|
|
log(f"{font.YELLOW}Requesting new Tor exit node{font.ENDC}")
|
|
check_root()
|
|
if not check_tor():
|
|
log(f"{font.CRIT}CRITICAL: you are not connected to Tor network{font.ENDC}")
|
|
return
|
|
log("Fetching current IP")
|
|
log(f"Current IP is {font.GREEN}{ip()}{font.ENDC}")
|
|
log("Checking tor pid... ", "")
|
|
if not subprocess.getoutput('id -ur debian-tor').isdigit():
|
|
log(f"{font.CRIT}seems like there is no tor process running! Aborting...{font.ENDC}")
|
|
sys.exit(2)
|
|
logapp(f"{font.GREEN}[OK]{font.ENDC}")
|
|
log("Please wait...")
|
|
time.sleep(7)
|
|
log("Requesting new circuit... ", "")
|
|
with Controller.from_port(port=TOR_CONTROL_PORT) as controller:
|
|
controller.authenticate()
|
|
controller.signal(Signal.NEWNYM)
|
|
logapp(f"{font.GREEN}[done]{font.ENDC}")
|
|
log("Fetching updated IP")
|
|
log(f"New IP is {font.GREEN}{ip()}{font.ENDC}")
|
|
|
|
def show_connection_info():
|
|
logo()
|
|
log(f"{font.BG_BLUE + font.WHITE}Tor status:{font.ENDC} ", "")
|
|
if not check_tor():
|
|
logapp(f"{font.RED}DISCONNECTED{font.ENDC}")
|
|
return
|
|
logapp(f"{font.GREEN}CONNECTED{font.ENDC}")
|
|
log(f"{font.BG_BLUE + font.WHITE}IP:{font.ENDC} {font.GREEN}{ip()}{font.ENDC}\n")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
if len(sys.argv) <= 1:
|
|
usage()
|
|
try:
|
|
(opts, args) = getopt.getopt(sys.argv[1:], "sxrih", [
|
|
"start", "stop", "switch", "info", "help"])
|
|
except:
|
|
usage()
|
|
sys.exit(3)
|
|
if not opts:
|
|
usage()
|
|
sys.exit(3)
|
|
for (o, a) in opts:
|
|
if o in ("-s", "--start"):
|
|
if setup_tor_proxy():
|
|
log(f"{font.BLUE}>>>{font.ENDC} {font.GREEN}Now you have +100 anonimity points!{font.ENDC} {font.BLUE}<<<{font.ENDC}")
|
|
elif o in ("-x", "--stop"):
|
|
stop_tor_proxy()
|
|
log(f"{font.BLUE}>>>{font.ENDC} {font.RED}Bye, anonymity, bye!{font.ENDC} {font.BLUE}<<<{font.ENDC}")
|
|
elif o in ("-r", "--switch"):
|
|
switch_exit_node()
|
|
elif o in ("-i", "--info"):
|
|
show_connection_info()
|
|
elif o in ("-h", "--help"):
|
|
usage()
|
|
else:
|
|
usage()
|