From 9cedcf176717cf53c5ac1e19f6afb4001fc6f1bc Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Tue, 6 Jan 2026 23:00:36 +0800 Subject: [PATCH 01/52] Create conftest to oves test_nuitka_binary.py to the end of testing Moves test_nuitka_binary.py to the end and fails if the file is missing. --- tests/conftest.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..50f0866 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,27 @@ +import pytest +import os + +def pytest_collection_modifyitems(config, items): + """ + Moves test_nuitka_binary.py to the end and fails if the file is missing. + """ + target_file = "test_nuitka_binary.py" + + # Sanity Check: Ensure the file actually exists in the tests directory + test_dir = os.path.join(config.rootdir, "tests") + file_path = os.path.join(test_dir, target_file) + + if not os.path.exists(file_path): + pytest.exit(f"\n[FATAL] Required test file '{target_file}' not found in {test_dir}. " + f"Order enforcement failed!", returncode=1) + + nuitka_tests = [] + remaining_tests = [] + + for item in items: + if target_file in item.nodeid: + nuitka_tests.append(item) + else: + remaining_tests.append(item) + + items[:] = remaining_tests + nuitka_tests \ No newline at end of file From 647e406ac0591b86ca484bd7ad0f79cc97cd3875 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Thu, 8 Jan 2026 23:57:02 +0800 Subject: [PATCH 02/52] Implement early key validation via authenticated decryption (#190) Utilize `decrypt_and_verify` to immediately identify incorrect user-provided keys via GCM tag validation. --- Whatsapp_Chat_Exporter/android_crypt.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index 3e921d1..ed84041 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -112,8 +112,20 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes zlib.error: If decompression fails. ValueError: if the plaintext is not a SQLite database. """ + FOOTER_SIZE = 32 + if len(db_ciphertext) <= FOOTER_SIZE: + raise ValueError("Input data too short to contain a valid GCM tag.") + + actual_ciphertext = db_ciphertext[:-FOOTER_SIZE] + tag = db_ciphertext[-FOOTER_SIZE: -FOOTER_SIZE + 16] + cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) + try: + db_compressed = cipher.decrypt_and_verify(actual_ciphertext, tag) + except ValueError: + # This could be key, IV, or tag is wrong, but likely the key is wrong. + raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.") + db = zlib.decompress(db_compressed) if db[0:6].upper() != b"SQLITE": raise ValueError( From 0ba81e0863a67f9a509e6a8cc00405df8d242ea2 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Thu, 8 Jan 2026 23:59:31 +0800 Subject: [PATCH 03/52] Implement granular error handling Added and improved layered Zlib and SQLite header checks to distinguish between authentication failures (wrong key) and data corruption. --- Whatsapp_Chat_Exporter/android_crypt.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index ed84041..b8ed13a 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -126,11 +126,22 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes # This could be key, IV, or tag is wrong, but likely the key is wrong. raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.") - db = zlib.decompress(db_compressed) - if db[0:6].upper() != b"SQLITE": + if len(db_compressed) < 2 or db_compressed[0] != 0x78: + logger.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}") raise ValueError( - "The plaintext is not a SQLite database. Ensure you are using the correct key." + "Key is correct, but decrypted data is not a valid compressed stream. " + "Is this even a valid WhatsApp database backup?" ) + + try: + db = zlib.decompress(db_compressed) + except zlib.error as e: + raise zlib.error(f"Decompression failed (The backup file likely corrupted at source): {e}") + + if not db.startswith(b"SQLite"): + raise ValueError( + "Data is valid and decompressed, but it is not a SQLite database. " + "Is this even a valid WhatsApp database backup?") return db From 75fcf33fda2b3a7f14fff919714327e090d52301 Mon Sep 17 00:00:00 2001 From: Cosmo Date: Sun, 11 Jan 2026 07:06:23 -0800 Subject: [PATCH 04/52] feat: Add support for exporting message reactions --- Whatsapp_Chat_Exporter/__main__.py | 5 +- Whatsapp_Chat_Exporter/android_handler.py | 66 +++++++++++++++++++++++ Whatsapp_Chat_Exporter/data_model.py | 1 + Whatsapp_Chat_Exporter/whatsapp.html | 18 +++++++ 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 07a341b..c8be561 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -25,7 +25,10 @@ from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards logger = logging.getLogger(__name__) -__version__ = importlib.metadata.version("whatsapp_chat_exporter") +try: + __version__ = importlib.metadata.version("whatsapp_chat_exporter") +except importlib.metadata.PackageNotFoundError: + __version__ = "0.13.0rc2" WTSEXPORTER_BANNER = f"""======================================================================================================== ██╗ ██╗██╗ ██╗ █████╗ ████████╗███████╗ █████╗ ██████╗ ██████╗ ██║ ██║██║ ██║██╔══██╗╚══██╔══╝██╔════╝██╔══██╗██╔══██╗██╔══██╗ diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 274661b..1b11e78 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -98,6 +98,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, # Fetch the next row safely content = _fetch_row_safely(content_cursor) + _get_reactions(db, data) logger.info(f"Processed {total_row_number} messages{CLEAR_LINE}") @@ -480,6 +481,71 @@ def _format_message_text(text): return text +def _get_reactions(db, data): + """ + Process message reactions. + """ + logger.info("Processing reactions...") + c = db.cursor() + + try: + # Check if tables exist + c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='message_add_on'") + if c.fetchone()[0] == 0: + return + + c.execute(""" + SELECT + message_add_on.parent_message_row_id, + message_add_on_reaction.reaction, + message_add_on.from_me, + jid.raw_string as sender_jid_raw, + chat_jid.raw_string as chat_jid_raw + FROM message_add_on + INNER JOIN message_add_on_reaction + ON message_add_on._id = message_add_on_reaction.message_add_on_row_id + LEFT JOIN jid + ON message_add_on.sender_jid_row_id = jid._id + LEFT JOIN chat + ON message_add_on.chat_row_id = chat._id + LEFT JOIN jid chat_jid + ON chat.jid_row_id = chat_jid._id + """) + except sqlite3.OperationalError: + logger.warning("Could not fetch reactions (schema might be too old or incompatible).") + return + + row = c.fetchone() + while row is not None: + parent_id = row["parent_message_row_id"] + reaction = row["reaction"] + chat_id = row["chat_jid_raw"] + + if chat_id and chat_id in data: + chat = data[chat_id] + if parent_id in chat._messages: + message = chat._messages[parent_id] + + # Determine sender name + sender_name = None + if row["from_me"]: + sender_name = "You" + elif row["sender_jid_raw"]: + sender_jid = row["sender_jid_raw"] + if sender_jid in data: + sender_name = data[sender_jid].name + if not sender_name: + sender_name = sender_jid.split('@')[0] if "@" in sender_jid else sender_jid + + if not sender_name: + sender_name = "Unknown" + + message.reactions[sender_name] = reaction + + row = c.fetchone() + logger.info(f"Reactions processed.{CLEAR_LINE}") + + def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True): """ Process WhatsApp media files from the database. diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index 8747419..47034e6 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -338,6 +338,7 @@ class Message: self.caption = None self.thumb = None # Android specific self.sticker = False + self.reactions = {} def to_json(self) -> Dict[str, Any]: """Convert message to JSON-serializable dict.""" diff --git a/Whatsapp_Chat_Exporter/whatsapp.html b/Whatsapp_Chat_Exporter/whatsapp.html index 2aa2b7e..b4b9c8a 100644 --- a/Whatsapp_Chat_Exporter/whatsapp.html +++ b/Whatsapp_Chat_Exporter/whatsapp.html @@ -287,6 +287,15 @@ {% endif %}

{{ msg.time }}

+ {% if msg.reactions %} +
+ {% for sender, emoji in msg.reactions.items() %} +
+ {{ emoji }} +
+ {% endfor %} +
+ {% endif %} {% else %} @@ -356,6 +365,15 @@ {{ msg.time }} + {% if msg.reactions %} +
+ {% for sender, emoji in msg.reactions.items() %} +
+ {{ emoji }} +
+ {% endfor %} +
+ {% endif %} - + + --> + {% if previous %} - - - + {% endif %} {% if next %} - - - + {% endif %}
@@ -233,15 +342,41 @@
{% if msg.reply is not none %} -
-

Replying to

-

- {% if msg.quoted_data is not none %} - "{{msg.quoted_data}}" - {% else %} - this message +

+
+
+

Replying to

+

+ {% if msg.quoted_data is not none %} + "{{msg.quoted_data}}" + {% else %} + this message + {% endif %} +

+
+ {% set replied_msg = msgs | selectattr('key_id', 'equalto', msg.reply) | first %} + {% if replied_msg and replied_msg.media == true %} +
+ {% if "image/" in replied_msg.mime %} + + {% elif "video/" in replied_msg.mime %} +
+ +
+
+
+
+ {% elif "audio/" in replied_msg.mime %} +
+
+
+ {% endif %} +
{% endif %} -

+
{% endif %} @@ -288,7 +423,15 @@ {% endif %} {% endif %}

-

{{ msg.time }}

+

{{ msg.time }} + + {% if msg.received_timestamp %} + ✓✓ + {% else %} + ✓ + {% endif %} + +

