twitter-checker/twitter_auth_cookies.py
2026-01-21 04:25:00 +02:00

608 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Twitter Authentication Script using Cookies
Supports Netscape and JSON cookie formats
"""
import json
import sys
import asyncio
import random
import shutil
from pathlib import Path
from typing import Dict, Union, Optional, List
from datetime import datetime
from tweety import TwitterAsync
from tweety.types import Proxy, PROXY_TYPE_SOCKS5, PROXY_TYPE_HTTP
def clean_cookie_content(content: str) -> str:
"""
Clean cookie content by removing non-cookie data
Args:
content: Raw file content
Returns:
Cleaned content with only cookies
"""
lines = content.strip().split('\n')
cleaned_lines = []
# For JSON format - try to extract only the JSON array/object
content_stripped = content.strip()
if content_stripped.startswith('[') or content_stripped.startswith('{'):
# Find the matching bracket/brace
bracket_count = 0
brace_count = 0
json_end = 0
for i, char in enumerate(content_stripped):
if char == '[':
bracket_count += 1
elif char == ']':
bracket_count -= 1
if bracket_count == 0 and content_stripped[0] == '[':
json_end = i + 1
break
elif char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0 and content_stripped[0] == '{':
json_end = i + 1
break
if json_end > 0:
return content_stripped[:json_end]
# For Netscape format - keep only valid cookie lines
for line in lines:
line = line.strip()
# Skip empty lines
if not line:
continue
# Keep comments that are part of Netscape format
if line.startswith('#'):
cleaned_lines.append(line)
continue
# Check if it's a valid Netscape cookie line (tab-separated)
parts = line.split('\t')
if len(parts) >= 7:
# Validate it looks like a cookie (domain, flag, path, secure, expiration, name, value)
try:
# Check if expiration is a number
int(parts[4])
cleaned_lines.append(line)
except ValueError:
# Not a valid cookie line, skip it
continue
return '\n'.join(cleaned_lines) if cleaned_lines else content
def netscape_to_dict(netscape_content: str) -> Dict[str, str]:
"""
Convert Netscape cookie format to dictionary
Args:
netscape_content: Netscape format cookies as string
Returns:
Dictionary of cookie name-value pairs
"""
cookies = {}
for line in netscape_content.strip().split('\n'):
# Skip comments and empty lines
if line.startswith('#') or not line.strip():
continue
# Netscape format: domain flag path secure expiration name value
parts = line.split('\t')
if len(parts) >= 7:
cookie_name = parts[5]
cookie_value = parts[6]
cookies[cookie_name] = cookie_value
return cookies
def json_to_dict(json_content: str) -> Dict[str, str]:
"""
Convert JSON cookie format to dictionary
Args:
json_content: JSON format cookies as string
Returns:
Dictionary of cookie name-value pairs
"""
cookies = {}
data = json.loads(json_content)
# Handle different JSON formats
if isinstance(data, list):
# Format: [{"name": "cookie_name", "value": "cookie_value"}, ...]
for cookie in data:
if 'name' in cookie and 'value' in cookie:
cookies[cookie['name']] = cookie['value']
elif isinstance(data, dict):
# Format: {"cookie_name": "cookie_value", ...}
cookies = data
return cookies
def dict_to_cookie_string(cookies: Dict[str, str]) -> str:
"""
Convert cookie dictionary to string format for tweety
Args:
cookies: Dictionary of cookie name-value pairs
Returns:
Cookie string in format "name1=value1; name2=value2"
"""
return "; ".join([f"{name}={value}" for name, value in cookies.items()])
def read_proxy_file(file_path: str) -> List[Proxy]:
"""
Read all proxies from file
Format: protocol://[username:password@]host:port
One proxy per line
Args:
file_path: Path to proxy file
Returns:
List of Proxy objects
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Proxy file not found: {file_path}")
proxies = []
lines = path.read_text(encoding='utf-8').strip().split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
try:
# Parse proxy URL
if '://' in line:
protocol, rest = line.split('://', 1)
else:
protocol = 'socks5'
rest = line
# Check for auth
username = None
password = None
if '@' in rest:
auth, host_port = rest.rsplit('@', 1)
if ':' in auth:
username, password = auth.split(':', 1)
else:
host_port = rest
# Parse host:port
if ':' in host_port:
host, port = host_port.rsplit(':', 1)
port = int(port)
else:
continue
# Determine proxy type
protocol = protocol.lower()
if protocol in ['socks5', 'socks', 'socks5h']:
proxy_type = PROXY_TYPE_SOCKS5
elif protocol in ['http', 'https']:
proxy_type = PROXY_TYPE_HTTP
else:
continue
proxy = Proxy(
host=host,
port=port,
proxy_type=proxy_type,
username=username,
password=password
)
proxies.append(proxy)
except Exception:
continue
if proxies:
print(f"✓ Loaded {len(proxies)} proxies from file")
return proxies
def read_cookies_file(file_path: str) -> str:
"""
Read cookies from file and convert to tweety format
Args:
file_path: Path to cookies file (Netscape or JSON format)
Returns:
Cookie string ready for tweety
"""
path = Path(file_path)
if not path.exists():
return None
try:
content = path.read_text(encoding='utf-8')
except Exception:
return None
# Clean the content first
content = clean_cookie_content(content)
if not content:
return None
# Try to detect format and convert
cookies_dict = {}
# Try JSON first
try:
cookies_dict = json_to_dict(content)
except json.JSONDecodeError:
# Try Netscape format
try:
cookies_dict = netscape_to_dict(content)
except Exception:
return None
if not cookies_dict:
return None
return dict_to_cookie_string(cookies_dict)
def scan_cookies_directory(dir_path: str) -> List[Path]:
"""
Scan directory for cookie files (.json and .txt)
Args:
dir_path: Path to directory with cookies or single file
Returns:
List of Path objects to cookie files
"""
path = Path(dir_path).expanduser().resolve()
if not path.exists():
# Try relative to current directory
path = Path.cwd() / dir_path
if not path.exists():
raise FileNotFoundError(f"Path not found: {dir_path}")
# If it's a file, return it directly
if path.is_file():
return [path]
# If it's a directory, scan for cookie files
cookie_files = []
for ext in ['*.json', '*.txt']:
cookie_files.extend(path.glob(ext))
return sorted(cookie_files)
async def authenticate_twitter(cookies: Union[str, Dict], proxy: Optional[Proxy] = None) -> TwitterAsync:
"""
Authenticate to Twitter using cookies
Args:
cookies: Cookie string or dictionary
proxy: Optional proxy configuration
Returns:
Authenticated TwitterAsync instance
"""
print(f"\n🔐 Authenticating with Twitter...")
if proxy:
app = TwitterAsync("temp_session", proxy=proxy)
else:
app = TwitterAsync("temp_session")
await app.load_cookies(cookies)
return app
def save_account_info(user, cookie_file: Path, output_dir: Path, cleaned_cookies: str = None):
"""
Save account information to text file and copy cookie file
Args:
user: User object from tweety
cookie_file: Original cookie file path
output_dir: Output directory (results/)
cleaned_cookies: Cleaned cookie content (optional)
"""
output_dir.mkdir(exist_ok=True)
# Create filename based on username
username = user.username
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save account info
info_file = output_dir / f"{username}_{timestamp}_info.txt"
# Check for verification type
verified_type = None
verified_label = None
if isinstance(user, dict):
verified_type = user.get('verified_type') or user.get('ext_verified_type')
verified_label = user.get('verified_label')
if 'legacy' in user and isinstance(user['legacy'], dict):
if not verified_type:
verified_type = user['legacy'].get('verified_type')
if 'ext_is_blue_verified' in user:
if user['ext_is_blue_verified']:
verified_type = verified_type or 'Blue'
# Use cleaned cookies if provided, otherwise read original
if cleaned_cookies:
cookie_content = cleaned_cookies
else:
cookie_content = cookie_file.read_text(encoding='utf-8')
# Format verification info
verification_parts = []
if user.verified:
verification_parts.append("Verified: Yes")
if verified_type:
verification_parts.append(f"Type: {verified_type}")
if verified_label:
verification_parts.append(f"Label: {verified_label}")
else:
verification_parts.append("Verified: No")
verification_line = " | ".join(verification_parts)
# Format created date
try:
if isinstance(user.created_at, str):
# Parse and reformat date
from dateutil import parser
created_date = parser.parse(user.created_at)
created_str = created_date.strftime("%d.%m.%Y %H:%M")
else:
created_str = user.created_at.strftime("%d.%m.%Y %H:%M")
except:
created_str = str(user.created_at)
# Format info
info_lines = [
"=" * 80,
f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
f"ID: {user.id} | Name: {user.name} | Username: @{username}",
f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}",
f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}",
verification_line,
f"Created: {created_str}",
f"Profile: https://twitter.com/{username}",
"",
f"Cookie file: {cookie_file.absolute()}",
"",
"=" * 80,
cookie_content,
]
# Write to file
info_file.write_text('\n'.join(info_lines), encoding='utf-8')
return info_file
async def check_account(cookie_file: Path, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore):
"""
Check single account asynchronously
Args:
cookie_file: Path to cookie file
proxies: List of available proxies
output_dir: Output directory
semaphore: Semaphore for concurrency control
Returns:
Tuple of (success: bool, verified: bool, error_msg: str)
"""
async with semaphore:
try:
# Read cookies
try:
cookies_string = read_cookies_file(str(cookie_file))
if not cookies_string:
return (False, False, "Failed to parse cookie file")
# Get cleaned cookie content for saving
cleaned_content = clean_cookie_content(cookie_file.read_text(encoding='utf-8'))
except Exception as e:
return (False, False, f"Failed to read cookies: {e}")
# Select random proxy if available
proxy = random.choice(proxies) if proxies else None
# Authenticate
try:
app = await authenticate_twitter(cookies_string, proxy)
except Exception as e:
return (False, False, f"Authentication error: {e}")
# Check if successful
try:
if app.me:
user = app.me
is_verified = user.verified
# Save results
try:
info_file = save_account_info(user, cookie_file, output_dir, cleaned_content)
except Exception as e:
pass
return (True, is_verified, f"@{user.username}")
else:
return (False, False, "Authentication failed - invalid cookies")
except Exception as e:
return (False, False, f"Error processing user info: {e}")
except Exception as e:
return (False, False, f"Unexpected error: {e}")
async def main():
"""Main function"""
print("=" * 60)
print("Twitter Accounts Checker")
print("=" * 60)
# Check arguments
if len(sys.argv) < 2:
print("\nUsage: python twitter_auth_cookies.py <cookies_dir> [proxy_file]")
print("\nArguments:")
print(" cookies_dir - Directory with cookie files (.json/.txt) or single file")
print(" proxy_file - Optional file with proxies (one per line)")
print("\nProxy format:")
print(" socks5://127.0.0.1:1080")
print(" http://user:pass@proxy.com:8080")
print("\nExamples:")
print(" python twitter_auth_cookies.py cookies/")
print(" python twitter_auth_cookies.py cookies/ proxies.txt")
print(" python twitter_auth_cookies.py single_cookie.json proxies.txt")
sys.exit(1)
cookies_path = sys.argv[1]
proxy_file = sys.argv[2] if len(sys.argv) > 2 else None
# Output directory
output_dir = Path("results")
output_dir.mkdir(exist_ok=True)
try:
# Scan for cookie files
print(f"\n📂 Scanning for cookies in: {cookies_path}")
cookie_files = scan_cookies_directory(cookies_path)
if not cookie_files:
print("❌ No cookie files found (.json or .txt)")
sys.exit(1)
print(f"✓ Found {len(cookie_files)} cookie file(s)")
# Load proxies if provided
proxies = []
if proxy_file:
print(f"\n🌐 Loading proxies from: {proxy_file}")
proxies = read_proxy_file(proxy_file)
if not proxies:
print("⚠️ No valid proxies found, continuing without proxy")
# Process each cookie file
print(f"\n🔍 Starting account checks...")
print("=" * 60)
# Create semaphore for concurrency control (5 concurrent checks)
semaphore = asyncio.Semaphore(5)
# Create tasks for all accounts
tasks = []
for cookie_file in cookie_files:
task = check_account(cookie_file, proxies, output_dir, semaphore)
tasks.append((cookie_file, task))
# Process all accounts concurrently
successful = 0
failed = 0
verified_count = 0
for i, (cookie_file, task) in enumerate(tasks, 1):
print(f"\n[{i}/{len(cookie_files)}] Checking: {cookie_file.name}")
success, verified, msg = await task
if success:
print(f" ✅ Success: {msg}")
if verified:
print(f" Verified: Yes")
verified_count += 1
successful += 1
else:
print(f"{msg}")
failed += 1
# Summary
print("\n" + "=" * 60)
print("Summary:")
print("=" * 60)
print(f"Total accounts: {len(cookie_files)}")
print(f"✅ Valid: {successful}")
print(f"❌ Failed: {failed}")
print(f"✓ Verified: {verified_count}")
print(f"\n📁 Results saved to: {output_dir.absolute()}/")
print("=" * 60)
# Save summary to file
summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
summary_lines = [
"=" * 80,
"Twitter Checker Summary",
"=" * 80,
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"Results:",
f" Total accounts checked: {len(cookie_files)}",
f" Valid accounts: {successful}",
f" Invalid accounts: {failed}",
f" Verified accounts: {verified_count}",
"",
"Configuration:",
f" Proxies loaded: {len(proxies) if proxies else 0}",
f" Cookie files: {len(cookie_files)}",
"",
"=" * 80,
]
summary_file.write_text('\n'.join(summary_lines), encoding='utf-8')
print(f"\n📊 Summary saved to: {summary_file.name}")
except FileNotFoundError as e:
print(f"\n❌ Error: {e}")
sys.exit(1)
except Exception as e:
print(f"\n❌ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())