From dbdfdaedcfeac31ae668835d27a18ba1d7455a9e Mon Sep 17 00:00:00 2001
From: KnugiHK <24708955+KnugiHK@users.noreply.github.com>
Date: Thu, 8 Jun 2023 17:51:57 +0800
Subject: [PATCH] Refine code to use the data model
---
Whatsapp_Chat_Exporter/extract_iphone.py | 156 ++++++++++++-----------
1 file changed, 79 insertions(+), 77 deletions(-)
diff --git a/Whatsapp_Chat_Exporter/extract_iphone.py b/Whatsapp_Chat_Exporter/extract_iphone.py
index 39c35f5..16a4a35 100644
--- a/Whatsapp_Chat_Exporter/extract_iphone.py
+++ b/Whatsapp_Chat_Exporter/extract_iphone.py
@@ -9,6 +9,7 @@ import shutil
from pathlib import Path
from datetime import datetime
from mimetypes import MimeTypes
+from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, APPLE_TIME
@@ -20,17 +21,17 @@ def messages(db, data):
print(f"Gathering contacts...({total_row_number})")
c.execute("""SELECT ZCONTACTJID, ZPARTNERNAME FROM ZWACHATSESSION; """)
- row = c.fetchone()
- while row is not None:
- data[row[0]] = {"name": row[1], "messages": {}}
- row = c.fetchone()
+ content = c.fetchone()
+ while content is not None:
+ data[content["ZCONTACTJID"]] = ChatStore(content["ZPARTNERNAME"])
+ content = c.fetchone()
# Get message history
c.execute("""SELECT count() FROM ZWAMESSAGE""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
- c.execute("""SELECT COALESCE(ZFROMJID, ZTOJID),
+ c.execute("""SELECT COALESCE(ZFROMJID, ZTOJID) as _id,
ZWAMESSAGE.Z_PK,
ZISFROMME,
ZMESSAGEDATE,
@@ -47,82 +48,79 @@ def messages(db, data):
i = 0
content = c.fetchone()
while content is not None:
- if content[0] not in data:
- data[content[0]] = {"name": None, "messages": {}}
- ts = APPLE_TIME + content[3]
- data[content[0]]["messages"][content[1]] = {
- "from_me": bool(content[2]),
- "timestamp": ts,
- "time": datetime.fromtimestamp(ts).strftime("%H:%M"),
- "media": False,
- "reply": None,
- "caption": None,
- "meta": False,
- "data": None,
- "key_id": content["ZSTANZAID"][:17]
- }
- if "-" in content[0] and content[2] == 0:
+ _id = content["_id"]
+ Z_PK = content["Z_PK"]
+ if _id not in data:
+ data[_id] = ChatStore()
+ ts = APPLE_TIME + content["ZMESSAGEDATE"]
+ data[_id].add_message(Z_PK, Message(
+ from_me=content["ZISFROMME"],
+ timestamp=ts,
+ time=ts, # Could be bug
+ key_id=content["ZSTANZAID"][:17],
+ ))
+ if "-" in _id and content["ZISFROMME"] == 0:
name = None
- if content[6] is not None:
- if content[6] in data:
- name = data[content[6]]["name"]
- if "@" in content[6]:
- fallback = content[6].split('@')[0]
+ if content["ZMEMBERJID"] is not None:
+ if content["ZMEMBERJID"] in data:
+ name = data[content["ZMEMBERJID"]].name
+ if "@" in content["ZMEMBERJID"]:
+ fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
- data[content[0]]["messages"][content[1]]["sender"] = name or fallback
+ data[_id].messages[Z_PK].sender = name or fallback
else:
- data[content[0]]["messages"][content[1]]["sender"] = None
- if content[5] == 6:
+ data[_id].messages[Z_PK].sender = None
+ if content["ZMESSAGETYPE"] == 6:
# Metadata
- if "-" in content[0]:
+ if "-" in _id:
# Group
- if content[4] is not None:
+ if content["ZTEXT"] is not None:
# Chnaged name
try:
- int(content[4])
+ int(content["ZTEXT"])
except ValueError:
- msg = f"The group name changed to {content[4]}"
- data[content[0]]["messages"][content[1]]["data"] = msg
- data[content[0]]["messages"][content[1]]["meta"] = True
+ msg = f"The group name changed to {content['ZTEXT']}"
+ data[_id].messages[Z_PK].data = msg
+ data[_id].messages[Z_PK].meta = True
else:
- del data[content[0]]["messages"][content[1]]
+ del data[_id].messages[Z_PK]
else:
- data[content[0]]["messages"][content[1]]["data"] = None
+ data[_id].messages[Z_PK].data = None
else:
- data[content[0]]["messages"][content[1]]["data"] = None
+ data[_id].messages[Z_PK].data = None
else:
# real message
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14"):
quoted = content["ZMETADATA"][2:19]
- data[content[0]]["messages"][content[1]]["reply"] = quoted.decode()
- data[content[0]]["messages"][content[1]]["quoted_data"] = None # TODO
+ data[_id].messages[Z_PK].reply = quoted.decode()
+ data[_id].messages[Z_PK].quoted_data = None # TODO
- if content[2] == 1:
- if content[5] == 14:
+ if content["ZISFROMME"] == 1:
+ if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
- data[content[0]]["messages"][content[1]]["meta"] = True
+ data[_id].messages[Z_PK].meta = True
else:
- msg = content[4]
+ msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "
")
if "\n" in msg:
msg = msg.replace("\n", "
")
else:
- if content[5] == 14:
+ if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
- data[content[0]]["messages"][content[1]]["meta"] = True
+ data[_id].messages[Z_PK].meta = True
else:
- msg = content[4]
+ msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "
")
if "\n" in msg:
msg = msg.replace("\n", "
")
- data[content[0]]["messages"][content[1]]["data"] = msg
+ data[_id].messages[Z_PK].data = msg
i += 1
if i % 1000 == 0:
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
@@ -153,34 +151,36 @@ def media(db, data, media_folder):
content = c.fetchone()
mime = MimeTypes()
while content is not None:
- file_path = f"{media_folder}/{content[2]}"
- data[content[0]]["messages"][content[1]]["media"] = True
+ file_path = f"{media_folder}/{content['ZMEDIALOCALPATH']}"
+ _id = content["_id"]
+ ZMESSAGE = content["ZMESSAGE"]
+ data[_id].messages[ZMESSAGE].media = True
if os.path.isfile(file_path):
- data[content[0]]["messages"][content[1]]["data"] = file_path
- if content[4] is None:
+ data[_id].messages[ZMESSAGE].data = file_path
+ if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
- data[content[0]]["messages"][content[1]]["mime"] = guess
+ data[_id].messages[ZMESSAGE].mime = guess
else:
- data[content[0]]["messages"][content[1]]["mime"] = "data/data"
+ data[_id].messages[ZMESSAGE].mime = "data/data"
else:
- data[content[0]]["messages"][content[1]]["mime"] = content[4]
+ data[_id].messages[ZMESSAGE].mime = content["ZVCARDSTRING"]
else:
- # if "https://mmg" in content[4]:
+ # if "https://mmg" in content["ZVCARDSTRING"]:
# try:
- # r = requests.get(content[3])
+ # r = requests.get(content["ZMEDIAURL"])
# if r.status_code != 200:
# raise RuntimeError()
# except:
- # data[content[0]]["messages"][content[1]]["data"] = "{The media is missing}"
- # data[content[0]]["messages"][content[1]]["mime"] = "media"
+ # data[_id].messages[ZMESSAGE].data"] = "{The media is missing}"
+ # data[_id].messages[ZMESSAGE].mime"] = "media"
# else:
- data[content[0]]["messages"][content[1]]["data"] = "The media is missing"
- data[content[0]]["messages"][content[1]]["mime"] = "media"
- data[content[0]]["messages"][content[1]]["meta"] = True
- if content[6] is not None:
- data[content[0]]["messages"][content[1]]["caption"] = content[6]
+ data[_id].messages[ZMESSAGE].data = "The media is missing"
+ data[_id].messages[ZMESSAGE].mime = "media"
+ data[_id].messages[ZMESSAGE].meta = True
+ if content["ZTITLE"] is not None:
+ data[_id].messages[ZMESSAGE].caption = content["ZTITLE"]
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
@@ -202,25 +202,27 @@ def vcard(db, data):
ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK""")
- rows = c.fetchall()
- total_row_number = len(rows)
+ contents = c.fetchall()
+ total_row_number = len(contents)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
base = "Message/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
- for index, row in enumerate(rows):
- file_name = "".join(x for x in row[3] if x.isalnum())
+ for index, content in enumerate(contents):
+ file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum())
file_name = file_name.encode('utf-8')[:251].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
- f.write(row[4])
- data[row[2]]["messages"][row[1]]["data"] = row[3] + \
+ f.write(content["ZVCARDSTRING"])
+ _id = content["_id"]
+ ZMESSAGE = content["ZMESSAGE"]
+ data[_id].messages[ZMESSAGE].data = content["ZVCARDNAME"] + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
- data[row[2]]["messages"][row[1]]["mime"] = "text/x-vcard"
- data[row[2]]["messages"][row[1]]["media"] = True
- data[row[2]]["messages"][row[1]]["meta"] = True
+ data[_id].messages[ZMESSAGE].mime = "text/x-vcard"
+ data[_id].messages[ZMESSAGE].media = True
+ data[_id].messages[ZMESSAGE].meta = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
@@ -256,7 +258,7 @@ def create_html(data, output_folder, template=None, embedded=False, offline_stat
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
- if len(data[contact]["messages"]) == 0:
+ if len(data[contact].messages) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
@@ -264,11 +266,11 @@ def create_html(data, output_folder, template=None, embedded=False, offline_stat
else:
file_name = phone_number
- if data[contact]["name"] is not None:
+ if data[contact].name is not None:
if file_name != "":
file_name += "-"
- file_name += data[contact]["name"].replace("/", "-")
- name = data[contact]["name"]
+ file_name += data[contact].name.replace("/", "-")
+ name = data[contact].name
else:
name = phone_number
@@ -278,7 +280,7 @@ def create_html(data, output_folder, template=None, embedded=False, offline_stat
f.write(
template.render(
name=name,
- msgs=data[contact]["messages"].values(),
+ msgs=data[contact].messages.values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css