Major refactoring

This commit does not refactor Android handler
This commit is contained in:
KnugiHK
2025-03-02 12:57:27 +08:00
parent 4a0be0233c
commit 7c4705d149
5 changed files with 1280 additions and 959 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -670,7 +670,7 @@ def create_html(
if maximum_size == 0: if maximum_size == 0:
maximum_size = MAX_SIZE maximum_size = MAX_SIZE
last_msg = current_chat.get_last_message().key_id last_msg = current_chat.get_last_message().key_id
for message in current_chat.get_messages(): for message in current_chat.values():
if message.data is not None and not message.meta and not message.media: if message.data is not None and not message.meta and not message.media:
current_size += len(message.data) + ROW_SIZE current_size += len(message.data) + ROW_SIZE
else: else:
@@ -717,7 +717,7 @@ def create_html(
output_file_name, output_file_name,
template, template,
name, name,
current_chat.get_messages(), current_chat.values(),
contact, contact,
w3css, w3css,
current_chat, current_chat,
@@ -739,7 +739,7 @@ def create_txt(data, output):
contact = jik.replace('+', '') contact = jik.replace('+', '')
output_file = os.path.join(output, f"{contact}.txt") output_file = os.path.join(output, f"{contact}.txt")
with open(output_file, "w", encoding="utf8") as f: with open(output_file, "w", encoding="utf8") as f:
for message in chat.get_messages(): for message in chat.values():
date = datetime.fromtimestamp(message.timestamp).date() date = datetime.fromtimestamp(message.timestamp).date()
if message.meta and message.mime != "media": if message.meta and message.mime != "media":
continue # Skip any metadata in text format continue # Skip any metadata in text format

View File

@@ -213,11 +213,19 @@ class ChatStore:
def get_last_message(self) -> 'Message': def get_last_message(self) -> 'Message':
"""Get the most recent message in the chat.""" """Get the most recent message in the chat."""
return tuple(self._messages.values())[-1] return tuple(self._messages.values())[-1]
def items(self):
"""Get message items pairs."""
return self._messages.items()
def get_messages(self) -> 'Message': def values(self):
"""Get all messages in the chat.""" """Get all messages in the chat."""
return self._messages.values() return self._messages.values()
def keys(self):
"""Get all message keys in the chat."""
return self._messages.keys()
class Message: class Message:
""" """

View File

@@ -8,85 +8,174 @@ from Whatsapp_Chat_Exporter.utility import Device
def messages(path, data, assume_first_as_me=False): def messages(path, data, assume_first_as_me=False):
"""Extracts messages from the exported file""" """
Extracts messages from an exported WhatsApp chat file.
Args:
path: Path to the exported chat file
data: Data container object to store the parsed chat
assume_first_as_me: If True, assumes the first message is sent from the user without asking
Returns:
Updated data container with extracted messages
"""
# Create a new chat in the data container
chat = data.add_chat("ExportedChat", ChatStore(Device.EXPORTED))
you = "" # Will store the username of the current user
user_identification_done = False # Flag to track if user identification has been done
# First pass: count total lines for progress reporting
with open(path, "r", encoding="utf8") as file:
total_row_number = sum(1 for _ in file)
# Second pass: process the messages
with open(path, "r", encoding="utf8") as file: with open(path, "r", encoding="utf8") as file:
you = ""
data["ExportedChat"] = ChatStore(Device.EXPORTED)
chat = data["ExportedChat"]
total_row_number = len(file.readlines())
file.seek(0)
for index, line in enumerate(file): for index, line in enumerate(file):
if len(line.split(" - ")) > 1: you, user_identification_done = process_line(
time = line.split(" - ")[0] line, index, chat, path, you,
if ":" not in line.split(time)[1]: assume_first_as_me, user_identification_done
msg.data = line.split(time)[1][3:] )
msg.meta = True
else: # Show progress
name = line.split(time)[1].split(":")[0]
message = line.split(time)[1].split(name + ":")[1].strip()
name = name[3:]
if you == "":
if chat.name is None:
if not assume_first_as_me:
while True:
ans = input(f"Is '{name}' you? (Y/N)").lower()
if ans == "y":
you = name
break
elif ans == "n":
chat.name = name
break
else:
you = name
else:
if name != chat.name:
you = name
elif chat.name is None:
if name != you:
chat.name = name
msg = Message(
you == name,
datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
time.split(", ")[1].strip(),
index
)
if "<Media omitted>" in message:
msg.data = "The media is omitted in the chat"
msg.mime = "media"
msg.meta = True
elif "(file attached)" in message:
mime = MimeTypes()
msg.media = True
file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip())
if os.path.isfile(file_path):
msg.data = file_path
guess = mime.guess_type(file_path)[0]
if guess is not None:
msg.mime = guess
else:
msg.mime = "application/octet-stream"
else:
msg.data = "The media is missing"
msg.mime = "media"
msg.meta = True
else:
msg.data = message
if "\r\n" in message:
msg.data = message.replace("\r\n", "<br>")
if "\n" in message:
msg.data = message.replace("\n", "<br>")
chat.add_message(index, msg)
else:
lookback = index - 1
while lookback not in chat.messages:
lookback -= 1
msg = chat.messages[lookback]
if msg.media:
msg.caption = line.strip()
else:
msg.data += "<br>" + line.strip()
if index % 1000 == 0: if index % 1000 == 0:
print(f"Processing messages & media...({index}/{total_row_number})", end="\r") print(f"Processing messages & media...({index}/{total_row_number})", end="\r")
print(f"Processing messages & media...({total_row_number}/{total_row_number})", end="\r")
print(f"Processing messages & media...({total_row_number}/{total_row_number})")
return data return data
def process_line(line, index, chat, file_path, you, assume_first_as_me, user_identification_done):
"""
Process a single line from the chat file
Returns:
Tuple of (updated_you_value, updated_user_identification_done_flag)
"""
parts = line.split(" - ", 1)
# Check if this is a new message (has timestamp format)
if len(parts) > 1:
time = parts[0]
you, user_identification_done = process_new_message(
time, parts[1], index, chat, you, file_path,
assume_first_as_me, user_identification_done
)
else:
# This is a continuation of the previous message
process_message_continuation(line, index, chat)
return you, user_identification_done
def process_new_message(time, content, index, chat, you, file_path,
assume_first_as_me, user_identification_done):
"""
Process a line that contains a new message
Returns:
Tuple of (updated_you_value, updated_user_identification_done_flag)
"""
# Create a new message
msg = Message(
from_me=False, # Will be updated later if needed
timestamp=datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
time=time.split(", ")[1].strip(),
key_id=index,
received_timestamp=None,
read_timestamp=None
)
# Check if this is a system message (no name:message format)
if ":" not in content:
msg.data = content
msg.meta = True
else:
# Process user message
name, message = content.strip().split(":", 1)
# Handle user identification
if you == "":
if chat.name is None:
# First sender identification
if not user_identification_done:
if not assume_first_as_me:
# Ask only once if this is the user
you = prompt_for_user_identification(name)
user_identification_done = True
else:
you = name
user_identification_done = True
else:
# If we know the chat name, anyone else must be "you"
if name != chat.name:
you = name
# Set the chat name if needed
if chat.name is None and name != you:
chat.name = name
# Determine if this message is from the current user
msg.from_me = (name == you)
# Process message content
process_message_content(msg, message, file_path)
chat.add_message(index, msg)
return you, user_identification_done
def process_message_content(msg, message, file_path):
"""Process and set the content of a message based on its type"""
if "<Media omitted>" in message:
msg.data = "The media is omitted in the chat"
msg.mime = "media"
msg.meta = True
elif "(file attached)" in message:
process_attached_file(msg, message, file_path)
else:
msg.data = message.replace("\r\n", "<br>").replace("\n", "<br>")
def process_attached_file(msg, message, file_path):
"""Process an attached file in a message"""
mime = MimeTypes()
msg.media = True
# Extract file path and check if it exists
file_name = message.split("(file attached)")[0].strip()
attached_file_path = os.path.join(os.path.dirname(file_path), file_name)
if os.path.isfile(attached_file_path):
msg.data = attached_file_path
guess = mime.guess_type(attached_file_path)[0]
msg.mime = guess if guess is not None else "application/octet-stream"
else:
msg.data = "The media is missing"
msg.mime = "media"
msg.meta = True
def process_message_continuation(line, index, chat):
"""Process a line that continues a previous message"""
# Find the previous message
lookback = index - 1
while lookback not in chat.keys():
lookback -= 1
msg = chat.get_message(lookback)
# Add the continuation line to the message
if msg.media:
msg.caption = line.strip()
else:
msg.data += "<br>" + line.strip()
def prompt_for_user_identification(name):
"""Ask the user if the given name is their username"""
while True:
ans = input(f"Is '{name}' you? (Y/N)").lower()
if ans == "y":
return name
elif ans == "n":
return ""

View File

@@ -12,144 +12,179 @@ from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit,
def contacts(db, data): def contacts(db, data):
"""Process WhatsApp contacts with status information."""
c = db.cursor() c = db.cursor()
# Get status only lol
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
print(f"Pre-processing contacts...({total_row_number})") print(f"Pre-processing contacts...({total_row_number})")
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""") c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
content = c.fetchone() content = c.fetchone()
while content is not None: while content is not None:
if not content["ZWHATSAPPID"].endswith("@s.whatsapp.net"): zwhatsapp_id = content["ZWHATSAPPID"]
ZWHATSAPPID = content["ZWHATSAPPID"] + "@s.whatsapp.net" if not zwhatsapp_id.endswith("@s.whatsapp.net"):
zwhatsapp_id += "@s.whatsapp.net"
current_chat = ChatStore(Device.IOS) current_chat = ChatStore(Device.IOS)
current_chat.status = content["ZABOUTTEXT"] current_chat.status = content["ZABOUTTEXT"]
data.add_chat(ZWHATSAPPID, current_chat) data.add_chat(zwhatsapp_id, current_chat)
content = c.fetchone() content = c.fetchone()
def process_contact_avatars(current_chat, media_folder, contact_id):
"""Process and assign avatar images for a contact."""
path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
current_chat.their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb") and current_chat.their_avatar_thumb is None:
current_chat.their_avatar_thumb = avatar
elif avatar.endswith(".jpg") and current_chat.their_avatar is None:
current_chat.their_avatar = avatar
def get_contact_name(content):
"""Determine the appropriate contact name based on push name and partner name."""
is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit()
if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone):
return content["ZPARTNERNAME"]
else:
return content["ZPUSHNAME"]
def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty): def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty):
"""Process WhatsApp messages and contacts from the database."""
c = db.cursor() c = db.cursor()
cursor2 = db.cursor() cursor2 = db.cursor()
# Get contacts
c.execute( # Build the chat filter conditions
f"""SELECT count() chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
FROM (SELECT DISTINCT ZCONTACTJID, chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
ZPARTNERNAME, date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
ZWAPROFILEPUSHNAME.ZPUSHNAME
FROM ZWACHATSESSION # Process contacts first
INNER JOIN ZWAMESSAGE contact_query = f"""
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK SELECT count()
LEFT JOIN ZWAPROFILEPUSHNAME FROM (SELECT DISTINCT ZCONTACTJID,
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID ZPARTNERNAME,
LEFT JOIN ZWAGROUPMEMBER ZWAPROFILEPUSHNAME.ZPUSHNAME
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK FROM ZWACHATSESSION
WHERE 1=1 INNER JOIN ZWAMESSAGE
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} LEFT JOIN ZWAPROFILEPUSHNAME
GROUP BY ZCONTACTJID);""" ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
) LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
GROUP BY ZCONTACTJID);
"""
c.execute(contact_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
print(f"Processing contacts...({total_row_number})") print(f"Processing contacts...({total_row_number})")
c.execute( # Get distinct contacts
f"""SELECT DISTINCT ZCONTACTJID, contacts_query = f"""
ZPARTNERNAME, SELECT DISTINCT ZCONTACTJID,
ZWAPROFILEPUSHNAME.ZPUSHNAME ZPARTNERNAME,
FROM ZWACHATSESSION ZWAPROFILEPUSHNAME.ZPUSHNAME
INNER JOIN ZWAMESSAGE FROM ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK INNER JOIN ZWAMESSAGE
LEFT JOIN ZWAPROFILEPUSHNAME ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID LEFT JOIN ZWAPROFILEPUSHNAME
LEFT JOIN ZWAGROUPMEMBER ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK LEFT JOIN ZWAGROUPMEMBER
WHERE 1=1 ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} WHERE 1=1
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} {chat_filter_include}
GROUP BY ZCONTACTJID;""" {chat_filter_exclude}
) GROUP BY ZCONTACTJID;
"""
c.execute(contacts_query)
# Process each contact
content = c.fetchone() content = c.fetchone()
while content is not None: while content is not None:
is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit() contact_name = get_contact_name(content)
if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone):
contact_name = content["ZPARTNERNAME"]
else:
contact_name = content["ZPUSHNAME"]
contact_id = content["ZCONTACTJID"] contact_id = content["ZCONTACTJID"]
# Add or update chat
if contact_id not in data: if contact_id not in data:
current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder)) current_chat = data.add_chat(contact_id, ChatStore(Device.IOS, contact_name, media_folder))
else: else:
current_chat = data.get_chat(contact_id) current_chat = data.get_chat(contact_id)
current_chat.name = contact_name current_chat.name = contact_name
current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg") current_chat.my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}'
avatars = glob(f"{path}*") # Process avatar images
if 0 < len(avatars) <= 1: process_contact_avatars(current_chat, media_folder, contact_id)
current_chat.their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb") and current_chat.their_avatar_thumb is None:
current_chat.their_avatar_thumb = avatar
elif avatar.endswith(".jpg") and current_chat.their_avatar is None:
current_chat.their_avatar = avatar
content = c.fetchone() content = c.fetchone()
# Get message history # Get message count
c.execute(f"""SELECT count() message_count_query = f"""
FROM ZWAMESSAGE SELECT count()
INNER JOIN ZWACHATSESSION FROM ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK INNER JOIN ZWACHATSESSION
LEFT JOIN ZWAGROUPMEMBER ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK LEFT JOIN ZWAGROUPMEMBER
WHERE 1=1 ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} WHERE 1=1
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} {date_filter}
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")}""") {chat_filter_include}
{chat_filter_exclude}
"""
c.execute(message_count_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
print(f"Processing messages...(0/{total_row_number})", end="\r") print(f"Processing messages...(0/{total_row_number})", end="\r")
c.execute(f"""SELECT ZCONTACTJID,
ZWAMESSAGE.Z_PK, # Fetch messages
ZISFROMME, messages_query = f"""
ZMESSAGEDATE, SELECT ZCONTACTJID,
ZTEXT, ZWAMESSAGE.Z_PK,
ZMESSAGETYPE, ZISFROMME,
ZWAGROUPMEMBER.ZMEMBERJID, ZMESSAGEDATE,
ZMETADATA, ZTEXT,
ZSTANZAID, ZMESSAGETYPE,
ZGROUPINFO, ZWAGROUPMEMBER.ZMEMBERJID,
ZSENTDATE ZMETADATA,
FROM ZWAMESSAGE ZSTANZAID,
LEFT JOIN ZWAGROUPMEMBER ZGROUPINFO,
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK ZSENTDATE
LEFT JOIN ZWAMEDIAITEM FROM ZWAMESSAGE
ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE LEFT JOIN ZWAGROUPMEMBER
INNER JOIN ZWACHATSESSION ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK LEFT JOIN ZWAMEDIAITEM
WHERE 1=1 ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} INNER JOIN ZWACHATSESSION
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} WHERE 1=1
ORDER BY ZMESSAGEDATE ASC;""") {date_filter}
{chat_filter_include}
{chat_filter_exclude}
ORDER BY ZMESSAGEDATE ASC;
"""
c.execute(messages_query)
# Process each message
i = 0 i = 0
content = c.fetchone() content = c.fetchone()
while content is not None: while content is not None:
ZCONTACTJID = content["ZCONTACTJID"] contact_id = content["ZCONTACTJID"]
Z_PK = content["Z_PK"] message_pk = content["Z_PK"]
is_group_message = content["ZGROUPINFO"] is not None is_group_message = content["ZGROUPINFO"] is not None
if ZCONTACTJID not in data:
current_chat = data.add_chat(ZCONTACTJID, ChatStore(Device.IOS)) # Ensure chat exists
path = f'{media_folder}/Media/Profile/{ZCONTACTJID.split("@")[0]}' if contact_id not in data:
avatars = glob(f"{path}*") current_chat = data.add_chat(contact_id, ChatStore(Device.IOS))
if 0 < len(avatars) <= 1: process_contact_avatars(current_chat, media_folder, contact_id)
current_chat.their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb"):
current_chat.their_avatar_thumb = avatar
elif avatar.endswith(".jpg"):
current_chat.their_avatar = avatar
else: else:
current_chat = data.get_chat(ZCONTACTJID) current_chat = data.get_chat(contact_id)
# Create message object
ts = APPLE_TIME + content["ZMESSAGEDATE"] ts = APPLE_TIME + content["ZMESSAGEDATE"]
message = Message( message = Message(
from_me=content["ZISFROMME"], from_me=content["ZISFROMME"],
@@ -159,290 +194,409 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET, timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
message_type=content["ZMESSAGETYPE"], message_type=content["ZMESSAGETYPE"],
received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None, received_timestamp=APPLE_TIME + content["ZSENTDATE"] if content["ZSENTDATE"] else None,
read_timestamp=None # TODO: Add timestamp read_timestamp=None # TODO: Add timestamp
) )
invalid = False
if is_group_message and content["ZISFROMME"] == 0: # Process message data
name = None invalid = process_message_data(message, content, is_group_message, data, cursor2)
if content["ZMEMBERJID"] is not None:
if content["ZMEMBERJID"] in data: # Add valid messages to chat
name = data.get_chat(content["ZMEMBERJID"]).name
if "@" in content["ZMEMBERJID"]:
fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
message.sender = name or fallback
else:
message.sender = None
if content["ZMESSAGETYPE"] == 6:
# Metadata
if is_group_message:
# Group
if content["ZTEXT"] is not None:
# Chnaged name
try:
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
else:
invalid = True
else:
message.data = None
else:
message.data = None
else:
# real message
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14"):
quoted = content["ZMETADATA"][2:19]
message.reply = quoted.decode()
cursor2.execute(f"""SELECT ZTEXT
FROM ZWAMESSAGE
WHERE ZSTANZAID LIKE '{message.reply}%'""")
quoted_content = cursor2.fetchone()
if quoted_content and "ZTEXT" in quoted_content:
message.quoted_data = quoted_content["ZTEXT"]
else:
message.quoted_data = None
if content["ZMESSAGETYPE"] == 15: # Sticker
message.sticker = True
if content["ZISFROMME"] == 1:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
message.data = msg
if not invalid: if not invalid:
current_chat.add_message(Z_PK, message) current_chat.add_message(message_pk, message)
# Update progress
i += 1 i += 1
if i % 1000 == 0: if i % 1000 == 0:
print(f"Processing messages...({i}/{total_row_number})", end="\r") print(f"Processing messages...({i}/{total_row_number})", end="\r")
content = c.fetchone() content = c.fetchone()
print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r") print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
def process_message_data(message, content, is_group_message, data, cursor2):
"""Process and set message data from content row."""
# Handle group sender info
if is_group_message and content["ZISFROMME"] == 0:
name = None
if content["ZMEMBERJID"] is not None:
if content["ZMEMBERJID"] in data:
name = data.get_chat(content["ZMEMBERJID"]).name
if "@" in content["ZMEMBERJID"]:
fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
message.sender = name or fallback
else:
message.sender = None
# Handle metadata messages
if content["ZMESSAGETYPE"] == 6:
return process_metadata_message(message, content, is_group_message)
# Handle quoted replies
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14") and False:
quoted = content["ZMETADATA"][2:19]
message.reply = quoted.decode()
cursor2.execute(f"""SELECT ZTEXT
FROM ZWAMESSAGE
WHERE ZSTANZAID LIKE '{message.reply}%'""")
quoted_content = cursor2.fetchone()
if quoted_content and "ZTEXT" in quoted_content:
message.quoted_data = quoted_content["ZTEXT"]
else:
message.quoted_data = None
# Handle stickers
if content["ZMESSAGETYPE"] == 15:
message.sticker = True
# Process message text
process_message_text(message, content)
return False # Message is valid
def process_metadata_message(message, content, is_group_message):
"""Process metadata messages (action_type 6)."""
if is_group_message:
# Group
if content["ZTEXT"] is not None:
# Changed name
try:
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
return False # Valid message
else:
return True # Invalid message
else:
message.data = None
return False
else:
message.data = None
return False
def process_message_text(message, content):
"""Process and format message text content."""
if content["ZISFROMME"] == 1:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
msg = msg.replace("\r\n", "<br>").replace("\n", "<br>")
else:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
message.meta = True
else:
msg = content["ZTEXT"]
if msg is not None:
msg = msg.replace("\r\n", "<br>").replace("\n", "<br>")
message.data = msg
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False): def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False):
"""Process media files from WhatsApp messages."""
c = db.cursor() c = db.cursor()
# Get media
c.execute(f"""SELECT count() # Build filter conditions
FROM ZWAMEDIAITEM chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios")
INNER JOIN ZWAMESSAGE chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK date_filter = f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK # Get media count
LEFT JOIN ZWAGROUPMEMBER media_count_query = f"""
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK SELECT count()
WHERE 1=1 FROM ZWAMEDIAITEM
{f'AND ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} INNER JOIN ZWAMESSAGE
{get_chat_condition(filter_chat[0], True, ["ZWACHATSESSION.ZCONTACTJID","ZMEMBERJID"], "ZGROUPINFO", "ios")} ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
{get_chat_condition(filter_chat[1], False, ["ZWACHATSESSION.ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} INNER JOIN ZWACHATSESSION
""") ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(media_count_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
print(f"\nProcessing media...(0/{total_row_number})", end="\r") print(f"\nProcessing media...(0/{total_row_number})", end="\r")
i = 0
c.execute(f"""SELECT ZCONTACTJID, # Fetch media items
ZMESSAGE, media_query = f"""
ZMEDIALOCALPATH, SELECT ZCONTACTJID,
ZMEDIAURL, ZMESSAGE,
ZVCARDSTRING, ZMEDIALOCALPATH,
ZMEDIAKEY, ZMEDIAURL,
ZTITLE ZVCARDSTRING,
FROM ZWAMEDIAITEM ZMEDIAKEY,
INNER JOIN ZWAMESSAGE ZTITLE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK FROM ZWAMEDIAITEM
INNER JOIN ZWACHATSESSION INNER JOIN ZWAMESSAGE
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
LEFT JOIN ZWAGROUPMEMBER INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
WHERE ZMEDIALOCALPATH IS NOT NULL LEFT JOIN ZWAGROUPMEMBER
{f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} WHERE ZMEDIALOCALPATH IS NOT NULL
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} {date_filter}
ORDER BY ZCONTACTJID ASC""") {chat_filter_include}
content = c.fetchone() {chat_filter_exclude}
ORDER BY ZCONTACTJID ASC
"""
c.execute(media_query)
# Process each media item
mime = MimeTypes() mime = MimeTypes()
i = 0
content = c.fetchone()
while content is not None: while content is not None:
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}" process_media_item(content, data, media_folder, mime, separate_media)
current_chat = data.get_chat(content["ZCONTACTJID"])
message = current_chat.get_message(content["ZMESSAGE"]) # Update progress
message.media = True
if current_chat.media_base == "":
current_chat.media_base = media_folder + "/"
if os.path.isfile(file_path):
message.data = '/'.join(file_path.split("/")[1:])
if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
message.mime = guess
else:
message.mime = "application/octet-stream"
else:
message.mime = content["ZVCARDSTRING"]
if separate_media:
chat_display_name = slugify(current_chat.name or message.sender \
or content["ZCONTACTJID"].split('@')[0], True)
current_filename = file_path.split("/")[-1]
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = '/'.join(new_path.split("\\")[1:])
else:
message.data = "The media is missing"
message.mime = "media"
message.meta = True
if content["ZTITLE"] is not None:
message.caption = content["ZTITLE"]
i += 1 i += 1
if i % 100 == 0: if i % 100 == 0:
print(f"Processing media...({i}/{total_row_number})", end="\r") print(f"Processing media...({i}/{total_row_number})", end="\r")
content = c.fetchone() content = c.fetchone()
print(
f"Processing media...({total_row_number}/{total_row_number})", end="\r") print(f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def process_media_item(content, data, media_folder, mime, separate_media):
"""Process a single media item."""
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
current_chat = data.get_chat(content["ZCONTACTJID"])
message = current_chat.get_message(content["ZMESSAGE"])
message.media = True
if current_chat.media_base == "":
current_chat.media_base = media_folder + "/"
if os.path.isfile(file_path):
message.data = '/'.join(file_path.split("/")[1:])
# Set MIME type
if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
message.mime = guess if guess is not None else "application/octet-stream"
else:
message.mime = content["ZVCARDSTRING"]
# Handle separate media option
if separate_media:
chat_display_name = slugify(current_chat.name or message.sender or content["ZCONTACTJID"].split('@')[0], True)
current_filename = file_path.split("/")[-1]
new_folder = os.path.join(media_folder, "separated", chat_display_name)
Path(new_folder).mkdir(parents=True, exist_ok=True)
new_path = os.path.join(new_folder, current_filename)
shutil.copy2(file_path, new_path)
message.data = '/'.join(new_path.split("\\")[1:])
else:
# Handle missing media
message.data = "The media is missing"
message.mime = "media"
message.meta = True
# Add caption if available
if content["ZTITLE"] is not None:
message.caption = content["ZTITLE"]
def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty): def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
"""Process vCard contacts from WhatsApp messages."""
c = db.cursor() c = db.cursor()
c.execute(f"""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE, # Build filter conditions
ZCONTACTJID, chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
ZVCARDNAME, chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")
ZVCARDSTRING date_filter = f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''
FROM ZWAVCARDMENTION
INNER JOIN ZWAMEDIAITEM # Fetch vCard mentions
ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK vcard_query = f"""
INNER JOIN ZWAMESSAGE SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK ZWAMEDIAITEM.ZMESSAGE,
INNER JOIN ZWACHATSESSION ZCONTACTJID,
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK ZVCARDNAME,
LEFT JOIN ZWAGROUPMEMBER ZVCARDSTRING
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK FROM ZWAVCARDMENTION
WHERE 1=1 INNER JOIN ZWAMEDIAITEM
{f'AND ZWAMESSAGE.ZMESSAGEDATE {filter_date}' if filter_date is not None else ''} ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK
{get_chat_condition(filter_chat[0], True, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")} INNER JOIN ZWAMESSAGE
{get_chat_condition(filter_chat[1], False, ["ZCONTACTJID", "ZMEMBERJID"], "ZGROUPINFO", "ios")};""") ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
INNER JOIN ZWACHATSESSION
ON ZWAMESSAGE.ZCHATSESSION = ZWACHATSESSION.Z_PK
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
WHERE 1=1
{date_filter}
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(vcard_query)
contents = c.fetchall() contents = c.fetchall()
total_row_number = len(contents) total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
# Create vCards directory
path = f'{media_folder}/Message/vCards' path = f'{media_folder}/Message/vCards'
Path(path).mkdir(parents=True, exist_ok=True) Path(path).mkdir(parents=True, exist_ok=True)
# Process each vCard
for index, content in enumerate(contents): for index, content in enumerate(contents):
file_paths = [] process_vcard_item(content, path, data)
vcard_names = content["ZVCARDNAME"].split("_$!<Name-Separator>!$_")
vcard_strings = content["ZVCARDSTRING"].split("_$!<VCard-Separator>!$_")
# If this is a list of contacts
if len(vcard_names) > len(vcard_strings):
vcard_names.pop(0) # Dismiss the first element, which is the group name
for name, vcard_string in zip(vcard_names, vcard_strings):
file_name = "".join(x for x in name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(path, f"{file_name}.vcf")
file_paths.append(file_path)
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(vcard_string)
vcard_summary = "This media include the following vCard file(s):<br>"
vcard_summary += " | ".join([f'<a href="{htmle(fp)}">{htmle(name)}</a>' for name, fp in zip(vcard_names, file_paths)])
message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"])
message.data = vcard_summary
message.mime = "text/x-vcard"
message.media = True
message.meta = True
message.safe = True
print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r") print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r")
def process_vcard_item(content, path, data):
"""Process a single vCard item."""
file_paths = []
vcard_names = content["ZVCARDNAME"].split("_$!<Name-Separator>!$_")
vcard_strings = content["ZVCARDSTRING"].split("_$!<VCard-Separator>!$_")
# If this is a list of contacts
if len(vcard_names) > len(vcard_strings):
vcard_names.pop(0) # Dismiss the first element, which is the group name
# Save each vCard file
for name, vcard_string in zip(vcard_names, vcard_strings):
file_name = "".join(x for x in name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(path, f"{file_name}.vcf")
file_paths.append(file_path)
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(vcard_string)
# Create vCard summary and update message
vcard_summary = "This media include the following vCard file(s):<br>"
vcard_summary += " | ".join([f'<a href="{htmle(fp)}">{htmle(name)}</a>' for name, fp in zip(vcard_names, file_paths)])
message = data.get_chat(content["ZCONTACTJID"]).get_message(content["ZMESSAGE"])
message.data = vcard_summary
message.mime = "text/x-vcard"
message.media = True
message.meta = True
message.safe = True
def calls(db, data, timezone_offset, filter_chat): def calls(db, data, timezone_offset, filter_chat):
"""Process WhatsApp call records."""
c = db.cursor() c = db.cursor()
c.execute(f"""SELECT count()
FROM ZWACDCALLEVENT # Build filter conditions
WHERE 1=1 chat_filter_include = get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")
{get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")
{get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""")
# Get call count
call_count_query = f"""
SELECT count()
FROM ZWACDCALLEVENT
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(call_count_query)
total_row_number = c.fetchone()[0] total_row_number = c.fetchone()[0]
if total_row_number == 0: if total_row_number == 0:
return return
print(f"\nProcessing calls...({total_row_number})", end="\r") print(f"\nProcessing calls...({total_row_number})", end="\r")
c.execute(f"""SELECT ZCALLIDSTRING,
ZGROUPCALLCREATORUSERJIDSTRING, # Fetch call records
ZGROUPJIDSTRING, calls_query = f"""
ZDATE, SELECT ZCALLIDSTRING,
ZOUTCOME, ZGROUPCALLCREATORUSERJIDSTRING,
ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred, ZGROUPJIDSTRING,
ZDURATION, ZDATE,
ZVIDEO, ZOUTCOME,
ZMISSED, ZBYTESRECEIVED + ZBYTESSENT AS bytes_transferred,
ZINCOMING ZDURATION,
FROM ZWACDCALLEVENT ZVIDEO,
INNER JOIN ZWAAGGREGATECALLEVENT ZMISSED,
ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK ZINCOMING
WHERE 1=1 FROM ZWACDCALLEVENT
{get_chat_condition(filter_chat[0], True, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")} INNER JOIN ZWAAGGREGATECALLEVENT
{get_chat_condition(filter_chat[1], False, ["ZGROUPCALLCREATORUSERJIDSTRING"], None, "ios")}""") ON ZWACDCALLEVENT.Z1CALLEVENTS = ZWAAGGREGATECALLEVENT.Z_PK
WHERE 1=1
{chat_filter_include}
{chat_filter_exclude}
"""
c.execute(calls_query)
# Create calls chat
chat = ChatStore(Device.ANDROID, "WhatsApp Calls") chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
# Process each call
content = c.fetchone() content = c.fetchone()
while content is not None: while content is not None:
ts = APPLE_TIME + int(content["ZDATE"]) process_call_record(content, chat, data, timezone_offset)
call = Message(
from_me=content["ZINCOMING"] == 0,
timestamp=ts,
time=ts,
key_id=content["ZCALLIDSTRING"],
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET
)
_jid = content["ZGROUPCALLCREATORUSERJIDSTRING"]
name = data.get_chat(_jid).name if _jid in data else None
if _jid is not None and "@" in _jid:
fallback = _jid.split('@')[0]
else:
fallback = None
call.sender = name or fallback
call.meta = True
call.data = (
f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}"
f"{'video' if content['ZVIDEO'] == 1 else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{call.sender} was "
)
if content['ZOUTCOME'] in (1, 4):
call.data += "not answered." if call.from_me else "missed."
elif content['ZOUTCOME'] == 2:
call.data += "failed."
elif content['ZOUTCOME'] == 0:
call_time = convert_time_unit(int(content['ZDURATION']))
call_bytes = bytes_to_readable(content['bytes_transferred'])
call.data += (
f"initiated and lasted for {call_time} "
f"with {call_bytes} data transferred."
)
else:
call.data += "in an unknown state."
chat.add_message(call.key_id, call)
content = c.fetchone() content = c.fetchone()
# Add calls chat to data
data.add_chat("000000000000000", chat) data.add_chat("000000000000000", chat)
def process_call_record(content, chat, data, timezone_offset):
"""Process a single call record."""
ts = APPLE_TIME + int(content["ZDATE"])
call = Message(
from_me=content["ZINCOMING"] == 0,
timestamp=ts,
time=ts,
key_id=content["ZCALLIDSTRING"],
timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET
)
# Set sender info
_jid = content["ZGROUPCALLCREATORUSERJIDSTRING"]
name = data.get_chat(_jid).name if _jid in data else None
if _jid is not None and "@" in _jid:
fallback = _jid.split('@')[0]
else:
fallback = None
call.sender = name or fallback
# Set call metadata
call.meta = True
call.data = format_call_data(call, content)
# Add call to chat
chat.add_message(call.key_id, call)
def format_call_data(call, content):
"""Format call data message based on call attributes."""
# Basic call info
call_data = (
f"A {'group ' if content['ZGROUPJIDSTRING'] is not None else ''}"
f"{'video' if content['ZVIDEO'] == 1 else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{call.sender} was "
)
# Call outcome
if content['ZOUTCOME'] in (1, 4):
call_data += "not answered." if call.from_me else "missed."
elif content['ZOUTCOME'] == 2:
call_data += "failed."
elif content['ZOUTCOME'] == 0:
call_time = convert_time_unit(int(content['ZDURATION']))
call_bytes = bytes_to_readable(content['bytes_transferred'])
call_data += (
f"initiated and lasted for {call_time} "
f"with {call_bytes} data transferred."
)
else:
call_data += "in an unknown state."
return call_data