#!/usr/bin/env python3 """ Testara Proxy Tunnel Client v1.1 Connects your local network to Testara cloud for testing internal apps. Usage: 1) Edit config.ini 2) Run: python testara-tunnel.py Requirements: pip install websockets requests """ import asyncio,json,sys,os,signal,configparser try: import websockets except ImportError: print("\n[ERROR] pip install websockets requests\n");sys.exit(1) try: import requests except ImportError: print("\n[ERROR] pip install websockets requests\n");sys.exit(1) CONFIG_FILE=os.path.join(os.path.dirname(os.path.abspath(__file__)),"config.ini") VERSION="1.1.0" def load_config(): if not os.path.exists(CONFIG_FILE):print(f"\n[ERROR] {CONFIG_FILE} not found");sys.exit(1) c=configparser.ConfigParser();c.read(CONFIG_FILE) if not c.has_section("server"):print("\n[ERROR] Missing [server]\n");sys.exit(1) if not c.has_section("tunnel"):print("\n[ERROR] Missing [tunnel]\n");sys.exit(1) url=c.get("server","url",fallback="").strip().rstrip("/") token=c.get("server","token",fallback="").strip() email=c.get("server","email",fallback="").strip() password=c.get("server","password",fallback="").strip() if not url:print("\n[ERROR] Missing 'url' in [server]\n");sys.exit(1) if not token and not (email and password):print("\n[ERROR] Set 'token' OR 'email'+'password' in [server]\n");sys.exit(1) name=c.get("tunnel","name",fallback="").strip() if not name:print("\n[ERROR] Missing 'name' in [tunnel]\n");sys.exit(1) return{"server_url":url,"token":token,"email":email,"password":password,"tunnel_name":name,"allowed_hosts":[h.strip() for h in c.get("tunnel","allowed_hosts",fallback="*").split(",")],"timeout":c.getint("tunnel","request_timeout",fallback=30),"verify_ssl":c.getboolean("tunnel","verify_ssl",fallback=True)} def authenticate(cfg): if cfg["token"]: print("[AUTH] Validating tunnel token...") try: r=requests.get(f"{cfg['server_url']}/api/v1/tunnel/token/validate",headers={"Authorization":f"Bearer {cfg['token']}"},timeout=15,verify=cfg["verify_ssl"]) if r.status_code==401:print("[ERROR] Invalid or revoked token. Generate a new one in Settings.");return None return cfg["token"] except requests.exceptions.ConnectionError:print(f"[ERROR] Cannot connect to {cfg['server_url']}");return None except Exception as e:print(f"[ERROR] {e}");return None else: print("[AUTH] Logging in with email/password...") try: r=requests.post(f"{cfg['server_url']}/api/v1/auth/login",json={"email":cfg["email"],"password":cfg["password"]},timeout=15,verify=cfg["verify_ssl"]) if r.status_code==200:return r.json().get("access_token") print(f"[ERROR] Login failed ({r.status_code}): {r.json().get('detail','')}");return None except requests.exceptions.ConnectionError:print(f"[ERROR] Cannot connect to {cfg['server_url']}");return None except Exception as e:print(f"[ERROR] {e}");return None def proxy(data,cfg): url=data.get("url","");method=data.get("method","GET");rid=data.get("request_id","?") if cfg["allowed_hosts"]!=["*"]: from urllib.parse import urlparse h=urlparse(url).hostname if h not in cfg["allowed_hosts"]:return{"request_id":rid,"status_code":403,"headers":{},"body":f"Host '{h}' blocked","error":"blocked"} try: r=requests.request(method,url,headers=data.get("headers",{}),data=data.get("body"),timeout=cfg["timeout"],verify=False,allow_redirects=True) return{"request_id":rid,"status_code":r.status_code,"headers":dict(r.headers),"body":r.text[:500000]} except requests.exceptions.Timeout:return{"request_id":rid,"status_code":504,"headers":{},"body":"Timed out","error":"timeout"} except requests.exceptions.ConnectionError as e:return{"request_id":rid,"status_code":502,"headers":{},"body":str(e)[:200],"error":"conn_error"} except Exception as e:return{"request_id":rid,"status_code":500,"headers":{},"body":str(e)[:200],"error":"error"} async def run(cfg,token): ws_url=cfg["server_url"].replace("https://","wss://").replace("http://","ws://")+"/api/v1/tunnel/connect" hdrs={"Authorization":f"Bearer {token}","X-Tunnel-Name":cfg["tunnel_name"],"X-Tunnel-Version":VERSION} retry=0 while retry<50: try: print(f"\n[CONNECTING] {cfg['server_url']} ...") async with websockets.connect(ws_url,additional_headers=hdrs,ping_interval=20,ping_timeout=10,max_size=2**22) as ws: retry=0;print(f"[CONNECTED] Tunnel '{cfg['tunnel_name']}' active\n[READY] Waiting for requests...\n") async for msg in ws: try: d=json.loads(msg);t=d.get("type","") if t=="ping":await ws.send(json.dumps({"type":"pong"})) elif t=="proxy_request": print(f" [{d.get('method','GET')}] {d.get('url','?')[:80]}",end="",flush=True) res=proxy(d,cfg);res["type"]="proxy_response";await ws.send(json.dumps(res)) print(f" -> {res['status_code']} [{'OK' if 200<=res['status_code']<400 else 'ERR'}]") elif t=="welcome":print(f" Server: {d.get('message','OK')}") elif t=="error":print(f"[ERROR] {d.get('message','?')}") except:pass except websockets.exceptions.ConnectionClosed as e: if e.code==4003:print("\n[REVOKED] Token was revoked. Generate a new one in Settings.");return print(f"\n[DISCONNECTED] {e.code}") except Exception as e: if "401" in str(e): if cfg["token"]:print("[ERROR] Token invalid or revoked.");return print("[AUTH] Token expired, re-authenticating...");token=authenticate(cfg) if not token:return hdrs["Authorization"]=f"Bearer {token}" else:print(f"\n[ERROR] {e}") retry+=1;delay=min(3*(2**min(retry-1,5)),60);print(f"[RETRY] {retry}/50 in {delay}s...");await asyncio.sleep(delay) def main(): print("="*56+f"\n Testara Proxy Tunnel Client v{VERSION}\n"+"="*56) cfg=load_config() print(f"\n Server: {cfg['server_url']}") if cfg["token"]:print(f" Auth: Token ({cfg['token'][:8]}...{cfg['token'][-4:]})") else:print(f" Auth: Email ({cfg['email']})") print(f" Tunnel: {cfg['tunnel_name']}\n Hosts: {', '.join(cfg['allowed_hosts'])}\n") token=authenticate(cfg) if not token:sys.exit(1) print("[AUTH] Success\n") signal.signal(signal.SIGINT,lambda s,f:(print("\n[SHUTDOWN] Goodbye!"),sys.exit(0))) try:asyncio.run(run(cfg,token)) except KeyboardInterrupt:print("\n[SHUTDOWN] Goodbye!") if __name__=="__main__":main()