diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py
index 3047c69..5133d6c 100644
--- a/Whatsapp_Chat_Exporter/android_handler.py
+++ b/Whatsapp_Chat_Exporter/android_handler.py
@@ -16,10 +16,21 @@ from Whatsapp_Chat_Exporter.utility import get_chat_condition, slugify, bytes_to
def contacts(db, data, enrich_from_vcards):
- # Get contacts
+ """
+ Process WhatsApp contacts from the database.
+
+ Args:
+ db: Database connection
+ data: Data store object
+ enrich_from_vcards: Path to vCard file for contact enrichment
+
+ Returns:
+ bool: False if no contacts found, True otherwise
+ """
c = db.cursor()
- c.execute("""SELECT count() FROM wa_contacts""")
+ c.execute("SELECT count() FROM wa_contacts")
total_row_number = c.fetchone()[0]
+
if total_row_number == 0:
if enrich_from_vcards is not None:
print("No contacts profiles found in the default database, contacts will be imported from the specified vCard file.")
@@ -29,33 +40,89 @@ def contacts(db, data, enrich_from_vcards):
else:
print(f"Processing contacts...({total_row_number})")
- c.execute("""SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts; """)
+ c.execute("SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts;")
row = c.fetchone()
while row is not None:
current_chat = data.add_chat(row["jid"], ChatStore(Device.ANDROID, row["display_name"]))
if row["status"] is not None:
current_chat.status = row["status"]
row = c.fetchone()
+
+ return True
def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat, filter_empty):
- # Get message history
+ """
+ Process WhatsApp messages from the database.
+
+ Args:
+ db: Database connection
+ data: Data store object
+ media_folder: Folder containing media files
+ timezone_offset: Timezone offset
+ filter_date: Date filter condition
+ filter_chat: Chat filter conditions
+ filter_empty: Filter for empty chats
+ """
c = db.cursor()
+ total_row_number = _get_message_count(c, filter_empty, filter_date, filter_chat)
+ print(f"Processing messages...(0/{total_row_number})", end="\r")
+
try:
- c.execute(f"""SELECT count()
+ content_cursor = _get_messages_cursor_legacy(c, filter_empty, filter_date, filter_chat)
+ table_message = False
+ except sqlite3.OperationalError:
+ try:
+ content_cursor = _get_messages_cursor_new(c, filter_empty, filter_date, filter_chat)
+ table_message = True
+ except Exception as e:
+ raise e
+
+ i = 0
+ # Fetch the first row safely
+ content = _fetch_row_safely(content_cursor)
+
+ while content is not None:
+ _process_single_message(data, content, table_message, timezone_offset)
+
+ i += 1
+ if i % 1000 == 0:
+ print(f"Processing messages...({i}/{total_row_number})", end="\r")
+
+ # Fetch the next row safely
+ content = _fetch_row_safely(content_cursor)
+
+ print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
+
+
+# Helper functions for message processing
+
+def _get_message_count(cursor, filter_empty, filter_date, filter_chat):
+ """Get the total number of messages to process."""
+ try:
+ empty_filter = get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push")
+ date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android")
+
+ cursor.execute(f"""SELECT count()
FROM messages
INNER JOIN jid
ON messages.key_remote_jid = jid.raw_string
LEFT JOIN chat
ON chat.jid_row_id = jid._id
WHERE 1=1
- {get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push")}
- {f'AND timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid", "android")}""")
-
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}""")
except sqlite3.OperationalError:
- c.execute(f"""SELECT count()
+ empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
+ date_filter = f'AND timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
+
+ cursor.execute(f"""SELECT count()
FROM message
LEFT JOIN chat
ON chat._id = message.chat_row_id
@@ -64,15 +131,21 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE 1=1
- {get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")}
- {f'AND timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")}""")
- total_row_number = c.fetchone()[0]
- print(f"Processing messages...(0/{total_row_number})", end="\r")
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}""")
+ return cursor.fetchone()[0]
- try:
- c.execute(f"""SELECT messages.key_remote_jid,
+
+def _get_messages_cursor_legacy(cursor, filter_empty, filter_date, filter_chat):
+ """Get cursor for legacy database schema."""
+ empty_filter = get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push")
+ date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android")
+
+ cursor.execute(f"""SELECT messages.key_remote_jid,
messages._id,
messages.key_from_me,
messages.timestamp,
@@ -100,8 +173,8 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
FROM messages
LEFT JOIN messages_quotes
ON messages.quoted_row_id = messages_quotes._id
- LEFT JOIN missed_call_logs
- ON messages._id = missed_call_logs.message_row_id
+ LEFT JOIN missed_call_logs
+ ON messages._id = missed_call_logs.message_row_id
INNER JOIN jid jid_global
ON messages.key_remote_jid = jid_global.raw_string
LEFT JOIN chat
@@ -119,16 +192,23 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
LEFT JOIN receipt_user
ON receipt_user.message_row_id = messages._id
WHERE messages.key_remote_jid <> '-1'
- {get_cond_for_empty(filter_empty, "messages.key_remote_jid", "messages.needs_push")}
- {f'AND messages.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android")}
- {get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "messages.remote_resource"], "jid_global", "android")}
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}
GROUP BY messages._id
- ORDER BY messages.timestamp ASC;"""
- )
- except sqlite3.OperationalError:
- try:
- c.execute(f"""SELECT jid_global.raw_string as key_remote_jid,
+ ORDER BY messages.timestamp ASC;""")
+ return cursor
+
+
+def _get_messages_cursor_new(cursor, filter_empty, filter_date, filter_chat):
+ """Get cursor for new database schema."""
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
+ date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")
+
+ cursor.execute(f"""SELECT jid_global.raw_string as key_remote_jid,
message._id,
message.from_me as key_from_me,
message.timestamp,
@@ -145,7 +225,7 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
message.message_type as media_wa_type,
jid_group.raw_string as group_sender_jid,
chat.subject as chat_subject,
- missed_call_logs.video_call,
+ missed_call_logs.video_call,
message.sender_jid_row_id,
message_system.action_type,
message_system_group.is_me_joined,
@@ -171,8 +251,8 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
- LEFT JOIN missed_call_logs
- ON message._id = missed_call_logs.message_row_id
+ LEFT JOIN missed_call_logs
+ ON message._id = missed_call_logs.message_row_id
LEFT JOIN message_system
ON message_system.message_row_id = message._id
LEFT JOIN message_system_group
@@ -186,184 +266,259 @@ def messages(db, data, media_folder, timezone_offset, filter_date, filter_chat,
LEFT JOIN receipt_user
ON receipt_user.message_row_id = message._id
WHERE key_remote_jid <> '-1'
- {get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")}
- {f'AND message.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")}
- {get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid_global", "android")}
- GROUP BY message._id;"""
- )
- except Exception as e:
- raise e
- else:
- table_message = True
- else:
- table_message = False
- i = 0
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}
+ GROUP BY message._id;""")
+ return cursor
+
+
+def _fetch_row_safely(cursor):
+ """Safely fetch a row from cursor, handling operational errors."""
while True:
try:
- content = c.fetchone()
+ content = cursor.fetchone()
+ return content
except sqlite3.OperationalError:
continue
- else:
- break
- while content is not None:
- if not data.get_chat(content["key_remote_jid"]):
- current_chat = data.add_chat(content["key_remote_jid"], ChatStore(Device.ANDROID, content["chat_subject"]))
- else:
- current_chat = data.get_chat(content["key_remote_jid"])
- if content["key_remote_jid"] is None:
- continue # Not sure
- if "sender_jid_row_id" in content:
- sender_jid_row_id = content["sender_jid_row_id"]
- else:
- sender_jid_row_id = None
- message = Message(
- from_me=not sender_jid_row_id and content["key_from_me"],
- timestamp=content["timestamp"],
- time=content["timestamp"],
- key_id=content["key_id"],
- timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
- message_type=content["media_wa_type"],
- received_timestamp=content["received_timestamp"],
- read_timestamp=content["read_timestamp"]
- )
- if isinstance(content["data"], bytes):
- message.data = ("The message is binary data and its base64 is "
- '""")
- message.data += b64encode(content["data"]).decode("utf-8") + ""
- message.safe = message.meta = True
- current_chat.add_message(content["_id"], message)
- i += 1
- content = c.fetchone()
- continue
- if content["jid_type"] == JidType.GROUP and content["key_from_me"] == 0:
- name = fallback = None
- if table_message:
- if content["sender_jid_row_id"] > 0:
- _jid = content["group_sender_jid"]
- if _jid in data:
- name = data.get_chat(_jid).name
- if "@" in _jid:
- fallback = _jid.split('@')[0]
- else:
- if content["remote_resource"] is not None:
- if content["remote_resource"] in data:
- name = data.get_chat(content["remote_resource"]).name
- if "@" in content["remote_resource"]:
- fallback = content["remote_resource"].split('@')[0]
- message.sender = name or fallback
- else:
- message.sender = None
-
- if content["quoted"] is not None:
- message.reply = content["quoted"]
- if content["quoted_data"] is not None and len(content["quoted_data"]) > 200:
- message.quoted_data = content["quoted_data"][:201] + "..."
- else:
- message.quoted_data = content["quoted_data"]
- else:
- message.reply = None
-
- if not table_message and content["media_caption"] is not None:
- # Old schema
- message.caption = content["media_caption"]
- elif table_message and content["media_wa_type"] == 1 and content["data"] is not None:
- # New schema
- message.caption = content["data"]
- else:
- message.caption = None
-
- if content["status"] == 6: # 6 = Metadata, otherwise assume a message
- message.meta = True
- name = fallback = None
- if table_message:
- if content["sender_jid_row_id"] > 0:
- _jid = content["group_sender_jid"]
- if _jid in data:
- name = data.get_chat(_jid).name
- if "@" in _jid:
- fallback = _jid.split('@')[0]
- else:
- name = "You"
- else:
- _jid = content["remote_resource"]
- if _jid is not None:
- if _jid in data:
- name = data.get_chat(_jid).name
- if "@" in _jid:
- fallback = _jid.split('@')[0]
- else:
- name = "You"
- message.data = determine_metadata(content, name or fallback)
- if isinstance(message.data, str) and "
" in message.data:
- message.safe = True
- if message.data is None:
- if content["video_call"] is not None: # Missed call
- message.meta = True
- if content["video_call"] == 1:
- message.data = "A video call was missed"
- elif content["video_call"] == 0:
- message.data = "A voice call was missed"
- elif content["data"] is None and content["thumb_image"] is None:
- message.meta = True
- message.data = None
- else:
- # Real message
- message.sticker = content["media_wa_type"] == 20 # Sticker is a message
- if content["key_from_me"] == 1:
- if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
- msg = "Message deleted"
- message.meta = True
- else:
- if content["media_wa_type"] == 5:
- msg = f"Location shared: {content['latitude'], content['longitude']}"
- message.meta = True
- else:
- msg = content["data"]
- if msg is not None:
- if "\r\n" in msg:
- msg = msg.replace("\r\n", "
")
- if "\n" in msg:
- msg = msg.replace("\n", "
")
- else:
- if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
- msg = "Message deleted"
- message.meta = True
- else:
- if content["media_wa_type"] == 5:
- msg = f"Location shared: {content['latitude'], content['longitude']}"
- message.meta = True
- else:
- msg = content["data"]
- if msg is not None:
- if "\r\n" in msg:
- msg = msg.replace("\r\n", "
")
- if "\n" in msg:
- msg = msg.replace("\n", "
")
- message.data = msg
+def _process_single_message(data, content, table_message, timezone_offset):
+ """Process a single message row."""
+ if content["key_remote_jid"] is None:
+ return
+
+ # Get or create the chat
+ if not data.get_chat(content["key_remote_jid"]):
+ current_chat = data.add_chat(content["key_remote_jid"], ChatStore(Device.ANDROID, content["chat_subject"]))
+ else:
+ current_chat = data.get_chat(content["key_remote_jid"])
+
+ # Determine sender_jid_row_id
+ if "sender_jid_row_id" in content:
+ sender_jid_row_id = content["sender_jid_row_id"]
+ else:
+ sender_jid_row_id = None
+
+ # Create message object
+ message = Message(
+ from_me=not sender_jid_row_id and content["key_from_me"],
+ timestamp=content["timestamp"],
+ time=content["timestamp"],
+ key_id=content["key_id"],
+ timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
+ message_type=content["media_wa_type"],
+ received_timestamp=content["received_timestamp"],
+ read_timestamp=content["read_timestamp"]
+ )
+
+ # Handle binary data
+ if isinstance(content["data"], bytes):
+ _process_binary_message(message, content)
current_chat.add_message(content["_id"], message)
- i += 1
- if i % 1000 == 0:
- print(f"Processing messages...({i}/{total_row_number})", end="\r")
- while True:
- try:
- content = c.fetchone()
- except sqlite3.OperationalError:
- continue
+ return
+
+ # Set sender for group chats
+ if content["jid_type"] == JidType.GROUP and content["key_from_me"] == 0:
+ _set_group_sender(message, content, data, table_message)
+ else:
+ message.sender = None
+
+ # Handle quoted messages
+ if content["quoted"] is not None:
+ message.reply = content["quoted"]
+ if content["quoted_data"] is not None and len(content["quoted_data"]) > 200:
+ message.quoted_data = content["quoted_data"][:201] + "..."
+ else:
+ message.quoted_data = content["quoted_data"]
+ else:
+ message.reply = None
+
+ # Handle message caption
+ if not table_message and content["media_caption"] is not None:
+ # Old schema
+ message.caption = content["media_caption"]
+ elif table_message and content["media_wa_type"] == 1 and content["data"] is not None:
+ # New schema
+ message.caption = content["data"]
+ else:
+ message.caption = None
+
+ # Handle message content based on status
+ if content["status"] == 6: # 6 = Metadata
+ _process_metadata_message(message, content, data, table_message)
+ else:
+ # Real message
+ _process_regular_message(message, content, table_message)
+
+ current_chat.add_message(content["_id"], message)
+
+
+def _process_binary_message(message, content):
+ """Process binary message data."""
+ message.data = ("The message is binary data and its base64 is "
+ '""")
+ message.data += b64encode(content["data"]).decode("utf-8") + ""
+ message.safe = message.meta = True
+
+
+def _set_group_sender(message, content, data, table_message):
+ """Set sender name for group messages."""
+ name = fallback = None
+ if table_message:
+ if content["sender_jid_row_id"] > 0:
+ _jid = content["group_sender_jid"]
+ if _jid in data:
+ name = data.get_chat(_jid).name
+ if "@" in _jid:
+ fallback = _jid.split('@')[0]
+ else:
+ if content["remote_resource"] is not None:
+ if content["remote_resource"] in data:
+ name = data.get_chat(content["remote_resource"]).name
+ if "@" in content["remote_resource"]:
+ fallback = content["remote_resource"].split('@')[0]
+
+ message.sender = name or fallback
+
+
+def _process_metadata_message(message, content, data, table_message):
+ """Process metadata message."""
+ message.meta = True
+ name = fallback = None
+
+ if table_message:
+ if content["sender_jid_row_id"] > 0:
+ _jid = content["group_sender_jid"]
+ if _jid in data:
+ name = data.get_chat(_jid).name
+ if "@" in _jid:
+ fallback = _jid.split('@')[0]
+ else:
+ name = "You"
+ else:
+ _jid = content["remote_resource"]
+ if _jid is not None:
+ if _jid in data:
+ name = data.get_chat(_jid).name
+ if "@" in _jid:
+ fallback = _jid.split('@')[0]
+ else:
+ name = "You"
+
+ message.data = determine_metadata(content, name or fallback)
+
+ if isinstance(message.data, str) and "
" in message.data:
+ message.safe = True
+
+ if message.data is None:
+ if content["video_call"] is not None: # Missed call
+ message.meta = True
+ if content["video_call"] == 1:
+ message.data = "A video call was missed"
+ elif content["video_call"] == 0:
+ message.data = "A voice call was missed"
+ elif content["data"] is None and content["thumb_image"] is None:
+ message.meta = True
+ message.data = None
+
+
+def _process_regular_message(message, content, table_message):
+ """Process regular (non-metadata) message."""
+ message.sticker = content["media_wa_type"] == 20 # Sticker is a message
+
+ if content["key_from_me"] == 1:
+ if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
+ msg = "Message deleted"
+ message.meta = True
+ else:
+ if content["media_wa_type"] == 5:
+ msg = f"Location shared: {content['latitude'], content['longitude']}"
+ message.meta = True
else:
- break
- print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
+ msg = content["data"]
+ if msg is not None:
+ msg = _format_message_text(msg)
+ else:
+ if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
+ msg = "Message deleted"
+ message.meta = True
+ else:
+ if content["media_wa_type"] == 5:
+ msg = f"Location shared: {content['latitude'], content['longitude']}"
+ message.meta = True
+ else:
+ msg = content["data"]
+ if msg is not None:
+ msg = _format_message_text(msg)
+
+ message.data = msg
+
+
+def _format_message_text(text):
+ """Format message text, replacing newlines with HTML breaks."""
+ if "\r\n" in text:
+ text = text.replace("\r\n", "
")
+ if "\n" in text:
+ text = text.replace("\n", "
")
+ return text
def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True):
- # Get media
+ """
+ Process WhatsApp media files from the database.
+
+ Args:
+ db: Database connection
+ data: Data store object
+ media_folder: Folder containing media files
+ filter_date: Date filter condition
+ filter_chat: Chat filter conditions
+ filter_empty: Filter for empty chats
+ separate_media: Whether to separate media files by chat
+ """
c = db.cursor()
+ total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat)
+ print(f"\nProcessing media...(0/{total_row_number})", end="\r")
+
try:
- c.execute(f"""SELECT count()
+ content_cursor = _get_media_cursor_legacy(c, filter_empty, filter_date, filter_chat)
+ except sqlite3.OperationalError:
+ content_cursor = _get_media_cursor_new(c, filter_empty, filter_date, filter_chat)
+
+ content = content_cursor.fetchone()
+ mime = MimeTypes()
+
+ # Ensure thumbnails directory exists
+ Path(f"{media_folder}/thumbnails").mkdir(parents=True, exist_ok=True)
+
+ i = 0
+ while content is not None:
+ _process_single_media(data, content, media_folder, mime, separate_media)
+
+ i += 1
+ if i % 100 == 0:
+ print(f"Processing media...({i}/{total_row_number})", end="\r")
+
+ content = content_cursor.fetchone()
+
+ print(f"Processing media...({total_row_number}/{total_row_number})", end="\r")
+
+
+# Helper functions for media processing
+
+def _get_media_count(cursor, filter_empty, filter_date, filter_chat):
+ """Get the total number of media files to process."""
+ try:
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push")
+ date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+
+ cursor.execute(f"""SELECT count()
FROM message_media
INNER JOIN messages
ON message_media.message_row_id = messages._id
@@ -372,12 +527,17 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
LEFT JOIN chat
ON chat.jid_row_id = jid._id
WHERE 1=1
- {get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push")}
- {f'AND messages.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}""")
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}""")
except sqlite3.OperationalError:
- c.execute(f"""SELECT count()
+ empty_filter = get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")
+ date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")
+
+ cursor.execute(f"""SELECT count()
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
@@ -388,40 +548,54 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE 1=1
- {get_cond_for_empty(filter_empty, "jid.raw_string", "broadcast")}
- {f'AND message.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["jid.raw_string", "jid_group.raw_string"], "jid", "android")}""")
- total_row_number = c.fetchone()[0]
- print(f"\nProcessing media...(0/{total_row_number})", end="\r")
- i = 0
- try:
- c.execute(f"""SELECT messages.key_remote_jid,
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}""")
+ return cursor.fetchone()[0]
+
+
+def _get_media_cursor_legacy(cursor, filter_empty, filter_date, filter_chat):
+ """Get cursor for legacy media database schema."""
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
+ date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+
+ cursor.execute(f"""SELECT messages.key_remote_jid,
message_row_id,
file_path,
message_url,
mime_type,
media_key,
file_hash,
- thumbnail
+ thumbnail
FROM message_media
INNER JOIN messages
ON message_media.message_row_id = messages._id
- LEFT JOIN media_hash_thumbnail
- ON message_media.file_hash = media_hash_thumbnail.media_hash
+ LEFT JOIN media_hash_thumbnail
+ ON message_media.file_hash = media_hash_thumbnail.media_hash
INNER JOIN jid
ON messages.key_remote_jid = jid.raw_string
LEFT JOIN chat
ON chat.jid_row_id = jid._id
WHERE jid.type <> 7
- {get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")}
- {f'AND messages.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}
- ORDER BY messages.key_remote_jid ASC"""
- )
- except sqlite3.OperationalError:
- c.execute(f"""SELECT jid.raw_string as key_remote_jid,
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}
+ ORDER BY messages.key_remote_jid ASC""")
+ return cursor
+
+
+def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat):
+ """Get cursor for new media database schema."""
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
+ date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
+ include_filter = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
+ exclude_filter = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
+
+ cursor.execute(f"""SELECT jid.raw_string as key_remote_jid,
message_row_id,
file_path,
message_url,
@@ -437,207 +611,297 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa
INNER JOIN jid
ON jid._id = chat.jid_row_id
LEFT JOIN media_hash_thumbnail
- ON message_media.file_hash = media_hash_thumbnail.media_hash
+ ON message_media.file_hash = media_hash_thumbnail.media_hash
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE jid.type <> 7
- {get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")}
- {f'AND message.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")}
- ORDER BY jid.raw_string ASC"""
- )
- content = c.fetchone()
- mime = MimeTypes()
- if not os.path.isdir(f"{media_folder}/thumbnails"):
- Path(f"{media_folder}/thumbnails").mkdir(parents=True, exist_ok=True)
- while content is not None:
- file_path = f"{media_folder}/{content['file_path']}"
- current_chat = data.get_chat(content["key_remote_jid"])
- message = current_chat.get_message(content["message_row_id"])
- message.media = True
- if os.path.isfile(file_path):
- message.data = file_path
- if content["mime_type"] is None:
- guess = mime.guess_type(file_path)[0]
- if guess is not None:
- message.mime = guess
- else:
- message.mime = "application/octet-stream"
+ {empty_filter}
+ {date_filter}
+ {include_filter}
+ {exclude_filter}
+ ORDER BY jid.raw_string ASC""")
+ return cursor
+
+
+def _process_single_media(data, content, media_folder, mime, separate_media):
+ """Process a single media file."""
+ file_path = f"{media_folder}/{content['file_path']}"
+ current_chat = data.get_chat(content["key_remote_jid"])
+ message = current_chat.get_message(content["message_row_id"])
+ message.media = True
+
+ if os.path.isfile(file_path):
+ message.data = file_path
+
+ # Set mime type
+ if content["mime_type"] is None:
+ guess = mime.guess_type(file_path)[0]
+ if guess is not None:
+ message.mime = guess
else:
- message.mime = content["mime_type"]
- if separate_media:
- chat_display_name = slugify(current_chat.name or message.sender \
- or content["key_remote_jid"].split('@')[0], True)
- current_filename = file_path.split("/")[-1]
- new_folder = os.path.join(media_folder, "separated", chat_display_name)
- Path(new_folder).mkdir(parents=True, exist_ok=True)
- new_path = os.path.join(new_folder, current_filename)
- shutil.copy2(file_path, new_path)
- message.data = new_path
+ message.mime = "application/octet-stream"
else:
- message.data = "The media is missing"
- message.mime = "media"
- message.meta = True
- if content["thumbnail"] is not None:
- thumb_path = f"{media_folder}/thumbnails/{b64decode(content['file_hash']).hex()}.png"
- if not os.path.isfile(thumb_path):
- with open(thumb_path, "wb") as f:
- f.write(content["thumbnail"])
- message.thumb = thumb_path
- i += 1
- if i % 100 == 0:
- print(f"Processing media...({i}/{total_row_number})", end="\r")
- content = c.fetchone()
- print(
- f"Processing media...({total_row_number}/{total_row_number})", end="\r")
+ message.mime = content["mime_type"]
+
+ # Copy media to separate folder if needed
+ if separate_media:
+ chat_display_name = slugify(current_chat.name or message.sender
+ or content["key_remote_jid"].split('@')[0], True)
+ current_filename = file_path.split("/")[-1]
+ new_folder = os.path.join(media_folder, "separated", chat_display_name)
+ Path(new_folder).mkdir(parents=True, exist_ok=True)
+ new_path = os.path.join(new_folder, current_filename)
+ shutil.copy2(file_path, new_path)
+ message.data = new_path
+ else:
+ message.data = "The media is missing"
+ message.mime = "media"
+ message.meta = True
+
+ # Handle thumbnail
+ if content["thumbnail"] is not None:
+ thumb_path = f"{media_folder}/thumbnails/{b64decode(content['file_hash']).hex()}.png"
+ if not os.path.isfile(thumb_path):
+ with open(thumb_path, "wb") as f:
+ f.write(content["thumbnail"])
+ message.thumb = thumb_path
def vcard(db, data, media_folder, filter_date, filter_chat, filter_empty):
+ """Process vCard data from WhatsApp database and save to files."""
c = db.cursor()
try:
- c.execute(f"""SELECT message_row_id,
- messages.key_remote_jid,
- vcard,
- messages.media_name
- FROM messages_vcards
- INNER JOIN messages
- ON messages_vcards.message_row_id = messages._id
- INNER JOIN jid
- ON messages.key_remote_jid = jid.raw_string
- LEFT JOIN chat
- ON chat.jid_row_id = jid._id
- WHERE 1=1
- {get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push")}
- {f'AND messages.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")}
- ORDER BY messages.key_remote_jid ASC;"""
- )
+ rows = _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty)
except sqlite3.OperationalError:
- c.execute(f"""SELECT message_row_id,
- jid.raw_string as key_remote_jid,
- vcard,
- message.text_data as media_name
- FROM message_vcard
- INNER JOIN message
- ON message_vcard.message_row_id = message._id
- LEFT JOIN chat
- ON chat._id = message.chat_row_id
- INNER JOIN jid
- ON jid._id = chat.jid_row_id
- LEFT JOIN jid jid_group
- ON jid_group._id = message.sender_jid_row_id
- WHERE 1=1
- {get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")}
- {f'AND message.timestamp {filter_date}' if filter_date is not None else ''}
- {get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")}
- {get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")}
- ORDER BY message.chat_row_id ASC;"""
- )
+ rows = _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty)
- rows = c.fetchall()
total_row_number = len(rows)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
- path = f"{media_folder}/vCards"
- if not os.path.isdir(path):
- Path(path).mkdir(parents=True, exist_ok=True)
+
+ # Create vCards directory if it doesn't exist
+ path = os.path.join(media_folder, "vCards")
+ Path(path).mkdir(parents=True, exist_ok=True)
+
for index, row in enumerate(rows):
- media_name = row["media_name"] if row["media_name"] is not None else "Undefined vCard File"
- file_name = "".join(x for x in media_name if x.isalnum())
- file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
- file_path = os.path.join(path, f"{file_name}.vcf")
- if not os.path.isfile(file_path):
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(row["vcard"])
- message = data.get_chat(row["key_remote_jid"]).get_message(row["message_row_id"])
- message.data = "This media include the following vCard file(s):
" \
- f'{htmle(media_name)}'
- message.mime = "text/x-vcard"
- message.meta = True
- message.safe = True
+ _process_vcard_row(row, path, data)
print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r")
+def _execute_vcard_query_modern(c, filter_date, filter_chat, filter_empty):
+ """Execute vCard query for modern WhatsApp database schema."""
+
+ # Build the filter conditions
+ chat_filter_include = get_chat_condition(filter_chat[0], True, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+ chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["messages.key_remote_jid", "remote_resource"], "jid", "android")
+ date_filter = f'AND messages.timestamp {filter_date}' if filter_date is not None else ''
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "messages.needs_push")
+
+ query = f"""SELECT message_row_id,
+ messages.key_remote_jid,
+ vcard,
+ messages.media_name
+ FROM messages_vcards
+ INNER JOIN messages
+ ON messages_vcards.message_row_id = messages._id
+ INNER JOIN jid
+ ON messages.key_remote_jid = jid.raw_string
+ LEFT JOIN chat
+ ON chat.jid_row_id = jid._id
+ WHERE 1=1
+ {empty_filter}
+ {date_filter}
+ {chat_filter_include}
+ {chat_filter_exclude}
+ ORDER BY messages.key_remote_jid ASC;"""
+ c.execute(query)
+ return c.fetchall()
+
+
+def _execute_vcard_query_legacy(c, filter_date, filter_chat, filter_empty):
+ """Execute vCard query for legacy WhatsApp database schema."""
+
+ # Build the filter conditions
+ chat_filter_include = get_chat_condition(filter_chat[0], True, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
+ chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["key_remote_jid", "jid_group.raw_string"], "jid", "android")
+ date_filter = f'AND message.timestamp {filter_date}' if filter_date is not None else ''
+ empty_filter = get_cond_for_empty(filter_empty, "key_remote_jid", "broadcast")
+
+ query = f"""SELECT message_row_id,
+ jid.raw_string as key_remote_jid,
+ vcard,
+ message.text_data as media_name
+ FROM message_vcard
+ INNER JOIN message
+ ON message_vcard.message_row_id = message._id
+ LEFT JOIN chat
+ ON chat._id = message.chat_row_id
+ INNER JOIN jid
+ ON jid._id = chat.jid_row_id
+ LEFT JOIN jid jid_group
+ ON jid_group._id = message.sender_jid_row_id
+ WHERE 1=1
+ {empty_filter}
+ {date_filter}
+ {chat_filter_include}
+ {chat_filter_exclude}
+ ORDER BY message.chat_row_id ASC;"""
+ c.execute(query)
+ return c.fetchall()
+
+
+def _process_vcard_row(row, path, data):
+ """Process a single vCard row and save to file."""
+ media_name = row["media_name"] if row["media_name"] is not None else "Undefined vCard File"
+ file_name = "".join(x for x in media_name if x.isalnum())
+ file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
+ file_path = os.path.join(path, f"{file_name}.vcf")
+
+ if not os.path.isfile(file_path):
+ with open(file_path, "w", encoding="utf-8") as f:
+ f.write(row["vcard"])
+
+ message = data.get_chat(row["key_remote_jid"]).get_message(row["message_row_id"])
+ message.data = "This media include the following vCard file(s):
" \
+ f'{htmle(media_name)}'
+ message.mime = "text/x-vcard"
+ message.meta = True
+ message.safe = True
+
+
def calls(db, data, timezone_offset, filter_chat):
+ """Process call logs from WhatsApp database."""
c = db.cursor()
- c.execute(f"""SELECT count()
- FROM call_log
- INNER JOIN jid
- ON call_log.jid_row_id = jid._id
- LEFT JOIN chat
- ON call_log.jid_row_id = chat.jid_row_id
- WHERE 1=1
- {get_chat_condition(filter_chat[0], True, ["jid.raw_string"])}
- {get_chat_condition(filter_chat[1], False, ["jid.raw_string"])}""")
- total_row_number = c.fetchone()[0]
+
+ # Check if there are any calls that match the filter
+ total_row_number = _get_calls_count(c, filter_chat)
if total_row_number == 0:
return
+
print(f"\nProcessing calls...({total_row_number})", end="\r")
- c.execute(f"""SELECT call_log._id,
- jid.raw_string,
- from_me,
- call_id,
- timestamp,
- video_call,
- duration,
- call_result,
- bytes_transferred,
- chat.subject as chat_subject
- FROM call_log
- INNER JOIN jid
- ON call_log.jid_row_id = jid._id
- LEFT JOIN chat
- ON call_log.jid_row_id = chat.jid_row_id
- WHERE 1=1
- {get_chat_condition(filter_chat[0], True, ["jid.raw_string"])}
- {get_chat_condition(filter_chat[1], False, ["jid.raw_string"])}"""
- )
+
+ # Fetch call data
+ calls_data = _fetch_calls_data(c, filter_chat)
+
+ # Create a chat store for all calls
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
- content = c.fetchone()
+
+ # Process each call
+ content = calls_data.fetchone()
while content is not None:
- call = Message(
- from_me=content["from_me"],
- timestamp=content["timestamp"],
- time=content["timestamp"],
- key_id=content["call_id"],
- timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
- received_timestamp=None, # TODO: Add timestamp
- read_timestamp=None # TODO: Add timestamp
- )
- _jid = content["raw_string"]
- name = data.get_chat(_jid).name if _jid in data else content["chat_subject"] or None
- if _jid is not None and "@" in _jid:
- fallback = _jid.split('@')[0]
- else:
- fallback = None
- call.sender = name or fallback
- call.meta = True
- call.data = (
- f"A {'video' if content['video_call'] else 'voice'} "
- f"call {'to' if call.from_me else 'from'} "
- f"{call.sender} was "
- )
- if content['call_result'] in (0, 4, 7):
- call.data += "cancelled." if call.from_me else "missed."
- elif content['call_result'] == 2:
- call.data += "not answered." if call.from_me else "missed."
- elif content['call_result'] == 3:
- call.data += "unavailable."
- elif content['call_result'] == 5:
- call_time = convert_time_unit(content['duration'])
- call_bytes = bytes_to_readable(content['bytes_transferred'])
- call.data += (
- f"initiated and lasted for {call_time} "
- f"with {call_bytes} data transferred."
- )
- else:
- call.data += "in an unknown state."
- chat.add_message(content["_id"], call)
- content = c.fetchone()
+ _process_call_record(content, chat, data, timezone_offset)
+ content = calls_data.fetchone()
+
+ # Add the calls chat to the data
data.add_chat("000000000000000", chat)
+def _get_calls_count(c, filter_chat):
+ """Get the count of call records that match the filter."""
+
+ # Build the filter conditions
+ chat_filter_include = get_chat_condition(filter_chat[0], True, ["jid.raw_string"])
+ chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["jid.raw_string"])
+
+ query = f"""SELECT count()
+ FROM call_log
+ INNER JOIN jid
+ ON call_log.jid_row_id = jid._id
+ LEFT JOIN chat
+ ON call_log.jid_row_id = chat.jid_row_id
+ WHERE 1=1
+ {chat_filter_include}
+ {chat_filter_exclude}"""
+ c.execute(query)
+ return c.fetchone()[0]
+
+
+def _fetch_calls_data(c, filter_chat):
+ """Fetch call data from the database."""
+
+ # Build the filter conditions
+ chat_filter_include = get_chat_condition(filter_chat[0], True, ["jid.raw_string"])
+ chat_filter_exclude = get_chat_condition(filter_chat[1], False, ["jid.raw_string"])
+
+ query = f"""SELECT call_log._id,
+ jid.raw_string,
+ from_me,
+ call_id,
+ timestamp,
+ video_call,
+ duration,
+ call_result,
+ bytes_transferred,
+ chat.subject as chat_subject
+ FROM call_log
+ INNER JOIN jid
+ ON call_log.jid_row_id = jid._id
+ LEFT JOIN chat
+ ON call_log.jid_row_id = chat.jid_row_id
+ WHERE 1=1
+ {chat_filter_include}
+ {chat_filter_exclude}"""
+ c.execute(query)
+ return c
+
+
+def _process_call_record(content, chat, data, timezone_offset):
+ """Process a single call record and add it to the chat."""
+ call = Message(
+ from_me=content["from_me"],
+ timestamp=content["timestamp"],
+ time=content["timestamp"],
+ key_id=content["call_id"],
+ timezone_offset=timezone_offset if timezone_offset else CURRENT_TZ_OFFSET,
+ received_timestamp=None, # TODO: Add timestamp
+ read_timestamp=None # TODO: Add timestamp
+ )
+
+ # Get caller/callee name
+ _jid = content["raw_string"]
+ name = data.get_chat(_jid).name if _jid in data else content["chat_subject"] or None
+ if _jid is not None and "@" in _jid:
+ fallback = _jid.split('@')[0]
+ else:
+ fallback = None
+ call.sender = name or fallback
+
+ # Set metadata
+ call.meta = True
+
+ # Construct call description based on call type and result
+ call.data = _construct_call_description(content, call)
+
+ # Add call to chat
+ chat.add_message(content["_id"], call)
+
+
+def _construct_call_description(content, call):
+ """Construct a description of the call based on its type and result."""
+ description = (
+ f"A {'video' if content['video_call'] else 'voice'} "
+ f"call {'to' if call.from_me else 'from'} "
+ f"{call.sender} was "
+ )
+
+ if content['call_result'] in (0, 4, 7):
+ description += "cancelled." if call.from_me else "missed."
+ elif content['call_result'] == 2:
+ description += "not answered." if call.from_me else "missed."
+ elif content['call_result'] == 3:
+ description += "unavailable."
+ elif content['call_result'] == 5:
+ call_time = convert_time_unit(content['duration'])
+ call_bytes = bytes_to_readable(content['bytes_transferred'])
+ description += (
+ f"initiated and lasted for {call_time} "
+ f"with {call_bytes} data transferred."
+ )
+ else:
+ description += "in an unknown state."
+
+ return description
+
+
def create_html(
data,
output_folder,
@@ -649,11 +913,13 @@ def create_html(
experimental=False,
headline=None
):
+ """Generate HTML chat files from data."""
template = setup_template(template, no_avatar, experimental)
total_row_number = len(data)
print(f"\nGenerating chats...(0/{total_row_number})", end="\r")
+ # Create output directory if it doesn't exist
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
@@ -664,107 +930,168 @@ def create_html(
if len(current_chat) == 0:
# Skip empty chats
continue
+
safe_file_name, name = get_file_name(contact, current_chat)
if maximum_size is not None:
- current_size = 0
- current_page = 1
- render_box = []
- if maximum_size == 0:
- maximum_size = MAX_SIZE
- last_msg = current_chat.get_last_message().key_id
- for message in current_chat.values():
- if message.data is not None and not message.meta and not message.media:
- current_size += len(message.data) + ROW_SIZE
- else:
- current_size += ROW_SIZE + 100 # Assume media and meta HTML are 100 bytes
- if current_size > maximum_size:
- output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
- rendering(
- output_file_name,
- template,
- name,
- render_box,
- contact,
- w3css,
- current_chat,
- headline,
- next=f"{safe_file_name}-{current_page + 1}.html",
- previous=f"{safe_file_name}-{current_page - 1}.html" if current_page > 1 else False
- )
- render_box = [message]
- current_size = 0
- current_page += 1
- else:
- render_box.append(message)
- if message.key_id == last_msg:
- if current_page == 1:
- output_file_name = f"{output_folder}/{safe_file_name}.html"
- else:
- output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
- rendering(
- output_file_name,
- template,
- name,
- render_box,
- contact,
- w3css,
- current_chat,
- headline,
- False,
- previous=f"{safe_file_name}-{current_page - 1}.html"
- )
- else:
- output_file_name = f"{output_folder}/{safe_file_name}.html"
- rendering(
- output_file_name,
- template,
- name,
- current_chat.values(),
- contact,
- w3css,
- current_chat,
- headline,
- False
+ _generate_paginated_chat(
+ current_chat,
+ safe_file_name,
+ name,
+ contact,
+ output_folder,
+ template,
+ w3css,
+ maximum_size,
+ headline
)
+ else:
+ _generate_single_chat(
+ current_chat,
+ safe_file_name,
+ name,
+ contact,
+ output_folder,
+ template,
+ w3css,
+ headline
+ )
+
if current % 10 == 0:
print(f"Generating chats...({current}/{total_row_number})", end="\r")
print(f"Generating chats...({total_row_number}/{total_row_number})", end="\r")
+def _generate_single_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, headline):
+ """Generate a single HTML file for a chat."""
+ output_file_name = f"{output_folder}/{safe_file_name}.html"
+ rendering(
+ output_file_name,
+ template,
+ name,
+ current_chat.values(),
+ contact,
+ w3css,
+ current_chat,
+ headline,
+ False
+ )
+
+
+def _generate_paginated_chat(current_chat, safe_file_name, name, contact, output_folder, template, w3css, maximum_size, headline):
+ """Generate multiple HTML files for a chat when pagination is required."""
+ current_size = 0
+ current_page = 1
+ render_box = []
+
+ # Use default maximum size if set to 0
+ if maximum_size == 0:
+ maximum_size = MAX_SIZE
+
+ last_msg = current_chat.get_last_message().key_id
+
+ for message in current_chat.values():
+ # Calculate message size
+ if message.data is not None and not message.meta and not message.media:
+ current_size += len(message.data) + ROW_SIZE
+ else:
+ current_size += ROW_SIZE + 100 # Assume media and meta HTML are 100 bytes
+
+ if current_size > maximum_size:
+ # Create a new page
+ output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
+ rendering(
+ output_file_name,
+ template,
+ name,
+ render_box,
+ contact,
+ w3css,
+ current_chat,
+ headline,
+ next=f"{safe_file_name}-{current_page + 1}.html",
+ previous=f"{safe_file_name}-{current_page - 1}.html" if current_page > 1 else False
+ )
+ render_box = [message]
+ current_size = 0
+ current_page += 1
+ else:
+ render_box.append(message)
+ if message.key_id == last_msg:
+ # Last message, create final page
+ if current_page == 1:
+ output_file_name = f"{output_folder}/{safe_file_name}.html"
+ else:
+ output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
+ rendering(
+ output_file_name,
+ template,
+ name,
+ render_box,
+ contact,
+ w3css,
+ current_chat,
+ headline,
+ False,
+ previous=f"{safe_file_name}-{current_page - 1}.html"
+ )
+
+
def create_txt(data, output):
+ """Generate text files from chat data."""
os.makedirs(output, exist_ok=True)
+
for jik, chat in data.items():
if len(chat) == 0:
continue
+
+ # Determine file name
if chat.name is not None:
contact = chat.name.replace('/', '')
else:
contact = jik.replace('+', '')
+
output_file = os.path.join(output, f"{contact}.txt")
+
with open(output_file, "w", encoding="utf8") as f:
for message in chat.values():
- date = datetime.fromtimestamp(message.timestamp).date()
+ # Skip metadata in text format
if message.meta and message.mime != "media":
- continue # Skip any metadata in text format
- if message.from_me:
- name = "You"
- else:
- name = message.sender if message.sender else contact
- prefix = f"[{date} {message.time}] {name}: "
- prefix_length = len(prefix)
- if message.media and ("/" in message.mime or message.mime == "media"):
- if message.data == "The media is missing":
- message_text = ""
- else:
- message_text = f""
- else:
- if message.data is None:
- message_text = ""
- else:
- message_text = message.data.replace('
', f'\n{" " * prefix_length}')
- if message.caption is not None:
- message_text += "\n" + ' ' * len(prefix) + message.caption.replace('
', f'\n{" " * prefix_length}')
- f.write(f"{prefix}{message_text}\n")
+ continue
+
+ # Format the message
+ formatted_message = _format_message_for_txt(message, contact)
+ f.write(f"{formatted_message}\n")
+
+def _format_message_for_txt(message, contact):
+ """Format a message for text output."""
+ date = datetime.fromtimestamp(message.timestamp).date()
+
+ # Determine the sender name
+ if message.from_me:
+ name = "You"
+ else:
+ name = message.sender if message.sender else contact
+
+ prefix = f"[{date} {message.time}] {name}: "
+ prefix_length = len(prefix)
+
+ # Handle different message types
+ if message.media and ("/" in message.mime or message.mime == "media"):
+ if message.data == "The media is missing":
+ message_text = ""
+ else:
+ message_text = f""
+ else:
+ if message.data is None:
+ message_text = ""
+ else:
+ message_text = message.data.replace('
', f'\n{" " * prefix_length}')
+
+ # Add caption if present
+ if message.caption is not None:
+ message_text += "\n" + ' ' * len(prefix) + message.caption.replace('
', f'\n{" " * prefix_length}')
+
+ return f"{prefix}{message_text}"