{% if msg.reactions %}
{% for sender, emoji in msg.reactions.items() %} @@ -305,15 +448,41 @@ From bf230db59575835982a13ec8d1b45a08e76bebcf Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Tue, 20 Jan 2026 23:35:05 +0800 Subject: [PATCH 38/52] Gracefully handle bytes that can't be decoded from db (#44) --- Whatsapp_Chat_Exporter/__main__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 405eec7..74ccaed 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -525,6 +525,7 @@ def process_contacts(args, data: ChatCollection) -> None: if os.path.isfile(contact_db): with sqlite3.connect(contact_db) as db: db.row_factory = sqlite3.Row + db.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace") if args.android: android_handler.contacts(db, data, args.enrich_from_vcards) else: @@ -547,6 +548,7 @@ def process_messages(args, data: ChatCollection) -> None: with sqlite3.connect(msg_db) as db: db.row_factory = sqlite3.Row + db.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace") # Process messages if args.android: @@ -584,6 +586,7 @@ def process_calls(args, db, data: ChatCollection, filter_chat, timing) -> None: elif args.ios and args.call_db_ios is not None: with sqlite3.connect(args.call_db_ios) as cdb: cdb.row_factory = sqlite3.Row + cdb.text_factory = lambda b: b.decode(encoding="utf-8", errors="replace") ios_handler.calls(cdb, data, timing, filter_chat) From 95a52231be11f77b8196b8cbd18af792676db7ea Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 00:03:08 +0800 Subject: [PATCH 39/52] Fix the returning string for empty filter list --- Whatsapp_Chat_Exporter/utility.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index f9a5da4..cfd00bf 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -400,7 +400,7 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List Raises: ValueError: If the column count is invalid or an unsupported platform is provided. """ - if filter is not None: + if filter is not None and len(filter) > 0: conditions = [] if len(columns) < 2 and jid is not None: raise ValueError( From 2e7953f4ca9c5ae8f332f7cb5cc38c02e6256285 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 00:03:21 +0800 Subject: [PATCH 40/52] Add unit test for get_chat_condition --- tests/test_utility.py | 96 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/tests/test_utility.py b/tests/test_utility.py index 8ea2af4..b4ece2a 100644 --- a/tests/test_utility.py +++ b/tests/test_utility.py @@ -254,3 +254,99 @@ class TestSafeName: def test_safe_name(self, input_text, expected_output): result = safe_name(input_text) assert result == expected_output + + +class TestGetChatCondition: + def test_no_filter(self): + """Test when filter is None""" + result = get_chat_condition(None, True, ["column1", "column2"]) + assert result == "" + + result = get_chat_condition(None, False, ["column1"]) + assert result == "" + + def test_include_single_chat_single_column(self): + """Test including a single chat with single column""" + result = get_chat_condition(["1234567890"], True, ["phone"]) + assert result == "AND ( phone LIKE '%1234567890%')" + + def test_include_multiple_chats_single_column(self): + """Test including multiple chats with single column""" + result = get_chat_condition(["1234567890", "0987654321"], True, ["phone"]) + assert result == "AND ( phone LIKE '%1234567890%' OR phone LIKE '%0987654321%')" + + def test_exclude_single_chat_single_column(self): + """Test excluding a single chat with single column""" + result = get_chat_condition(["1234567890"], False, ["phone"]) + assert result == "AND ( phone NOT LIKE '%1234567890%')" + + def test_exclude_multiple_chats_single_column(self): + """Test excluding multiple chats with single column""" + result = get_chat_condition(["1234567890", "0987654321"], False, ["phone"]) + assert result == "AND ( phone NOT LIKE '%1234567890%' AND phone NOT LIKE '%0987654321%')" + + def test_include_with_jid_android(self): + """Test including chats with JID for Android platform""" + result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "android") + assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1))" + + def test_include_with_jid_ios(self): + """Test including chats with JID for iOS platform""" + result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "ios") + assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid IS NOT NULL))" + + def test_exclude_with_jid_android(self): + """Test excluding chats with JID for Android platform""" + result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "android") + assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1))" + + def test_exclude_with_jid_ios(self): + """Test excluding chats with JID for iOS platform""" + result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "ios") + assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid IS NOT NULL))" + + def test_multiple_chats_with_jid_android(self): + """Test multiple chats with JID for Android platform""" + result = get_chat_condition(["1234567890", "0987654321"], True, ["phone", "name"], "jid", "android") + expected = "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1) OR phone LIKE '%0987654321%' OR (name LIKE '%0987654321%' AND jid.type == 1))" + assert result == expected + + def test_multiple_chats_exclude_with_jid_android(self): + """Test excluding multiple chats with JID for Android platform""" + result = get_chat_condition(["1234567890", "0987654321"], False, ["phone", "name"], "jid", "android") + expected = "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1) AND phone NOT LIKE '%0987654321%' AND (name NOT LIKE '%0987654321%' AND jid.type == 1))" + assert result == expected + + def test_invalid_column_count_with_jid(self): + """Test error when column count is less than 2 but jid is provided""" + with pytest.raises(ValueError, match="There must be at least two elements in argument columns if jid is not None"): + get_chat_condition(["1234567890"], True, ["phone"], "jid", "android") + + def test_unsupported_platform(self): + """Test error when unsupported platform is provided""" + with pytest.raises(ValueError, match="Only android and ios are supported for argument platform if jid is not None"): + get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "windows") + + def test_empty_filter_list(self): + """Test with empty filter list""" + result = get_chat_condition([], True, ["phone"]) + assert result == "" + + result = get_chat_condition([], False, ["phone"]) + assert result == "" + + def test_filter_with_empty_strings(self): + """Test with filter containing empty strings""" + result = get_chat_condition(["", "1234567890"], True, ["phone"]) + assert result == "AND ( phone LIKE '%%' OR phone LIKE '%1234567890%')" + + result = get_chat_condition([""], True, ["phone"]) + assert result == "AND ( phone LIKE '%%')" + + def test_special_characters_in_filter(self): + """Test with special characters in filter values""" + result = get_chat_condition(["test@example.com"], True, ["email"]) + assert result == "AND ( email LIKE '%test@example.com%')" + + result = get_chat_condition(["user-name"], True, ["username"]) + assert result == "AND ( username LIKE '%user-name%')" \ No newline at end of file From db01d0526380a8af9fd4b3530139b4ca914aa80f Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 00:50:06 +0800 Subject: [PATCH 41/52] Refactor get_chat_condition to increase maintainability --- Whatsapp_Chat_Exporter/utility.py | 90 +++++++++++++++++++++---------- tests/test_utility.py | 18 +++---- 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index cfd00bf..f2a729e 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -384,7 +384,35 @@ def get_cond_for_empty(enable: bool, jid_field: str, broadcast_field: str) -> st return f"AND (chat.hidden=0 OR {jid_field}='status@broadcast' OR {broadcast_field}>0)" if enable else "" -def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List[str], jid: Optional[str] = None, platform: Optional[str] = None) -> str: +def _get_group_condition(jid: str, platform: str) -> str: + """Generate platform-specific group identification condition. + + Args: + jid: The JID column name. + platform: The platform ("android" or "ios"). + + Returns: + SQL condition string for group identification. + + Raises: + ValueError: If platform is not supported. + """ + if platform == "android": + return f"{jid}.type == 1" + elif platform == "ios": + return f"{jid} IS NOT NULL" + else: + raise ValueError( + "Only android and ios are supported for argument platform if jid is not None") + + +def get_chat_condition( + filter: Optional[List[str]], + include: bool, + columns: List[str], + jid: Optional[str] = None, + platform: Optional[str] = None +) -> str: """Generates a SQL condition for filtering chats based on inclusion or exclusion criteria. Args: @@ -400,35 +428,39 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List Raises: ValueError: If the column count is invalid or an unsupported platform is provided. """ - if filter is not None and len(filter) > 0: - conditions = [] - if len(columns) < 2 and jid is not None: - raise ValueError( - "There must be at least two elements in argument columns if jid is not None") - if jid is not None: - if platform == "android": - is_group = f"{jid}.type == 1" - elif platform == "ios": - is_group = f"{jid} IS NOT NULL" - else: - raise ValueError( - "Only android and ios are supported for argument platform if jid is not None") - for index, chat in enumerate(filter): - if include: - conditions.append( - f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") - if len(columns) > 1: - conditions.append( - f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") - else: - conditions.append( - f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") - if len(columns) > 1: - conditions.append( - f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") - return f"AND ({' '.join(conditions)})" - else: + if not filter: return "" + + if jid is not None and len(columns) < 2: + raise ValueError( + "There must be at least two elements in argument columns if jid is not None") + + # Get group condition if needed + is_group_condition = None + if jid is not None: + is_group_condition = _get_group_condition(jid, platform) + + # Build conditions for each chat filter + conditions = [] + for index, chat in enumerate(filter): + # Add connector for subsequent conditions (with double space) + connector = " OR" if include else " AND" + prefix = connector if index > 0 else "" + + # Primary column condition + operator = "LIKE" if include else "NOT LIKE" + conditions.append(f"{prefix} {columns[0]} {operator} '%{chat}%'") + + # Secondary column condition for groups + if len(columns) > 1 and is_group_condition: + if include: + group_condition = f" OR ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})" + else: + group_condition = f" AND ({columns[1]} {operator} '%{chat}%' AND {is_group_condition})" + conditions.append(group_condition) + + combined_conditions = "".join(conditions) + return f"AND ({combined_conditions})" # Android Specific diff --git a/tests/test_utility.py b/tests/test_utility.py index b4ece2a..fcdce25 100644 --- a/tests/test_utility.py +++ b/tests/test_utility.py @@ -273,7 +273,7 @@ class TestGetChatCondition: def test_include_multiple_chats_single_column(self): """Test including multiple chats with single column""" result = get_chat_condition(["1234567890", "0987654321"], True, ["phone"]) - assert result == "AND ( phone LIKE '%1234567890%' OR phone LIKE '%0987654321%')" + assert result == "AND ( phone LIKE '%1234567890%' OR phone LIKE '%0987654321%')" def test_exclude_single_chat_single_column(self): """Test excluding a single chat with single column""" @@ -283,38 +283,38 @@ class TestGetChatCondition: def test_exclude_multiple_chats_single_column(self): """Test excluding multiple chats with single column""" result = get_chat_condition(["1234567890", "0987654321"], False, ["phone"]) - assert result == "AND ( phone NOT LIKE '%1234567890%' AND phone NOT LIKE '%0987654321%')" + assert result == "AND ( phone NOT LIKE '%1234567890%' AND phone NOT LIKE '%0987654321%')" def test_include_with_jid_android(self): """Test including chats with JID for Android platform""" result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "android") - assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1))" + assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1))" def test_include_with_jid_ios(self): """Test including chats with JID for iOS platform""" result = get_chat_condition(["1234567890"], True, ["phone", "name"], "jid", "ios") - assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid IS NOT NULL))" + assert result == "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid IS NOT NULL))" def test_exclude_with_jid_android(self): """Test excluding chats with JID for Android platform""" result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "android") - assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1))" + assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1))" def test_exclude_with_jid_ios(self): """Test excluding chats with JID for iOS platform""" result = get_chat_condition(["1234567890"], False, ["phone", "name"], "jid", "ios") - assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid IS NOT NULL))" + assert result == "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid IS NOT NULL))" def test_multiple_chats_with_jid_android(self): """Test multiple chats with JID for Android platform""" result = get_chat_condition(["1234567890", "0987654321"], True, ["phone", "name"], "jid", "android") - expected = "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1) OR phone LIKE '%0987654321%' OR (name LIKE '%0987654321%' AND jid.type == 1))" + expected = "AND ( phone LIKE '%1234567890%' OR (name LIKE '%1234567890%' AND jid.type == 1) OR phone LIKE '%0987654321%' OR (name LIKE '%0987654321%' AND jid.type == 1))" assert result == expected def test_multiple_chats_exclude_with_jid_android(self): """Test excluding multiple chats with JID for Android platform""" result = get_chat_condition(["1234567890", "0987654321"], False, ["phone", "name"], "jid", "android") - expected = "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1) AND phone NOT LIKE '%0987654321%' AND (name NOT LIKE '%0987654321%' AND jid.type == 1))" + expected = "AND ( phone NOT LIKE '%1234567890%' AND (name NOT LIKE '%1234567890%' AND jid.type == 1) AND phone NOT LIKE '%0987654321%' AND (name NOT LIKE '%0987654321%' AND jid.type == 1))" assert result == expected def test_invalid_column_count_with_jid(self): @@ -338,7 +338,7 @@ class TestGetChatCondition: def test_filter_with_empty_strings(self): """Test with filter containing empty strings""" result = get_chat_condition(["", "1234567890"], True, ["phone"]) - assert result == "AND ( phone LIKE '%%' OR phone LIKE '%1234567890%')" + assert result == "AND ( phone LIKE '%%' OR phone LIKE '%1234567890%')" result = get_chat_condition([""], True, ["phone"]) assert result == "AND ( phone LIKE '%%')" From 0c5f2b7f1343270ef0ade695ee9bb4c0fcfd070a Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 01:19:55 +0800 Subject: [PATCH 42/52] Add a comment on SQLi in get_chat_condition --- Whatsapp_Chat_Exporter/utility.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index f2a729e..a4db10c 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -415,6 +415,10 @@ def get_chat_condition( ) -> str: """Generates a SQL condition for filtering chats based on inclusion or exclusion criteria. + SQL injection risks from chat filters were evaluated during development and deemed negligible + due to the tool's offline, trusted-input model (user running this tool on WhatsApp + backups/databases on their own device). + Args: filter: A list of phone numbers to include or exclude. include: True to include chats that match the filter, False to exclude them. From f05e0d34519d874598eae919bf60591719c7a45b Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 01:33:18 +0800 Subject: [PATCH 43/52] Refactor incremental_merge --- Whatsapp_Chat_Exporter/utility.py | 273 +++++++++++++++++++++++------- 1 file changed, 209 insertions(+), 64 deletions(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index a4db10c..fb55f3e 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -13,7 +13,7 @@ from datetime import datetime, timedelta from enum import IntEnum from tqdm import tqdm from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union, Any try: from enum import StrEnum, IntEnum except ImportError: @@ -257,86 +257,231 @@ def import_from_json(json_file: str, data: ChatCollection): logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}") -def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): - """Merges JSON files from the source directory into the target directory. +class IncrementalMerger: + """Handles incremental merging of WhatsApp chat exports.""" + + def __init__(self, pretty_print_json: int, avoid_encoding_json: bool): + """Initialize the merger with JSON formatting options. + + Args: + pretty_print_json: JSON indentation level. + avoid_encoding_json: Whether to avoid ASCII encoding. + """ + self.pretty_print_json = pretty_print_json + self.avoid_encoding_json = avoid_encoding_json + + def _get_json_files(self, source_dir: str) -> List[str]: + """Get list of JSON files from source directory. + + Args: + source_dir: Path to the source directory. + + Returns: + List of JSON filenames. + + Raises: + SystemExit: If no JSON files are found. + """ + json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] + if not json_files: + logger.error("No JSON files found in the source directory.") + raise SystemExit(1) + + logger.info("JSON files found:", json_files) + return json_files - Args: - source_dir (str): The path to the source directory containing JSON files. - target_dir (str): The path to the target directory to merge into. - media_dir (str): The path to the media directory. - """ - json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] - if not json_files: - logger.error("No JSON files found in the source directory.") - return + def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None: + """Copy a new JSON file to target directory. + + Args: + source_path: Path to source file. + target_path: Path to target file. + target_dir: Target directory path. + json_file: Name of the JSON file. + """ + logger.info(f"Copying '{json_file}' to target directory...") + os.makedirs(target_dir, exist_ok=True) + shutil.copy2(source_path, target_path) - logger.info("JSON files found:", json_files) + def _load_chat_data(self, file_path: str) -> Dict[str, Any]: + """Load JSON data from file. + + Args: + file_path: Path to JSON file. + + Returns: + Loaded JSON data. + """ + with open(file_path, 'r') as file: + return json.load(file) - for json_file in json_files: - source_path = os.path.join(source_dir, json_file) - target_path = os.path.join(target_dir, json_file) + def _parse_chats_from_json(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Parse JSON data into ChatStore objects. + + Args: + data: Raw JSON data. + + Returns: + Dictionary of JID to ChatStore objects. + """ + return {jid: ChatStore.from_json(chat) for jid, chat in data.items()} - if not os.path.exists(target_path): - logger.info(f"Copying '{json_file}' to target directory...") - os.makedirs(target_dir, exist_ok=True) - shutil.copy2(source_path, target_path) + def _merge_chat_stores(self, source_chats: Dict[str, Any], target_chats: Dict[str, Any]) -> Dict[str, Any]: + """Merge source chats into target chats. + + Args: + source_chats: Source ChatStore objects. + target_chats: Target ChatStore objects. + + Returns: + Merged ChatStore objects. + """ + for jid, chat in source_chats.items(): + if jid in target_chats: + target_chats[jid].merge_with(chat) + else: + target_chats[jid] = chat + return target_chats + + def _serialize_chats(self, chats: Dict[str, Any]) -> Dict[str, Any]: + """Serialize ChatStore objects to JSON format. + + Args: + chats: Dictionary of ChatStore objects. + + Returns: + Serialized JSON data. + """ + return {jid: chat.to_json() for jid, chat in chats.items()} + + def _has_changes(self, merged_data: Dict[str, Any], original_data: Dict[str, Any]) -> bool: + """Check if merged data differs from original data. + + Args: + merged_data: Merged JSON data. + original_data: Original JSON data. + + Returns: + True if changes detected, False otherwise. + """ + return json.dumps(merged_data, sort_keys=True) != json.dumps(original_data, sort_keys=True) + + def _save_merged_data(self, target_path: str, merged_data: Dict[str, Any]) -> None: + """Save merged data to target file. + + Args: + target_path: Path to target file. + merged_data: Merged JSON data. + """ + with open(target_path, 'w') as merged_file: + json.dump( + merged_data, + merged_file, + indent=self.pretty_print_json, + ensure_ascii=not self.avoid_encoding_json, + ) + + def _merge_json_file(self, source_path: str, target_path: str, json_file: str) -> None: + """Merge a single JSON file. + + Args: + source_path: Path to source file. + target_path: Path to target file. + json_file: Name of the JSON file. + """ + logger.info(f"Merging '{json_file}' with existing file in target directory...") + + source_data = self._load_chat_data(source_path) + target_data = self._load_chat_data(target_path) + + source_chats = self._parse_chats_from_json(source_data) + target_chats = self._parse_chats_from_json(target_data) + + merged_chats = self._merge_chat_stores(source_chats, target_chats) + merged_data = self._serialize_chats(merged_chats) + + if self._has_changes(merged_data, target_data): + logger.info(f"Changes detected in '{json_file}', updating target file...") + self._save_merged_data(target_path, merged_data) else: - logger.info( - f"Merging '{json_file}' with existing file in target directory...") - with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: - source_data = json.load(src_file) - target_data = json.load(tgt_file) + logger.info(f"No changes detected in '{json_file}', skipping update.") - # Parse JSON into ChatStore objects using from_json() - source_chats = {jid: ChatStore.from_json( - chat) for jid, chat in source_data.items()} - target_chats = {jid: ChatStore.from_json( - chat) for jid, chat in target_data.items()} + def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: + """Check if media file should be copied. + + Args: + source_file: Path to source media file. + target_file: Path to target media file. + + Returns: + True if file should be copied, False otherwise. + """ + return not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file) - # Merge chats using merge_with() - for jid, chat in source_chats.items(): - if jid in target_chats: - target_chats[jid].merge_with(chat) - else: - target_chats[jid] = chat - - # Serialize merged data - merged_data = {jid: chat.to_json() - for jid, chat in target_chats.items()} - - # Check if the merged data differs from the original target data - if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): - logger.info( - f"Changes detected in '{json_file}', updating target file...") - with open(target_path, 'w') as merged_file: - json.dump( - merged_data, - merged_file, - indent=pretty_print_json, - ensure_ascii=not avoid_encoding_json, - ) - else: - logger.info( - f"No changes detected in '{json_file}', skipping update.") - - # Merge media directories - source_media_path = os.path.join(source_dir, media_dir) - target_media_path = os.path.join(target_dir, media_dir) - logger.info( - f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") - if os.path.exists(source_media_path): + def _merge_media_directories(self, source_dir: str, target_dir: str, media_dir: str) -> None: + """Merge media directories from source to target. + + Args: + source_dir: Source directory path. + target_dir: Target directory path. + media_dir: Media directory name. + """ + source_media_path = os.path.join(source_dir, media_dir) + target_media_path = os.path.join(target_dir, media_dir) + + logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + + if not os.path.exists(source_media_path): + return + for root, _, files in os.walk(source_media_path): relative_path = os.path.relpath(root, source_media_path) target_root = os.path.join(target_media_path, relative_path) os.makedirs(target_root, exist_ok=True) + for file in files: source_file = os.path.join(root, file) target_file = os.path.join(target_root, file) - # we only copy if the file doesn't exist in the target or if the source is newer - if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file): + + if self._should_copy_media_file(source_file, target_file): logger.info(f"Copying '{source_file}' to '{target_file}'...") shutil.copy2(source_file, target_file) + def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None: + """Merge JSON files and media from source to target directory. + + Args: + source_dir: The path to the source directory containing JSON files. + target_dir: The path to the target directory to merge into. + media_dir: The path to the media directory. + """ + json_files = self._get_json_files(source_dir) + + for json_file in json_files: + source_path = os.path.join(source_dir, json_file) + target_path = os.path.join(target_dir, json_file) + + if not os.path.exists(target_path): + self._copy_new_file(source_path, target_path, target_dir, json_file) + else: + self._merge_json_file(source_path, target_path, json_file) + + self._merge_media_directories(source_dir, target_dir, media_dir) + + +def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool) -> None: + """Wrapper for merging JSON files from the source directory into the target directory. + + Args: + source_dir: The path to the source directory containing JSON files. + target_dir: The path to the target directory to merge into. + media_dir: The path to the media directory. + pretty_print_json: JSON indentation level. + avoid_encoding_json: Whether to avoid ASCII encoding. + """ + merger = IncrementalMerger(pretty_print_json, avoid_encoding_json) + merger.merge(source_dir, target_dir, media_dir) + def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]: """Generates a sanitized filename and contact name for a chat. From 1694ae7dd930c3de27d102ebb91d51a0dbded635 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 01:47:45 +0800 Subject: [PATCH 44/52] Update utility.py --- Whatsapp_Chat_Exporter/utility.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index fb55f3e..8de2e9b 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -765,7 +765,7 @@ def check_jid_map(db: sqlite3.Connection) -> bool: """ cursor = db.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='jid_map'") - return cursor.fetchone()is not None + return cursor.fetchone() is not None def get_jid_map_join(jid_map_exists: bool) -> str: @@ -815,6 +815,7 @@ def get_transcription_selection(db: sqlite3.Connection) -> str: else: return "NULL AS transcription_text" + def setup_template(template: Optional[str], no_avatar: bool, experimental: bool = False) -> jinja2.Template: """ Sets up the Jinja2 template environment and loads the template. From 746e4e1ac58c2cb8c5012c3b552085cabf51dbbe Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 16:24:10 +0800 Subject: [PATCH 45/52] Fix and improve the logging facility for incremental merge --- Whatsapp_Chat_Exporter/utility.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 8de2e9b..f73af1b 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -287,7 +287,7 @@ class IncrementalMerger: logger.error("No JSON files found in the source directory.") raise SystemExit(1) - logger.info("JSON files found:", json_files) + logger.debug("JSON files found:", json_files) return json_files def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None: @@ -389,7 +389,7 @@ class IncrementalMerger: target_path: Path to target file. json_file: Name of the JSON file. """ - logger.info(f"Merging '{json_file}' with existing file in target directory...") + logger.info(f"Merging '{json_file}' with existing file in target directory...\r") source_data = self._load_chat_data(source_path) target_data = self._load_chat_data(target_path) @@ -401,10 +401,10 @@ class IncrementalMerger: merged_data = self._serialize_chats(merged_chats) if self._has_changes(merged_data, target_data): - logger.info(f"Changes detected in '{json_file}', updating target file...") + logger.info(f"Changes detected in '{json_file}', updating target file...{CLEAR_LINE}") self._save_merged_data(target_path, merged_data) else: - logger.info(f"No changes detected in '{json_file}', skipping update.") + logger.info(f"No changes detected in '{json_file}', skipping update.{CLEAR_LINE}") def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: """Check if media file should be copied. @@ -429,7 +429,7 @@ class IncrementalMerger: source_media_path = os.path.join(source_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir) - logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}{CLEAR_LINE}") if not os.path.exists(source_media_path): return @@ -444,7 +444,7 @@ class IncrementalMerger: target_file = os.path.join(target_root, file) if self._should_copy_media_file(source_file, target_file): - logger.info(f"Copying '{source_file}' to '{target_file}'...") + logger.debug(f"Copying '{source_file}' to '{target_file}'...") shutil.copy2(source_file, target_file) def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None: @@ -457,6 +457,7 @@ class IncrementalMerger: """ json_files = self._get_json_files(source_dir) + logger.info("Starting incremental merge process...{CLEAR_LINE}") for json_file in json_files: source_path = os.path.join(source_dir, json_file) target_path = os.path.join(target_dir, json_file) From 4eed3ca32102fc7c3796d0e2f9be4b0bd9b778c0 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 16:48:07 +0800 Subject: [PATCH 46/52] Refactor CLEAR_LINE in a more pythonic way So it is easier for contributor to write a logging line for this project. --- Whatsapp_Chat_Exporter/__main__.py | 45 +++++++++++++-------- Whatsapp_Chat_Exporter/android_crypt.py | 16 ++++---- Whatsapp_Chat_Exporter/android_handler.py | 22 +++++----- Whatsapp_Chat_Exporter/exported_handler.py | 4 +- Whatsapp_Chat_Exporter/ios_handler.py | 22 +++++----- Whatsapp_Chat_Exporter/ios_media_handler.py | 20 ++++----- Whatsapp_Chat_Exporter/utility.py | 15 ++++--- Whatsapp_Chat_Exporter/vcards_contacts.py | 6 +-- 8 files changed, 81 insertions(+), 69 deletions(-) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 74ccaed..9d30935 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -12,7 +12,7 @@ import importlib.metadata from Whatsapp_Chat_Exporter import android_crypt, exported_handler, android_handler from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, CURRENT_TZ_OFFSET, Crypt +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CURRENT_TZ_OFFSET, Crypt from Whatsapp_Chat_Exporter.utility import readable_to_bytes, safe_name, bytes_to_readable from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, check_update from Whatsapp_Chat_Exporter.utility import telegram_json_format, convert_time_unit, DbType @@ -440,10 +440,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: def decrypt_android_backup(args) -> int: """Decrypt Android backup files and return error code.""" if args.key is None or args.backup is None: - logger.error(f"You must specify the backup file with -b and a key with -k{CLEAR_LINE}") + logger.error(f"You must specify the backup file with -b and a key with -k") return 1 - logger.info(f"Decryption key specified, decrypting WhatsApp backup...{CLEAR_LINE}") + logger.info(f"Decryption key specified, decrypting WhatsApp backup...") # Determine crypt type if "crypt12" in args.backup: @@ -454,7 +454,7 @@ def decrypt_android_backup(args) -> int: crypt = Crypt.CRYPT15 else: logger.error( - f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.{CLEAR_LINE}") + f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") return 1 # Get key @@ -597,20 +597,20 @@ def handle_media_directory(args) -> None: if os.path.isdir(media_path): logger.info( - f"WhatsApp directory already exists in output directory. Skipping...{CLEAR_LINE}") + f"WhatsApp directory already exists in output directory. Skipping...") else: if args.move_media: try: - logger.info(f"Moving media directory...\r") + logger.info(f"Moving media directory...", extra={"clear": True}) shutil.move(args.media, f"{args.output}/") - logger.info(f"Media directory has been moved to the output directory{CLEAR_LINE}") + logger.info(f"Media directory has been moved to the output directory") except PermissionError: logger.warning("Cannot remove original WhatsApp directory. " "Perhaps the directory is opened?\n") else: - logger.info(f"Copying media directory...\r") + logger.info(f"Copying media directory...", extra={"clear": True}) shutil.copytree(args.media, media_path) - logger.info(f"Media directory has been copied to the output directory{CLEAR_LINE}") + logger.info(f"Media directory has been copied to the output directory") def create_output_files(args, data: ChatCollection) -> None: @@ -631,7 +631,7 @@ def create_output_files(args, data: ChatCollection) -> None: # Create text files if requested if args.text_format: - logger.info(f"Writing text file...{CLEAR_LINE}") + logger.info(f"Writing text file...") android_handler.create_txt(data, args.text_format) # Create JSON files if requested @@ -661,9 +661,9 @@ def export_single_json(args, data: Dict) -> None: ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) - logger.info(f"Writing JSON file...\r") + logger.info(f"Writing JSON file...", extra={"clear": True}) f.write(json_data) - logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))}){CLEAR_LINE}") + logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))})") def export_multiple_json(args, data: Dict) -> None: @@ -697,7 +697,7 @@ def export_multiple_json(args, data: Dict) -> None: f.write(file_content) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}") def process_exported_chat(args, data: ChatCollection) -> None: @@ -722,16 +722,29 @@ def process_exported_chat(args, data: ChatCollection) -> None: shutil.copy(file, args.output) +class ClearLineFilter(logging.Filter): + def filter(self, record): + is_clear = getattr(record, 'clear', False) + if is_clear: + record.line_end = "\r" + record.prefix = "\x1b[K" + else: + record.line_end = "\n" + record.prefix = "" + return True + + def setup_logging(level): log_handler_stdout = logging.StreamHandler() log_handler_stdout.terminator = "" handlers = [log_handler_stdout] + log_handler_stdout.addFilter(ClearLineFilter()) if level == logging.DEBUG: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")) logging.basicConfig( level=level, - format="[%(levelname)s] %(message)s", + format="[%(levelname)s] %(message)s%(line_end)s", handlers=handlers ) @@ -822,7 +835,7 @@ def main(): args.backup, identifiers, args.decrypt_chunk_size) else: logger.info( - f"WhatsApp directory already exists, skipping WhatsApp file extraction.{CLEAR_LINE}") + f"WhatsApp directory already exists, skipping WhatsApp file extraction.") # Set default DB paths if not provided if args.db is None: @@ -838,7 +851,7 @@ def main(): args.pretty_print_json, args.avoid_encoding_json ) - logger.info(f"Incremental merge completed successfully.{CLEAR_LINE}") + logger.info(f"Incremental merge completed successfully.") else: # Process contacts process_contacts(args, data) diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index 54a4e09..8a87420 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -7,7 +7,7 @@ from tqdm import tqdm from typing import Tuple, Union from hashlib import sha256 from functools import partial -from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType +from Whatsapp_Chat_Exporter.utility import CRYPT14_OFFSETS, Crypt, DbType try: import zlib @@ -172,11 +172,11 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> continue else: logger.debug( - f"Decryption successful with known offsets: IV {iv}, DB {db}{CLEAR_LINE}" + f"Decryption successful with known offsets: IV {iv}, DB {db}" ) return decrypted_db # Successful decryption - logger.info(f"Common offsets failed. Will attempt to brute-force{CLEAR_LINE}") + logger.info(f"Common offsets failed. Will attempt to brute-force") offset_max = 200 workers = max_worker check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key) @@ -196,19 +196,19 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> break if found: logger.info( - f"The offsets of your IV and database are {start_iv} and {start_db}, respectively.{CLEAR_LINE}" + f"The offsets of your IV and database are {start_iv} and {start_db}, respectively." ) logger.info( - f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:{CLEAR_LINE}" + f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:" ) - logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47{CLEAR_LINE}") + logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47") return result except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True) print("\n") raise KeyboardInterrupt( - f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}" + f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully..." ) finally: @@ -346,7 +346,7 @@ def decrypt_backup( main_key, hex_key = _derive_main_enc_key(key) if show_crypt15: hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]) - logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}{CLEAR_LINE}") + logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}") else: main_key = key[126:] diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 72ace63..0760b30 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -11,7 +11,7 @@ from markupsafe import escape as htmle from base64 import b64decode, b64encode from datetime import datetime from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join +from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, JidType, Device, get_jid_map_join from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata @@ -56,7 +56,7 @@ def contacts(db, data, enrich_from_vcards): current_chat.status = row["status"] pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") return True @@ -101,7 +101,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, pbar.update(1) total_time = pbar.format_dict['elapsed'] _get_reactions(db, data) - logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") # Helper functions for message processing @@ -518,7 +518,7 @@ def _get_reactions(db, data): if c.fetchone()[0] == 0: return - logger.info("Processing reactions...\r") + logger.info("Processing reactions...", extra={"clear": True}) c.execute(""" SELECT @@ -539,7 +539,7 @@ def _get_reactions(db, data): ON chat.jid_row_id = chat_jid._id """) except sqlite3.OperationalError: - logger.warning(f"Could not fetch reactions (schema might be too old or incompatible){CLEAR_LINE}") + logger.warning(f"Could not fetch reactions (schema might be too old or incompatible)") return rows = c.fetchall() @@ -574,7 +574,7 @@ def _get_reactions(db, data): message.reactions[sender_name] = reaction pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}") def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False): @@ -609,7 +609,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") # Helper functions for media processing @@ -828,7 +828,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): _process_vcard_row(row, path, data) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty): """Execute vCard query for modern WhatsApp database schema.""" @@ -935,7 +935,7 @@ def calls(db, data, timezone_offset, filter_chat): if total_row_number == 0: return - logger.info(f"Processing calls...({total_row_number})\r") + logger.info(f"Processing calls...({total_row_number})", extra={"clear": True}) # Fetch call data calls_data = _fetch_calls_data(c, filter_chat) @@ -952,7 +952,7 @@ def calls(db, data, timezone_offset, filter_chat): # Add the calls chat to the data data.add_chat("000000000000000", chat) - logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") def _get_calls_count(c, filter_chat): """Get the count of call records that match the filter.""" @@ -1128,7 +1128,7 @@ def create_html( pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}") def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline): """Generate a single HTML file for a chat.""" diff --git a/Whatsapp_Chat_Exporter/exported_handler.py b/Whatsapp_Chat_Exporter/exported_handler.py index 984dbe1..45510f7 100644 --- a/Whatsapp_Chat_Exporter/exported_handler.py +++ b/Whatsapp_Chat_Exporter/exported_handler.py @@ -6,7 +6,7 @@ from datetime import datetime from mimetypes import MimeTypes from tqdm import tqdm from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device, convert_time_unit +from Whatsapp_Chat_Exporter.utility import Device, convert_time_unit logger = logging.getLogger(__name__) @@ -43,7 +43,7 @@ def messages(path, data, assume_first_as_me=False): ) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}") return data diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index c0f86fe..503653e 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -9,7 +9,7 @@ from pathlib import Path from mimetypes import MimeTypes from markupsafe import escape as htmle from Whatsapp_Chat_Exporter.data_model import ChatStore, Message -from Whatsapp_Chat_Exporter.utility import APPLE_TIME, CLEAR_LINE, get_chat_condition, Device +from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name @@ -21,7 +21,7 @@ def contacts(db, data): c = db.cursor() c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") total_row_number = c.fetchone()[0] - logger.info(f"Pre-processing contacts...({total_row_number})\r") + logger.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True}) c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar: @@ -35,7 +35,7 @@ def contacts(db, data): data.add_chat(zwhatsapp_id, current_chat) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}") def process_contact_avatars(current_chat, media_folder, contact_id): @@ -132,7 +132,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, process_contact_avatars(current_chat, media_folder, contact_id) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") # Get message count message_count_query = f""" @@ -149,7 +149,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """ c.execute(message_count_query) total_row_number = c.fetchone()[0] - logger.info(f"Processing messages...(0/{total_row_number})\r") + logger.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True}) # Fetch messages messages_query = f""" @@ -226,7 +226,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") def process_message_data(message, content, is_group_message, data, message_map, no_reply): @@ -340,7 +340,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa """ c.execute(media_count_query) total_row_number = c.fetchone()[0] - logger.info(f"Processing media...(0/{total_row_number})\r") + logger.info(f"Processing media...(0/{total_row_number})", extra={"clear": True}) # Fetch media items media_query = f""" @@ -373,7 +373,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False): @@ -462,7 +462,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): c.execute(vcard_query) contents = c.fetchall() total_row_number = len(contents) - logger.info(f"Processing vCards...(0/{total_row_number})\r") + logger.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True}) # Create vCards directory path = f'{media_folder}/Message/vCards' @@ -474,7 +474,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): process_vcard_item(content, path, data) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") def process_vcard_item(content, path, data): @@ -566,7 +566,7 @@ def calls(db, data, timezone_offset, filter_chat): # Add calls chat to data data.add_chat("000000000000000", chat) - logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") def process_call_record(content, chat, data, timezone_offset): diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index 56df3d0..321b196 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -8,7 +8,7 @@ import getpass from sys import exit, platform as osname import sys from tqdm import tqdm -from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, WhatsAppIdentifier, convert_time_unit +from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier, convert_time_unit from Whatsapp_Chat_Exporter.bplist import BPListReader try: from iphone_backup_decrypt import EncryptedBackup, RelativePath @@ -79,7 +79,7 @@ class BackupExtractor: ) return - logger.info(f"Encryption detected on the backup!{CLEAR_LINE}") + logger.info(f"Encryption detected on the backup!") password = getpass.getpass("Enter the password for the backup:") sys.stdout.write("\033[F\033[K") sys.stdout.flush() @@ -93,7 +93,7 @@ class BackupExtractor: Args: password (str): The password for the encrypted backup. """ - logger.info(f"Trying to open the iOS backup...{CLEAR_LINE}") + logger.info(f"Trying to open the iOS backup...") self.backup = EncryptedBackup( backup_directory=self.base_dir, passphrase=password, @@ -101,8 +101,8 @@ class BackupExtractor: check_same_thread=False, decrypt_chunk_size=self.decrypt_chunk_size, ) - logger.info(f"iOS backup is opened successfully{CLEAR_LINE}") - logger.info("Decrypting WhatsApp database...\r") + logger.info(f"iOS backup is opened successfully") + logger.info("Decrypting WhatsApp database...", extra={"clear": True}) try: self.backup.extract_file( relative_path=RelativePath.WHATSAPP_MESSAGES, @@ -130,7 +130,7 @@ class BackupExtractor: ) exit(6) else: - logger.info(f"WhatsApp database decrypted successfully{CLEAR_LINE}") + logger.info(f"WhatsApp database decrypted successfully") def _extract_decrypted_files(self): """Extract all WhatsApp files after decryption""" @@ -150,7 +150,7 @@ class BackupExtractor: ) total_time = pbar.format_dict['elapsed'] pbar.close() - logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}") def _extract_unencrypted_backup(self): """ @@ -182,12 +182,12 @@ class BackupExtractor: shutil.copyfile(wts_db_path, self.identifiers.MESSAGE) if not os.path.isfile(contact_db_path): - logger.warning(f"Contact database not found. Skipping...{CLEAR_LINE}") + logger.warning(f"Contact database not found. Skipping...") else: shutil.copyfile(contact_db_path, self.identifiers.CONTACT) if not os.path.isfile(call_db_path): - logger.warning(f"Call database not found. Skipping...{CLEAR_LINE}") + logger.warning(f"Call database not found. Skipping...") else: shutil.copyfile(call_db_path, self.identifiers.CALL) @@ -236,7 +236,7 @@ class BackupExtractor: os.utime(destination, (modification, modification)) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}") def extract_media(base_dir, identifiers, decrypt_chunk_size): diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index f73af1b..fe21492 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -30,7 +30,6 @@ except ImportError: MAX_SIZE = 4 * 1024 * 1024 # Default 4MB ROW_SIZE = 0x3D0 CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600 -CLEAR_LINE = "\x1b[K\n" logger = logging.getLogger(__name__) @@ -254,7 +253,7 @@ def import_from_json(json_file: str, data: ChatCollection): data.add_chat(jid, chat) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}") + logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}") class IncrementalMerger: @@ -389,7 +388,7 @@ class IncrementalMerger: target_path: Path to target file. json_file: Name of the JSON file. """ - logger.info(f"Merging '{json_file}' with existing file in target directory...\r") + logger.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True}) source_data = self._load_chat_data(source_path) target_data = self._load_chat_data(target_path) @@ -401,10 +400,10 @@ class IncrementalMerger: merged_data = self._serialize_chats(merged_chats) if self._has_changes(merged_data, target_data): - logger.info(f"Changes detected in '{json_file}', updating target file...{CLEAR_LINE}") + logger.info(f"Changes detected in '{json_file}', updating target file...") self._save_merged_data(target_path, merged_data) else: - logger.info(f"No changes detected in '{json_file}', skipping update.{CLEAR_LINE}") + logger.info(f"No changes detected in '{json_file}', skipping update.") def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: """Check if media file should be copied. @@ -429,7 +428,7 @@ class IncrementalMerger: source_media_path = os.path.join(source_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir) - logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}{CLEAR_LINE}") + logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") if not os.path.exists(source_media_path): return @@ -457,7 +456,7 @@ class IncrementalMerger: """ json_files = self._get_json_files(source_dir) - logger.info("Starting incremental merge process...{CLEAR_LINE}") + logger.info("Starting incremental merge process...") for json_file in json_files: source_path = os.path.join(source_dir, json_file) target_path = os.path.join(target_dir, json_file) @@ -894,7 +893,7 @@ def get_chat_type(chat_id: str) -> str: return "status_broadcast" elif chat_id.endswith("@broadcast"): return "broadcast_channel" - logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group{CLEAR_LINE}") + logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group") return "private_group" diff --git a/Whatsapp_Chat_Exporter/vcards_contacts.py b/Whatsapp_Chat_Exporter/vcards_contacts.py index 03d60ce..707f555 100644 --- a/Whatsapp_Chat_Exporter/vcards_contacts.py +++ b/Whatsapp_Chat_Exporter/vcards_contacts.py @@ -3,7 +3,7 @@ import re import quopri from typing import List, TypedDict from Whatsapp_Chat_Exporter.data_model import ChatStore -from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, Device +from Whatsapp_Chat_Exporter.utility import Device logger = logging.getLogger(__name__) @@ -47,7 +47,7 @@ def decode_quoted_printable(value: str, charset: str) -> str: # Fallback: return the original value if decoding fails logger.warning( f"Failed to decode quoted-printable value: {value}, " - f"charset: {charset}. Please report this issue.{CLEAR_LINE}" + f"charset: {charset}. Please report this issue." ) return value @@ -176,7 +176,7 @@ def read_vcards_file(vcf_file_path, default_country_code: str): if contact := process_vcard_entry(vcard): contacts.append(contact) - logger.info(f"Imported {len(contacts)} contacts/vcards{CLEAR_LINE}") + logger.info(f"Imported {len(contacts)} contacts/vcards") return map_number_to_name(contacts, default_country_code) From f920ca82b42cc42e894bfe3e76b1d146d89a9167 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:05:14 +0800 Subject: [PATCH 47/52] Refactor the logging facility a bit --- Whatsapp_Chat_Exporter/__main__.py | 79 ++++++++++++--------- Whatsapp_Chat_Exporter/android_crypt.py | 17 +++-- Whatsapp_Chat_Exporter/android_handler.py | 45 ++++++------ Whatsapp_Chat_Exporter/exported_handler.py | 3 +- Whatsapp_Chat_Exporter/ios_handler.py | 21 +++--- Whatsapp_Chat_Exporter/ios_media_handler.py | 33 +++++---- Whatsapp_Chat_Exporter/utility.py | 41 ++++++----- Whatsapp_Chat_Exporter/vcards_contacts.py | 5 +- 8 files changed, 123 insertions(+), 121 deletions(-) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 9d30935..55aaf27 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -26,7 +26,6 @@ from typing import Optional, List, Dict from Whatsapp_Chat_Exporter.vcards_contacts import ContactsFromVCards -logger = logging.getLogger(__name__) __version__ = importlib.metadata.version("whatsapp_chat_exporter") WTSEXPORTER_BANNER = f"""======================================================================================================== ██╗ ██╗██╗ ██╗ █████╗ ████████╗███████╗ █████╗ ██████╗ ██████╗ @@ -440,10 +439,10 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: def decrypt_android_backup(args) -> int: """Decrypt Android backup files and return error code.""" if args.key is None or args.backup is None: - logger.error(f"You must specify the backup file with -b and a key with -k") + logging.error(f"You must specify the backup file with -b and a key with -k") return 1 - logger.info(f"Decryption key specified, decrypting WhatsApp backup...") + logging.info(f"Decryption key specified, decrypting WhatsApp backup...") # Determine crypt type if "crypt12" in args.backup: @@ -453,7 +452,7 @@ def decrypt_android_backup(args) -> int: elif "crypt15" in args.backup: crypt = Crypt.CRYPT15 else: - logger.error( + logging.error( f"Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") return 1 @@ -506,15 +505,15 @@ def decrypt_android_backup(args) -> int: def handle_decrypt_error(error: int) -> None: """Handle decryption errors with appropriate messages.""" if error == 1: - logger.error("Dependencies of decrypt_backup and/or extract_encrypted_key" - " are not present. For details, see README.md.\n") + logging.error("Dependencies of decrypt_backup and/or extract_encrypted_key" + " are not present. For details, see README.md.") exit(3) elif error == 2: - logger.error("Failed when decompressing the decrypted backup. " - "Possibly incorrect offsets used in decryption.\n") + logging.error("Failed when decompressing the decrypted backup. " + "Possibly incorrect offsets used in decryption.") exit(4) else: - logger.error("Unknown error occurred.\n") + logging.error("Unknown error occurred.") exit(5) @@ -537,9 +536,9 @@ def process_messages(args, data: ChatCollection) -> None: msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE if not os.path.isfile(msg_db): - logger.error( + logging.error( "The message database does not exist. You may specify the path " - "to database file with option -d or check your provided path.\n" + "to database file with option -d or check your provided path." ) exit(6) @@ -596,21 +595,21 @@ def handle_media_directory(args) -> None: media_path = os.path.join(args.output, args.media) if os.path.isdir(media_path): - logger.info( + logging.info( f"WhatsApp directory already exists in output directory. Skipping...") else: if args.move_media: try: - logger.info(f"Moving media directory...", extra={"clear": True}) + logging.info(f"Moving media directory...", extra={"clear": True}) shutil.move(args.media, f"{args.output}/") - logger.info(f"Media directory has been moved to the output directory") + logging.info(f"Media directory has been moved to the output directory") except PermissionError: - logger.warning("Cannot remove original WhatsApp directory. " - "Perhaps the directory is opened?\n") + logging.warning("Cannot remove original WhatsApp directory. " + "Perhaps the directory is opened?") else: - logger.info(f"Copying media directory...", extra={"clear": True}) + logging.info(f"Copying media directory...", extra={"clear": True}) shutil.copytree(args.media, media_path) - logger.info(f"Media directory has been copied to the output directory") + logging.info(f"Media directory has been copied to the output directory") def create_output_files(args, data: ChatCollection) -> None: @@ -631,7 +630,7 @@ def create_output_files(args, data: ChatCollection) -> None: # Create text files if requested if args.text_format: - logger.info(f"Writing text file...") + logging.info(f"Writing text file...") android_handler.create_txt(data, args.text_format) # Create JSON files if requested @@ -661,9 +660,9 @@ def export_single_json(args, data: Dict) -> None: ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) - logger.info(f"Writing JSON file...", extra={"clear": True}) + logging.info(f"Writing JSON file...", extra={"clear": True}) f.write(json_data) - logger.info(f"JSON file saved...({bytes_to_readable(len(json_data))})") + logging.info(f"JSON file saved...({bytes_to_readable(len(json_data))})") def export_multiple_json(args, data: Dict) -> None: @@ -697,7 +696,7 @@ def export_multiple_json(args, data: Dict) -> None: f.write(file_content) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}") + logging.info(f"Generated {total} JSON files in {convert_time_unit(total_time)}") def process_exported_chat(args, data: ChatCollection) -> None: @@ -737,11 +736,18 @@ class ClearLineFilter(logging.Filter): def setup_logging(level): log_handler_stdout = logging.StreamHandler() log_handler_stdout.terminator = "" - handlers = [log_handler_stdout] log_handler_stdout.addFilter(ClearLineFilter()) + log_handler_stdout.set_name("console") + + handlers = [log_handler_stdout] + if level == logging.DEBUG: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") - handlers.append(logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")) + log_handler_file = logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w") + log_handler_file.terminator = "" + log_handler_file.addFilter(ClearLineFilter()) + handlers.append(log_handler_file) + logging.basicConfig( level=level, format="[%(levelname)s] %(message)s%(line_end)s", @@ -755,23 +761,26 @@ def main(): parser = setup_argument_parser() args = parser.parse_args() - # Check for updates - if args.check_update: - exit(check_update()) - - # Validate arguments - validate_args(parser, args) - # Print banner if not suppressed if not args.no_banner: print(WTSEXPORTER_BANNER) if args.debug: setup_logging(logging.DEBUG) - logger.debug("Debug mode enabled.\n") + logging.debug("Debug mode enabled.") + for handler in logging.getLogger().handlers: + if handler.name == "console": + handler.setLevel(logging.INFO) else: setup_logging(logging.INFO) + # Check for updates + if args.check_update: + exit(check_update()) + + # Validate arguments + validate_args(parser, args) + # Create output directory if it doesn't exist os.makedirs(args.output, exist_ok=True) @@ -834,7 +843,7 @@ def main(): ios_media_handler.extract_media( args.backup, identifiers, args.decrypt_chunk_size) else: - logger.info( + logging.info( f"WhatsApp directory already exists, skipping WhatsApp file extraction.") # Set default DB paths if not provided @@ -851,7 +860,7 @@ def main(): args.pretty_print_json, args.avoid_encoding_json ) - logger.info(f"Incremental merge completed successfully.") + logging.info(f"Incremental merge completed successfully.") else: # Process contacts process_contacts(args, data) @@ -869,7 +878,7 @@ def main(): # Handle media directory handle_media_directory(args) - logger.info("Everything is done!") + logging.info("Everything is done!") if __name__ == "__main__": diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index 8a87420..c7d25f0 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -25,7 +25,6 @@ else: support_crypt15 = True -logger = logging.getLogger(__name__) class DecryptionError(Exception): @@ -126,7 +125,7 @@ def _decrypt_database(db_ciphertext: bytes, main_key: bytes, iv: bytes) -> bytes raise ValueError("Decryption/Authentication failed. Ensure you are using the correct key.") if len(db_compressed) < 2 or db_compressed[0] != 0x78: - logger.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}") + logging.debug(f"Data passes GCM but is not Zlib. Header: {db_compressed[:2].hex()}") raise ValueError( "Key is correct, but decrypted data is not a valid compressed stream. " "Is this even a valid WhatsApp database backup?" @@ -171,12 +170,12 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> except (zlib.error, ValueError): continue else: - logger.debug( + logging.debug( f"Decryption successful with known offsets: IV {iv}, DB {db}" ) return decrypted_db # Successful decryption - logger.info(f"Common offsets failed. Will attempt to brute-force") + logging.info(f"Common offsets failed. Will attempt to brute-force") offset_max = 200 workers = max_worker check_offset = partial(_attempt_decrypt_task, database=database, main_key=main_key) @@ -195,18 +194,18 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> found = True break if found: - logger.info( + logging.info( f"The offsets of your IV and database are {start_iv} and {start_db}, respectively." ) - logger.info( + logging.info( f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:" ) - logger.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47") + logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47") return result except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True) - print("\n") + logging.info("") raise KeyboardInterrupt( f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully..." ) @@ -346,7 +345,7 @@ def decrypt_backup( main_key, hex_key = _derive_main_enc_key(key) if show_crypt15: hex_key_str = ' '.join([hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]) - logger.info(f"The HEX key of the crypt15 backup is: {hex_key_str}") + logging.info(f"The HEX key of the crypt15 backup is: {hex_key_str}") else: main_key = key[126:] diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 0760b30..59cb2b6 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -17,7 +17,6 @@ from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_uni from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata -logger = logging.getLogger(__name__) def contacts(db, data, enrich_from_vcards): @@ -38,14 +37,14 @@ def contacts(db, data, enrich_from_vcards): if total_row_number == 0: if enrich_from_vcards is not None: - logger.info( - "No contacts profiles found in the default database, contacts will be imported from the specified vCard file.\n") + logging.info( + "No contacts profiles found in the default database, contacts will be imported from the specified vCard file.") else: - logger.warning( - "No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google\n") + logging.warning( + "No contacts profiles found in the default database, consider using --enrich-from-vcards for adopting names from exported contacts from Google") return False else: - logger.info(f"Processed {total_row_number} contacts\n") + logging.info(f"Processed {total_row_number} contacts") c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;") @@ -56,7 +55,7 @@ def contacts(db, data, enrich_from_vcards): current_chat.status = row["status"] pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") return True @@ -81,7 +80,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat) table_message = False except sqlite3.OperationalError as e: - logger.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n') + logging.debug(f'Got sql error "{e}" in _get_message_cursor_legacy trying fallback.\n') try: content_cursor = _get_messages_cursor_new( c, @@ -101,7 +100,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, pbar.update(1) total_time = pbar.format_dict['elapsed'] _get_reactions(db, data) - logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") # Helper functions for message processing @@ -127,7 +126,7 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_e {include_filter} {exclude_filter}""") except sqlite3.OperationalError as e: - logger.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n') + logging.debug(f'Got sql error "{e}" in _get_message_count trying fallback.\n') empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast") date_filter = f'AND timestamp {filter_date}' if filter_date is not None else '' @@ -315,8 +314,8 @@ def _fetch_row_safely(cursor): except sqlite3.OperationalError as e: # Not sure how often this might happen, but this check should reduce the overhead # if DEBUG flag is not set. - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n') + if logging.isEnabledFor(logging.DEBUG): + logging.debug(f'Got sql error "{e}" in _fetch_row_safely ignoring row.\n') continue @@ -518,7 +517,7 @@ def _get_reactions(db, data): if c.fetchone()[0] == 0: return - logger.info("Processing reactions...", extra={"clear": True}) + logging.info("Processing reactions...", extra={"clear": True}) c.execute(""" SELECT @@ -539,7 +538,7 @@ def _get_reactions(db, data): ON chat.jid_row_id = chat_jid._id """) except sqlite3.OperationalError: - logger.warning(f"Could not fetch reactions (schema might be too old or incompatible)") + logging.warning(f"Could not fetch reactions (schema might be too old or incompatible)") return rows = c.fetchall() @@ -574,7 +573,7 @@ def _get_reactions(db, data): message.reactions[sender_name] = reaction pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}") def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False): @@ -595,7 +594,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa try: content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat) except sqlite3.OperationalError as e: - logger.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n') + logging.debug(f'Got sql error "{e}" in _get_media_cursor_legacy trying fallback.\n') content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat) content = content_cursor.fetchone() @@ -609,7 +608,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") # Helper functions for media processing @@ -637,7 +636,7 @@ def _get_media_count(cursor, filter_empty, filter_date, filter_chat): {include_filter} {exclude_filter}""") except sqlite3.OperationalError as e: - logger.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n') + logging.debug(f'Got sql error "{e}" in _get_media_count trying fallback.\n') empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast") date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else '' include_filter = get_chat_condition( @@ -814,7 +813,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): try: rows = _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty) except sqlite3.OperationalError as e: - logger.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n') + logging.debug(f'Got sql error "{e}" in _execute_vcard_query_modern trying fallback.\n') rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty) total_row_number = len(rows) @@ -828,7 +827,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): _process_vcard_row(row, path, data) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty): """Execute vCard query for modern WhatsApp database schema.""" @@ -935,7 +934,7 @@ def calls(db, data, timezone_offset, filter_chat): if total_row_number == 0: return - logger.info(f"Processing calls...({total_row_number})", extra={"clear": True}) + logging.info(f"Processing calls...({total_row_number})", extra={"clear": True}) # Fetch call data calls_data = _fetch_calls_data(c, filter_chat) @@ -952,7 +951,7 @@ def calls(db, data, timezone_offset, filter_chat): # Add the calls chat to the data data.add_chat("000000000000000", chat) - logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") def _get_calls_count(c, filter_chat): """Get the count of call records that match the filter.""" @@ -1128,7 +1127,7 @@ def create_html( pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}") + logging.info(f"Generated {total_row_number} chats in {convert_time_unit(total_time)}") def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline): """Generate a single HTML file for a chat.""" diff --git a/Whatsapp_Chat_Exporter/exported_handler.py b/Whatsapp_Chat_Exporter/exported_handler.py index 45510f7..93808b0 100644 --- a/Whatsapp_Chat_Exporter/exported_handler.py +++ b/Whatsapp_Chat_Exporter/exported_handler.py @@ -9,7 +9,6 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.utility import Device, convert_time_unit -logger = logging.getLogger(__name__) def messages(path, data, assume_first_as_me=False): @@ -43,7 +42,7 @@ def messages(path, data, assume_first_as_me=False): ) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} messages & media in {convert_time_unit(total_time)}") return data diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index 503653e..ed3e056 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -13,7 +13,6 @@ from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Devic from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name -logger = logging.getLogger(__name__) def contacts(db, data): @@ -21,7 +20,7 @@ def contacts(db, data): c = db.cursor() c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") total_row_number = c.fetchone()[0] - logger.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True}) + logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True}) c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar: @@ -35,7 +34,7 @@ def contacts(db, data): data.add_chat(zwhatsapp_id, current_chat) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}") + logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}") def process_contact_avatars(current_chat, media_folder, contact_id): @@ -132,7 +131,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, process_contact_avatars(current_chat, media_folder, contact_id) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") # Get message count message_count_query = f""" @@ -149,7 +148,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, """ c.execute(message_count_query) total_row_number = c.fetchone()[0] - logger.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True}) + logging.info(f"Processing messages...(0/{total_row_number})", extra={"clear": True}) # Fetch messages messages_query = f""" @@ -226,7 +225,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} messages in {convert_time_unit(total_time)}") def process_message_data(message, content, is_group_message, data, message_map, no_reply): @@ -340,7 +339,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa """ c.execute(media_count_query) total_row_number = c.fetchone()[0] - logger.info(f"Processing media...(0/{total_row_number})", extra={"clear": True}) + logging.info(f"Processing media...(0/{total_row_number})", extra={"clear": True}) # Fetch media items media_query = f""" @@ -373,7 +372,7 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False): @@ -462,7 +461,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): c.execute(vcard_query) contents = c.fetchall() total_row_number = len(contents) - logger.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True}) + logging.info(f"Processing vCards...(0/{total_row_number})", extra={"clear": True}) # Create vCards directory path = f'{media_folder}/Message/vCards' @@ -474,7 +473,7 @@ def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): process_vcard_item(content, path, data) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} vCards in {convert_time_unit(total_time)}") def process_vcard_item(content, path, data): @@ -566,7 +565,7 @@ def calls(db, data, timezone_offset, filter_chat): # Add calls chat to data data.add_chat("000000000000000", chat) - logger.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") + logging.info(f"Processed {total_row_number} calls in {convert_time_unit(total_time)}") def process_call_record(content, chat, data, timezone_offset): diff --git a/Whatsapp_Chat_Exporter/ios_media_handler.py b/Whatsapp_Chat_Exporter/ios_media_handler.py index 321b196..1e26656 100644 --- a/Whatsapp_Chat_Exporter/ios_media_handler.py +++ b/Whatsapp_Chat_Exporter/ios_media_handler.py @@ -18,7 +18,6 @@ else: support_encrypted = True -logger = logging.getLogger(__name__) class BackupExtractor: @@ -60,7 +59,7 @@ class BackupExtractor: return False except sqlite3.DatabaseError as e: if str(e) == "authorization denied" and osname == "darwin": - logger.error( + logging.error( "You don't have permission to access the backup database. Please" "check your permissions or try moving the backup to somewhere else." ) @@ -73,13 +72,13 @@ class BackupExtractor: Handles the extraction of data from an encrypted iOS backup. """ if not support_encrypted: - logger.error("You don't have the dependencies to handle encrypted backup." + logging.error("You don't have the dependencies to handle encrypted backup." "Read more on how to deal with encrypted backup:" "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage" ) return - logger.info(f"Encryption detected on the backup!") + logging.info(f"Encryption detected on the backup!") password = getpass.getpass("Enter the password for the backup:") sys.stdout.write("\033[F\033[K") sys.stdout.flush() @@ -93,7 +92,7 @@ class BackupExtractor: Args: password (str): The password for the encrypted backup. """ - logger.info(f"Trying to open the iOS backup...") + logging.info(f"Trying to open the iOS backup...") self.backup = EncryptedBackup( backup_directory=self.base_dir, passphrase=password, @@ -101,8 +100,8 @@ class BackupExtractor: check_same_thread=False, decrypt_chunk_size=self.decrypt_chunk_size, ) - logger.info(f"iOS backup is opened successfully") - logger.info("Decrypting WhatsApp database...", extra={"clear": True}) + logging.info(f"iOS backup is opened successfully") + logging.info("Decrypting WhatsApp database...", extra={"clear": True}) try: self.backup.extract_file( relative_path=RelativePath.WHATSAPP_MESSAGES, @@ -120,17 +119,17 @@ class BackupExtractor: output_filename=self.identifiers.CALL, ) except ValueError: - logger.error("Failed to decrypt backup: incorrect password?") + logging.error("Failed to decrypt backup: incorrect password?") exit(7) except FileNotFoundError: - logger.error( + logging.error( "Essential WhatsApp files are missing from the iOS backup. " "Perhapse you enabled end-to-end encryption for the backup? " "See https://wts.knugi.dev/docs.html?dest=iose2e" ) exit(6) else: - logger.info(f"WhatsApp database decrypted successfully") + logging.info(f"WhatsApp database decrypted successfully") def _extract_decrypted_files(self): """Extract all WhatsApp files after decryption""" @@ -150,7 +149,7 @@ class BackupExtractor: ) total_time = pbar.format_dict['elapsed'] pbar.close() - logger.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}") + logging.info(f"All required files are decrypted and extracted in {convert_time_unit(total_time)}") def _extract_unencrypted_backup(self): """ @@ -169,10 +168,10 @@ class BackupExtractor: if not os.path.isfile(wts_db_path): if self.identifiers is WhatsAppIdentifier: - logger.error("WhatsApp database not found.") + logging.error("WhatsApp database not found.") else: - logger.error("WhatsApp Business database not found.") - logger.error( + logging.error("WhatsApp Business database not found.") + logging.error( "Essential WhatsApp files are missing from the iOS backup. " "Perhapse you enabled end-to-end encryption for the backup? " "See https://wts.knugi.dev/docs.html?dest=iose2e" @@ -182,12 +181,12 @@ class BackupExtractor: shutil.copyfile(wts_db_path, self.identifiers.MESSAGE) if not os.path.isfile(contact_db_path): - logger.warning(f"Contact database not found. Skipping...") + logging.warning(f"Contact database not found. Skipping...") else: shutil.copyfile(contact_db_path, self.identifiers.CONTACT) if not os.path.isfile(call_db_path): - logger.warning(f"Call database not found. Skipping...") + logging.warning(f"Call database not found. Skipping...") else: shutil.copyfile(call_db_path, self.identifiers.CALL) @@ -236,7 +235,7 @@ class BackupExtractor: os.utime(destination, (modification, modification)) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}") + logging.info(f"Extracted {total_row_number} WhatsApp files in {convert_time_unit(total_time)}") def extract_media(base_dir, identifiers, decrypt_chunk_size): diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index fe21492..f5aaba7 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -31,7 +31,6 @@ MAX_SIZE = 4 * 1024 * 1024 # Default 4MB ROW_SIZE = 0x3D0 CURRENT_TZ_OFFSET = datetime.now().astimezone().utcoffset().seconds / 3600 -logger = logging.getLogger(__name__) def convert_time_unit(time_second: int) -> str: @@ -168,7 +167,7 @@ def check_update(): try: raw = urllib.request.urlopen(PACKAGE_JSON) except Exception: - logger.error("Failed to check for updates.") + logging.error("Failed to check for updates.") return 1 else: with raw: @@ -178,19 +177,19 @@ def check_update(): __version__ = importlib.metadata.version("whatsapp_chat_exporter") current_version = tuple(map(int, __version__.split("."))) if current_version < latest_version: - logger.info( + logging.info( "===============Update===============\n" - "A newer version of WhatsApp Chat Exporter is available.\n" - f"Current version: {__version__}\n" - f"Latest version: {package_info['info']['version']}\n" + "A newer version of WhatsApp Chat Exporter is available." + f"Current version: {__version__}" + f"Latest version: {package_info['info']['version']}" ) if platform == "win32": - logger.info("Update with: pip install --upgrade whatsapp-chat-exporter\n") + logging.info("Update with: pip install --upgrade whatsapp-chat-exporter") else: - logger.info("Update with: pip3 install --upgrade whatsapp-chat-exporter\n") - logger.info("====================================\n") + logging.info("Update with: pip3 install --upgrade whatsapp-chat-exporter") + logging.info("====================================") else: - logger.info("You are using the latest version of WhatsApp Chat Exporter.\n") + logging.info("You are using the latest version of WhatsApp Chat Exporter.") return 0 @@ -253,7 +252,7 @@ def import_from_json(json_file: str, data: ChatCollection): data.add_chat(jid, chat) pbar.update(1) total_time = pbar.format_dict['elapsed'] - logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}") + logging.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}") class IncrementalMerger: @@ -283,10 +282,10 @@ class IncrementalMerger: """ json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] if not json_files: - logger.error("No JSON files found in the source directory.") + logging.error("No JSON files found in the source directory.") raise SystemExit(1) - logger.debug("JSON files found:", json_files) + logging.debug("JSON files found:", json_files) return json_files def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None: @@ -298,7 +297,7 @@ class IncrementalMerger: target_dir: Target directory path. json_file: Name of the JSON file. """ - logger.info(f"Copying '{json_file}' to target directory...") + logging.info(f"Copying '{json_file}' to target directory...") os.makedirs(target_dir, exist_ok=True) shutil.copy2(source_path, target_path) @@ -388,7 +387,7 @@ class IncrementalMerger: target_path: Path to target file. json_file: Name of the JSON file. """ - logger.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True}) + logging.info(f"Merging '{json_file}' with existing file in target directory...", extra={"clear": True}) source_data = self._load_chat_data(source_path) target_data = self._load_chat_data(target_path) @@ -400,10 +399,10 @@ class IncrementalMerger: merged_data = self._serialize_chats(merged_chats) if self._has_changes(merged_data, target_data): - logger.info(f"Changes detected in '{json_file}', updating target file...") + logging.info(f"Changes detected in '{json_file}', updating target file...") self._save_merged_data(target_path, merged_data) else: - logger.info(f"No changes detected in '{json_file}', skipping update.") + logging.info(f"No changes detected in '{json_file}', skipping update.") def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: """Check if media file should be copied. @@ -428,7 +427,7 @@ class IncrementalMerger: source_media_path = os.path.join(source_dir, media_dir) target_media_path = os.path.join(target_dir, media_dir) - logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + logging.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") if not os.path.exists(source_media_path): return @@ -443,7 +442,7 @@ class IncrementalMerger: target_file = os.path.join(target_root, file) if self._should_copy_media_file(source_file, target_file): - logger.debug(f"Copying '{source_file}' to '{target_file}'...") + logging.debug(f"Copying '{source_file}' to '{target_file}'...") shutil.copy2(source_file, target_file) def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None: @@ -456,7 +455,7 @@ class IncrementalMerger: """ json_files = self._get_json_files(source_dir) - logger.info("Starting incremental merge process...") + logging.info("Starting incremental merge process...") for json_file in json_files: source_path = os.path.join(source_dir, json_file) target_path = os.path.join(target_dir, json_file) @@ -893,7 +892,7 @@ def get_chat_type(chat_id: str) -> str: return "status_broadcast" elif chat_id.endswith("@broadcast"): return "broadcast_channel" - logger.warning(f"Unknown chat type for {chat_id}, defaulting to private_group") + logging.warning(f"Unknown chat type for {chat_id}, defaulting to private_group") return "private_group" diff --git a/Whatsapp_Chat_Exporter/vcards_contacts.py b/Whatsapp_Chat_Exporter/vcards_contacts.py index 707f555..c9eb0e1 100644 --- a/Whatsapp_Chat_Exporter/vcards_contacts.py +++ b/Whatsapp_Chat_Exporter/vcards_contacts.py @@ -6,7 +6,6 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore from Whatsapp_Chat_Exporter.utility import Device -logger = logging.getLogger(__name__) class ExportedContactNumbers(TypedDict): @@ -45,7 +44,7 @@ def decode_quoted_printable(value: str, charset: str) -> str: return bytes_val.decode(charset, errors="replace") except Exception: # Fallback: return the original value if decoding fails - logger.warning( + logging.warning( f"Failed to decode quoted-printable value: {value}, " f"charset: {charset}. Please report this issue." ) @@ -176,7 +175,7 @@ def read_vcards_file(vcf_file_path, default_country_code: str): if contact := process_vcard_entry(vcard): contacts.append(contact) - logger.info(f"Imported {len(contacts)} contacts/vcards") + logging.info(f"Imported {len(contacts)} contacts/vcards") return map_number_to_name(contacts, default_country_code) From eab98ba0d65b4d1262e48fc4a506b2f0f76eba5b Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:20:07 +0800 Subject: [PATCH 48/52] Fix crash on pre-release versions and enable update checks for pre-releases --- Whatsapp_Chat_Exporter/__main__.py | 8 ++++++-- Whatsapp_Chat_Exporter/utility.py | 25 +++++++++++++------------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 55aaf27..4aeb9da 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -274,6 +274,10 @@ def setup_argument_parser() -> ArgumentParser: "--check-update", dest="check_update", default=False, action='store_true', help="Check for updates (require Internet access)" ) + misc_group.add_argument( + "--check-update-pre", dest="check_update_pre", default=False, action='store_true', + help="Check for updates including pre-releases (require Internet access)" + ) misc_group.add_argument( "--assume-first-as-me", dest="assume_first_as_me", default=False, action='store_true', help="Assume the first message in a chat as sent by me (must be used together with -e)" @@ -775,8 +779,8 @@ def main(): setup_logging(logging.INFO) # Check for updates - if args.check_update: - exit(check_update()) + if args.check_update or args.check_update_pre: + exit(check_update(args.check_update_pre)) # Validate arguments validate_args(parser, args) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index f5aaba7..6ccbee6 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -157,11 +157,12 @@ def determine_day(last: int, current: int) -> Optional[datetime.date]: return current -def check_update(): +def check_update(include_beta: bool = False) -> int: import urllib.request import json import importlib from sys import platform + from packaging import version PACKAGE_JSON = "https://pypi.org/pypi/whatsapp-chat-exporter/json" try: @@ -172,21 +173,21 @@ def check_update(): else: with raw: package_info = json.load(raw) - latest_version = tuple( - map(int, package_info["info"]["version"].split("."))) - __version__ = importlib.metadata.version("whatsapp_chat_exporter") - current_version = tuple(map(int, __version__.split("."))) + if include_beta: + all_versions = [version.parse(v) for v in package_info["releases"].keys()] + latest_version = max(all_versions, key=lambda v: (v.release, v.pre)) + else: + latest_version = version.parse(package_info["info"]["version"]) + current_version = version.parse(importlib.metadata.version("whatsapp_chat_exporter")) if current_version < latest_version: logging.info( "===============Update===============\n" - "A newer version of WhatsApp Chat Exporter is available." - f"Current version: {__version__}" - f"Latest version: {package_info['info']['version']}" + "A newer version of WhatsApp Chat Exporter is available.\n" + f"Current version: {current_version}\n" + f"Latest version: {latest_version}" ) - if platform == "win32": - logging.info("Update with: pip install --upgrade whatsapp-chat-exporter") - else: - logging.info("Update with: pip3 install --upgrade whatsapp-chat-exporter") + pip_cmd = "pip" if platform == "win32" else "pip3" + logging.info(f"Update with: {pip_cmd} install --upgrade whatsapp-chat-exporter {'--pre' if include_beta else ''}") logging.info("====================================") else: logging.info("You are using the latest version of WhatsApp Chat Exporter.") From 28ba97d72f9231d7d352cab88b898c35df21960e Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:38:22 +0800 Subject: [PATCH 49/52] Fix CI on Windows --- .github/workflows/ci.yml | 6 ++++++ Whatsapp_Chat_Exporter/__main__.py | 3 +++ 2 files changed, 9 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8a4cd9..b728d58 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,14 +18,20 @@ jobs: include: - os: windows-latest python-version: "3.13" + env: + PYTHONUTF8: "1" - os: macos-latest python-version: "3.13" - os: windows-11-arm python-version: "3.13" + env: + PYTHONUTF8: "1" - os: macos-15-intel python-version: "3.13" - os: windows-latest python-version: "3.14" + env: + PYTHONUTF8: "1" steps: - name: Checkout code diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index 4aeb9da..b9d1616 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -767,6 +767,9 @@ def main(): # Print banner if not suppressed if not args.no_banner: + # Note: This may raise UnicodeEncodeError on Windows if the terminal + # doesn't support UTF-8 (e.g., Legacy CMD). Use a modern terminal + # or set PYTHONUTF8=1 in your environment. print(WTSEXPORTER_BANNER) if args.debug: From 1560c4964472815f2b602cc8925bf94e659dc33d Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:42:02 +0800 Subject: [PATCH 50/52] Update ci.yml --- .github/workflows/ci.yml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b728d58..6b22856 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,20 +18,17 @@ jobs: include: - os: windows-latest python-version: "3.13" - env: - PYTHONUTF8: "1" + python_utf8: "1" - os: macos-latest python-version: "3.13" - os: windows-11-arm python-version: "3.13" - env: - PYTHONUTF8: "1" + python_utf8: "1" - os: macos-15-intel python-version: "3.13" - os: windows-latest python-version: "3.14" - env: - PYTHONUTF8: "1" + python_utf8: "1" steps: - name: Checkout code @@ -43,6 +40,8 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies + env: + PYTHONUTF8: ${{ matrix.python_utf8 || '0' }} run: | python -m pip install --upgrade pip pip install .[all] pytest nuitka From 322b12a5a4d96fe8c26f1db00de5c1d36ebed8bc Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 18:02:30 +0800 Subject: [PATCH 51/52] Fix a crash in message counting if chat filter is in use --- Whatsapp_Chat_Exporter/android_handler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 59cb2b6..827e8bd 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -142,6 +142,8 @@ def _get_message_count(cursor, filter_empty, filter_date, filter_chat, jid_map_e FROM message LEFT JOIN chat ON chat._id = message.chat_row_id + INNER JOIN jid + ON jid._id = chat.jid_row_id INNER JOIN jid jid_global ON jid_global._id = chat.jid_row_id LEFT JOIN jid jid_group From 4e877987fbbc5d3cbab3545b129afc379f1d608a Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 18:08:43 +0800 Subject: [PATCH 52/52] Bump version & update readme --- README.md | 79 ++++++++++++++++++++++++++++---------------------- pyproject.toml | 2 +- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 04d0427..47ab7d2 100644 --- a/README.md +++ b/README.md @@ -145,20 +145,22 @@ After extracting, you will get this: Invoke the wtsexporter with --help option will show you all options available. ```sh > wtsexporter --help -usage: wtsexporter [-h] [--debug] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-d DB] [-k [KEY]] - [--call-db [CALL_DB_IOS]] [--wab WAB] [-o OUTPUT] [-j [JSON]] [--txt [TEXT_FORMAT]] [--no-html] - [--size [SIZE]] [--no-reply] [--avoid-encoding-json] [--pretty-print-json [PRETTY_PRINT_JSON]] - [--tg] [--per-chat] [--import] [-t TEMPLATE] [--offline OFFLINE] [--no-avatar] [--old-theme] - [--headline HEADLINE] [-c] [--create-separated-media] [--time-offset {-12 to 14}] [--date DATE] +usage: wtsexporter [-h] [--debug] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-d DB] + [-k [KEY]] [--call-db [CALL_DB_IOS]] [--wab WAB] [-o OUTPUT] [-j [JSON]] + [--txt [TEXT_FORMAT]] [--no-html] [--size [SIZE]] [--no-reply] [--avoid-encoding-json] + [--pretty-print-json [PRETTY_PRINT_JSON]] [--tg] [--per-chat] [--import] [-t TEMPLATE] + [--offline OFFLINE] [--no-avatar] [--old-theme] [--headline HEADLINE] [-c] + [--create-separated-media] [--time-offset {-12 to 14}] [--date DATE] [--date-format FORMAT] [--include [phone number ...]] [--exclude [phone number ...]] [--dont-filter-empty] [--enrich-from-vcards ENRICH_FROM_VCARDS] - [--default-country-code DEFAULT_COUNTRY_CODE] [--incremental-merge] [--source-dir SOURCE_DIR] - [--target-dir TARGET_DIR] [-s] [--check-update] [--assume-first-as-me] [--business] - [--decrypt-chunk-size DECRYPT_CHUNK_SIZE] [--max-bruteforce-worker MAX_BRUTEFORCE_WORKER] - [--no-banner] + [--default-country-code DEFAULT_COUNTRY_CODE] [--incremental-merge] + [--source-dir SOURCE_DIR] [--target-dir TARGET_DIR] [-s] [--check-update] + [--check-update-pre] [--assume-first-as-me] [--business] + [--decrypt-chunk-size DECRYPT_CHUNK_SIZE] + [--max-bruteforce-worker MAX_BRUTEFORCE_WORKER] [--no-banner] [--fix-dot-files] -A customizable Android and iOS/iPadOS WhatsApp database parser that will give you the history of your WhatsApp -conversations in HTML and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported. +A customizable Android and iOS/iPadOS WhatsApp database parser that will give you the history of your +WhatsApp conversations in HTML and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported. options: -h, --help show this help message and exit @@ -174,9 +176,10 @@ Input Files: -w, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite) -m, --media MEDIA Path to WhatsApp media folder (default: WhatsApp) -b, --backup BACKUP Path to Android (must be used together with -k)/iOS WhatsApp backup - -d, --db DB Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d) - -k, --key [KEY] Path to key file. If this option is set for crypt15 backup but nothing is specified, you will - be prompted to enter the key. + -d, --db DB Path to database file (default: + msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d) + -k, --key [KEY] Path to key file. If this option is set for crypt15 backup but nothing is + specified, you will be prompted to enter the key. --call-db [CALL_DB_IOS] Path to call database (default: 1b432994e958845fffe8e2f190f26d1511534088) iOS only --wab, --wa-backup WAB @@ -185,8 +188,8 @@ Input Files: Output Options: -o, --output OUTPUT Output to specific directory (default: result) -j, --json [JSON] Save the result to a single JSON file (default if present: result.json) - --txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default if present: - result/) + --txt [TEXT_FORMAT] Export chats in text format similar to what WhatsApp officially provided (default + if present: result/) --no-html Do not output html files --size, --output-size, --split [SIZE] Maximum (rough) size of a single output file in bytes, 0 for auto @@ -197,7 +200,8 @@ JSON Options: Don't encode non-ascii characters in the output JSON files --pretty-print-json [PRETTY_PRINT_JSON] Pretty print the output JSON. - --tg, --telegram Output the JSON in a format compatible with Telegram export (implies json-per-chat) + --tg, --telegram Output the JSON in a format compatible with Telegram export (implies json-per- + chat) --per-chat Output the JSON file per chat --import Import JSON file and convert to HTML output @@ -207,7 +211,8 @@ HTML Options: --offline OFFLINE Relative path to offline static files --no-avatar Do not render avatar in HTML output --old-theme Use the old Telegram-alike theme - --headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat name + --headline HEADLINE The custom headline for the HTML output. Use '??' as a placeholder for the chat + name Media Handling: -c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it @@ -223,24 +228,26 @@ Filtering Options: Include chats that match the supplied phone number --exclude [phone number ...] Exclude chats that match the supplied phone number - --dont-filter-empty By default, the exporter will not render chats with no valid message. Setting this flag will - cause the exporter to render those. This is useful if chat(s) are missing from the output + --dont-filter-empty By default, the exporter will not render chats with no valid message. Setting this + flag will cause the exporter to render those. This is useful if chat(s) are + missing from the output Contact Enrichment: --enrich-from-vcards ENRICH_FROM_VCARDS - Path to an exported vcf file from Google contacts export. Add names missing from WhatsApp's - default database + Path to an exported vcf file from Google contacts export. Add names missing from + WhatsApp's default database --default-country-code DEFAULT_COUNTRY_CODE - Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this - will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country + Use with --enrich-from-vcards. When numbers in the vcf file does not have a + country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use + the number of your own country Incremental Merging: - --incremental-merge Performs an incremental merge of two exports. Requires setting both --source-dir and --target- - dir. The chats (JSON files only) and media from the source directory will be merged into the - target directory. No chat messages or media will be deleted from the target directory; only - new chat messages and media will be added to it. This enables chat messages and media to be - deleted from the device to free up space, while ensuring they are preserved in the exported - backups. + --incremental-merge Performs an incremental merge of two exports. Requires setting both --source-dir + and --target-dir. The chats (JSON files only) and media from the source directory + will be merged into the target directory. No chat messages or media will be + deleted from the target directory; only new chat messages and media will be added + to it. This enables chat messages and media to be deleted from the device to free + up space, while ensuring they are preserved in the exported backups. --source-dir SOURCE_DIR Sets the source directory. Used for performing incremental merges. --target-dir TARGET_DIR @@ -249,16 +256,20 @@ Incremental Merging: Miscellaneous: -s, --showkey Show the HEX key used to decrypt the database --check-update Check for updates (require Internet access) + --check-update-pre Check for updates including pre-releases (require Internet access) --assume-first-as-me Assume the first message in a chat as sent by me (must be used together with -e) --business Use Whatsapp Business default files (iOS only) --decrypt-chunk-size DECRYPT_CHUNK_SIZE - Specify the chunk size for decrypting iOS backup, which may affect the decryption speed. + Specify the chunk size for decrypting iOS backup, which may affect the decryption + speed. --max-bruteforce-worker MAX_BRUTEFORCE_WORKER Specify the maximum number of worker for bruteforce decryption. --no-banner Do not show the banner + --fix-dot-files Fix files with a dot at the end of their name (allowing the outputs be stored in + FAT filesystems) -WhatsApp Chat Exporter: 0.13.0rc2 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open source -licenses. +WhatsApp Chat Exporter: 0.13.0 Licensed with MIT. See https://wts.knugi.dev/docs?dest=osl for all open +source licenses. ``` # Verifying Build Integrity @@ -266,7 +277,7 @@ licenses. To ensure that the binaries provided in the releases were built directly from this source code via GitHub Actions and have not been tampered with, GitHub Artifact Attestations is used. You can verify the authenticity of any pre-built binaries using the GitHub CLI. > [!NOTE] -> Requires version 0.13.0rc1 or newer. Legacy binaries are unsupported. +> Requires version 0.13.0 or newer. Legacy binaries are unsupported. ### Using Bash (Linux/WSL/macOS) diff --git a/pyproject.toml b/pyproject.toml index f467f1a..078791f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "whatsapp-chat-exporter" -version = "0.13.0rc2" +version = "0.13.0" description = "A Whatsapp database parser that provides history of your Whatsapp conversations in HTML and JSON. Android, iOS, iPadOS, Crypt12, Crypt14, Crypt15 supported." readme = "README.md" authors = [