From 0e802f455467f0c8bbe43aca5c9a8f21999f6ff3 Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Thu, 8 Jun 2023 16:50:33 +0800 Subject: [PATCH] Remove old file --- Whatsapp_Chat_Exporter/extract_new.py | 588 -------------------------- 1 file changed, 588 deletions(-) delete mode 100644 Whatsapp_Chat_Exporter/extract_new.py diff --git a/Whatsapp_Chat_Exporter/extract_new.py b/Whatsapp_Chat_Exporter/extract_new.py deleted file mode 100644 index cbfbd3f..0000000 --- a/Whatsapp_Chat_Exporter/extract_new.py +++ /dev/null @@ -1,588 +0,0 @@ -#!/usr/bin/python3 - -import sqlite3 -import json -import jinja2 -import os -import shutil -import re -import io -import hmac -from pathlib import Path -from bleach import clean as sanitize -from markupsafe import Markup -from datetime import datetime -from enum import Enum -from mimetypes import MimeTypes -from hashlib import sha256 - -from Whatsapp_Chat_Exporter.data_model import ChatStore, Message - -try: - import zlib - from Crypto.Cipher import AES -except ModuleNotFoundError: - support_backup = False -else: - support_backup = True -try: - import javaobj -except ModuleNotFoundError: - support_crypt15 = False -else: - support_crypt15 = True - - -def sanitize_except(html): - return Markup(sanitize(html, tags=["br"])) - - -def determine_day(last, current): - last = datetime.fromtimestamp(last).date() - current = datetime.fromtimestamp(current).date() - if last == current: - return None - else: - return current - - -CRYPT14_OFFSETS = ( - {"iv": 67, "db": 191}, - {"iv": 67, "db": 190}, - {"iv": 66, "db": 99}, - {"iv": 67, "db": 193} -) - - -class Crypt(Enum): - CRYPT15 = 15 - CRYPT14 = 14 - CRYPT12 = 12 - - -def brute_force_offset(): - for iv in range(0, 200): - for db in range(0, 200): - yield iv, iv + 16, db - - -def _generate_hmac_of_hmac(key_stream): - key = hmac.new( - hmac.new( - b'\x00' * 32, - key_stream, - sha256 - ).digest(), - b"backup encryption\x01", - sha256 - ) - return key.digest(), key_stream - - -def _extract_encrypted_key(keyfile): - key_stream = b"" - for byte in javaobj.loads(keyfile): - key_stream += byte.to_bytes(1, "big", signed=True) - - return _generate_hmac_of_hmac(key_stream) - - -def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False): - if not support_backup: - return 1 - if isinstance(key, io.IOBase): - key = key.read() - if crypt is not Crypt.CRYPT15: - t1 = key[30:62] - if crypt is not Crypt.CRYPT15 and len(key) != 158: - raise ValueError("The key file must be 158 bytes") - if crypt == Crypt.CRYPT14: - if len(database) < 191: - raise ValueError("The crypt14 file must be at least 191 bytes") - current_try = 0 - offsets = CRYPT14_OFFSETS[current_try] - t2 = database[15:47] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - elif crypt == Crypt.CRYPT12: - if len(database) < 67: - raise ValueError("The crypt12 file must be at least 67 bytes") - t2 = database[3:35] - iv = database[51:67] - db_ciphertext = database[67:-20] - elif crypt == Crypt.CRYPT15: - if not support_crypt15: - return 1 - if len(database) < 131: - raise ValueError("The crypt15 file must be at least 131 bytes") - t1 = t2 = None - iv = database[8:24] - db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type - db_ciphertext = database[db_offset:] - - if t1 != t2: - raise ValueError("The signature of key file and backup file mismatch") - - if crypt == Crypt.CRYPT15: - if len(key) == 32: - main_key, hex_key = _generate_hmac_of_hmac(key) - else: - main_key, hex_key = _extract_encrypted_key(key) - if show_crypt15: - hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)] - print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key)) - else: - main_key = key[126:] - decompressed = False - while not decompressed: - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - if crypt == Crypt.CRYPT14: - current_try += 1 - if current_try < len(CRYPT14_OFFSETS): - offsets = CRYPT14_OFFSETS[current_try] - iv = database[offsets["iv"]:offsets["iv"] + 16] - db_ciphertext = database[offsets["db"]:] - continue - else: - print("Common offsets are not applicable to " - "your backup. Trying to brute force it...") - for start_iv, end_iv, start_db in brute_force_offset(): - iv = database[start_iv:end_iv] - db_ciphertext = database[start_db:] - cipher = AES.new(main_key, AES.MODE_GCM, iv) - db_compressed = cipher.decrypt(db_ciphertext) - try: - db = zlib.decompress(db_compressed) - except zlib.error: - continue - else: - decompressed = True - print( - f"The offsets of your IV and database are {start_iv} and " - f"{start_db}, respectively. To include your offsets in the " - "program, please report it by creating an issue on GitHub: " - "https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues/new" - ) - break - if not decompressed: - return 2 - else: - return 3 - else: - decompressed = True - if db[0:6].upper() == b"SQLITE": - with open(output, "wb") as f: - f.write(db) - return 0 - else: - raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...") - - -def contacts(db, data): - # Get contacts - c = db.cursor() - c.execute("""SELECT count() FROM wa_contacts""") - total_row_number = c.fetchone()[0] - print(f"Gathering contacts...({total_row_number})") - - c.execute("""SELECT jid, display_name FROM wa_contacts; """) - row = c.fetchone() - while row is not None: - data[row["jid"]] = ChatStore(row["display_name"]) - row = c.fetchone() - - -def messages(db, data): - # Get message history - c = db.cursor() - c.execute("""SELECT count() FROM message""") - total_row_number = c.fetchone()[0] - print(f"Gathering messages...(0/{total_row_number})", end="\r") - - phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net") - c.execute("""SELECT jid_global.raw_string as key_remote_jid, - message._id, - message.from_me as key_from_me, - message.timestamp, - message.text_data as data, - message.status, - message_future.version as edit_version, - message_thumbnail.thumbnail as thumb_image, - message_media.file_path as remote_resource, - message_media.mime_type as media_wa_type, - message_location.latitude, - message_location.longitude, - message_quoted.key_id as quoted, - message.key_id, - message_quoted.text_data as quoted_data, - message.message_type, - jid_group.raw_string as group_sender_jid, - chat.subject as chat_subject - FROM message - LEFT JOIN message_quoted - ON message_quoted.message_row_id = message._id - LEFT JOIN message_location - ON message_location.message_row_id = message._id - LEFT JOIN message_media - ON message_media.message_row_id = message._id - LEFT JOIN message_thumbnail - ON message_thumbnail.message_row_id = message._id - LEFT JOIN message_future - ON message_future.message_row_id = message._id - LEFT JOIN chat - ON chat._id = message.chat_row_id - INNER JOIN jid jid_global - ON jid_global._id = chat.jid_row_id - LEFT JOIN jid jid_group - ON jid_group._id = message.sender_jid_row_id - WHERE key_remote_jid <> '-1';""") - i = 0 - content = c.fetchone() - while content is not None: - if content["key_remote_jid"] not in data: - data[content["key_remote_jid"]] = ChatStore() - if content["key_remote_jid"] is None: - continue - data[content["key_remote_jid"]].add_message(content["_id"], Message( - from_me=content["key_from_me"], - timestamp=content["timestamp"], - time=content["timestamp"], - key_id=content["key_id"], - )) - if "-" in content["key_remote_jid"] and content["key_from_me"] == 0: - name = None - if content["chat_subject"] is not None: - _jid = content["group_sender_jid"] - else: - _jid = content["key_remote_jid"] - if _jid in data: - name = data[_jid].name - fallback = _jid.split('@')[0] if "@" in _jid else None - else: - fallback = None - data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback - else: - data[content["key_remote_jid"]].messages[content["_id"]].sender = None - - if content["quoted"] is not None: - data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"] - data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"] - else: - data[content["key_remote_jid"]].messages[content["_id"]].reply = None - - if content["message_type"] == 1: - data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"] - else: - data[content["key_remote_jid"]].messages[content["_id"]].caption = None - - if content["status"] == 6: - if content["chat_subject"] is not None: - # Is Group - if content["data"] is not None: - try: - int(content["data"]) - except ValueError: - msg = f"The group name changed to {content['data']}" - data[content["key_remote_jid"]].messages[content["_id"]].data = msg - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - data[content["key_remote_jid"]].delete_message(content["_id"]) - else: - thumb_image = content["thumb_image"] - if thumb_image is not None: - if b"\x00\x00\x01\x74\x00\x1A" in thumb_image: - # Add user - added = phone_number_re.search( - thumb_image.decode("unicode_escape"))[0] - if added in data: - name_right = data[added]["name"] - else: - name_right = added.split('@')[0] - if content["remote_resource"] is not None: - if content["remote_resource"] in data: - name_left = data[content["remote_resource"]]["name"] - else: - name_left = content["remote_resource"].split('@')[0] - msg = f"{name_left} added {name_right or 'You'}" - else: - msg = f"Added {name_right or 'You'}" - elif b"\xac\xed\x00\x05\x74\x00" in thumb_image: - # Changed number - original = content["remote_resource"].split('@')[0] - changed = thumb_image[7:].decode().split('@')[0] - msg = f"{original} changed to {changed}" - data[content["key_remote_jid"]].messages[content["_id"]].data = msg - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - if content["data"] is None: - data[content["key_remote_jid"]].delete_message(content["_id"]) - else: - # Private chat - if content["data"] is None and content["thumb_image"] is None: - data[content["key_remote_jid"]].delete_message(content["_id"]) - - else: - if content["key_from_me"] == 1: - if content["status"] == 5 and content["edit_version"] == 7: - msg = "Message deleted" - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - if content["media_wa_type"] == "5": - msg = f"Location shared: {content[10], content[11]}" - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - msg = content["data"] - if msg is not None: - if "\r\n" in msg: - msg = msg.replace("\r\n", "
") - if "\n" in msg: - msg = msg.replace("\n", "
") - else: - if content["status"] == 0 and content["edit_version"] == 7: - msg = "Message deleted" - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - if content["media_wa_type"] == "5": - msg = f"Location shared: {content[10], content[11]}" - data[content["key_remote_jid"]].messages[content["_id"]].meta = True - else: - msg = content["data"] - if msg is not None: - if "\r\n" in msg: - msg = msg.replace("\r\n", "
") - if "\n" in msg: - msg = msg.replace("\n", "
") - - data[content["key_remote_jid"]].messages[content["_id"]].data = msg - - i += 1 - if i % 1000 == 0: - print(f"Gathering messages...({i}/{total_row_number})", end="\r") - content = c.fetchone() - print(f"Gathering messages...({total_row_number}/{total_row_number})", end="\r") - - -def media(db, data, media_folder): - # Get media - c = db.cursor() - c.execute("""SELECT count() FROM message_media""") - total_row_number = c.fetchone()[0] - print(f"\nGathering media...(0/{total_row_number})", end="\r") - i = 0 - c.execute("""SELECT jid.raw_string, - message_row_id, - file_path, - message_url, - mime_type, - media_key - FROM message_media - INNER JOIN message - ON message_media.message_row_id = message._id - LEFT JOIN chat - ON chat._id = message.chat_row_id - INNER JOIN jid - ON jid._id = chat.jid_row_id - ORDER BY jid.raw_string ASC""") - content = c.fetchone() - mime = MimeTypes() - while content is not None: - file_path = f"{media_folder}/{content['file_path']}" - data[content["raw_string"]].messages[content["message_row_id"]].media = True - if os.path.isfile(file_path): - data[content["raw_string"]].messages[content["message_row_id"]].data = file_path - if content["mime_type"] is None: - guess = mime.guess_type(file_path)[0] - if guess is not None: - data[content["raw_string"]].messages[content["message_row_id"]].mime = guess - else: - data[content["raw_string"]].messages[content["message_row_id"]].mime = "data/data" - else: - data[content["raw_string"]].messages[content["message_row_id"]].mime = content["mime_type"] - else: - # if "https://mmg" in content["mime_type"]: - # try: - # r = requests.get(content["message_url"]) - # if r.status_code != 200: - # raise RuntimeError() - # except: - # data[content["raw_string"]].messages[content["message_row_id"]].data = "{The media is missing}" - # data[content["raw_string"]].messages[content["message_row_id"]].media = True - # data[content["raw_string"]].messages[content["message_row_id"]].mime = "media" - # else: - data[content["raw_string"]].messages[content["message_row_id"]].data = "The media is missing" - data[content["raw_string"]].messages[content["message_row_id"]].mime = "media" - data[content["raw_string"]].messages[content["message_row_id"]].meta = True - i += 1 - if i % 100 == 0: - print(f"Gathering media...({i}/{total_row_number})", end="\r") - content = c.fetchone() - print( - f"Gathering media...({total_row_number}/{total_row_number})", end="\r") - - -def vcard(db, data): - c = db.cursor() - c.execute("""SELECT message_row_id, - jid.raw_string, - vcard, - message.text_data - FROM message_vcard - INNER JOIN message - ON message_vcard.message_row_id = message._id - LEFT JOIN chat - ON chat._id = message.chat_row_id - INNER JOIN jid - ON jid._id = chat.jid_row_id - ORDER BY message.chat_row_id ASC;""") - rows = c.fetchall() - total_row_number = len(rows) - print(f"\nGathering vCards...(0/{total_row_number})", end="\r") - base = "WhatsApp/vCards" - if not os.path.isdir(base): - Path(base).mkdir(parents=True, exist_ok=True) - for index, row in enumerate(rows): - media_name = row["text_data"] if row["text_data"] else "" - file_name = "".join(x for x in media_name if x.isalnum()) - file_path = f"{base}/{file_name}.vcf" - if not os.path.isfile(file_path): - with open(file_path, "w", encoding="utf-8") as f: - f.write(row["vcard"]) - data[row["raw_string"]].messages[row["message_row_id"]].data = media_name + \ - "The vCard file cannot be displayed here, " \ - f"however it should be located at {file_path}" - data[row["raw_string"]].messages[row["message_row_id"]].mime = "text/x-vcard" - data[row["raw_string"]].messages[row["message_row_id"]].meta = True - print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r") - - -def create_html( - data, - output_folder, - template=None, - embedded=False, - offline_static=False, - maximum_size=None - ): - if template is None: - template_dir = os.path.dirname(__file__) - template_file = "whatsapp.html" - else: - template_dir = os.path.dirname(template) - template_file = os.path.basename(template) - templateLoader = jinja2.FileSystemLoader(searchpath=template_dir) - templateEnv = jinja2.Environment(loader=templateLoader) - templateEnv.globals.update(determine_day=determine_day) - templateEnv.filters['sanitize_except'] = sanitize_except - template = templateEnv.get_template(template_file) - - total_row_number = len(data) - print(f"\nCreating HTML...(0/{total_row_number})", end="\r") - - if not os.path.isdir(output_folder): - os.mkdir(output_folder) - - w3css = "https://www.w3schools.com/w3css/4/w3.css" - if offline_static: - import urllib.request - static_folder = os.path.join(output_folder, offline_static) - if not os.path.isdir(static_folder): - os.mkdir(static_folder) - w3css_path = os.path.join(static_folder, "w3.css") - if not os.path.isfile(w3css_path): - with urllib.request.urlopen(w3css) as resp: - with open(w3css_path, "wb") as f: f.write(resp.read()) - w3css = os.path.join(offline_static, "w3.css") - - for current, contact in enumerate(data): - if len(data[contact].messages) == 0: - continue - phone_number = contact.split('@')[0] - if "-" in contact: - file_name = "" - else: - file_name = phone_number - - if data[contact].name is not None: - if file_name != "": - file_name += "-" - file_name += data[contact].name.replace("/", "-") - name = data[contact].name - else: - name = phone_number - - safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ") - with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f: - f.write( - template.render( - name=name, - msgs=data[contact].messages.values(), - my_avatar=None, - their_avatar=f"WhatsApp/Avatars/{contact}.j", - w3css=w3css - ) - ) - if current % 10 == 0: - print(f"Creating HTML...({current}/{total_row_number})", end="\r") - - print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r") - - -if __name__ == "__main__": - from optparse import OptionParser - parser = OptionParser() - parser.add_option( - "-w", - "--wa", - dest="wa", - default="wa.db", - help="Path to contact database") - parser.add_option( - "-m", - "--media", - dest="media", - default="WhatsApp", - help="Path to WhatsApp media folder" - ) - # parser.add_option( - # "-t", - # "--template", - # dest="html", - # default="wa.db", - # help="Path to HTML template") - (options, args) = parser.parse_args() - msg_db = "msgstore.db" - output_folder = "temp" - contact_db = options.wa - media_folder = options.media - - if len(args) == 1: - msg_db = args[0] - elif len(args) == 2: - msg_db = args[0] - output_folder = args[1] - - data = {} - - if os.path.isfile(contact_db): - with sqlite3.connect(contact_db) as db: - contacts(db, data) - if os.path.isfile(msg_db): - with sqlite3.connect(msg_db) as db: - messages(db, data) - media(db, data, media_folder) - vcard(db, data) - create_html(data, output_folder) - - if not os.path.isdir(f"{output_folder}/WhatsApp"): - shutil.move(media_folder, f"{output_folder}/") - - with open("result.json", "w") as f: - data = json.dumps(data) - print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)") - f.write(data) - - print("Everything is done!")