608 lines
18 KiB
Python
608 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Twitter Authentication Script using Cookies
|
|
Supports Netscape and JSON cookie formats
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import asyncio
|
|
import random
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, Union, Optional, List
|
|
from datetime import datetime
|
|
from tweety import TwitterAsync
|
|
from tweety.types import Proxy, PROXY_TYPE_SOCKS5, PROXY_TYPE_HTTP
|
|
|
|
|
|
def clean_cookie_content(content: str) -> str:
|
|
"""
|
|
Clean cookie content by removing non-cookie data
|
|
|
|
Args:
|
|
content: Raw file content
|
|
|
|
Returns:
|
|
Cleaned content with only cookies
|
|
"""
|
|
lines = content.strip().split('\n')
|
|
cleaned_lines = []
|
|
|
|
# For JSON format - try to extract only the JSON array/object
|
|
content_stripped = content.strip()
|
|
if content_stripped.startswith('[') or content_stripped.startswith('{'):
|
|
# Find the matching bracket/brace
|
|
bracket_count = 0
|
|
brace_count = 0
|
|
json_end = 0
|
|
|
|
for i, char in enumerate(content_stripped):
|
|
if char == '[':
|
|
bracket_count += 1
|
|
elif char == ']':
|
|
bracket_count -= 1
|
|
if bracket_count == 0 and content_stripped[0] == '[':
|
|
json_end = i + 1
|
|
break
|
|
elif char == '{':
|
|
brace_count += 1
|
|
elif char == '}':
|
|
brace_count -= 1
|
|
if brace_count == 0 and content_stripped[0] == '{':
|
|
json_end = i + 1
|
|
break
|
|
|
|
if json_end > 0:
|
|
return content_stripped[:json_end]
|
|
|
|
# For Netscape format - keep only valid cookie lines
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip empty lines
|
|
if not line:
|
|
continue
|
|
|
|
# Keep comments that are part of Netscape format
|
|
if line.startswith('#'):
|
|
cleaned_lines.append(line)
|
|
continue
|
|
|
|
# Check if it's a valid Netscape cookie line (tab-separated)
|
|
parts = line.split('\t')
|
|
if len(parts) >= 7:
|
|
# Validate it looks like a cookie (domain, flag, path, secure, expiration, name, value)
|
|
try:
|
|
# Check if expiration is a number
|
|
int(parts[4])
|
|
cleaned_lines.append(line)
|
|
except ValueError:
|
|
# Not a valid cookie line, skip it
|
|
continue
|
|
|
|
return '\n'.join(cleaned_lines) if cleaned_lines else content
|
|
|
|
|
|
def netscape_to_dict(netscape_content: str) -> Dict[str, str]:
|
|
"""
|
|
Convert Netscape cookie format to dictionary
|
|
|
|
Args:
|
|
netscape_content: Netscape format cookies as string
|
|
|
|
Returns:
|
|
Dictionary of cookie name-value pairs
|
|
"""
|
|
cookies = {}
|
|
|
|
for line in netscape_content.strip().split('\n'):
|
|
# Skip comments and empty lines
|
|
if line.startswith('#') or not line.strip():
|
|
continue
|
|
|
|
# Netscape format: domain flag path secure expiration name value
|
|
parts = line.split('\t')
|
|
if len(parts) >= 7:
|
|
cookie_name = parts[5]
|
|
cookie_value = parts[6]
|
|
cookies[cookie_name] = cookie_value
|
|
|
|
return cookies
|
|
|
|
|
|
def json_to_dict(json_content: str) -> Dict[str, str]:
|
|
"""
|
|
Convert JSON cookie format to dictionary
|
|
|
|
Args:
|
|
json_content: JSON format cookies as string
|
|
|
|
Returns:
|
|
Dictionary of cookie name-value pairs
|
|
"""
|
|
cookies = {}
|
|
data = json.loads(json_content)
|
|
|
|
# Handle different JSON formats
|
|
if isinstance(data, list):
|
|
# Format: [{"name": "cookie_name", "value": "cookie_value"}, ...]
|
|
for cookie in data:
|
|
if 'name' in cookie and 'value' in cookie:
|
|
cookies[cookie['name']] = cookie['value']
|
|
elif isinstance(data, dict):
|
|
# Format: {"cookie_name": "cookie_value", ...}
|
|
cookies = data
|
|
|
|
return cookies
|
|
|
|
|
|
def dict_to_cookie_string(cookies: Dict[str, str]) -> str:
|
|
"""
|
|
Convert cookie dictionary to string format for tweety
|
|
|
|
Args:
|
|
cookies: Dictionary of cookie name-value pairs
|
|
|
|
Returns:
|
|
Cookie string in format "name1=value1; name2=value2"
|
|
"""
|
|
return "; ".join([f"{name}={value}" for name, value in cookies.items()])
|
|
|
|
|
|
def read_proxy_file(file_path: str) -> List[Proxy]:
|
|
"""
|
|
Read all proxies from file
|
|
|
|
Format: protocol://[username:password@]host:port
|
|
One proxy per line
|
|
|
|
Args:
|
|
file_path: Path to proxy file
|
|
|
|
Returns:
|
|
List of Proxy objects
|
|
"""
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Proxy file not found: {file_path}")
|
|
|
|
proxies = []
|
|
lines = path.read_text(encoding='utf-8').strip().split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
try:
|
|
# Parse proxy URL
|
|
if '://' in line:
|
|
protocol, rest = line.split('://', 1)
|
|
else:
|
|
protocol = 'socks5'
|
|
rest = line
|
|
|
|
# Check for auth
|
|
username = None
|
|
password = None
|
|
|
|
if '@' in rest:
|
|
auth, host_port = rest.rsplit('@', 1)
|
|
if ':' in auth:
|
|
username, password = auth.split(':', 1)
|
|
else:
|
|
host_port = rest
|
|
|
|
# Parse host:port
|
|
if ':' in host_port:
|
|
host, port = host_port.rsplit(':', 1)
|
|
port = int(port)
|
|
else:
|
|
continue
|
|
|
|
# Determine proxy type
|
|
protocol = protocol.lower()
|
|
if protocol in ['socks5', 'socks', 'socks5h']:
|
|
proxy_type = PROXY_TYPE_SOCKS5
|
|
elif protocol in ['http', 'https']:
|
|
proxy_type = PROXY_TYPE_HTTP
|
|
else:
|
|
continue
|
|
|
|
proxy = Proxy(
|
|
host=host,
|
|
port=port,
|
|
proxy_type=proxy_type,
|
|
username=username,
|
|
password=password
|
|
)
|
|
|
|
proxies.append(proxy)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
if proxies:
|
|
print(f"✓ Loaded {len(proxies)} proxies from file")
|
|
|
|
return proxies
|
|
|
|
|
|
def read_cookies_file(file_path: str) -> str:
|
|
"""
|
|
Read cookies from file and convert to tweety format
|
|
|
|
Args:
|
|
file_path: Path to cookies file (Netscape or JSON format)
|
|
|
|
Returns:
|
|
Cookie string ready for tweety
|
|
"""
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
return None
|
|
|
|
try:
|
|
content = path.read_text(encoding='utf-8')
|
|
except Exception:
|
|
return None
|
|
|
|
# Clean the content first
|
|
content = clean_cookie_content(content)
|
|
|
|
if not content:
|
|
return None
|
|
|
|
# Try to detect format and convert
|
|
cookies_dict = {}
|
|
|
|
# Try JSON first
|
|
try:
|
|
cookies_dict = json_to_dict(content)
|
|
except json.JSONDecodeError:
|
|
# Try Netscape format
|
|
try:
|
|
cookies_dict = netscape_to_dict(content)
|
|
except Exception:
|
|
return None
|
|
|
|
if not cookies_dict:
|
|
return None
|
|
|
|
return dict_to_cookie_string(cookies_dict)
|
|
|
|
|
|
def scan_cookies_directory(dir_path: str) -> List[Path]:
|
|
"""
|
|
Scan directory for cookie files (.json and .txt)
|
|
|
|
Args:
|
|
dir_path: Path to directory with cookies or single file
|
|
|
|
Returns:
|
|
List of Path objects to cookie files
|
|
"""
|
|
path = Path(dir_path).expanduser().resolve()
|
|
|
|
if not path.exists():
|
|
# Try relative to current directory
|
|
path = Path.cwd() / dir_path
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Path not found: {dir_path}")
|
|
|
|
# If it's a file, return it directly
|
|
if path.is_file():
|
|
return [path]
|
|
|
|
# If it's a directory, scan for cookie files
|
|
cookie_files = []
|
|
|
|
for ext in ['*.json', '*.txt']:
|
|
cookie_files.extend(path.glob(ext))
|
|
|
|
return sorted(cookie_files)
|
|
|
|
|
|
async def authenticate_twitter(cookies: Union[str, Dict], proxy: Optional[Proxy] = None) -> TwitterAsync:
|
|
"""
|
|
Authenticate to Twitter using cookies
|
|
|
|
Args:
|
|
cookies: Cookie string or dictionary
|
|
proxy: Optional proxy configuration
|
|
|
|
Returns:
|
|
Authenticated TwitterAsync instance
|
|
"""
|
|
print(f"\n🔐 Authenticating with Twitter...")
|
|
|
|
if proxy:
|
|
app = TwitterAsync("temp_session", proxy=proxy)
|
|
else:
|
|
app = TwitterAsync("temp_session")
|
|
|
|
await app.load_cookies(cookies)
|
|
|
|
return app
|
|
|
|
|
|
def save_account_info(user, cookie_file: Path, output_dir: Path, cleaned_cookies: str = None):
|
|
"""
|
|
Save account information to text file and copy cookie file
|
|
|
|
Args:
|
|
user: User object from tweety
|
|
cookie_file: Original cookie file path
|
|
output_dir: Output directory (results/)
|
|
cleaned_cookies: Cleaned cookie content (optional)
|
|
"""
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# Create filename based on username
|
|
username = user.username
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Save account info
|
|
info_file = output_dir / f"{username}_{timestamp}_info.txt"
|
|
|
|
# Check for verification type
|
|
verified_type = None
|
|
verified_label = None
|
|
|
|
if isinstance(user, dict):
|
|
verified_type = user.get('verified_type') or user.get('ext_verified_type')
|
|
verified_label = user.get('verified_label')
|
|
|
|
if 'legacy' in user and isinstance(user['legacy'], dict):
|
|
if not verified_type:
|
|
verified_type = user['legacy'].get('verified_type')
|
|
|
|
if 'ext_is_blue_verified' in user:
|
|
if user['ext_is_blue_verified']:
|
|
verified_type = verified_type or 'Blue'
|
|
|
|
# Use cleaned cookies if provided, otherwise read original
|
|
if cleaned_cookies:
|
|
cookie_content = cleaned_cookies
|
|
else:
|
|
cookie_content = cookie_file.read_text(encoding='utf-8')
|
|
|
|
# Format verification info
|
|
verification_parts = []
|
|
if user.verified:
|
|
verification_parts.append("Verified: Yes")
|
|
if verified_type:
|
|
verification_parts.append(f"Type: {verified_type}")
|
|
if verified_label:
|
|
verification_parts.append(f"Label: {verified_label}")
|
|
else:
|
|
verification_parts.append("Verified: No")
|
|
|
|
verification_line = " | ".join(verification_parts)
|
|
|
|
# Format created date
|
|
try:
|
|
if isinstance(user.created_at, str):
|
|
# Parse and reformat date
|
|
from dateutil import parser
|
|
created_date = parser.parse(user.created_at)
|
|
created_str = created_date.strftime("%d.%m.%Y %H:%M")
|
|
else:
|
|
created_str = user.created_at.strftime("%d.%m.%Y %H:%M")
|
|
except:
|
|
created_str = str(user.created_at)
|
|
|
|
# Format info
|
|
info_lines = [
|
|
"=" * 80,
|
|
f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
"",
|
|
f"ID: {user.id} | Name: {user.name} | Username: @{username}",
|
|
f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}",
|
|
f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}",
|
|
verification_line,
|
|
f"Created: {created_str}",
|
|
f"Profile: https://twitter.com/{username}",
|
|
"",
|
|
f"Cookie file: {cookie_file.absolute()}",
|
|
"",
|
|
"=" * 80,
|
|
cookie_content,
|
|
]
|
|
|
|
# Write to file
|
|
info_file.write_text('\n'.join(info_lines), encoding='utf-8')
|
|
|
|
return info_file
|
|
|
|
|
|
async def check_account(cookie_file: Path, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore):
|
|
"""
|
|
Check single account asynchronously
|
|
|
|
Args:
|
|
cookie_file: Path to cookie file
|
|
proxies: List of available proxies
|
|
output_dir: Output directory
|
|
semaphore: Semaphore for concurrency control
|
|
|
|
Returns:
|
|
Tuple of (success: bool, verified: bool, error_msg: str)
|
|
"""
|
|
async with semaphore:
|
|
try:
|
|
# Read cookies
|
|
try:
|
|
cookies_string = read_cookies_file(str(cookie_file))
|
|
|
|
if not cookies_string:
|
|
return (False, False, "Failed to parse cookie file")
|
|
|
|
# Get cleaned cookie content for saving
|
|
cleaned_content = clean_cookie_content(cookie_file.read_text(encoding='utf-8'))
|
|
|
|
except Exception as e:
|
|
return (False, False, f"Failed to read cookies: {e}")
|
|
|
|
# Select random proxy if available
|
|
proxy = random.choice(proxies) if proxies else None
|
|
|
|
# Authenticate
|
|
try:
|
|
app = await authenticate_twitter(cookies_string, proxy)
|
|
except Exception as e:
|
|
return (False, False, f"Authentication error: {e}")
|
|
|
|
# Check if successful
|
|
try:
|
|
if app.me:
|
|
user = app.me
|
|
is_verified = user.verified
|
|
|
|
# Save results
|
|
try:
|
|
info_file = save_account_info(user, cookie_file, output_dir, cleaned_content)
|
|
except Exception as e:
|
|
pass
|
|
|
|
return (True, is_verified, f"@{user.username}")
|
|
else:
|
|
return (False, False, "Authentication failed - invalid cookies")
|
|
except Exception as e:
|
|
return (False, False, f"Error processing user info: {e}")
|
|
|
|
except Exception as e:
|
|
return (False, False, f"Unexpected error: {e}")
|
|
|
|
|
|
async def main():
|
|
"""Main function"""
|
|
|
|
print("=" * 60)
|
|
print("Twitter Accounts Checker")
|
|
print("=" * 60)
|
|
|
|
# Check arguments
|
|
if len(sys.argv) < 2:
|
|
print("\nUsage: python twitter_auth_cookies.py <cookies_dir> [proxy_file]")
|
|
print("\nArguments:")
|
|
print(" cookies_dir - Directory with cookie files (.json/.txt) or single file")
|
|
print(" proxy_file - Optional file with proxies (one per line)")
|
|
print("\nProxy format:")
|
|
print(" socks5://127.0.0.1:1080")
|
|
print(" http://user:pass@proxy.com:8080")
|
|
print("\nExamples:")
|
|
print(" python twitter_auth_cookies.py cookies/")
|
|
print(" python twitter_auth_cookies.py cookies/ proxies.txt")
|
|
print(" python twitter_auth_cookies.py single_cookie.json proxies.txt")
|
|
sys.exit(1)
|
|
|
|
cookies_path = sys.argv[1]
|
|
proxy_file = sys.argv[2] if len(sys.argv) > 2 else None
|
|
|
|
# Output directory
|
|
output_dir = Path("results")
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
try:
|
|
# Scan for cookie files
|
|
print(f"\n📂 Scanning for cookies in: {cookies_path}")
|
|
cookie_files = scan_cookies_directory(cookies_path)
|
|
|
|
if not cookie_files:
|
|
print("❌ No cookie files found (.json or .txt)")
|
|
sys.exit(1)
|
|
|
|
print(f"✓ Found {len(cookie_files)} cookie file(s)")
|
|
|
|
# Load proxies if provided
|
|
proxies = []
|
|
if proxy_file:
|
|
print(f"\n🌐 Loading proxies from: {proxy_file}")
|
|
proxies = read_proxy_file(proxy_file)
|
|
|
|
if not proxies:
|
|
print("⚠️ No valid proxies found, continuing without proxy")
|
|
|
|
# Process each cookie file
|
|
print(f"\n🔍 Starting account checks...")
|
|
print("=" * 60)
|
|
|
|
# Create semaphore for concurrency control (5 concurrent checks)
|
|
semaphore = asyncio.Semaphore(5)
|
|
|
|
# Create tasks for all accounts
|
|
tasks = []
|
|
for cookie_file in cookie_files:
|
|
task = check_account(cookie_file, proxies, output_dir, semaphore)
|
|
tasks.append((cookie_file, task))
|
|
|
|
# Process all accounts concurrently
|
|
successful = 0
|
|
failed = 0
|
|
verified_count = 0
|
|
|
|
for i, (cookie_file, task) in enumerate(tasks, 1):
|
|
print(f"\n[{i}/{len(cookie_files)}] Checking: {cookie_file.name}")
|
|
|
|
success, verified, msg = await task
|
|
|
|
if success:
|
|
print(f" ✅ Success: {msg}")
|
|
if verified:
|
|
print(f" Verified: Yes")
|
|
verified_count += 1
|
|
successful += 1
|
|
else:
|
|
print(f" ❌ {msg}")
|
|
failed += 1
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("Summary:")
|
|
print("=" * 60)
|
|
print(f"Total accounts: {len(cookie_files)}")
|
|
print(f"✅ Valid: {successful}")
|
|
print(f"❌ Failed: {failed}")
|
|
print(f"✓ Verified: {verified_count}")
|
|
print(f"\n📁 Results saved to: {output_dir.absolute()}/")
|
|
print("=" * 60)
|
|
|
|
# Save summary to file
|
|
summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
summary_lines = [
|
|
"=" * 80,
|
|
"Twitter Checker Summary",
|
|
"=" * 80,
|
|
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
"",
|
|
"Results:",
|
|
f" Total accounts checked: {len(cookie_files)}",
|
|
f" Valid accounts: {successful}",
|
|
f" Invalid accounts: {failed}",
|
|
f" Verified accounts: {verified_count}",
|
|
"",
|
|
"Configuration:",
|
|
f" Proxies loaded: {len(proxies) if proxies else 0}",
|
|
f" Cookie files: {len(cookie_files)}",
|
|
"",
|
|
"=" * 80,
|
|
]
|
|
summary_file.write_text('\n'.join(summary_lines), encoding='utf-8')
|
|
print(f"\n📊 Summary saved to: {summary_file.name}")
|
|
|
|
except FileNotFoundError as e:
|
|
print(f"\n❌ Error: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|