diff --git a/README.md b/README.md index bace80a..1fa93a3 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ Do an iPhone/iPad Backup with iTunes/Finder first. > [!NOTE] > If you are working on unencrypted iOS/iPadOS backup, skip this. -If you want to work on an encrypted iOS/iPadOS Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py. +If you want to work on an encrypted iOS/iPadOS Backup, you should install `iphone_backup_decrypt` from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py. ```sh pip install git+https://github.com/KnugiHK/iphone_backup_decrypt ``` diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index b9d1616..b0b14db 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -578,6 +578,12 @@ def process_messages(args, data: ChatCollection) -> None: filter_chat, args.filter_empty ) + # Process polls + message_handler.polls( + db, data, args.filter_date, + filter_chat, args.filter_empty + ) + # Process calls process_calls(args, db, data, filter_chat, timing) @@ -747,7 +753,7 @@ def setup_logging(level): if level == logging.DEBUG: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") - log_handler_file = logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w") + log_handler_file = logging.FileHandler(f"wtsexporter-debug-{timestamp}.log", mode="w") log_handler_file.terminator = "" log_handler_file.addFilter(ClearLineFilter()) handlers.append(log_handler_file) diff --git a/Whatsapp_Chat_Exporter/android_crypt.py b/Whatsapp_Chat_Exporter/android_crypt.py index c7d25f0..c705617 100644 --- a/Whatsapp_Chat_Exporter/android_crypt.py +++ b/Whatsapp_Chat_Exporter/android_crypt.py @@ -198,7 +198,7 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) -> f"The offsets of your IV and database are {start_iv} and {start_db}, respectively." ) logging.info( - f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:" + f"To include your offsets in the exporter, please report it in the discussion thread on GitHub:" ) logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47") return result diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index 7087e43..f5c23d3 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -1134,6 +1134,10 @@ def _construct_call_description(content, call): return description +def polls(db, data, date_filter, chat_filter, empty_filter): + """Placeholder for future polls processing implementation.""" + return + # TODO: Marked for enhancement on multi-threaded processing def create_html( data, diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index 52f9bae..aa63c9c 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -67,6 +67,7 @@ class ChatCollection(MutableMapping): """Initialize an empty chat collection.""" self._chats: Dict[str, ChatStore] = {} self._system: Dict[str, Any] = {} + self.set_system("master_lookup", {}) def __getitem__(self, key: str) -> 'ChatStore': """Get a chat by its ID. Required for dict-like access.""" @@ -100,21 +101,32 @@ class ChatCollection(MutableMapping): Returns: Optional['ChatStore']: The chat if found, None otherwise """ - return self._chats.get(chat_id) + if chat_id in self._chats: + return self._chats[chat_id] + elif chat_id in self.get_system("master_lookup"): + return self._chats[self.get_system("master_lookup")[chat_id]] + else: + return None - def add_chat(self, chat_id: str, chat: 'ChatStore') -> None: + def add_chat(self, chat_id: str, chat: 'ChatStore', alias: Optional[str] = None) -> 'ChatStore': """ Add a new chat to the collection. Args: chat_id (str): The ID for the chat chat (ChatStore): The chat to add + alias (Optional[str]): An optional alias to associate with the chat ID Raises: TypeError: If chat is not a ChatStore object """ if not isinstance(chat, ChatStore): raise TypeError("Chat must be a ChatStore object") + if chat_id in self._chats: + raise ValueError("Chat ID already exists. Use get_chat to retrieve existing chat.") + if alias: + self.get_system("master_lookup")[alias] = chat_id + chat.aliases.append(alias) self._chats[chat_id] = chat return self._chats[chat_id] @@ -128,6 +140,34 @@ class ChatCollection(MutableMapping): if chat_id in self._chats: del self._chats[chat_id] + def add_alias(self, alias: str, chat_id: str) -> bool: + """ + Add or modify an alias for a chat. + + Args: + alias (str): The alias to add + chat_id (str): The ID of the chat to associate the alias with + """ + if chat_id not in self._chats: + raise ValueError("Chat ID does not exist. Add chat first.") + self.get_system("master_lookup")[alias] = chat_id + return True + + def remove_alias(self, alias: str) -> bool: + """ + Remove an alias. + + Args: + alias (str): The alias to remove + """ + + if alias in self.get_system("master_lookup"): + self._chats[self.get_system("master_lookup")[alias]].aliases.remove(alias) + del self.get_system("master_lookup")[alias] + return True + return False + + def items(self): """Get chat items (id, chat) pairs.""" return self._chats.items() @@ -208,6 +248,7 @@ class ChatStore: self.their_avatar_thumb = None self.status = None self.media_base = "" + self.aliases = [] def __len__(self) -> int: """Get number of chats. Required for dict-like access.""" @@ -361,6 +402,7 @@ class Message: self.thumb = None # Android specific self.sticker = False self.reactions = {} + self.poll = None def to_json(self) -> Dict[str, Any]: """Convert message to JSON-serializable dict.""" diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index ed3e056..2ee1091 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +import json import os import logging import shutil @@ -11,27 +12,47 @@ from markupsafe import escape as htmle from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name +from Whatsapp_Chat_Exporter.poll import decode_poll_from_receipt_blob def contacts(db, data): - """Process WhatsApp contacts with status information.""" + """Process WhatsApp contacts with name and status information.""" c = db.cursor() - c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") + c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT""") total_row_number = c.fetchone()[0] logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True}) - c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") + # Check if expected columns exist before querying, + # to handle different WhatsApp versions (mainly ZLID). + c.execute("PRAGMA table_info(ZWAADDRESSBOOKCONTACT)") + column_names = [info[1] for info in c.fetchall()] + all_cols = ["ZWHATSAPPID", "ZLID", "ZFULLNAME", "ZABOUTTEXT"] + columns = [col for col in all_cols if col in column_names] + + c.execute(f"""SELECT {', '.join(columns)} FROM ZWAADDRESSBOOKCONTACT""") with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar: while (content := c.fetchone()) is not None: zwhatsapp_id = content["ZWHATSAPPID"] + if zwhatsapp_id is None: + pbar.update(1) + continue if not zwhatsapp_id.endswith("@s.whatsapp.net"): zwhatsapp_id += "@s.whatsapp.net" current_chat = ChatStore(Device.IOS) - current_chat.status = content["ZABOUTTEXT"] - data.add_chat(zwhatsapp_id, current_chat) + if content["ZFULLNAME"]: + current_chat.name = content["ZFULLNAME"] + if content["ZABOUTTEXT"]: + current_chat.status = content["ZABOUTTEXT"] + # Index by WhatsApp ID, with LID as alias if available + data.add_chat( + zwhatsapp_id, + current_chat, + content["ZLID"] if "ZLID" in columns and content["ZLID"] else None + ) + pbar.update(1) total_time = pbar.format_dict['elapsed'] logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}") @@ -124,7 +145,12 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder)) else: current_chat = data.get_chat(contact_id) - current_chat.name = contact_name + # Only overwrite name if we have a better one (not a phone number) + # or if there's no existing name + if current_chat.name is None or contact_name is not None: + is_phone = contact_name.replace("+", "").replace(" ", "").isdigit() if contact_name else True + if not is_phone or current_chat.name is None: + current_chat.name = contact_name current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg") # Process avatar images @@ -133,6 +159,17 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, total_time = pbar.format_dict['elapsed'] logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}") + # Pre-load push names for JIDs not yet in data (especially @lid group members) + c.execute("""SELECT ZJID, ZPUSHNAME FROM ZWAPROFILEPUSHNAME WHERE ZPUSHNAME IS NOT NULL""") + while (row := c.fetchone()) is not None: + jid = row["ZJID"] + if jid not in data: + push_chat = ChatStore(Device.IOS) + push_chat.name = row["ZPUSHNAME"] + data.add_chat(jid, push_chat) + elif data.get_chat(jid).name is None: + data.get_chat(jid).name = row["ZPUSHNAME"] + # Get message count message_count_query = f""" SELECT count() @@ -248,7 +285,7 @@ def process_message_data(message, content, is_group_message, data, message_map, # Handle metadata messages if content["ZMESSAGETYPE"] == 6: - return process_metadata_message(message, content, is_group_message) + return process_metadata_message(message, content, is_group_message, data) # Handle quoted replies if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and not no_reply: @@ -256,6 +293,15 @@ def process_message_data(message, content, is_group_message, data, message_map, message.reply = quoted.decode() message.quoted_data = message_map.get(message.reply) + # Skip poll vote update messages (type 66) + if content["ZMESSAGETYPE"] == 66: + return True # Invalid, skip + + # Handle poll messages (type 46) - will be enriched by polls() later + if content["ZMESSAGETYPE"] == 46: + message.data = "\U0001f4ca Poll" + return False # Valid, populated later by polls() + # Handle stickers if content["ZMESSAGETYPE"] == 15: message.sticker = True @@ -266,21 +312,51 @@ def process_message_data(message, content, is_group_message, data, message_map, return False # Message is valid -def process_metadata_message(message, content, is_group_message): +def _parse_group_action(ztext, data): + if ztext.endswith("@lid") or ztext.endswith("@s.whatsapp.net"): + # This is likely a group member change action + # Not really sure actually + name = None + if ztext in data: + name = data.get_chat(ztext).name + if "@" in ztext: + fallback = ztext.split('@')[0] + else: + fallback = None + entity = name or fallback + + return f"{entity} join the group" + + elif ztext.startswith("{") and ztext.endswith("}"): + try: + metadata = json.loads(ztext) + except json.JSONDecodeError: + return ztext # Not a JSON string, return as-is + entity = metadata.get('author', 'Someone') + if entity is not "Someone": + name = None + if entity in data: + name = data.get_chat(entity).name + if "@" in entity: + fallback = entity.split('@')[0] + else: + fallback = None + entity = name or fallback + return f"{entity} changed the group name to {metadata.get('subject', 'Unknown')}." + elif ztext == "admin_add": + return f"The administrator has restricted participant additions to admins only." + else: + return "Unsupported WhatsApp internal message." + + +def process_metadata_message(message, content, is_group_message, data): """Process metadata messages (action_type 6).""" if is_group_message: # Group if content["ZTEXT"] is not None: - # Changed name - try: - int(content["ZTEXT"]) - except ValueError: - msg = f"The group name changed to {content['ZTEXT']}" - message.data = msg - message.meta = True - return False # Valid message - else: - return True # Invalid message + message.data = _parse_group_action(content["ZTEXT"], data) + message.meta = True + return False else: message.data = None return False @@ -596,6 +672,187 @@ def process_call_record(content, chat, data, timezone_offset): chat.add_message(call.key_id, call) +def _resolve_voter_name(voter_jid, is_creator, message, data): + """Resolve a voter JID to a display name. + + Args: + voter_jid (str or None): The voter's JID (often LID format like '123@lid'). + is_creator (bool): Whether this voter is the poll creator. + message (Message): The poll message object. + data (ChatCollection): The chat data collection for name lookups. + + Returns: + str: The resolved display name. + """ + if voter_jid is None: + if is_creator: + # Field 6 in the protobuf is always the device owner's vote, + # not the poll message sender's vote + return "You" + return "Unknown" + + # Try direct lookup in data + if voter_jid in data: + chat = data.get_chat(voter_jid) + if chat is not None and chat.name: + return chat.name + + # Try with @s.whatsapp.net suffix + if "@" not in voter_jid: + jid_with_suffix = f"{voter_jid}@s.whatsapp.net" + if jid_with_suffix in data: + chat = data.get_chat(jid_with_suffix) + if chat is not None and chat.name: + return chat.name + + # Fallback: strip domain part + if "@" in voter_jid: + return voter_jid.split("@")[0] + return voter_jid + + +def polls(db, data, filter_date, filter_chat, filter_empty): + """Process WhatsApp poll messages (type 46) from the database. + + Queries ZWAMESSAGEINFO.ZRECEIPTINFO for poll messages, decodes the + protobuf blobs, and enriches the corresponding Message objects with + structured poll data. + + Args: + db: SQLite database connection. + data (ChatCollection): The chat data collection. + filter_date: Date filter SQL fragment or None. + filter_chat: Tuple of (include_filter, exclude_filter). + filter_empty: Whether to filter empty chats. + """ + c = db.cursor() + + # Build filter conditions + chat_filter_include = get_chat_condition( + filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + chat_filter_exclude = get_chat_condition( + filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios") + date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else '' + + # Count poll messages + count_query = f""" + SELECT count() + FROM ZWAMESSAGE + JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE ZWAMESSAGE.ZMESSAGETYPE = 46 + AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + """ + c.execute(count_query) + total_row_number = c.fetchone()[0] + + if total_row_number == 0: + return + + logging.info(f"Processing polls...(0/{total_row_number})", extra={"clear": True}) + + # Fetch poll data + poll_query = f""" + SELECT ZWACHATSESSION.ZCONTACTJID, + ZWAMESSAGE.Z_PK AS ZMESSAGE, + ZWAMESSAGEINFO.ZRECEIPTINFO + FROM ZWAMESSAGE + JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK + INNER JOIN ZWACHATSESSION + ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK + LEFT JOIN ZWAGROUPMEMBER + ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK + WHERE ZWAMESSAGE.ZMESSAGETYPE = 46 + AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL + {date_filter} + {chat_filter_include} + {chat_filter_exclude} + ORDER BY ZWAMESSAGE.ZMESSAGEDATE ASC + """ + c.execute(poll_query) + + with tqdm(total=total_row_number, desc="Processing polls", unit="poll", leave=False) as pbar: + while (content := c.fetchone()) is not None: + contact_id = content["ZCONTACTJID"] + message_pk = content["ZMESSAGE"] + receipt_blob = content["ZRECEIPTINFO"] + + current_chat = data.get_chat(contact_id) + if current_chat is None: + pbar.update(1) + continue + + message = current_chat.get_message(message_pk) + if message is None: + pbar.update(1) + continue + + try: + poll_data = decode_poll_from_receipt_blob(receipt_blob) + except Exception as e: + logging.warning(f"Failed to decode poll {message_pk}: {e}") + pbar.update(1) + continue + + if poll_data is None: + pbar.update(1) + continue + + # Build structured poll result with vote tallies + options = poll_data['options'] + votes = poll_data['votes'] + + # Tally votes per option + option_votes = {i: [] for i in range(len(options))} + seen_voters = set() + for vote in votes: + voter_name = _resolve_voter_name( + vote.get('voter_jid'), vote.get('is_creator', False), message, data) + voter_key = vote.get('voter_jid') or ("__creator__" if vote.get('is_creator') else "__unknown__") + if voter_key not in seen_voters: + seen_voters.add(voter_key) + for idx in vote.get('selected_indices', []): + if 0 <= idx < len(options): + option_votes[idx].append(voter_name) + + # Find max vote count for percentage calculation + max_votes = max((len(v) for v in option_votes.values()), default=0) + + # Build option list with tallies + option_list = [] + for i, opt_text in enumerate(options): + voters = option_votes.get(i, []) + vote_count = len(voters) + vote_pct = (vote_count / max_votes * 100) if max_votes > 0 else 0 + option_list.append({ + 'text': opt_text, + 'vote_count': vote_count, + 'vote_pct': vote_pct, + 'voters': voters, + }) + + total_voters = len(seen_voters) + + # Set poll data on message + message.poll = { + 'type': 'poll', + 'question': poll_data['question'], + 'options': option_list, + 'total_voters': total_voters, + } + message.data = f"\U0001f4ca {poll_data['question']}" + + pbar.update(1) + total_time = pbar.format_dict['elapsed'] + logging.info(f"Processed {total_row_number} polls in {convert_time_unit(total_time)}") + + def format_call_data(call, content): """Format call data message based on call attributes.""" # Basic call info diff --git a/Whatsapp_Chat_Exporter/poll.py b/Whatsapp_Chat_Exporter/poll.py new file mode 100644 index 0000000..1be32cc --- /dev/null +++ b/Whatsapp_Chat_Exporter/poll.py @@ -0,0 +1,190 @@ +""" +WhatsApp Poll decoder for iOS/macOS. + +Decodes poll messages (ZMESSAGETYPE = 46) stored as protobuf blobs +in ZWAMESSAGEINFO.ZRECEIPTINFO. Uses raw varint/wire-type parsing +with no external protobuf library dependency. +""" + +import struct +import logging + + +def _decode_varint(data, pos): + """Decode a protobuf varint starting at pos. + + Args: + data (bytes): The protobuf data. + pos (int): Starting position. + + Returns: + tuple: (value, new_pos) + + Raises: + ValueError: If the varint is truncated. + """ + result = 0 + shift = 0 + while pos < len(data): + b = data[pos] + pos += 1 + result |= (b & 0x7F) << shift + if not (b & 0x80): + return result, pos + shift += 7 + raise ValueError("Truncated varint") + + +def decode_protobuf_fields(data): + """ + Decode raw protobuf bytes into list of (field_number, wire_type_name, value). + Handles: varint (0), fixed64 (1), length-delimited/bytes (2), fixed32 (5). + + Args: + data (bytes): Raw protobuf data. + + Returns: + list: List of (field_number, wire_type_name, value) tuples. + """ + fields = [] + pos = 0 + while pos < len(data): + try: + tag, pos = _decode_varint(data, pos) + field_num = tag >> 3 + wire_type = tag & 0x7 + + if wire_type == 0: # varint + val, pos = _decode_varint(data, pos) + fields.append((field_num, 'varint', val)) + elif wire_type == 2: # length-delimited + length, pos = _decode_varint(data, pos) + val = data[pos:pos + length] + pos += length + fields.append((field_num, 'bytes', val)) + elif wire_type == 5: # fixed32 + val = struct.unpack(' {% endif %}
📊 {{ msg.poll.question }}
+ {% for option in msg.poll.options %} +{{ option.voters | join(', ') }}
+ {% endif %} +{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}
+- {% if msg.meta == true or msg.media == false and msg.data is none %} + {% if msg.poll %} +
📊 {{ msg.poll.question }}
+ {% for option in msg.poll.options %} +{{ option.voters | join(', ') }}
+ {% endif %} +{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}
+