661 lines
22 KiB
Python
661 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# @Time : 2026/3/1 19:48
|
|
# @Author :
|
|
# @Software : PyCharm
|
|
# @Desc :
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import math
|
|
import random
|
|
import string
|
|
import secrets
|
|
import hashlib
|
|
import base64
|
|
import threading
|
|
import argparse
|
|
from datetime import datetime, timezone, timedelta
|
|
from urllib.parse import urlparse, parse_qs, urlencode, quote
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional
|
|
import urllib.parse
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
from curl_cffi import requests
|
|
|
|
# ==========================================
|
|
# Mail.tm 临时邮箱 API
|
|
# ==========================================
|
|
|
|
MAILTM_BASE = "https://api.mail.tm"
|
|
|
|
|
|
def _mailtm_headers(*, token: str = "", use_json: bool = False) -> Dict[str, str]:
|
|
headers = {"Accept": "application/json"}
|
|
if use_json:
|
|
headers["Content-Type"] = "application/json"
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
return headers
|
|
|
|
|
|
def _mailtm_domains(proxies: Any = None) -> list[str]:
|
|
resp = requests.get(
|
|
f"{MAILTM_BASE}/domains",
|
|
headers=_mailtm_headers(),
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(f"获取 Mail.tm 域名失败,状态码: {resp.status_code}")
|
|
|
|
data = resp.json()
|
|
domains = []
|
|
if isinstance(data, list):
|
|
items = data
|
|
elif isinstance(data, dict):
|
|
items = data.get("hydra:member") or data.get("items") or []
|
|
else:
|
|
items = []
|
|
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
domain = str(item.get("domain") or "").strip()
|
|
is_active = item.get("isActive", True)
|
|
is_private = item.get("isPrivate", False)
|
|
if domain and is_active and not is_private:
|
|
domains.append(domain)
|
|
|
|
return domains
|
|
|
|
|
|
def get_email_and_token(proxies: Any = None) -> tuple[str, str]:
|
|
"""创建 Mail.tm 邮箱并获取 Bearer Token"""
|
|
try:
|
|
domains = _mailtm_domains(proxies)
|
|
if not domains:
|
|
print("[Error] Mail.tm 没有可用域名")
|
|
return "", ""
|
|
domain = random.choice(domains)
|
|
|
|
for _ in range(5):
|
|
local = f"oc{secrets.token_hex(5)}"
|
|
email = f"{local}@{domain}"
|
|
password = secrets.token_urlsafe(18)
|
|
|
|
create_resp = requests.post(
|
|
f"{MAILTM_BASE}/accounts",
|
|
headers=_mailtm_headers(use_json=True),
|
|
json={"address": email, "password": password},
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
|
|
if create_resp.status_code not in (200, 201):
|
|
continue
|
|
|
|
token_resp = requests.post(
|
|
f"{MAILTM_BASE}/token",
|
|
headers=_mailtm_headers(use_json=True),
|
|
json={"address": email, "password": password},
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
|
|
if token_resp.status_code == 200:
|
|
token = str(token_resp.json().get("token") or "").strip()
|
|
if token:
|
|
return email, token
|
|
|
|
print("[Error] Mail.tm 邮箱创建成功但获取 Token 失败")
|
|
return "", ""
|
|
except Exception as e:
|
|
print(f"[Error] 请求 Mail.tm API 出错: {e}")
|
|
return "", ""
|
|
|
|
|
|
def get_oai_code(token: str, email: str, proxies: Any = None) -> str:
|
|
"""使用 Mail.tm Token 轮询获取 OpenAI 验证码"""
|
|
url_list = f"{MAILTM_BASE}/messages"
|
|
regex = r"(?<!\d)(\d{6})(?!\d)"
|
|
seen_ids: set[str] = set()
|
|
|
|
print(f"[*] 正在等待邮箱 {email} 的验证码...", end="", flush=True)
|
|
|
|
for _ in range(40):
|
|
print(".", end="", flush=True)
|
|
try:
|
|
resp = requests.get(
|
|
url_list,
|
|
headers=_mailtm_headers(token=token),
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
if resp.status_code != 200:
|
|
time.sleep(3)
|
|
continue
|
|
|
|
data = resp.json()
|
|
if isinstance(data, list):
|
|
messages = data
|
|
elif isinstance(data, dict):
|
|
messages = data.get("hydra:member") or data.get("messages") or []
|
|
else:
|
|
messages = []
|
|
|
|
for msg in messages:
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
msg_id = str(msg.get("id") or "").strip()
|
|
if not msg_id or msg_id in seen_ids:
|
|
continue
|
|
seen_ids.add(msg_id)
|
|
|
|
read_resp = requests.get(
|
|
f"{MAILTM_BASE}/messages/{msg_id}",
|
|
headers=_mailtm_headers(token=token),
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
if read_resp.status_code != 200:
|
|
continue
|
|
|
|
mail_data = read_resp.json()
|
|
sender = str(
|
|
((mail_data.get("from") or {}).get("address") or "")
|
|
).lower()
|
|
subject = str(mail_data.get("subject") or "")
|
|
intro = str(mail_data.get("intro") or "")
|
|
text = str(mail_data.get("text") or "")
|
|
html = mail_data.get("html") or ""
|
|
if isinstance(html, list):
|
|
html = "\n".join(str(x) for x in html)
|
|
content = "\n".join([subject, intro, text, str(html)])
|
|
|
|
if "openai" not in sender and "openai" not in content.lower():
|
|
continue
|
|
|
|
m = re.search(regex, content)
|
|
if m:
|
|
print(" 抓到啦! 验证码:", m.group(1))
|
|
return m.group(1)
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(3)
|
|
|
|
print(" 超时,未收到验证码")
|
|
return ""
|
|
|
|
|
|
# ==========================================
|
|
# OAuth 授权与辅助函数
|
|
# ==========================================
|
|
|
|
AUTH_URL = "https://auth.openai.com/oauth/authorize"
|
|
TOKEN_URL = "https://auth.openai.com/oauth/token"
|
|
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
|
|
|
|
DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
|
|
DEFAULT_SCOPE = "openid email profile offline_access"
|
|
|
|
|
|
def _b64url_no_pad(raw: bytes) -> str:
|
|
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
|
|
|
|
|
|
def _sha256_b64url_no_pad(s: str) -> str:
|
|
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
|
|
|
|
|
|
def _random_state(nbytes: int = 16) -> str:
|
|
return secrets.token_urlsafe(nbytes)
|
|
|
|
|
|
def _pkce_verifier() -> str:
|
|
return secrets.token_urlsafe(64)
|
|
|
|
|
|
def _parse_callback_url(callback_url: str) -> Dict[str, str]:
|
|
candidate = callback_url.strip()
|
|
if not candidate:
|
|
return {"code": "", "state": "", "error": "", "error_description": ""}
|
|
|
|
if "://" not in candidate:
|
|
if candidate.startswith("?"):
|
|
candidate = f"http://localhost{candidate}"
|
|
elif any(ch in candidate for ch in "/?#") or ":" in candidate:
|
|
candidate = f"http://{candidate}"
|
|
elif "=" in candidate:
|
|
candidate = f"http://localhost/?{candidate}"
|
|
|
|
parsed = urllib.parse.urlparse(candidate)
|
|
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
|
|
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
|
|
|
|
for key, values in fragment.items():
|
|
if key not in query or not query[key] or not (query[key][0] or "").strip():
|
|
query[key] = values
|
|
|
|
def get1(k: str) -> str:
|
|
v = query.get(k, [""])
|
|
return (v[0] or "").strip()
|
|
|
|
code = get1("code")
|
|
state = get1("state")
|
|
error = get1("error")
|
|
error_description = get1("error_description")
|
|
|
|
if code and not state and "#" in code:
|
|
code, state = code.split("#", 1)
|
|
|
|
if not error and error_description:
|
|
error, error_description = error_description, ""
|
|
|
|
return {
|
|
"code": code,
|
|
"state": state,
|
|
"error": error,
|
|
"error_description": error_description,
|
|
}
|
|
|
|
|
|
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
|
|
if not id_token or id_token.count(".") < 2:
|
|
return {}
|
|
payload_b64 = id_token.split(".")[1]
|
|
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
|
|
try:
|
|
payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
|
|
return json.loads(payload.decode("utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
|
|
raw = (seg or "").strip()
|
|
if not raw:
|
|
return {}
|
|
pad = "=" * ((4 - (len(raw) % 4)) % 4)
|
|
try:
|
|
decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii"))
|
|
return json.loads(decoded.decode("utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _to_int(v: Any) -> int:
|
|
try:
|
|
return int(v)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
|
|
body = urllib.parse.urlencode(data).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=body,
|
|
method="POST",
|
|
headers={
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Accept": "application/json",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
if resp.status != 200:
|
|
raise RuntimeError(
|
|
f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
|
|
)
|
|
return json.loads(raw.decode("utf-8"))
|
|
except urllib.error.HTTPError as exc:
|
|
raw = exc.read()
|
|
raise RuntimeError(
|
|
f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
|
|
) from exc
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OAuthStart:
|
|
auth_url: str
|
|
state: str
|
|
code_verifier: str
|
|
redirect_uri: str
|
|
|
|
|
|
def generate_oauth_url(
|
|
*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE
|
|
) -> OAuthStart:
|
|
state = _random_state()
|
|
code_verifier = _pkce_verifier()
|
|
code_challenge = _sha256_b64url_no_pad(code_verifier)
|
|
|
|
params = {
|
|
"client_id": CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": redirect_uri,
|
|
"scope": scope,
|
|
"state": state,
|
|
"code_challenge": code_challenge,
|
|
"code_challenge_method": "S256",
|
|
"prompt": "login",
|
|
"id_token_add_organizations": "true",
|
|
"codex_cli_simplified_flow": "true",
|
|
}
|
|
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
|
|
return OAuthStart(
|
|
auth_url=auth_url,
|
|
state=state,
|
|
code_verifier=code_verifier,
|
|
redirect_uri=redirect_uri,
|
|
)
|
|
|
|
|
|
def submit_callback_url(
|
|
*,
|
|
callback_url: str,
|
|
expected_state: str,
|
|
code_verifier: str,
|
|
redirect_uri: str = DEFAULT_REDIRECT_URI,
|
|
) -> str:
|
|
cb = _parse_callback_url(callback_url)
|
|
if cb["error"]:
|
|
desc = cb["error_description"]
|
|
raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
|
|
|
|
if not cb["code"]:
|
|
raise ValueError("callback url missing ?code=")
|
|
if not cb["state"]:
|
|
raise ValueError("callback url missing ?state=")
|
|
if cb["state"] != expected_state:
|
|
raise ValueError("state mismatch")
|
|
|
|
token_resp = _post_form(
|
|
TOKEN_URL,
|
|
{
|
|
"grant_type": "authorization_code",
|
|
"client_id": CLIENT_ID,
|
|
"code": cb["code"],
|
|
"redirect_uri": redirect_uri,
|
|
"code_verifier": code_verifier,
|
|
},
|
|
)
|
|
|
|
access_token = (token_resp.get("access_token") or "").strip()
|
|
refresh_token = (token_resp.get("refresh_token") or "").strip()
|
|
id_token = (token_resp.get("id_token") or "").strip()
|
|
expires_in = _to_int(token_resp.get("expires_in"))
|
|
|
|
claims = _jwt_claims_no_verify(id_token)
|
|
email = str(claims.get("email") or "").strip()
|
|
auth_claims = claims.get("https://api.openai.com/auth") or {}
|
|
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
|
|
|
|
now = int(time.time())
|
|
expired_rfc3339 = time.strftime(
|
|
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
|
|
)
|
|
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
|
|
|
|
config = {
|
|
"id_token": id_token,
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"account_id": account_id,
|
|
"last_refresh": now_rfc3339,
|
|
"email": email,
|
|
"type": "codex",
|
|
"expired": expired_rfc3339,
|
|
}
|
|
|
|
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
|
|
# ==========================================
|
|
# 核心注册逻辑
|
|
# ==========================================
|
|
|
|
|
|
def run(proxy: Optional[str]) -> Optional[str]:
|
|
proxies: Any = None
|
|
if proxy:
|
|
proxies = {"http": proxy, "https": proxy}
|
|
|
|
s = requests.Session(proxies=proxies, impersonate="chrome")
|
|
|
|
try:
|
|
trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
|
|
trace = trace.text
|
|
loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
|
|
loc = loc_re.group(1) if loc_re else None
|
|
print(f"[*] 当前 IP 所在地: {loc}")
|
|
if loc == "CN" or loc == "HK":
|
|
raise RuntimeError("检查代理哦w - 所在地不支持")
|
|
except Exception as e:
|
|
print(f"[Error] 网络连接检查失败: {e}")
|
|
return None
|
|
|
|
email, dev_token = get_email_and_token(proxies)
|
|
if not email or not dev_token:
|
|
return None
|
|
print(f"[*] 成功获取 Mail.tm 邮箱与授权: {email}")
|
|
|
|
oauth = generate_oauth_url()
|
|
url = oauth.auth_url
|
|
|
|
try:
|
|
resp = s.get(url, timeout=15)
|
|
did = s.cookies.get("oai-did")
|
|
print(f"[*] Device ID: {did}")
|
|
|
|
signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
|
|
sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
|
|
|
|
sen_resp = requests.post(
|
|
"https://sentinel.openai.com/backend-api/sentinel/req",
|
|
headers={
|
|
"origin": "https://sentinel.openai.com",
|
|
"referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
|
|
"content-type": "text/plain;charset=UTF-8",
|
|
},
|
|
data=sen_req_body,
|
|
proxies=proxies,
|
|
impersonate="chrome",
|
|
timeout=15,
|
|
)
|
|
|
|
if sen_resp.status_code != 200:
|
|
print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}")
|
|
return None
|
|
|
|
sen_token = sen_resp.json()["token"]
|
|
sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}'
|
|
|
|
signup_resp = s.post(
|
|
"https://auth.openai.com/api/accounts/authorize/continue",
|
|
headers={
|
|
"referer": "https://auth.openai.com/create-account",
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"openai-sentinel-token": sentinel,
|
|
},
|
|
data=signup_body,
|
|
)
|
|
print(f"[*] 提交注册表单状态: {signup_resp.status_code}")
|
|
|
|
otp_resp = s.post(
|
|
"https://auth.openai.com/api/accounts/passwordless/send-otp",
|
|
headers={
|
|
"referer": "https://auth.openai.com/create-account/password",
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
},
|
|
)
|
|
print(f"[*] 验证码发送状态: {otp_resp.status_code}")
|
|
|
|
code = get_oai_code(dev_token, email, proxies)
|
|
if not code:
|
|
return None
|
|
|
|
code_body = f'{{"code":"{code}"}}'
|
|
code_resp = s.post(
|
|
"https://auth.openai.com/api/accounts/email-otp/validate",
|
|
headers={
|
|
"referer": "https://auth.openai.com/email-verification",
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
},
|
|
data=code_body,
|
|
)
|
|
print(f"[*] 验证码校验状态: {code_resp.status_code}")
|
|
|
|
create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
|
|
create_account_resp = s.post(
|
|
"https://auth.openai.com/api/accounts/create_account",
|
|
headers={
|
|
"referer": "https://auth.openai.com/about-you",
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
},
|
|
data=create_account_body,
|
|
)
|
|
create_account_status = create_account_resp.status_code
|
|
print(f"[*] 账户创建状态: {create_account_status}")
|
|
|
|
if create_account_status != 200:
|
|
print(create_account_resp.text)
|
|
return None
|
|
|
|
auth_cookie = s.cookies.get("oai-client-auth-session")
|
|
if not auth_cookie:
|
|
print("[Error] 未能获取到授权 Cookie")
|
|
return None
|
|
|
|
auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
|
|
workspaces = auth_json.get("workspaces") or []
|
|
if not workspaces:
|
|
print("[Error] 授权 Cookie 里没有 workspace 信息")
|
|
return None
|
|
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
|
|
if not workspace_id:
|
|
print("[Error] 无法解析 workspace_id")
|
|
return None
|
|
|
|
select_body = f'{{"workspace_id":"{workspace_id}"}}'
|
|
select_resp = s.post(
|
|
"https://auth.openai.com/api/accounts/workspace/select",
|
|
headers={
|
|
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
|
|
"content-type": "application/json",
|
|
},
|
|
data=select_body,
|
|
)
|
|
|
|
if select_resp.status_code != 200:
|
|
print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}")
|
|
print(select_resp.text)
|
|
return None
|
|
|
|
continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
|
|
if not continue_url:
|
|
print("[Error] workspace/select 响应里缺少 continue_url")
|
|
return None
|
|
|
|
current_url = continue_url
|
|
for _ in range(6):
|
|
final_resp = s.get(current_url, allow_redirects=False, timeout=15)
|
|
location = final_resp.headers.get("Location") or ""
|
|
|
|
if final_resp.status_code not in [301, 302, 303, 307, 308]:
|
|
break
|
|
if not location:
|
|
break
|
|
|
|
next_url = urllib.parse.urljoin(current_url, location)
|
|
if "code=" in next_url and "state=" in next_url:
|
|
return submit_callback_url(
|
|
callback_url=next_url,
|
|
code_verifier=oauth.code_verifier,
|
|
redirect_uri=oauth.redirect_uri,
|
|
expected_state=oauth.state,
|
|
)
|
|
current_url = next_url
|
|
|
|
print("[Error] 未能在重定向链中捕获到最终 Callback URL")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"[Error] 运行时发生错误: {e}")
|
|
return None
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本")
|
|
parser.add_argument(
|
|
"--proxy", default="", help="代理地址,如 http://127.0.0.1:7890"
|
|
)
|
|
parser.add_argument("--once", action="store_true", help="只运行一次")
|
|
parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
|
|
parser.add_argument(
|
|
"--sleep-max", type=int, default=20, help="循环模式最长等待秒数"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
sleep_min = max(1, args.sleep_min)
|
|
sleep_max = max(sleep_min, args.sleep_max)
|
|
|
|
count = 0
|
|
print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH")
|
|
|
|
while True:
|
|
count += 1
|
|
print(
|
|
f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<"
|
|
)
|
|
|
|
try:
|
|
token_json = run(args.proxy)
|
|
|
|
if token_json:
|
|
try:
|
|
t_data = json.loads(token_json)
|
|
fname_email = t_data.get("email", "unknown").replace("@", "_")
|
|
except Exception:
|
|
fname_email = "unknown"
|
|
|
|
file_name = f"token_{fname_email}_{int(time.time())}.json"
|
|
|
|
with open(file_name, "w", encoding="utf-8") as f:
|
|
f.write(token_json)
|
|
|
|
print(f"[*] 成功! Token 已保存至: {file_name}")
|
|
else:
|
|
print("[-] 本次注册失败。")
|
|
|
|
except Exception as e:
|
|
print(f"[Error] 发生未捕获异常: {e}")
|
|
|
|
if args.once:
|
|
break
|
|
|
|
wait_time = random.randint(sleep_min, sleep_max)
|
|
print(f"[*] 休息 {wait_time} 秒...")
|
|
time.sleep(wait_time)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|