#!/usr/bin/env python3 """ Twitter Authentication Script using Cookies Supports Netscape and JSON cookie formats """ import json import sys import asyncio import random import shutil from pathlib import Path from typing import Dict, Union, Optional, List from datetime import datetime from tweety import TwitterAsync from tweety.types import Proxy, PROXY_TYPE_SOCKS5, PROXY_TYPE_HTTP def clean_cookie_content(content: str) -> str: """ Clean cookie content by removing non-cookie data Args: content: Raw file content Returns: Cleaned content with only cookies """ lines = content.strip().split('\n') cleaned_lines = [] # For JSON format - try to extract only the JSON array/object content_stripped = content.strip() if content_stripped.startswith('[') or content_stripped.startswith('{'): # Find the matching bracket/brace bracket_count = 0 brace_count = 0 json_end = 0 for i, char in enumerate(content_stripped): if char == '[': bracket_count += 1 elif char == ']': bracket_count -= 1 if bracket_count == 0 and content_stripped[0] == '[': json_end = i + 1 break elif char == '{': brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0 and content_stripped[0] == '{': json_end = i + 1 break if json_end > 0: return content_stripped[:json_end] # For Netscape format - keep only valid cookie lines for line in lines: line = line.strip() # Skip empty lines if not line: continue # Keep comments that are part of Netscape format if line.startswith('#'): cleaned_lines.append(line) continue # Check if it's a valid Netscape cookie line (tab-separated) parts = line.split('\t') if len(parts) >= 7: # Validate it looks like a cookie (domain, flag, path, secure, expiration, name, value) try: # Check if expiration is a number int(parts[4]) cleaned_lines.append(line) except ValueError: # Not a valid cookie line, skip it continue return '\n'.join(cleaned_lines) if cleaned_lines else content def netscape_to_dict(netscape_content: str) -> Dict[str, str]: """ Convert Netscape cookie format to dictionary Args: netscape_content: Netscape format cookies as string Returns: Dictionary of cookie name-value pairs """ cookies = {} for line in netscape_content.strip().split('\n'): # Skip comments and empty lines if line.startswith('#') or not line.strip(): continue # Netscape format: domain flag path secure expiration name value parts = line.split('\t') if len(parts) >= 7: cookie_name = parts[5] cookie_value = parts[6] cookies[cookie_name] = cookie_value return cookies def json_to_dict(json_content: str) -> Dict[str, str]: """ Convert JSON cookie format to dictionary Args: json_content: JSON format cookies as string Returns: Dictionary of cookie name-value pairs """ cookies = {} data = json.loads(json_content) # Handle different JSON formats if isinstance(data, list): # Format: [{"name": "cookie_name", "value": "cookie_value"}, ...] for cookie in data: if 'name' in cookie and 'value' in cookie: cookies[cookie['name']] = cookie['value'] elif isinstance(data, dict): # Format: {"cookie_name": "cookie_value", ...} cookies = data return cookies def dict_to_cookie_string(cookies: Dict[str, str]) -> str: """ Convert cookie dictionary to string format for tweety Args: cookies: Dictionary of cookie name-value pairs Returns: Cookie string in format "name1=value1; name2=value2" """ return "; ".join([f"{name}={value}" for name, value in cookies.items()]) def read_proxy_file(file_path: str) -> List[Proxy]: """ Read all proxies from file Format: protocol://[username:password@]host:port One proxy per line Args: file_path: Path to proxy file Returns: List of Proxy objects """ path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Proxy file not found: {file_path}") proxies = [] lines = path.read_text(encoding='utf-8').strip().split('\n') for line in lines: line = line.strip() if not line or line.startswith('#'): continue try: # Parse proxy URL if '://' in line: protocol, rest = line.split('://', 1) else: protocol = 'socks5' rest = line # Check for auth username = None password = None if '@' in rest: auth, host_port = rest.rsplit('@', 1) if ':' in auth: username, password = auth.split(':', 1) else: host_port = rest # Parse host:port if ':' in host_port: host, port = host_port.rsplit(':', 1) port = int(port) else: continue # Determine proxy type protocol = protocol.lower() if protocol in ['socks5', 'socks', 'socks5h']: proxy_type = PROXY_TYPE_SOCKS5 elif protocol in ['http', 'https']: proxy_type = PROXY_TYPE_HTTP else: continue proxy = Proxy( host=host, port=port, proxy_type=proxy_type, username=username, password=password ) proxies.append(proxy) except Exception: continue if proxies: print(f"āœ“ Loaded {len(proxies)} proxies from file") return proxies def read_cookies_file(file_path: str) -> str: """ Read cookies from file and convert to tweety format Args: file_path: Path to cookies file (Netscape or JSON format) Returns: Cookie string ready for tweety """ path = Path(file_path) if not path.exists(): return None try: content = path.read_text(encoding='utf-8') except Exception: return None # Clean the content first content = clean_cookie_content(content) if not content: return None # Try to detect format and convert cookies_dict = {} # Try JSON first try: cookies_dict = json_to_dict(content) except json.JSONDecodeError: # Try Netscape format try: cookies_dict = netscape_to_dict(content) except Exception: return None if not cookies_dict: return None return dict_to_cookie_string(cookies_dict) def scan_cookies_directory(dir_path: str) -> List[Path]: """ Scan directory for cookie files (.json and .txt) Args: dir_path: Path to directory with cookies or single file Returns: List of Path objects to cookie files """ path = Path(dir_path).expanduser().resolve() if not path.exists(): # Try relative to current directory path = Path.cwd() / dir_path if not path.exists(): raise FileNotFoundError(f"Path not found: {dir_path}") # If it's a file, return it directly if path.is_file(): return [path] # If it's a directory, scan for cookie files cookie_files = [] for ext in ['*.json', '*.txt']: cookie_files.extend(path.glob(ext)) return sorted(cookie_files) async def authenticate_twitter(cookies: Union[str, Dict], proxy: Optional[Proxy] = None) -> TwitterAsync: """ Authenticate to Twitter using cookies Args: cookies: Cookie string or dictionary proxy: Optional proxy configuration Returns: Authenticated TwitterAsync instance """ print(f"\nšŸ” Authenticating with Twitter...") if proxy: app = TwitterAsync("temp_session", proxy=proxy) else: app = TwitterAsync("temp_session") await app.load_cookies(cookies) return app def save_account_info(user, cookie_file: Path, output_dir: Path, cleaned_cookies: str = None): """ Save account information to text file and copy cookie file Args: user: User object from tweety cookie_file: Original cookie file path output_dir: Output directory (results/) cleaned_cookies: Cleaned cookie content (optional) """ output_dir.mkdir(exist_ok=True) # Create filename based on username username = user.username timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Save account info info_file = output_dir / f"{username}_{timestamp}_info.txt" # Check for verification type verified_type = None verified_label = None if isinstance(user, dict): verified_type = user.get('verified_type') or user.get('ext_verified_type') verified_label = user.get('verified_label') if 'legacy' in user and isinstance(user['legacy'], dict): if not verified_type: verified_type = user['legacy'].get('verified_type') if 'ext_is_blue_verified' in user: if user['ext_is_blue_verified']: verified_type = verified_type or 'Blue' # Use cleaned cookies if provided, otherwise read original if cleaned_cookies: cookie_content = cleaned_cookies else: cookie_content = cookie_file.read_text(encoding='utf-8') # Format verification info verification_parts = [] if user.verified: verification_parts.append("Verified: Yes") if verified_type: verification_parts.append(f"Type: {verified_type}") if verified_label: verification_parts.append(f"Label: {verified_label}") else: verification_parts.append("Verified: No") verification_line = " | ".join(verification_parts) # Format created date try: if isinstance(user.created_at, str): # Parse and reformat date from dateutil import parser created_date = parser.parse(user.created_at) created_str = created_date.strftime("%d.%m.%Y %H:%M") else: created_str = user.created_at.strftime("%d.%m.%Y %H:%M") except: created_str = str(user.created_at) # Format info info_lines = [ "=" * 80, f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", f"ID: {user.id} | Name: {user.name} | Username: @{username}", f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}", f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}", verification_line, f"Created: {created_str}", f"Profile: https://twitter.com/{username}", "", f"Cookie file: {cookie_file.absolute()}", "", "=" * 80, cookie_content, ] # Write to file info_file.write_text('\n'.join(info_lines), encoding='utf-8') return info_file async def check_account(cookie_file: Path, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore): """ Check single account asynchronously Args: cookie_file: Path to cookie file proxies: List of available proxies output_dir: Output directory semaphore: Semaphore for concurrency control Returns: Tuple of (success: bool, verified: bool, error_msg: str) """ async with semaphore: try: # Read cookies try: cookies_string = read_cookies_file(str(cookie_file)) if not cookies_string: return (False, False, "Failed to parse cookie file") # Get cleaned cookie content for saving cleaned_content = clean_cookie_content(cookie_file.read_text(encoding='utf-8')) except Exception as e: return (False, False, f"Failed to read cookies: {e}") # Select random proxy if available proxy = random.choice(proxies) if proxies else None # Authenticate try: app = await authenticate_twitter(cookies_string, proxy) except Exception as e: return (False, False, f"Authentication error: {e}") # Check if successful try: if app.me: user = app.me is_verified = user.verified # Save results try: info_file = save_account_info(user, cookie_file, output_dir, cleaned_content) except Exception as e: pass return (True, is_verified, f"@{user.username}") else: return (False, False, "Authentication failed - invalid cookies") except Exception as e: return (False, False, f"Error processing user info: {e}") except Exception as e: return (False, False, f"Unexpected error: {e}") async def main(): """Main function""" print("=" * 60) print("Twitter Accounts Checker") print("=" * 60) # Check arguments if len(sys.argv) < 2: print("\nUsage: python twitter_auth_cookies.py [proxy_file]") print("\nArguments:") print(" cookies_dir - Directory with cookie files (.json/.txt) or single file") print(" proxy_file - Optional file with proxies (one per line)") print("\nProxy format:") print(" socks5://127.0.0.1:1080") print(" http://user:pass@proxy.com:8080") print("\nExamples:") print(" python twitter_auth_cookies.py cookies/") print(" python twitter_auth_cookies.py cookies/ proxies.txt") print(" python twitter_auth_cookies.py single_cookie.json proxies.txt") sys.exit(1) cookies_path = sys.argv[1] proxy_file = sys.argv[2] if len(sys.argv) > 2 else None # Output directory output_dir = Path("results") output_dir.mkdir(exist_ok=True) try: # Scan for cookie files print(f"\nšŸ“‚ Scanning for cookies in: {cookies_path}") cookie_files = scan_cookies_directory(cookies_path) if not cookie_files: print("āŒ No cookie files found (.json or .txt)") sys.exit(1) print(f"āœ“ Found {len(cookie_files)} cookie file(s)") # Load proxies if provided proxies = [] if proxy_file: print(f"\n🌐 Loading proxies from: {proxy_file}") proxies = read_proxy_file(proxy_file) if not proxies: print("āš ļø No valid proxies found, continuing without proxy") # Process each cookie file print(f"\nšŸ” Starting account checks...") print("=" * 60) # Create semaphore for concurrency control (5 concurrent checks) semaphore = asyncio.Semaphore(5) # Create tasks for all accounts tasks = [] for cookie_file in cookie_files: task = check_account(cookie_file, proxies, output_dir, semaphore) tasks.append((cookie_file, task)) # Process all accounts concurrently successful = 0 failed = 0 verified_count = 0 for i, (cookie_file, task) in enumerate(tasks, 1): print(f"\n[{i}/{len(cookie_files)}] Checking: {cookie_file.name}") success, verified, msg = await task if success: print(f" āœ… Success: {msg}") if verified: print(f" Verified: Yes") verified_count += 1 successful += 1 else: print(f" āŒ {msg}") failed += 1 # Summary print("\n" + "=" * 60) print("Summary:") print("=" * 60) print(f"Total accounts: {len(cookie_files)}") print(f"āœ… Valid: {successful}") print(f"āŒ Failed: {failed}") print(f"āœ“ Verified: {verified_count}") print(f"\nšŸ“ Results saved to: {output_dir.absolute()}/") print("=" * 60) # Save summary to file summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" summary_lines = [ "=" * 80, "Twitter Checker Summary", "=" * 80, f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", "Results:", f" Total accounts checked: {len(cookie_files)}", f" Valid accounts: {successful}", f" Invalid accounts: {failed}", f" Verified accounts: {verified_count}", "", "Configuration:", f" Proxies loaded: {len(proxies) if proxies else 0}", f" Cookie files: {len(cookie_files)}", "", "=" * 80, ] summary_file.write_text('\n'.join(summary_lines), encoding='utf-8') print(f"\nšŸ“Š Summary saved to: {summary_file.name}") except FileNotFoundError as e: print(f"\nāŒ Error: {e}") sys.exit(1) except Exception as e: print(f"\nāŒ Unexpected error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())