diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 51bbabf..9279812 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -7,11 +7,12 @@ import shutil import json import string import glob +import logging import importlib.metadata from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, check_update, DbType +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, Crypt, check_update, DbType from Whatsapp_Chat_Exporter.utility import readable_to_bytes, sanitize_filename from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, bytes_to_readable from argparse import ArgumentParser, SUPPRESS @@ -30,16 +31,43 @@ else: vcards_deps_installed = True +logger = logging.getLogger(__name__) +__version__ = importlib.metadata.version("whatsapp_chat_exporter") +WTSEXPORTER_BANNER = f"""======================================================================================================== + ██╗ ██╗██╗ ██╗ █████╗ ████████╗███████╗ █████╗ ██████╗ ██████╗ + ██║ ██║██║ ██║██╔══██╗╚══██╔══╝██╔════╝██╔══██╗██╔══██╗██╔══██╗ + ██║ █╗ ██║███████║███████║ ██║ ███████╗███████║██████╔╝██████╔╝ + ██║███╗██║██╔══██║██╔══██║ ██║ ╚════██║██╔══██║██╔═══╝ ██╔═══╝ + ╚███╔███╔╝██║ ██║██║ ██║ ██║ ███████║██║ ██║██║ ██║ + ╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + + ██████╗██╗ ██╗ █████╗ ████████╗ ███████╗██╗ ██╗██████╗ ██████╗ ██████╗ ████████╗███████╗██████╗ +██╔════╝██║ ██║██╔══██╗╚══██╔══╝ ██╔════╝╚██╗██╔╝██╔══██╗██╔═══██╗██╔══██╗╚══██╔══╝██╔════╝██╔══██╗ +██║ ███████║███████║ ██║ █████╗ ╚███╔╝ ██████╔╝██║ ██║██████╔╝ ██║ █████╗ ██████╔╝ +██║ ██╔══██║██╔══██║ ██║ ██╔══╝ ██╔██╗ ██╔═══╝ ██║ ██║██╔══██╗ ██║ ██╔══╝ ██╔══██╗ +╚██████╗██║ ██║██║ ██║ ██║ ███████╗██╔╝ ██╗██║ ╚██████╔╝██║ ██║ ██║ ███████╗██║ ██║ + ╚═════╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝╚═╝ ╚═╝ + + WhatsApp Chat Exporter: A customizable Android and iOS/iPadOS WhatsApp database parser + Version: {__version__} +========================================================================================================""" + + def setup_argument_parser() -> ArgumentParser: """Set up and return the argument parser with all options.""" parser = ArgumentParser( description='A customizable Android and iOS/iPadOS WhatsApp database parser that ' 'will give you the history of your WhatsApp conversations in HTML ' 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', - epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' + epilog=f'WhatsApp Chat Exporter: {__version__} Licensed with MIT. See ' 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' ) + # General options + parser.add_argument( + "--debug", dest="debug", default=False, action='store_true', + help="Enable debug mode" + ) # Device type arguments device_group = parser.add_argument_group('Device Type') device_group.add_argument( @@ -260,6 +288,10 @@ def setup_argument_parser() -> ArgumentParser: "--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int, help="Specify the maximum number of worker for bruteforce decryption." ) + misc_group.add_argument( + "--no-banner", dest="no_banner", default=False, action='store_true', + help="Do not show the banner" + ) return parser @@ -391,10 +423,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: """Set up and return a contact store if needed.""" if args.enrich_from_vcards is not None: if not vcards_deps_installed: - print( + logger.error( "You don't have the dependency to enrich contacts with vCard.\n" "Read more on how to deal with enriching contacts:\n" - "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" + "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage\n" ) exit(1) contact_store = ContactsFromVCards() @@ -407,10 +439,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: def decrypt_android_backup(args) -> int: """Decrypt Android backup files and return error code.""" if args.key is None or args.backup is None: - print("You must specify the backup file with -b and a key with -k") + logger.error(f"You must specify the backup file with -b and a key with -k{CLEAR_LINE}") return 1 - print("Decryption key specified, decrypting WhatsApp backup...") + logger.info(f"Decryption key specified, decrypting WhatsApp backup...{CLEAR_LINE}") # Determine crypt type if "crypt12" in args.backup: @@ -420,7 +452,7 @@ def decrypt_android_backup(args) -> int: elif "crypt15" in args.backup: crypt = Crypt.CRYPT15 else: - print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") + logger.error(f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}") return 1 # Get key @@ -472,15 +504,15 @@ def decrypt_android_backup(args) -> int: def handle_decrypt_error(error: int) -> None: """Handle decryption errors with appropriate messages.""" if error == 1: - print("Dependencies of decrypt_backup and/or extract_encrypted_key" - " are not present. For details, see README.md.") + logger.error("Dependencies of decrypt_backup and/or extract_encrypted_key" + " are not present. For details, see README.md.\n") exit(3) elif error == 2: - print("Failed when decompressing the decrypted backup. " - "Possibly incorrect offsets used in decryption.") + logger.error("Failed when decompressing the decrypted backup. " + "Possibly incorrect offsets used in decryption.\n") exit(4) else: - print("Unknown error occurred.", error) + logger.error("Unknown error occurred.\n") exit(5) @@ -502,9 +534,9 @@ def process_messages(args, data: ChatCollection) -> None: msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE if not os.path.isfile(msg_db): - print( + logger.error( "The message database does not exist. You may specify the path " - "to database file with option -d or check your provided path." + "to database file with option -d or check your provided path.\n" ) exit(6) @@ -556,19 +588,21 @@ def handle_media_directory(args) -> None: media_path = os.path.join(args.output, args.media) if os.path.isdir(media_path): - print( - "\nWhatsApp directory already exists in output directory. Skipping...", end="\n") + logger.info( + f"WhatsApp directory already exists in output directory. Skipping...{CLEAR_LINE}") else: if args.move_media: try: - print("\nMoving media directory...", end="\n") + logger.info(f"Moving media directory...\r") shutil.move(args.media, f"{args.output}/") + logger.info(f"Media directory has been moved to the output directory{CLEAR_LINE}") except PermissionError: - print("\nCannot remove original WhatsApp directory. " - "Perhaps the directory is opened?", end="\n") + logger.warning("Cannot remove original WhatsApp directory. " + "Perhaps the directory is opened?\n") else: - print("\nCopying media directory...", end="\n") + logger.info(f"Copying media directory...\r") shutil.copytree(args.media, media_path) + logger.info(f"Media directory has been copied to the output directory{CLEAR_LINE}") def create_output_files(args, data: ChatCollection, contact_store=None) -> None: @@ -593,7 +627,7 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None: # Create text files if requested if args.text_format: - print("Writing text file...") + logger.info(f"Writing text file...{CLEAR_LINE}") android_handler.create_txt(data, args.text_format) # Create JSON files if requested @@ -626,8 +660,9 @@ def export_single_json(args, data: Dict) -> None: ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) - print(f"\nWriting JSON file...({bytes_to_readable(len(json_data))})") + logger.info(f"Writing JSON file...\r") f.write(json_data) + logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))}){CLEAR_LINE}") def export_multiple_json(args, data: Dict) -> None: @@ -654,8 +689,7 @@ def export_multiple_json(args, data: Dict) -> None: indent=args.pretty_print_json ) f.write(file_content) - print(f"Writing JSON file...({index + 1}/{total})", end="\r") - print() + logger.info(f"Writing JSON file...({index + 1}/{total})\r") def process_exported_chat(args, data: ChatCollection) -> None: @@ -680,6 +714,19 @@ def process_exported_chat(args, data: ChatCollection) -> None: shutil.copy(file, args.output) +def setup_logging(level): + log_handler_stdout = logging.StreamHandler() + log_handler_stdout.terminator = "" + handlers = [log_handler_stdout] + if level == logging.DEBUG: + handlers.append(logging.FileHandler("debug.log", mode="w")) + logging.basicConfig( + level=level, + format="[%(levelname)s] %(message)s", + handlers=handlers + ) + + def main(): """Main function to run the WhatsApp Chat Exporter.""" # Set up and parse arguments @@ -693,6 +740,16 @@ def main(): # Validate arguments validate_args(parser, args) + # Print banner if not suppressed + if not args.no_banner: + print(WTSEXPORTER_BANNER) + + if args.debug: + setup_logging(logging.DEBUG) + logger.debug("Debug mode enabled.\n") + else: + setup_logging(logging.INFO) + # Create output directory if it doesn't exist os.makedirs(args.output, exist_ok=True) @@ -755,8 +812,8 @@ def main(): ios_media_handler.extract_media( args.backup, identifiers, args.decrypt_chunk_size) else: - print( - "WhatsApp directory already exists, skipping WhatsApp file extraction.") + logger.info( + f"WhatsApp directory already exists, skipping WhatsApp file extraction.{CLEAR_LINE}") # Set default DB paths if not provided if args.db is None: @@ -772,7 +829,7 @@ def main(): args.pretty_print_json, args.avoid_encoding_json ) - print("Incremental merge completed successfully.") + logger.info(f"Incremental merge completed successfully.{CLEAR_LINE}") else: # Process contacts process_contacts(args, data, contact_store) @@ -786,7 +843,7 @@ def main(): # Handle media directory handle_media_directory(args) - print("Everything is done!") + logger.info("Everything is done!") if __name__ == "__main__": diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index 84e629e..68f0120 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -1,11 +1,14 @@ +import time import hmac import io +import logging +import threading import zlib import concurrent.futures from typing import Tuple, Union from hashlib import sha256 from sys import exit -from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType +from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType try: import zlib @@ -23,6 +26,9 @@ else: support_crypt15 = True +logger = logging.getLogger(__name__) + + class DecryptionError(Exception): """Base class for decryption-related exceptions.""" pass @@ -138,11 +144,28 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> iv = database[offsets["iv"]:offsets["iv"] + 16] db_ciphertext = database[offsets["db"]:] try: - return _decrypt_database(db_ciphertext, main_key, iv) + decrypted_db = _decrypt_database(db_ciphertext, main_key, iv) except (zlib.error, ValueError): pass # Try next offset + else: + logger.debug( + f"Decryption successful with known offsets: IV {offsets["iv"]}, DB {offsets["db"]}{CLEAR_LINE}" + ) + return decrypted_db # Successful decryption - print("Common offsets failed. Initiating brute-force with multithreading...") + def animate_message(stop_event): + base_msg = "Common offsets failed. Initiating brute-force with multithreading" + dots = ["", ".", "..", "..."] + i = 0 + while not stop_event.is_set(): + logger.info(f"{base_msg}{dots[i % len(dots)]}\x1b[K\r") + time.sleep(0.3) + i += 1 + logger.info(f"Common offsets failed but brute-forcing the offset works!{CLEAR_LINE}") + + stop_event = threading.Event() + anim_thread = threading.Thread(target=animate_message, args=(stop_event,)) + anim_thread.start() # Convert brute force generator into a list for parallel processing offset_combinations = list(brute_force_offset()) @@ -152,19 +175,23 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> start_iv, end_iv, start_db = offset_tuple iv = database[start_iv:end_iv] db_ciphertext = database[start_db:] + logger.debug(""f"Trying offsets: IV {start_iv}-{end_iv}, DB {start_db}{CLEAR_LINE}") try: db = _decrypt_database(db_ciphertext, main_key, iv) - print( + except (zlib.error, ValueError): + return None # Decryption failed, move to next + else: + stop_event.set() + anim_thread.join() + logger.info( f"The offsets of your IV and database are {start_iv} and " f"{start_db}, respectively. To include your offsets in the " "program, please report it by creating an issue on GitHub: " "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47" - "\nShutting down other threads..." + f"\nShutting down other threads...{CLEAR_LINE}" ) return db - except (zlib.error, ValueError): - return None # Decryption failed, move to next with concurrent.futures.ThreadPoolExecutor(max_worker) as executor: future_to_offset = {executor.submit(attempt_decrypt, offset): offset for offset in offset_combinations} @@ -178,9 +205,14 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> return result except KeyboardInterrupt: - print("\nBrute force interrupted by user (Ctrl+C). Exiting gracefully...") + stop_event.set() + anim_thread.join() + logger.info(f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}") executor.shutdown(wait=False, cancel_futures=True) exit(1) + finally: + stop_event.set() + anim_thread.join() raise OffsetNotFoundError("Could not find the correct offsets for decryption.") @@ -305,7 +337,7 @@ def decrypt_backup( main_key, hex_key = _derive_main_enc_key(key) if show_crypt15: hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]) - print(f"The HEX key of the crypt15 backup is: {hex_key_str}") + logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}{CLEAR_LINE}") else: main_key = key[126:] diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 5133d6c..cd364e7 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +import logging import sqlite3 import os import shutil @@ -9,12 +10,15 @@ from markupsafe import escape as htmle from base64 import b64decode, b64encode from datetime import datetime from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import CURRENT_TZ_OFFSET, MAX_SIZE, ROW_SIZE, JidType, Device +from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CURRENT_TZ_OFFSET, MAX_SIZE, ROW_SIZE, JidType, Device from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, determine_metadata from Whatsapp_Chat_Exporter.utility import get_chat_condition, slugify, bytes_to_readable +logger = logging.getLogger(__name__) + + def contacts(db, data, enrich_from_vcards): """ Process WhatsApp contacts from the database. @@ -33,12 +37,12 @@ def contacts(db, data, enrich_from_vcards): if total_row_number == 0: if enrich_from_vcards is not None: - print("No contacts profiles found in the default database, contacts will be imported from the specified vCard file.") + logger.info("No contacts profiles found in the default database, contacts will be imported from the specified vCard file.") else: - print("No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google") + logger.warning("No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google") return False else: - print(f"Processing contacts...({total_row_number})") + logger.info(f"Processed {total_row_number} contacts\n") c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;") row = c.fetchone() @@ -66,7 +70,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """ c = db.cursor() total_row_number = _get_message_count(c, filter_empty, filter_date, filter_chat) - print(f"Processing messages...(0/{total_row_number})", end="\r") + logger.info(f"Processing messages...(0/{total_row_number})\r") try: content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat) @@ -87,12 +91,12 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, i += 1 if i % 1000 == 0: - print(f"Processing messages...({i}/{total_row_number})", end="\r") + logger.info(f"Processing messages...({i}/{total_row_number})\r") # Fetch the next row safely content = _fetch_row_safely(content_cursor) - print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r") + logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}") # Helper functions for message processing @@ -482,7 +486,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa """ c = db.cursor() total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat) - print(f"\nProcessing media...(0/{total_row_number})", end="\r") + logger.info(f"Processing media...(0/{total_row_number})\r") try: content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat) @@ -501,11 +505,11 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa i += 1 if i % 100 == 0: - print(f"Processing media...({i}/{total_row_number})", end="\r") + logger.info(f"Processing media...({i}/{total_row_number})\r") content = content_cursor.fetchone() - print(f"Processing media...({total_row_number}/{total_row_number})", end="\r") + logger.info(f"Processed {total_row_number} media{CLEAR_LINE}") # Helper functions for media processing @@ -676,7 +680,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty) total_row_number = len(rows) - print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") + logger.info(f"Processing vCards...(0/{total_row_number})\r") # Create vCards directory if it doesn't exist path = os.path.join(media_folder, "vCards") @@ -684,7 +688,8 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): for index, row in enumerate(rows): _process_vcard_row(row, path, data) - print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r") + logger.info(f"Processing vCards...({index + 1}/{total_row_number})\r") + logger.info(f"Processed {total_row_number} vCards{CLEAR_LINE}") def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty): @@ -777,7 +782,7 @@ def calls(db, data, timezone_offset, filter_chat): if total_row_number == 0: return - print(f"\nProcessing calls...({total_row_number})", end="\r") + logger.info(f"Processing calls...({total_row_number})\r") # Fetch call data calls_data = _fetch_calls_data(c, filter_chat) @@ -793,6 +798,7 @@ def calls(db, data, timezone_offset, filter_chat): # Add the calls chat to the data data.add_chat("000000000000000", chat) + logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}") def _get_calls_count(c, filter_chat): @@ -917,7 +923,7 @@ def create_html( template = setup_template(template, no_avatar, experimental) total_row_number = len(data) - print(f"\nGenerating chats...(0/{total_row_number})", end="\r") + logger.info(f"Generating chats...(0/{total_row_number})\r") # Create output directory if it doesn't exist if not os.path.isdir(output_folder): @@ -958,9 +964,9 @@ def create_html( ) if current % 10 == 0: - print(f"Generating chats...({current}/{total_row_number})", end="\r") + logger.info(f"Generating chats...({current}/{total_row_number})\r") - print(f"Generating chats...({total_row_number}/{total_row_number})", end="\r") + logger.info(f"Generated {total_row_number} chats{CLEAR_LINE}") def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline): diff --git a/Whatsapp_Chat_Exporter/exported_handler.py b/Whatsapp_Chat_Exporter/exported_handler.py index 7215f6f..4a81adb 100644 --- a/Whatsapp_Chat_Exporter/exported_handler.py +++ b/Whatsapp_Chat_Exporter/exported_handler.py @@ -1,10 +1,14 @@ #!/usr/bin/python3 import os +import logging from datetime import datetime from mimetypes import MimeTypes from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import Device +from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device + + +logger = logging.getLogger(__name__) def messages(path, data, assume_first_as_me=False): @@ -38,9 +42,9 @@ def messages(path, data, assume_first_as_me=False): # Show progress if index % 1000 == 0: - print(f"Processing messages & media...({index}/{total_row_number})", end="\r") + logger.info(f"Processing messages & media...({index}/{total_row_number})\r") - print(f"Processing messages & media...({total_row_number}/{total_row_number})") + logger.info(f"Processed {total_row_number} messages & media{CLEAR_LINE}") return data diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index 7a15835..a833c7f 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -1,22 +1,26 @@ #!/usr/bin/python3 import os +import logging import shutil from glob import glob from pathlib import Path from mimetypes import MimeTypes from markupsafe import escape as htmle from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, get_chat_condition +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, get_chat_condition from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, slugify, Device +logger = logging.getLogger(__name__) + + def contacts(db, data): """Process WhatsApp contacts with status information.""" c = db.cursor() c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") total_row_number = c.fetchone()[0] - print(f"Pre-processing contacts...({total_row_number})") + logger.info(f"Pre-processing contacts...({total_row_number})\r") c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") content = c.fetchone() @@ -29,6 +33,7 @@ def contacts(db, data): current_chat.status = content["ZABOUTTEXT"] data.add_chat(zwhatsapp_id, current_chat) content = c.fetchone() + logger.info(f"Pre-processed {total_row_number} contacts{CLEAR_LINE}") def process_contact_avatars(current_chat, media_folder, contact_id): @@ -85,7 +90,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """ c.execute(contact_query) total_row_number = c.fetchone()[0] - print(f"Processing contacts...({total_row_number})") + logger.info(f"Processing contacts...({total_row_number})\r") # Get distinct contacts contacts_query = f""" @@ -123,6 +128,8 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, # Process avatar images process_contact_avatars(current_chat, media_folder, contact_id) content = c.fetchone() + + logger.info(f"Processed {total_row_number} contacts{CLEAR_LINE}") # Get message count message_count_query = f""" @@ -139,7 +146,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """ c.execute(message_count_query) total_row_number = c.fetchone()[0] - print(f"Processing messages...(0/{total_row_number})", end="\r") + logger.info(f"Processing messages...(0/{total_row_number})\r") # Fetch messages messages_query = f""" @@ -207,10 +214,9 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, # Update progress i += 1 if i % 1000 == 0: - print(f"Processing messages...({i}/{total_row_number})", end="\r") + logger.info(f"Processing messages...({i}/{total_row_number})\r") content = c.fetchone() - - print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r") + logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}") def process_message_data(message, content, is_group_message, data, cursor2): @@ -329,7 +335,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa """ c.execute(media_count_query) total_row_number = c.fetchone()[0] - print(f"\nProcessing media...(0/{total_row_number})", end="\r") + logger.info(f"Processing media...(0/{total_row_number})\r") # Fetch media items media_query = f""" @@ -365,10 +371,9 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa # Update progress i += 1 if i % 100 == 0: - print(f"Processing media...({i}/{total_row_number})", end="\r") + logger.info(f"Processing media...({i}/{total_row_number})\r") content = c.fetchone() - - print(f"Processing media...({total_row_number}/{total_row_number})", end="\r") + logger.info(f"Processed {total_row_number} media{CLEAR_LINE}") def process_media_item(content, data, media_folder, mime, separate_media): @@ -444,7 +449,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): c.execute(vcard_query) contents = c.fetchall() total_row_number = len(contents) - print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") + logger.info(f"Processing vCards...(0/{total_row_number})\r") # Create vCards directory path = f'{media_folder}/Message/vCards' @@ -453,7 +458,8 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): # Process each vCard for index, content in enumerate(contents): process_vcard_item(content, path, data) - print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r") + logger.info(f"Processing vCards...({index + 1}/{total_row_number})\r") + logger.info(f"Processed {total_row_number} vCards{CLEAR_LINE}") def process_vcard_item(content, path, data): @@ -510,7 +516,7 @@ def calls(db, data, timezone_offset, filter_chat): if total_row_number == 0: return - print(f"\nProcessing calls...({total_row_number})", end="\r") + logger.info(f"Processed {total_row_number} calls{CLEAR_LINE}\n") # Fetch call records calls_query = f""" diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index a1dcd30..a3c26e2 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -1,11 +1,12 @@ #!/usr/bin/python3 +import logging import shutil import sqlite3 import os import getpass from sys import exit -from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier +from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier from Whatsapp_Chat_Exporter.bplist import BPListReader try: from iphone_backup_decrypt import EncryptedBackup, RelativePath @@ -15,6 +16,8 @@ else: support_encrypted = True +logger = logging.getLogger(__name__) + class BackupExtractor: """ A class to handle the extraction of WhatsApp data from iOS backups, @@ -57,12 +60,13 @@ class BackupExtractor: Handles the extraction of data from an encrypted iOS backup. """ if not support_encrypted: - print("You don't have the dependencies to handle encrypted backup.") - print("Read more on how to deal with encrypted backup:") - print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage") + logger.error("You don't have the dependencies to handle encrypted backup." + "Read more on how to deal with encrypted backup:" + "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" + ) return - print("Encryption detected on the backup!") + logger.info(f"Encryption detected on the backup!{CLEAR_LINE}") password = getpass.getpass("Enter the password for the backup:") self._decrypt_backup(password) self._extract_decrypted_files() @@ -74,7 +78,7 @@ class BackupExtractor: Args: password (str): The password for the encrypted backup. """ - print("Trying to decrypt the iOS backup...", end="") + logger.info(f"Trying to decrypt the iOS backup...{CLEAR_LINE}") self.backup = EncryptedBackup( backup_directory=self.base_dir, passphrase=password, @@ -82,7 +86,8 @@ class BackupExtractor: check_same_thread=False, decrypt_chunk_size=self.decrypt_chunk_size, ) - print("Done\nDecrypting WhatsApp database...", end="") + logger.info(f"iOS backup decrypted successfully!{CLEAR_LINE}") + logger.info("Decrypting WhatsApp database...\n") try: self.backup.extract_file( relative_path=RelativePath.WHATSAPP_MESSAGES, @@ -100,23 +105,23 @@ class BackupExtractor: output_filename=self.identifiers.CALL, ) except ValueError: - print("Failed to decrypt backup: incorrect password?") + logger.error("Failed to decrypt backup: incorrect password?") exit(7) except FileNotFoundError: - print( + logger.error( "Essential WhatsApp files are missing from the iOS backup. " "Perhapse you enabled end-to-end encryption for the backup? " "See https://wts.knugi.dev/docs.html?dest=iose2e" ) exit(6) else: - print("Done") + logger.info(f"Done{CLEAR_LINE}") def _extract_decrypted_files(self): """Extract all WhatsApp files after decryption""" def extract_progress_handler(file_id, domain, relative_path, n, total_files): if n % 100 == 0: - print(f"Decrypting and extracting files...({n}/{total_files})", end="\r") + logger.info(f"Decrypting and extracting files...({n}/{total_files})\r") return True self.backup.extract_files( @@ -125,7 +130,7 @@ class BackupExtractor: preserve_folders=True, filter_callback=extract_progress_handler ) - print(f"All required files are decrypted and extracted. ", end="\n") + logger.info(f"All required files are decrypted and extracted.{CLEAR_LINE}") def _extract_unencrypted_backup(self): """ @@ -144,10 +149,10 @@ class BackupExtractor: if not os.path.isfile(wts_db_path): if self.identifiers is WhatsAppIdentifier: - print("WhatsApp database not found.") + logger.error("WhatsApp database not found.") else: - print("WhatsApp Business database not found.") - print( + logger.error("WhatsApp Business database not found.") + logger.error( "Essential WhatsApp files are missing from the iOS backup. " "Perhapse you enabled end-to-end encryption for the backup? " "See https://wts.knugi.dev/docs.html?dest=iose2e" @@ -157,12 +162,12 @@ class BackupExtractor: shutil.copyfile(wts_db_path, self.identifiers.MESSAGE) if not os.path.isfile(contact_db_path): - print("Contact database not found. Skipping...") + logger.warning(f"Contact database not found. Skipping...{CLEAR_LINE}") else: shutil.copyfile(contact_db_path, self.identifiers.CONTACT) if not os.path.isfile(call_db_path): - print("Call database not found. Skipping...") + logger.warning(f"Call database not found. Skipping...{CLEAR_LINE}") else: shutil.copyfile(call_db_path, self.identifiers.CALL) @@ -176,7 +181,7 @@ class BackupExtractor: c = manifest.cursor() c.execute(f"SELECT count() FROM Files WHERE domain = '{_wts_id}'") total_row_number = c.fetchone()[0] - print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r") + logger.info(f"Extracting WhatsApp files...(0/{total_row_number})\r") c.execute( f""" SELECT fileID, relativePath, flags, file AS metadata, @@ -213,9 +218,9 @@ class BackupExtractor: os.utime(destination, (modification, modification)) if row["_index"] % 100 == 0: - print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r") + logger.info(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})\r") row = c.fetchone() - print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n") + logger.info(f"Extracted WhatsApp files...({total_row_number}){CLEAR_LINE}") def extract_media(base_dir, identifiers, decrypt_chunk_size): diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index b6f7bc2..2a1e22c 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -1,3 +1,4 @@ +import logging import sqlite3 import jinja2 import json @@ -28,7 +29,9 @@ except ImportError: MAX_SIZE = 4 * 1024 * 1024 # Default 4MB ROW_SIZE = 0x3D0 CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600 +CLEAR_LINE = "\x1b[K\n" +logger = logging.getLogger(__name__) def convert_time_unit(time_second: int) -> str: """Converts a time duration in seconds to a human-readable string. @@ -151,7 +154,7 @@ def check_update(): try: raw = urllib.request.urlopen(PACKAGE_JSON) except Exception: - print("Failed to check for updates.") + logger.error("Failed to check for updates.") return 1 else: with raw: @@ -161,17 +164,19 @@ def check_update(): __version__ = importlib.metadata.version("whatsapp_chat_exporter") current_version = tuple(map(int, __version__.split("."))) if current_version < latest_version: - print("===============Update===============") - print("A newer version of WhatsApp Chat Exporter is available.") - print("Current version: " + __version__) - print("Latest version: " + package_info["info"]["version"]) + logger.info( + "===============Update===============\n" + "A newer version of WhatsApp Chat Exporter is available.\n" + f"Current version: {__version__}\n" + f"Latest version: {package_info['info']['version']}\n" + ) if platform == "win32": - print("Update with: pip install --upgrade whatsapp-chat-exporter") + logger.info("Update with: pip install --upgrade whatsapp-chat-exporter\n") else: - print("Update with: pip3 install --upgrade whatsapp-chat-exporter") - print("====================================") + logger.info("Update with: pip3 install --upgrade whatsapp-chat-exporter\n") + logger.info("====================================\n") else: - print("You are using the latest version of WhatsApp Chat Exporter.") + logger.info("You are using the latest version of WhatsApp Chat Exporter.\n") return 0 @@ -229,7 +234,7 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]): with open(json_file, "r") as f: temp_data = json.loads(f.read()) total_row_number = len(tuple(temp_data.keys())) - print(f"Importing chats from JSON...(0/{total_row_number})", end="\r") + logger.info(f"Importing chats from JSON...(0/{total_row_number})\r") for index, (jid, chat_data) in enumerate(temp_data.items()): chat = ChatStore(chat_data.get("type"), chat_data.get("name")) chat.my_avatar = chat_data.get("my_avatar") @@ -258,8 +263,9 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]): message.sticker = msg.get("sticker") chat.add_message(id, message) data[jid] = chat - print( - f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") + logger.info( + f"Importing chats from JSON...({index + 1}/{total_row_number})\r") + logger.info(f"Imported chats from JSON...({total_row_number}){CLEAR_LINE}") def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): @@ -272,21 +278,21 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p """ json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] if not json_files: - print("No JSON files found in the source directory.") + logger.error("No JSON files found in the source directory.") return - print("JSON files found:", json_files) + logger.info("JSON files found:", json_files) for json_file in json_files: source_path = os.path.join(source_dir, json_file) target_path = os.path.join(target_dir, json_file) if not os.path.exists(target_path): - print(f"Copying '{json_file}' to target directory...") + logger.info(f"Copying '{json_file}' to target directory...") os.makedirs(target_dir, exist_ok=True) shutil.copy2(source_path, target_path) else: - print( + logger.info( f"Merging '{json_file}' with existing file in target directory...") with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: source_data = json.load(src_file) @@ -311,7 +317,7 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p # Check if the merged data differs from the original target data if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): - print( + logger.info( f"Changes detected in '{json_file}', updating target file...") with open(target_path, 'w') as merged_file: json.dump( @@ -321,13 +327,13 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p ensure_ascii=not avoid_encoding_json, ) else: - print( + logger.info( f"No changes detected in '{json_file}', skipping update.") # Merge media directories source_media_path = os.path.join(source_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir) - print( + logger.info( f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") if os.path.exists(source_media_path): for root, _, files in os.walk(source_media_path): @@ -339,7 +345,7 @@ def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_p target_file = os.path.join(target_root, file) # we only copy if the file doesn't exist in the target or if the source is newer if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file): - print(f"Copying '{source_file}' to '{target_file}'...") + logger.info(f"Copying '{source_file}' to '{target_file}'...") shutil.copy2(source_file, target_file)