mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-03-29 09:15:27 +00:00
Merge pull request #199 from watercrossing/fix/ios-polls-contacts-typo
iOS: Add poll support, fix contact name resolution, fix typos
This commit is contained in:
@@ -113,7 +113,7 @@ Do an iPhone/iPad Backup with iTunes/Finder first.
|
||||
> [!NOTE]
|
||||
> If you are working on unencrypted iOS/iPadOS backup, skip this.
|
||||
|
||||
If you want to work on an encrypted iOS/iPadOS Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
|
||||
If you want to work on an encrypted iOS/iPadOS Backup, you should install `iphone_backup_decrypt` from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
|
||||
```sh
|
||||
pip install git+https://github.com/KnugiHK/iphone_backup_decrypt
|
||||
```
|
||||
|
||||
@@ -578,6 +578,12 @@ def process_messages(args, data: ChatCollection) -> None:
|
||||
filter_chat, args.filter_empty
|
||||
)
|
||||
|
||||
# Process polls
|
||||
message_handler.polls(
|
||||
db, data, args.filter_date,
|
||||
filter_chat, args.filter_empty
|
||||
)
|
||||
|
||||
# Process calls
|
||||
process_calls(args, db, data, filter_chat, timing)
|
||||
|
||||
@@ -747,7 +753,7 @@ def setup_logging(level):
|
||||
|
||||
if level == logging.DEBUG:
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
log_handler_file = logging.FileHandler(f"wtsexpoter-debug-{timestamp}.log", mode="w")
|
||||
log_handler_file = logging.FileHandler(f"wtsexporter-debug-{timestamp}.log", mode="w")
|
||||
log_handler_file.terminator = ""
|
||||
log_handler_file.addFilter(ClearLineFilter())
|
||||
handlers.append(log_handler_file)
|
||||
|
||||
@@ -198,7 +198,7 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
|
||||
f"The offsets of your IV and database are {start_iv} and {start_db}, respectively."
|
||||
)
|
||||
logging.info(
|
||||
f"To include your offsets in the expoter, please report it in the discussion thread on GitHub:"
|
||||
f"To include your offsets in the exporter, please report it in the discussion thread on GitHub:"
|
||||
)
|
||||
logging.info(f"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47")
|
||||
return result
|
||||
|
||||
@@ -1134,6 +1134,10 @@ def _construct_call_description(content, call):
|
||||
return description
|
||||
|
||||
|
||||
def polls(db, data, date_filter, chat_filter, empty_filter):
|
||||
"""Placeholder for future polls processing implementation."""
|
||||
return
|
||||
|
||||
# TODO: Marked for enhancement on multi-threaded processing
|
||||
def create_html(
|
||||
data,
|
||||
|
||||
@@ -67,6 +67,7 @@ class ChatCollection(MutableMapping):
|
||||
"""Initialize an empty chat collection."""
|
||||
self._chats: Dict[str, ChatStore] = {}
|
||||
self._system: Dict[str, Any] = {}
|
||||
self.set_system("master_lookup", {})
|
||||
|
||||
def __getitem__(self, key: str) -> 'ChatStore':
|
||||
"""Get a chat by its ID. Required for dict-like access."""
|
||||
@@ -100,21 +101,32 @@ class ChatCollection(MutableMapping):
|
||||
Returns:
|
||||
Optional['ChatStore']: The chat if found, None otherwise
|
||||
"""
|
||||
return self._chats.get(chat_id)
|
||||
if chat_id in self._chats:
|
||||
return self._chats[chat_id]
|
||||
elif chat_id in self.get_system("master_lookup"):
|
||||
return self._chats[self.get_system("master_lookup")[chat_id]]
|
||||
else:
|
||||
return None
|
||||
|
||||
def add_chat(self, chat_id: str, chat: 'ChatStore') -> None:
|
||||
def add_chat(self, chat_id: str, chat: 'ChatStore', alias: Optional[str] = None) -> 'ChatStore':
|
||||
"""
|
||||
Add a new chat to the collection.
|
||||
|
||||
Args:
|
||||
chat_id (str): The ID for the chat
|
||||
chat (ChatStore): The chat to add
|
||||
alias (Optional[str]): An optional alias to associate with the chat ID
|
||||
|
||||
Raises:
|
||||
TypeError: If chat is not a ChatStore object
|
||||
"""
|
||||
if not isinstance(chat, ChatStore):
|
||||
raise TypeError("Chat must be a ChatStore object")
|
||||
if chat_id in self._chats:
|
||||
raise ValueError("Chat ID already exists. Use get_chat to retrieve existing chat.")
|
||||
if alias:
|
||||
self.get_system("master_lookup")[alias] = chat_id
|
||||
chat.aliases.append(alias)
|
||||
self._chats[chat_id] = chat
|
||||
return self._chats[chat_id]
|
||||
|
||||
@@ -128,6 +140,34 @@ class ChatCollection(MutableMapping):
|
||||
if chat_id in self._chats:
|
||||
del self._chats[chat_id]
|
||||
|
||||
def add_alias(self, alias: str, chat_id: str) -> bool:
|
||||
"""
|
||||
Add or modify an alias for a chat.
|
||||
|
||||
Args:
|
||||
alias (str): The alias to add
|
||||
chat_id (str): The ID of the chat to associate the alias with
|
||||
"""
|
||||
if chat_id not in self._chats:
|
||||
raise ValueError("Chat ID does not exist. Add chat first.")
|
||||
self.get_system("master_lookup")[alias] = chat_id
|
||||
return True
|
||||
|
||||
def remove_alias(self, alias: str) -> bool:
|
||||
"""
|
||||
Remove an alias.
|
||||
|
||||
Args:
|
||||
alias (str): The alias to remove
|
||||
"""
|
||||
|
||||
if alias in self.get_system("master_lookup"):
|
||||
self._chats[self.get_system("master_lookup")[alias]].aliases.remove(alias)
|
||||
del self.get_system("master_lookup")[alias]
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def items(self):
|
||||
"""Get chat items (id, chat) pairs."""
|
||||
return self._chats.items()
|
||||
@@ -208,6 +248,7 @@ class ChatStore:
|
||||
self.their_avatar_thumb = None
|
||||
self.status = None
|
||||
self.media_base = ""
|
||||
self.aliases = []
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get number of chats. Required for dict-like access."""
|
||||
@@ -361,6 +402,7 @@ class Message:
|
||||
self.thumb = None # Android specific
|
||||
self.sticker = False
|
||||
self.reactions = {}
|
||||
self.poll = None
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
"""Convert message to JSON-serializable dict."""
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import shutil
|
||||
@@ -11,27 +12,47 @@ from markupsafe import escape as htmle
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device
|
||||
from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name
|
||||
from Whatsapp_Chat_Exporter.poll import decode_poll_from_receipt_blob
|
||||
|
||||
|
||||
|
||||
|
||||
def contacts(db, data):
|
||||
"""Process WhatsApp contacts with status information."""
|
||||
"""Process WhatsApp contacts with name and status information."""
|
||||
c = db.cursor()
|
||||
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
|
||||
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT""")
|
||||
total_row_number = c.fetchone()[0]
|
||||
logging.info(f"Pre-processing contacts...({total_row_number})", extra={"clear": True})
|
||||
|
||||
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
|
||||
# Check if expected columns exist before querying,
|
||||
# to handle different WhatsApp versions (mainly ZLID).
|
||||
c.execute("PRAGMA table_info(ZWAADDRESSBOOKCONTACT)")
|
||||
column_names = [info[1] for info in c.fetchall()]
|
||||
all_cols = ["ZWHATSAPPID", "ZLID", "ZFULLNAME", "ZABOUTTEXT"]
|
||||
columns = [col for col in all_cols if col in column_names]
|
||||
|
||||
c.execute(f"""SELECT {', '.join(columns)} FROM ZWAADDRESSBOOKCONTACT""")
|
||||
with tqdm(total=total_row_number, desc="Processing contacts", unit="contact", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
zwhatsapp_id = content["ZWHATSAPPID"]
|
||||
if zwhatsapp_id is None:
|
||||
pbar.update(1)
|
||||
continue
|
||||
if not zwhatsapp_id.endswith("@s.whatsapp.net"):
|
||||
zwhatsapp_id += "@s.whatsapp.net"
|
||||
|
||||
current_chat = ChatStore(Device.IOS)
|
||||
current_chat.status = content["ZABOUTTEXT"]
|
||||
data.add_chat(zwhatsapp_id, current_chat)
|
||||
if content["ZFULLNAME"]:
|
||||
current_chat.name = content["ZFULLNAME"]
|
||||
if content["ZABOUTTEXT"]:
|
||||
current_chat.status = content["ZABOUTTEXT"]
|
||||
# Index by WhatsApp ID, with LID as alias if available
|
||||
data.add_chat(
|
||||
zwhatsapp_id,
|
||||
current_chat,
|
||||
content["ZLID"] if "ZLID" in columns and content["ZLID"] else None
|
||||
)
|
||||
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logging.info(f"Pre-processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
||||
@@ -124,7 +145,12 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
|
||||
else:
|
||||
current_chat = data.get_chat(contact_id)
|
||||
current_chat.name = contact_name
|
||||
# Only overwrite name if we have a better one (not a phone number)
|
||||
# or if there's no existing name
|
||||
if current_chat.name is None or contact_name is not None:
|
||||
is_phone = contact_name.replace("+", "").replace(" ", "").isdigit() if contact_name else True
|
||||
if not is_phone or current_chat.name is None:
|
||||
current_chat.name = contact_name
|
||||
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
|
||||
|
||||
# Process avatar images
|
||||
@@ -133,6 +159,17 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logging.info(f"Processed {total_row_number} contacts in {convert_time_unit(total_time)}")
|
||||
|
||||
# Pre-load push names for JIDs not yet in data (especially @lid group members)
|
||||
c.execute("""SELECT ZJID, ZPUSHNAME FROM ZWAPROFILEPUSHNAME WHERE ZPUSHNAME IS NOT NULL""")
|
||||
while (row := c.fetchone()) is not None:
|
||||
jid = row["ZJID"]
|
||||
if jid not in data:
|
||||
push_chat = ChatStore(Device.IOS)
|
||||
push_chat.name = row["ZPUSHNAME"]
|
||||
data.add_chat(jid, push_chat)
|
||||
elif data.get_chat(jid).name is None:
|
||||
data.get_chat(jid).name = row["ZPUSHNAME"]
|
||||
|
||||
# Get message count
|
||||
message_count_query = f"""
|
||||
SELECT count()
|
||||
@@ -248,7 +285,7 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
||||
|
||||
# Handle metadata messages
|
||||
if content["ZMESSAGETYPE"] == 6:
|
||||
return process_metadata_message(message, content, is_group_message)
|
||||
return process_metadata_message(message, content, is_group_message, data)
|
||||
|
||||
# Handle quoted replies
|
||||
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and not no_reply:
|
||||
@@ -256,6 +293,15 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
||||
message.reply = quoted.decode()
|
||||
message.quoted_data = message_map.get(message.reply)
|
||||
|
||||
# Skip poll vote update messages (type 66)
|
||||
if content["ZMESSAGETYPE"] == 66:
|
||||
return True # Invalid, skip
|
||||
|
||||
# Handle poll messages (type 46) - will be enriched by polls() later
|
||||
if content["ZMESSAGETYPE"] == 46:
|
||||
message.data = "\U0001f4ca Poll"
|
||||
return False # Valid, populated later by polls()
|
||||
|
||||
# Handle stickers
|
||||
if content["ZMESSAGETYPE"] == 15:
|
||||
message.sticker = True
|
||||
@@ -266,21 +312,51 @@ def process_message_data(message, content, is_group_message, data, message_map,
|
||||
return False # Message is valid
|
||||
|
||||
|
||||
def process_metadata_message(message, content, is_group_message):
|
||||
def _parse_group_action(ztext, data):
|
||||
if ztext.endswith("@lid") or ztext.endswith("@s.whatsapp.net"):
|
||||
# This is likely a group member change action
|
||||
# Not really sure actually
|
||||
name = None
|
||||
if ztext in data:
|
||||
name = data.get_chat(ztext).name
|
||||
if "@" in ztext:
|
||||
fallback = ztext.split('@')[0]
|
||||
else:
|
||||
fallback = None
|
||||
entity = name or fallback
|
||||
|
||||
return f"{entity} join the group"
|
||||
|
||||
elif ztext.startswith("{") and ztext.endswith("}"):
|
||||
try:
|
||||
metadata = json.loads(ztext)
|
||||
except json.JSONDecodeError:
|
||||
return ztext # Not a JSON string, return as-is
|
||||
entity = metadata.get('author', 'Someone')
|
||||
if entity is not "Someone":
|
||||
name = None
|
||||
if entity in data:
|
||||
name = data.get_chat(entity).name
|
||||
if "@" in entity:
|
||||
fallback = entity.split('@')[0]
|
||||
else:
|
||||
fallback = None
|
||||
entity = name or fallback
|
||||
return f"{entity} changed the group name to {metadata.get('subject', 'Unknown')}."
|
||||
elif ztext == "admin_add":
|
||||
return f"The administrator has restricted participant additions to admins only."
|
||||
else:
|
||||
return "Unsupported WhatsApp internal message."
|
||||
|
||||
|
||||
def process_metadata_message(message, content, is_group_message, data):
|
||||
"""Process metadata messages (action_type 6)."""
|
||||
if is_group_message:
|
||||
# Group
|
||||
if content["ZTEXT"] is not None:
|
||||
# Changed name
|
||||
try:
|
||||
int(content["ZTEXT"])
|
||||
except ValueError:
|
||||
msg = f"The group name changed to {content['ZTEXT']}"
|
||||
message.data = msg
|
||||
message.meta = True
|
||||
return False # Valid message
|
||||
else:
|
||||
return True # Invalid message
|
||||
message.data = _parse_group_action(content["ZTEXT"], data)
|
||||
message.meta = True
|
||||
return False
|
||||
else:
|
||||
message.data = None
|
||||
return False
|
||||
@@ -596,6 +672,187 @@ def process_call_record(content, chat, data, timezone_offset):
|
||||
chat.add_message(call.key_id, call)
|
||||
|
||||
|
||||
def _resolve_voter_name(voter_jid, is_creator, message, data):
|
||||
"""Resolve a voter JID to a display name.
|
||||
|
||||
Args:
|
||||
voter_jid (str or None): The voter's JID (often LID format like '123@lid').
|
||||
is_creator (bool): Whether this voter is the poll creator.
|
||||
message (Message): The poll message object.
|
||||
data (ChatCollection): The chat data collection for name lookups.
|
||||
|
||||
Returns:
|
||||
str: The resolved display name.
|
||||
"""
|
||||
if voter_jid is None:
|
||||
if is_creator:
|
||||
# Field 6 in the protobuf is always the device owner's vote,
|
||||
# not the poll message sender's vote
|
||||
return "You"
|
||||
return "Unknown"
|
||||
|
||||
# Try direct lookup in data
|
||||
if voter_jid in data:
|
||||
chat = data.get_chat(voter_jid)
|
||||
if chat is not None and chat.name:
|
||||
return chat.name
|
||||
|
||||
# Try with @s.whatsapp.net suffix
|
||||
if "@" not in voter_jid:
|
||||
jid_with_suffix = f"{voter_jid}@s.whatsapp.net"
|
||||
if jid_with_suffix in data:
|
||||
chat = data.get_chat(jid_with_suffix)
|
||||
if chat is not None and chat.name:
|
||||
return chat.name
|
||||
|
||||
# Fallback: strip domain part
|
||||
if "@" in voter_jid:
|
||||
return voter_jid.split("@")[0]
|
||||
return voter_jid
|
||||
|
||||
|
||||
def polls(db, data, filter_date, filter_chat, filter_empty):
|
||||
"""Process WhatsApp poll messages (type 46) from the database.
|
||||
|
||||
Queries ZWAMESSAGEINFO.ZRECEIPTINFO for poll messages, decodes the
|
||||
protobuf blobs, and enriches the corresponding Message objects with
|
||||
structured poll data.
|
||||
|
||||
Args:
|
||||
db: SQLite database connection.
|
||||
data (ChatCollection): The chat data collection.
|
||||
filter_date: Date filter SQL fragment or None.
|
||||
filter_chat: Tuple of (include_filter, exclude_filter).
|
||||
filter_empty: Whether to filter empty chats.
|
||||
"""
|
||||
c = db.cursor()
|
||||
|
||||
# Build filter conditions
|
||||
chat_filter_include = get_chat_condition(
|
||||
filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
|
||||
chat_filter_exclude = get_chat_condition(
|
||||
filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
|
||||
date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
|
||||
|
||||
# Count poll messages
|
||||
count_query = f"""
|
||||
SELECT count()
|
||||
FROM ZWAMESSAGE
|
||||
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
|
||||
INNER JOIN ZWACHATSESSION
|
||||
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
|
||||
LEFT JOIN ZWAGROUPMEMBER
|
||||
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
|
||||
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
|
||||
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
|
||||
{date_filter}
|
||||
{chat_filter_include}
|
||||
{chat_filter_exclude}
|
||||
"""
|
||||
c.execute(count_query)
|
||||
total_row_number = c.fetchone()[0]
|
||||
|
||||
if total_row_number == 0:
|
||||
return
|
||||
|
||||
logging.info(f"Processing polls...(0/{total_row_number})", extra={"clear": True})
|
||||
|
||||
# Fetch poll data
|
||||
poll_query = f"""
|
||||
SELECT ZWACHATSESSION.ZCONTACTJID,
|
||||
ZWAMESSAGE.Z_PK AS ZMESSAGE,
|
||||
ZWAMESSAGEINFO.ZRECEIPTINFO
|
||||
FROM ZWAMESSAGE
|
||||
JOIN ZWAMESSAGEINFO ON ZWAMESSAGEINFO.ZMESSAGE = ZWAMESSAGE.Z_PK
|
||||
INNER JOIN ZWACHATSESSION
|
||||
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
|
||||
LEFT JOIN ZWAGROUPMEMBER
|
||||
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
|
||||
WHERE ZWAMESSAGE.ZMESSAGETYPE = 46
|
||||
AND ZWAMESSAGEINFO.ZRECEIPTINFO IS NOT NULL
|
||||
{date_filter}
|
||||
{chat_filter_include}
|
||||
{chat_filter_exclude}
|
||||
ORDER BY ZWAMESSAGE.ZMESSAGEDATE ASC
|
||||
"""
|
||||
c.execute(poll_query)
|
||||
|
||||
with tqdm(total=total_row_number, desc="Processing polls", unit="poll", leave=False) as pbar:
|
||||
while (content := c.fetchone()) is not None:
|
||||
contact_id = content["ZCONTACTJID"]
|
||||
message_pk = content["ZMESSAGE"]
|
||||
receipt_blob = content["ZRECEIPTINFO"]
|
||||
|
||||
current_chat = data.get_chat(contact_id)
|
||||
if current_chat is None:
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
message = current_chat.get_message(message_pk)
|
||||
if message is None:
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
try:
|
||||
poll_data = decode_poll_from_receipt_blob(receipt_blob)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to decode poll {message_pk}: {e}")
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
if poll_data is None:
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
# Build structured poll result with vote tallies
|
||||
options = poll_data['options']
|
||||
votes = poll_data['votes']
|
||||
|
||||
# Tally votes per option
|
||||
option_votes = {i: [] for i in range(len(options))}
|
||||
seen_voters = set()
|
||||
for vote in votes:
|
||||
voter_name = _resolve_voter_name(
|
||||
vote.get('voter_jid'), vote.get('is_creator', False), message, data)
|
||||
voter_key = vote.get('voter_jid') or ("__creator__" if vote.get('is_creator') else "__unknown__")
|
||||
if voter_key not in seen_voters:
|
||||
seen_voters.add(voter_key)
|
||||
for idx in vote.get('selected_indices', []):
|
||||
if 0 <= idx < len(options):
|
||||
option_votes[idx].append(voter_name)
|
||||
|
||||
# Find max vote count for percentage calculation
|
||||
max_votes = max((len(v) for v in option_votes.values()), default=0)
|
||||
|
||||
# Build option list with tallies
|
||||
option_list = []
|
||||
for i, opt_text in enumerate(options):
|
||||
voters = option_votes.get(i, [])
|
||||
vote_count = len(voters)
|
||||
vote_pct = (vote_count / max_votes * 100) if max_votes > 0 else 0
|
||||
option_list.append({
|
||||
'text': opt_text,
|
||||
'vote_count': vote_count,
|
||||
'vote_pct': vote_pct,
|
||||
'voters': voters,
|
||||
})
|
||||
|
||||
total_voters = len(seen_voters)
|
||||
|
||||
# Set poll data on message
|
||||
message.poll = {
|
||||
'type': 'poll',
|
||||
'question': poll_data['question'],
|
||||
'options': option_list,
|
||||
'total_voters': total_voters,
|
||||
}
|
||||
message.data = f"\U0001f4ca {poll_data['question']}"
|
||||
|
||||
pbar.update(1)
|
||||
total_time = pbar.format_dict['elapsed']
|
||||
logging.info(f"Processed {total_row_number} polls in {convert_time_unit(total_time)}")
|
||||
|
||||
|
||||
def format_call_data(call, content):
|
||||
"""Format call data message based on call attributes."""
|
||||
# Basic call info
|
||||
|
||||
190
Whatsapp_Chat_Exporter/poll.py
Normal file
190
Whatsapp_Chat_Exporter/poll.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""
|
||||
WhatsApp Poll decoder for iOS/macOS.
|
||||
|
||||
Decodes poll messages (ZMESSAGETYPE = 46) stored as protobuf blobs
|
||||
in ZWAMESSAGEINFO.ZRECEIPTINFO. Uses raw varint/wire-type parsing
|
||||
with no external protobuf library dependency.
|
||||
"""
|
||||
|
||||
import struct
|
||||
import logging
|
||||
|
||||
|
||||
def _decode_varint(data, pos):
|
||||
"""Decode a protobuf varint starting at pos.
|
||||
|
||||
Args:
|
||||
data (bytes): The protobuf data.
|
||||
pos (int): Starting position.
|
||||
|
||||
Returns:
|
||||
tuple: (value, new_pos)
|
||||
|
||||
Raises:
|
||||
ValueError: If the varint is truncated.
|
||||
"""
|
||||
result = 0
|
||||
shift = 0
|
||||
while pos < len(data):
|
||||
b = data[pos]
|
||||
pos += 1
|
||||
result |= (b & 0x7F) << shift
|
||||
if not (b & 0x80):
|
||||
return result, pos
|
||||
shift += 7
|
||||
raise ValueError("Truncated varint")
|
||||
|
||||
|
||||
def decode_protobuf_fields(data):
|
||||
"""
|
||||
Decode raw protobuf bytes into list of (field_number, wire_type_name, value).
|
||||
Handles: varint (0), fixed64 (1), length-delimited/bytes (2), fixed32 (5).
|
||||
|
||||
Args:
|
||||
data (bytes): Raw protobuf data.
|
||||
|
||||
Returns:
|
||||
list: List of (field_number, wire_type_name, value) tuples.
|
||||
"""
|
||||
fields = []
|
||||
pos = 0
|
||||
while pos < len(data):
|
||||
try:
|
||||
tag, pos = _decode_varint(data, pos)
|
||||
field_num = tag >> 3
|
||||
wire_type = tag & 0x7
|
||||
|
||||
if wire_type == 0: # varint
|
||||
val, pos = _decode_varint(data, pos)
|
||||
fields.append((field_num, 'varint', val))
|
||||
elif wire_type == 2: # length-delimited
|
||||
length, pos = _decode_varint(data, pos)
|
||||
val = data[pos:pos + length]
|
||||
pos += length
|
||||
fields.append((field_num, 'bytes', val))
|
||||
elif wire_type == 5: # fixed32
|
||||
val = struct.unpack('<I', data[pos:pos + 4])[0]
|
||||
pos += 4
|
||||
fields.append((field_num, 'fixed32', val))
|
||||
elif wire_type == 1: # fixed64
|
||||
val = struct.unpack('<Q', data[pos:pos + 8])[0]
|
||||
pos += 8
|
||||
fields.append((field_num, 'fixed64', val))
|
||||
else:
|
||||
break # Unknown wire type, stop parsing
|
||||
except Exception:
|
||||
break
|
||||
return fields
|
||||
|
||||
|
||||
def _decode_vote_record(data):
|
||||
"""Decode a single vote record sub-message.
|
||||
|
||||
Args:
|
||||
data (bytes): Raw protobuf data for a vote record.
|
||||
|
||||
Returns:
|
||||
dict or None: Vote record with 'voter_jid' and 'selected_indices',
|
||||
or None if the record is empty.
|
||||
"""
|
||||
fields = decode_protobuf_fields(data)
|
||||
|
||||
selected_indices = []
|
||||
voter_jid = None
|
||||
|
||||
for fn, wt, val in fields:
|
||||
if fn == 1 and wt == 'varint':
|
||||
selected_indices.append(val)
|
||||
elif fn == 4 and wt == 'bytes':
|
||||
try:
|
||||
voter_jid = val.decode('utf-8')
|
||||
except Exception:
|
||||
voter_jid = val.hex()
|
||||
|
||||
if not voter_jid and not selected_indices:
|
||||
return None
|
||||
|
||||
return {
|
||||
'voter_jid': voter_jid,
|
||||
'selected_indices': selected_indices,
|
||||
}
|
||||
|
||||
|
||||
def decode_poll_from_receipt_blob(receipt_blob):
|
||||
"""
|
||||
Decode a WhatsApp poll from the ZWAMESSAGEINFO.ZRECEIPTINFO protobuf blob.
|
||||
|
||||
The blob has a top-level structure where field 8 contains the poll content.
|
||||
The poll content has: question (field 2), options (field 3 repeated),
|
||||
other voters (field 5 repeated), and creator vote (field 6).
|
||||
|
||||
Args:
|
||||
receipt_blob (bytes): The ZRECEIPTINFO protobuf blob.
|
||||
|
||||
Returns:
|
||||
dict or None: Decoded poll data with keys:
|
||||
question (str): The poll question text
|
||||
options (list[str]): The poll option texts, in order
|
||||
votes (list[dict]): Each vote has:
|
||||
voter_jid (str|None): Voter's JID (LID format)
|
||||
selected_indices (list[int]): 0-based indices into options
|
||||
is_creator (bool): True if this is the poll creator's vote
|
||||
Returns None if the blob does not contain a valid poll.
|
||||
"""
|
||||
if not receipt_blob:
|
||||
return None
|
||||
|
||||
top_fields = decode_protobuf_fields(receipt_blob)
|
||||
|
||||
# Find the poll content in field 8
|
||||
poll_content = None
|
||||
for fn, wt, val in top_fields:
|
||||
if fn == 8 and wt == 'bytes':
|
||||
poll_content = val
|
||||
break
|
||||
|
||||
if not poll_content:
|
||||
return None
|
||||
|
||||
poll_fields = decode_protobuf_fields(poll_content)
|
||||
|
||||
# Extract question (field 2, first string)
|
||||
question = None
|
||||
for fn, wt, val in poll_fields:
|
||||
if fn == 2 and wt == 'bytes':
|
||||
try:
|
||||
question = val.decode('utf-8')
|
||||
except Exception:
|
||||
question = repr(val)
|
||||
break
|
||||
|
||||
if not question:
|
||||
return None
|
||||
|
||||
# Extract options (field 3, repeated)
|
||||
options = []
|
||||
for fn, wt, val in poll_fields:
|
||||
if fn == 3 and wt == 'bytes':
|
||||
option_fields = decode_protobuf_fields(val)
|
||||
for ofn, owt, oval in option_fields:
|
||||
if ofn == 1 and owt == 'bytes':
|
||||
try:
|
||||
options.append(oval.decode('utf-8'))
|
||||
except Exception:
|
||||
options.append(repr(oval))
|
||||
break
|
||||
|
||||
# Extract votes: field 5 = other participants, field 6 = creator
|
||||
votes = []
|
||||
for fn, wt, val in poll_fields:
|
||||
if fn in (5, 6) and wt == 'bytes':
|
||||
vote = _decode_vote_record(val)
|
||||
if vote:
|
||||
vote['is_creator'] = (fn == 6)
|
||||
votes.append(vote)
|
||||
|
||||
return {
|
||||
'question': question,
|
||||
'options': options,
|
||||
'votes': votes,
|
||||
}
|
||||
@@ -381,7 +381,26 @@
|
||||
</a>
|
||||
{% endif %}
|
||||
<p class="text-[#111b21] text-sm message-text">
|
||||
{% if msg.meta == true or msg.media == false and msg.data is none %}
|
||||
{% if msg.poll %}
|
||||
<div class="mb-1">
|
||||
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
|
||||
{% for option in msg.poll.options %}
|
||||
<div class="mb-1.5">
|
||||
<div class="flex justify-between text-xs mb-0.5">
|
||||
<span>{{ option.text }}</span>
|
||||
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
|
||||
</div>
|
||||
<div class="w-full bg-gray-200 rounded-full h-1.5">
|
||||
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
|
||||
</div>
|
||||
{% if option.voters %}
|
||||
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
|
||||
</div>
|
||||
{% elif msg.meta == true or msg.media == false and msg.data is none %}
|
||||
<div class="flex justify-center mb-2">
|
||||
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
||||
{% if msg.safe %}
|
||||
@@ -487,7 +506,26 @@
|
||||
</a>
|
||||
{% endif %}
|
||||
<p class="text-[#111b21] text-sm">
|
||||
{% if msg.meta == true or msg.media == false and msg.data is none %}
|
||||
{% if msg.poll %}
|
||||
<div class="mb-1">
|
||||
<p class="font-semibold text-sm mb-2">📊 {{ msg.poll.question }}</p>
|
||||
{% for option in msg.poll.options %}
|
||||
<div class="mb-1.5">
|
||||
<div class="flex justify-between text-xs mb-0.5">
|
||||
<span>{{ option.text }}</span>
|
||||
<span class="text-[#667781] ml-2">{{ option.vote_count }}</span>
|
||||
</div>
|
||||
<div class="w-full bg-gray-200 rounded-full h-1.5">
|
||||
<div class="bg-whatsapp rounded-full h-1.5" style="width: {{ option.vote_pct }}%"></div>
|
||||
</div>
|
||||
{% if option.voters %}
|
||||
<p class="text-[10px] text-[#667781] mt-0.5">{{ option.voters | join(', ') }}</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
<p class="text-[10px] text-[#667781] mt-1 border-t border-gray-200 pt-1">{{ msg.poll.total_voters }} vote{{ 's' if msg.poll.total_voters != 1 else '' }}</p>
|
||||
</div>
|
||||
{% elif msg.meta == true or msg.media == false and msg.data is none %}
|
||||
<div class="flex justify-center mb-2">
|
||||
<div class="bg-[#FFF3C5] rounded-lg px-3 py-2 text-sm text-[#856404] flex items-center">
|
||||
{% if msg.safe %}
|
||||
|
||||
@@ -9,6 +9,7 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore
|
||||
BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
|
||||
chat_data_1 = {
|
||||
"12345678@s.whatsapp.net": {
|
||||
'aliases': [],
|
||||
"name": "Friend",
|
||||
"type": "ios",
|
||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||
@@ -44,6 +45,7 @@ chat_data_1 = {
|
||||
|
||||
chat_data_2 = {
|
||||
"12345678@s.whatsapp.net": {
|
||||
'aliases': [],
|
||||
"name": "Friend",
|
||||
"type": "ios",
|
||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||
@@ -80,6 +82,7 @@ chat_data_2 = {
|
||||
# Expected merged data - should contain all messages with all fields initialized as they would be by Message class
|
||||
chat_data_merged = {
|
||||
"12345678@s.whatsapp.net": {
|
||||
'aliases': [],
|
||||
"name": "Friend",
|
||||
"type": "ios",
|
||||
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
|
||||
@@ -107,7 +110,8 @@ chat_data_merged = {
|
||||
"sticker": False,
|
||||
"message_type": None,
|
||||
"received_timestamp": None,
|
||||
"read_timestamp": None
|
||||
"read_timestamp": None,
|
||||
"poll": None
|
||||
},
|
||||
"24691": {
|
||||
"from_me": False,
|
||||
@@ -128,7 +132,8 @@ chat_data_merged = {
|
||||
"sticker": False,
|
||||
"message_type": None,
|
||||
"received_timestamp": None,
|
||||
"read_timestamp": None
|
||||
"read_timestamp": None,
|
||||
"poll": None
|
||||
},
|
||||
"24692": {
|
||||
"from_me": False,
|
||||
@@ -149,7 +154,8 @@ chat_data_merged = {
|
||||
"sticker": False,
|
||||
"message_type": None,
|
||||
"received_timestamp": None,
|
||||
"read_timestamp": None
|
||||
"read_timestamp": None,
|
||||
"poll": None
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user