Merge branch 'main' into patch-1

This commit is contained in:
Johan Grande
2023-06-16 12:33:17 +02:00
committed by GitHub
12 changed files with 1190 additions and 283 deletions

View File

@@ -1,6 +1,8 @@
name: Compile standalone binary
on:
release:
types: [published]
workflow_dispatch:
permissions:
@@ -12,7 +14,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -24,6 +26,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --follow-imports Whatsapp_Chat_Exporter/__main__.py
cp __main__.bin wtsexporter_linux_x64
sha256sum wtsexporter_linux_x64
- uses: actions/upload-artifact@v3
with:
name: binary-linux
@@ -35,7 +38,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -47,6 +50,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --assume-yes-for-downloads --follow-imports Whatsapp_Chat_Exporter\__main__.py
copy __main__.exe wtsexporter_x64.exe
Get-FileHash wtsexporter_x64.exe
- uses: actions/upload-artifact@v3
with:
name: binary-windows
@@ -58,7 +62,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -70,6 +74,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --follow-imports Whatsapp_Chat_Exporter/__main__.py
cp __main__.bin wtsexporter_macos_x64
shasum -a 256 wtsexporter_macos_x64
- uses: actions/upload-artifact@v3
with:
name: binary-macos

View File

@@ -18,9 +18,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies

View File

@@ -3,7 +3,7 @@
![License MIT](https://img.shields.io/pypi/l/whatsapp-chat-exporter)
[![Python](https://img.shields.io/pypi/pyversions/Whatsapp-Chat-Exporter)](https://pypi.org/project/Whatsapp-Chat-Exporter/)
A customizable Android and iPhone Whatsapp database parser that will give you the history of your Whatsapp conversations in HTML and JSON.
A customizable Android and iPhone Whatsapp database parser that will give you the history of your Whatsapp conversations in HTML and JSON. Inspired by [Telegram Chat Export Tool](https://telegram.org/blog/export-and-more).
**If you plan to uninstall WhatsApp or delete your WhatsApp account, please make a backup of your WhatsApp database. You may want to use this exporter again on the same database in the future as the exporter develops**
# Usage
@@ -14,7 +14,7 @@ A customizable Android and iPhone Whatsapp database parser that will give you th
First, install the exporter by:
```shell
pip install whatsapp-chat-exporter
pip install whatsapp-chat-exporter[android_backup] & :: Optional, if you want it to support decrypting Android WhatsApp backup.
pip install whatsapp-chat-exporter[android_backup] :; # Optional, if you want it to support decrypting Android WhatsApp backup.
```
Then, create a working directory in somewhere you want
```shell
@@ -42,7 +42,7 @@ In order to support the decryption, install pycryptodome if it is not installed
pip install pycryptodome # Or
pip install whatsapp-chat-exporter["android_backup"] # install along with this software
```
### Crypt15 is now the easiest way to decrypt a backup. If you have the 32 bytes hex key generated when you enable End-to-End encrypted backup, you can use it to decrypt the backup. If you do not have the 32 bytes hex key, you can still use the key file extracted just like extacting key file for Crypt12 and Crypt14 to decrypt the backup.
#### Crypt12 or Crypt14
You will need the decryption key file. You can find it on your phone in the WhatsApp backup directory as `WhatsApp/Databases/.nomedia`, or, if you have root access, `/data/data/com.whatsapp/files/key`.
@@ -102,36 +102,47 @@ After extracting, you will get these:
#### Group Message
![Group Message](imgs/group.png)
*The above screenshots were taken longgggggggggggggg ago. I am going to update them when possible.*
## More options
Invoke the wtsexporter with --help option will show you all options available.
```sh
> wtsexporter --help
Usage: wtsexporter [options]
usage: wtsexporter [options]
Options:
--version show program's version number and exit
options:
-h, --help show this help message and exit
-a, --android Define the target as Android
-i, --iphone Define the target as iPhone
-w WA, --wa=WA Path to contact database
-m MEDIA, --media=MEDIA
Path to WhatsApp media folder
-b BACKUP, --backup=BACKUP
Path to Android (must be used together with -k)/iPhone
WhatsApp backup
-o OUTPUT, --output=OUTPUT
Output to specific directory
-j, --json Save the result to a single JSON file
-d DB, --db=DB Path to database file
-k KEY, --key=KEY Path to key file
-t TEMPLATE, --template=TEMPLATE
-i, --iphone, --ios Define the target as iPhone
-w WA, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite)
-m MEDIA, --media MEDIA
Path to WhatsApp media folder (default: WhatsApp)
-b BACKUP, --backup BACKUP
Path to Android (must be used together with -k)/iPhone WhatsApp backup
-o OUTPUT, --output OUTPUT
Output to specific directory (default: result)
-j [JSON], --json [JSON]
Save the result to a single JSON file (default if present: result.json)
-d DB, --db DB Path to database file (default: msgstore.db/7c7fba66680ef796b916b067077cc246adacf01d)
-k KEY, --key KEY Path to key file
-t TEMPLATE, --template TEMPLATE
Path to custom HTML template
-e, --embedded Embed media into HTML file (not yet implemented)
-s, --showkey Show the HEX key used to decrypt the database
-c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it
--offline OFFLINE Relative path to offline static files
--size SIZE, --output-size SIZE
Maximum size of a single output file in bytes, 0 for auto (not yet implemented)
--no-html Do not output html files
--check-update Check for updates
```
# To do
1. Reply in iPhone
See [issues](https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues).
# Copyright
This is a MIT licensed project.
The Telegram Desktop's export is the reference for whatsapp.html in this repo
WhatsApp Chat Exporter is not affiliated, associated, authorized, endorsed by, or in any way officially connected with the WhatsApp LLC, or any of its subsidiaries or its affiliates. The official WhatsApp LLC website can be found at https://www.whatsapp.com/.

View File

@@ -1 +1 @@
__version__ = "0.8.5"
__version__ = "0.9.1"

View File

@@ -4,8 +4,9 @@ except ImportError:
from Whatsapp_Chat_Exporter.__init__ import __version__
from Whatsapp_Chat_Exporter import extract, extract_iphone
from Whatsapp_Chat_Exporter import extract_iphone_media
from Whatsapp_Chat_Exporter.extract import Crypt
from optparse import OptionParser
from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import Crypt, check_update
from argparse import ArgumentParser
import os
import sqlite3
import shutil
@@ -15,116 +16,179 @@ from sys import exit
def main():
parser = OptionParser(version=f"Whatsapp Chat Exporter: {__version__}")
parser.add_option(
parser = ArgumentParser(
description = 'A customizable Android and iPhone WhatsApp database parser that '
'will give you the history of your WhatsApp conversations inHTML '
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
epilog = f'WhatsApp Chat Exporter: {__version__} Licensed with MIT'
)
parser.add_argument(
'-a',
'--android',
dest='android',
default=False,
action='store_true',
help="Define the target as Android")
parser.add_option(
parser.add_argument(
'-i',
'--iphone',
'--ios',
dest='iphone',
default=False,
action='store_true',
help="Define the target as iPhone")
parser.add_option(
help="Define the target as iPhone/iPad")
parser.add_argument(
"-w",
"--wa",
dest="wa",
default=None,
help="Path to contact database")
parser.add_option(
help="Path to contact database (default: wa.db/ContactsV2.sqlite)")
parser.add_argument(
"-m",
"--media",
dest="media",
default=None,
help="Path to WhatsApp media folder")
parser.add_option(
help="Path to WhatsApp media folder (default: WhatsApp)")
parser.add_argument(
"-b",
"--backup",
dest="backup",
default=None,
help="Path to Android (must be used together "
"with -k)/iPhone WhatsApp backup")
parser.add_option(
parser.add_argument(
"-o",
"--output",
dest="output",
default="result",
help="Output to specific directory")
parser.add_option(
help="Output to specific directory (default: result)")
parser.add_argument(
'-j',
'--json',
dest='json',
default=False,
action='store_true',
help="Save the result to a single JSON file")
parser.add_option(
nargs='?',
default=None,
type=str,
const="result.json",
help="Save the result to a single JSON file (default if present: result.json)")
parser.add_argument(
'-d',
'--db',
dest='db',
default=None,
help="Path to database file")
parser.add_option(
help="Path to database file (default: msgstore.db/"
"7c7fba66680ef796b916b067077cc246adacf01d)")
parser.add_argument(
'-k',
'--key',
dest='key',
default=None,
help="Path to key file"
)
parser.add_option(
parser.add_argument(
"-t",
"--template",
dest="template",
default=None,
help="Path to custom HTML template")
parser.add_option(
help="Path to custom HTML template"
)
parser.add_argument(
"-e",
"--embedded",
dest="embedded",
default=False,
action='store_true',
help="Embed media into HTML file")
(options, args) = parser.parse_args()
help="Embed media into HTML file (not yet implemented)"
)
parser.add_argument(
"-s",
"--showkey",
dest="showkey",
default=False,
action='store_true',
help="Show the HEX key used to decrypt the database"
)
parser.add_argument(
"-c",
"--move-media",
dest="move_media",
default=False,
action='store_true',
help="Move the media directory to output directory if the flag is set, otherwise copy it"
)
parser.add_argument(
"--offline",
dest="offline",
default=None,
help="Relative path to offline static files"
)
parser.add_argument(
"--size",
"--output-size",
dest="size",
default=None,
help="Maximum size of a single output file in bytes, 0 for auto (not yet implemented)"
)
parser.add_argument(
"--no-html",
dest="no_html",
default=False,
action='store_true',
help="Do not output html files"
)
parser.add_argument(
"--check-update",
dest="check_update",
default=False,
action='store_true',
help="Check for updates"
)
args = parser.parse_args()
if options.android and options.iphone:
# Check for updates
if args.check_update:
exit(check_update())
# Sanity checks
if args.android and args.iphone:
print("You must define only one device type.")
exit(1)
if not options.android and not options.iphone:
if not args.android and not args.iphone:
print("You must define the device type.")
exit(1)
if args.no_html and not args.json:
print("You must either specify a JSON output file or enable HTML output.")
exit(1)
data = {}
if options.android:
if args.android:
contacts = extract.contacts
messages = extract.messages
media = extract.media
vcard = extract.vcard
create_html = extract.create_html
if options.db is None:
if args.db is None:
msg_db = "msgstore.db"
else:
msg_db = options.db
if options.key is not None:
if options.backup is None:
msg_db = args.db
if args.key is not None:
if args.backup is None:
print("You must specify the backup file with -b")
exit(1)
print("Decryption key specified, decrypting WhatsApp backup...")
if "crypt12" in options.backup:
if "crypt12" in args.backup:
crypt = Crypt.CRYPT12
elif "crypt14" in options.backup:
elif "crypt14" in args.backup:
crypt = Crypt.CRYPT14
elif "crypt15" in options.backup:
elif "crypt15" in args.backup:
crypt = Crypt.CRYPT15
if os.path.isfile(options.key):
key = open(options.key, "rb")
elif all(char in string.hexdigits for char in options.key):
key = bytes.fromhex(options.key)
db = open(options.backup, "rb").read()
error = extract.decrypt_backup(db, key, msg_db, crypt)
if os.path.isfile(args.key):
key = open(args.key, "rb")
elif all(char in string.hexdigits for char in args.key):
key = bytes.fromhex(args.key)
db = open(args.backup, "rb").read()
error = extract.decrypt_backup(db, key, msg_db, crypt, args.showkey)
if error != 0:
if error == 1:
print("Dependencies of decrypt_backup and/or extract_encrypted_key"
@@ -135,67 +199,81 @@ def main():
"Possibly incorrect offsets used in decryption.")
exit(4)
else:
print("Unknown error occurred.")
print("Unknown error occurred.", error)
exit(5)
if options.wa is None:
if args.wa is None:
contact_db = "wa.db"
else:
contact_db = options.wa
if options.media is None:
options.media = "WhatsApp"
if len(args) == 1:
msg_db = args[0]
contact_db = args.wa
if args.media is None:
args.media = "WhatsApp"
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
db.row_factory = sqlite3.Row
contacts(db, data)
elif options.iphone:
elif args.iphone:
import sys
if "--iphone" in sys.argv:
print("WARNING: The --iphone flag is deprecated and will be removed in the future. Use --ios instead.")
messages = extract_iphone.messages
media = extract_iphone.media
vcard = extract_iphone.vcard
create_html = extract_iphone.create_html
if options.backup is not None:
extract_iphone_media.extract_media(options.backup)
if options.db is None:
if args.backup is not None:
extract_iphone_media.extract_media(args.backup)
if args.db is None:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
else:
msg_db = options.db
if options.wa is None:
msg_db = args.db
if args.wa is None:
contact_db = "ContactsV2.sqlite"
else:
contact_db = options.wa
if options.media is None:
options.media = "Message"
if len(args) == 1:
msg_db = args[0]
contact_db = args.wa
if args.media is None:
args.media = "Message"
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
db.row_factory = sqlite3.Row
messages(db, data)
media(db, data, options.media)
media(db, data, args.media)
vcard(db, data)
create_html(data, options.output, options.template, options.embedded)
if not args.no_html:
create_html(
data,
args.output,
args.template,
args.embedded,
args.offline,
args.size
)
else:
print(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path.",
end="\r"
"to database file with option -d or check your provided path."
)
exit(2)
if os.path.isdir(options.media) and \
not os.path.isdir(f"{options.output}/{options.media}"):
try:
shutil.move(options.media, f"{options.output}/")
except PermissionError:
print("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?")
if os.path.isdir(args.media):
if os.path.isdir(f"{args.output}/{args.media}"):
print("Media directory already exists in output directory. Skipping...")
else:
if not args.move_media:
print("Copying media directory...")
shutil.copytree(args.media, f"{args.output}/WhatsApp")
else:
try:
shutil.move(args.media, f"{args.output}/")
except PermissionError:
print("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?")
if options.json:
with open("result.json", "w") as f:
if args.json:
if isinstance(data[next(iter(data))], ChatStore):
data = {jik: chat.to_json() for jik, chat in data.items()}
with open(args.json, "w") as f:
data = json.dumps(data)
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
f.write(data)

View File

@@ -0,0 +1,53 @@
from datetime import datetime
from typing import Union
class ChatStore():
def __init__(self, name=None):
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string or None")
self.name = name
self.messages = {}
def add_message(self, id, message):
if not isinstance(message, Message):
raise TypeError("Chat must be a Chat object")
self.messages[id] = message
def delete_message(self, id):
if id in self.messages:
del self.messages[id]
def to_json(self):
serialized_msgs = {id : msg.to_json() for id,msg in self.messages.items()}
return {'name' : self.name, 'messages' : serialized_msgs}
class Message():
def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int):
self.from_me = bool(from_me)
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
self.media = False
self.key_id = key_id
self.meta = False
self.data = None
self.sender = None
# Extra
self.reply = None
self.quoted_data = None
self.caption = None
def to_json(self):
return {
'from_me' : self.from_me,
'timestamp' : self.timestamp,
'time' : self.time,
'media' : self.media,
'key_id' : self.key_id,
'meta' : self.meta,
'data' : self.data,
'sender' : self.sender,
'reply' : self.reply,
'quoted_data' : self.quoted_data,
'caption' : self.caption
}

View File

@@ -9,12 +9,11 @@ import re
import io
import hmac
from pathlib import Path
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from enum import Enum
from mimetypes import MimeTypes
from hashlib import sha256
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, Crypt
from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS
try:
import zlib
@@ -30,36 +29,6 @@ except ModuleNotFoundError:
else:
support_crypt15 = True
def sanitize_except(html):
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
return None
else:
return current
CRYPT14_OFFSETS = [
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99}
]
class Crypt(Enum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset():
for iv in range(0, 200):
for db in range(0, 200):
yield iv, iv + 16, db
def _generate_hmac_of_hmac(key_stream):
key = hmac.new(
@@ -71,7 +40,7 @@ def _generate_hmac_of_hmac(key_stream):
b"backup encryption\x01",
sha256
)
return key.digest()
return key.digest(), key_stream
def _extract_encrypted_key(keyfile):
@@ -82,7 +51,7 @@ def _extract_encrypted_key(keyfile):
return _generate_hmac_of_hmac(key_stream)
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
if not support_backup:
return 1
if isinstance(key, io.IOBase):
@@ -91,6 +60,7 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
t1 = key[30:62]
if crypt is not Crypt.CRYPT15 and len(key) != 158:
raise ValueError("The key file must be 158 bytes")
# Determine the IV and database offsets
if crypt == Crypt.CRYPT14:
if len(database) < 191:
raise ValueError("The crypt14 file must be at least 191 bytes")
@@ -120,9 +90,12 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14):
if crypt == Crypt.CRYPT15:
if len(key) == 32:
main_key = _generate_hmac_of_hmac(key)
main_key, hex_key = _generate_hmac_of_hmac(key)
else:
main_key = _extract_encrypted_key(key)
main_key, hex_key = _extract_encrypted_key(key)
if show_crypt15:
hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]
print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key))
else:
main_key = key[126:]
decompressed = False
@@ -184,157 +157,222 @@ def contacts(db, data):
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
row = c.fetchone()
while row is not None:
data[row[0]] = {"name": row[1], "messages": {}}
data[row["jid"]] = ChatStore(row["display_name"])
row = c.fetchone()
def messages(db, data):
# Get message history
c = db.cursor()
c.execute("""SELECT count() FROM messages""")
try:
c.execute("""SELECT count() FROM messages""")
except sqlite3.OperationalError:
c.execute("""SELECT count() FROM message""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
c.execute("""SELECT messages.key_remote_jid,
messages._id,
messages.key_from_me,
messages.timestamp,
messages.data,
messages.status,
messages.edit_version,
messages.thumb_image,
messages.remote_resource,
messages.media_wa_type,
messages.latitude,
messages.longitude,
messages_quotes.key_id as quoted,
messages.key_id,
messages_quotes.data,
messages.media_caption
FROM messages
LEFT JOIN messages_quotes
ON messages.quoted_row_id = messages_quotes._id
WHERE messages.key_remote_jid <> '-1';""")
try:
c.execute("""SELECT messages.key_remote_jid,
messages._id,
messages.key_from_me,
messages.timestamp,
messages.data,
messages.status,
messages.edit_version,
messages.thumb_image,
messages.remote_resource,
messages.media_wa_type,
messages.latitude,
messages.longitude,
messages_quotes.key_id as quoted,
messages.key_id,
messages_quotes.data as quoted_data,
messages.media_caption
FROM messages
LEFT JOIN messages_quotes
ON messages.quoted_row_id = messages_quotes._id
WHERE messages.key_remote_jid <> '-1';"""
)
except sqlite3.OperationalError:
try:
c.execute("""SELECT jid_global.raw_string as key_remote_jid,
message._id,
message.from_me as key_from_me,
message.timestamp,
message.text_data as data,
message.status,
message_future.version as edit_version,
message_thumbnail.thumbnail as thumb_image,
message_media.file_path as remote_resource,
message_media.mime_type as media_wa_type,
message_location.latitude,
message_location.longitude,
message_quoted.key_id as quoted,
message.key_id,
message_quoted.text_data as quoted_data,
message.message_type,
jid_group.raw_string as group_sender_jid,
chat.subject as chat_subject
FROM message
LEFT JOIN message_quoted
ON message_quoted.message_row_id = message._id
LEFT JOIN message_location
ON message_location.message_row_id = message._id
LEFT JOIN message_media
ON message_media.message_row_id = message._id
LEFT JOIN message_thumbnail
ON message_thumbnail.message_row_id = message._id
LEFT JOIN message_future
ON message_future.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid jid_global
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE key_remote_jid <> '-1';"""
)
except Exception as e:
raise e
else:
table_message = True
else:
table_message = False
i = 0
content = c.fetchone()
while content is not None:
if content[0] not in data:
data[content[0]] = {"name": None, "messages": {}}
data[content[0]]["messages"][content[1]] = {
"from_me": bool(content[2]),
"timestamp": content[3]/1000,
"time": datetime.fromtimestamp(content[3]/1000).strftime("%H:%M"),
"media": False,
"key_id": content[13],
"meta": False,
"data": None
}
if "-" in content[0] and content[2] == 0:
if content["key_remote_jid"] not in data:
data[content["key_remote_jid"]] = ChatStore()
if content["key_remote_jid"] is None:
continue # Not sure
data[content["key_remote_jid"]].add_message(content["_id"], Message(
from_me=content["key_from_me"],
timestamp=content["timestamp"],
time=content["timestamp"],
key_id=content["key_id"],
))
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
name = None
if content[8] in data:
name = data[content[8]]["name"]
if "@" in content[8]:
fallback = content[8].split('@')[0]
if table_message:
if content["chat_subject"] is not None:
_jid = content["group_sender_jid"]
else:
_jid = content["key_remote_jid"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
else:
fallback = None
else:
fallback = None
data[content[0]]["messages"][content[1]]["sender"] = name or fallback
else:
data[content[0]]["messages"][content[1]]["sender"] = None
if content[12] is not None:
data[content[0]]["messages"][content[1]]["reply"] = content[12]
data[content[0]]["messages"][content[1]]["quoted_data"] = content[14]
else:
data[content[0]]["messages"][content[1]]["reply"] = None
if content[15] is not None:
data[content[0]]["messages"][content[1]]["caption"] = content[15]
else:
data[content[0]]["messages"][content[1]]["caption"] = None
if content[5] == 6:
if "-" in content[0]:
# Is Group
if content[4] is not None:
try:
int(content[4])
except ValueError:
msg = f"The group name changed to {content[4]}"
data[content[0]]["messages"][content[1]]["data"] = msg
data[content[0]]["messages"][content[1]]["meta"] = True
if content["remote_resource"] in data:
name = data[content["remote_resource"]].name
if "@" in content["remote_resource"]:
fallback = content["remote_resource"].split('@')[0]
else:
del data[content[0]]["messages"][content[1]]
fallback = None
else:
thumb_image = content[7]
fallback = None
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
else:
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
if content["quoted"] is not None:
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
if not table_message and content["media_caption"] is not None:
# Old schema
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["media_caption"]
elif table_message and content["message_type"] == 1 and content["data"] is not None:
# New schema
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
if content["status"] == 6: # 6 = Metadata, otherwise it's a message
if (not table_message and "-" in content["key_remote_jid"]) or \
(table_message and content["chat_subject"] is not None):
# Is Group
if content["data"] is not None:
try:
int(content["data"])
except ValueError:
msg = f"The group name changed to {content['data']}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
thumb_image = content["thumb_image"]
if thumb_image is not None:
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
# Add user
added = phone_number_re.search(
thumb_image.decode("unicode_escape"))[0]
if added in data:
name_right = data[added]["name"]
name_right = data[added].name
else:
name_right = added.split('@')[0]
if content[8] is not None:
if content[8] in data:
name_left = data[content[8]]["name"]
if content["remote_resource"] is not None:
if content["remote_resource"] in data:
name_left = data[content["remote_resource"]].name
else:
name_left = content[8].split('@')[0]
name_left = content["remote_resource"].split('@')[0]
msg = f"{name_left} added {name_right or 'You'}"
else:
msg = f"Added {name_right or 'You'}"
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
# Changed number
original = content[8].split('@')[0]
original = content["remote_resource"].split('@')[0]
changed = thumb_image[7:].decode().split('@')[0]
msg = f"{original} changed to {changed}"
data[content[0]]["messages"][content[1]]["data"] = msg
data[content[0]]["messages"][content[1]]["meta"] = True
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content[4] is None:
del data[content[0]]["messages"][content[1]]
if content["data"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
# Private chat
if content[4] is None and content[7] is None:
del data[content[0]]["messages"][content[1]]
if content["data"] is None and content["thumb_image"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
if content[2] == 1:
if content[5] == 5 and content[6] == 7:
if content["key_from_me"] == 1:
if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
msg = "Message deleted"
data[content[0]]["messages"][content[1]]["meta"] = True
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content[9] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content[0]]["messages"][content[1]]["meta"] = True
if content["media_wa_type"] == "5":
msg = f"Location shared: {content['latitude'], content['longitude']}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content[4]
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content[5] == 0 and content[6] == 7:
if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
msg = "Message deleted"
data[content[0]]["messages"][content[1]]["meta"] = True
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content[9] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content[0]]["messages"][content[1]]["meta"] = True
if content["media_wa_type"] == "5":
msg = f"Location shared: {content['latitude'], content['longitude']}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content[4]
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
data[content[0]]["messages"][content[1]]["data"] = msg
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
i += 1
if i % 1000 == 0:
@@ -350,7 +388,8 @@ def media(db, data, media_folder):
total_row_number = c.fetchone()[0]
print(f"\nGathering media...(0/{total_row_number})", end="\r")
i = 0
c.execute("""SELECT messages.key_remote_jid,
try:
c.execute("""SELECT messages.key_remote_jid,
message_row_id,
file_path,
message_url,
@@ -359,22 +398,39 @@ def media(db, data, media_folder):
FROM message_media
INNER JOIN messages
ON message_media.message_row_id = messages._id
ORDER BY messages.key_remote_jid ASC""")
ORDER BY messages.key_remote_jid ASC"""
)
except sqlite3.OperationalError:
c.execute("""SELECT jid.raw_string as key_remote_jid,
message_row_id,
file_path,
message_url,
mime_type,
media_key
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY jid.raw_string ASC"""
)
content = c.fetchone()
mime = MimeTypes()
while content is not None:
file_path = f"{media_folder}/{content[2]}"
data[content[0]]["messages"][content[1]]["media"] = True
file_path = f"{media_folder}/{content['file_path']}"
data[content["key_remote_jid"]].messages[content["message_row_id"]].media = True
if os.path.isfile(file_path):
data[content[0]]["messages"][content[1]]["data"] = file_path
if content[4] is None:
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = file_path
if content["mime_type"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
data[content[0]]["messages"][content[1]]["mime"] = guess
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = guess
else:
data[content[0]]["messages"][content[1]]["mime"] = "data/data"
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "data/data"
else:
data[content[0]]["messages"][content[1]]["mime"] = content[4]
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = content["mime_type"]
else:
# if "https://mmg" in content[4]:
# try:
@@ -386,9 +442,9 @@ def media(db, data, media_folder):
# data[content[0]]["messages"][content[1]]["media"] = True
# data[content[0]]["messages"][content[1]]["mime"] = "media"
# else:
data[content[0]]["messages"][content[1]]["data"] = "The media is missing"
data[content[0]]["messages"][content[1]]["mime"] = "media"
data[content[0]]["messages"][content[1]]["meta"] = True
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = "The media is missing"
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "media"
data[content["key_remote_jid"]].messages[content["message_row_id"]].meta = True
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
@@ -399,14 +455,31 @@ def media(db, data, media_folder):
def vcard(db, data):
c = db.cursor()
c.execute("""SELECT message_row_id,
try:
c.execute("""SELECT message_row_id,
messages.key_remote_jid,
vcard,
messages.media_name
FROM messages_vcards
INNER JOIN messages
ON messages_vcards.message_row_id = messages._id
ORDER BY messages.key_remote_jid ASC;""")
ORDER BY messages.key_remote_jid ASC;"""
)
except sqlite3.OperationalError:
c.execute("""SELECT message_row_id,
jid.raw_string as key_remote_jid,
vcard,
message.text_data as media_name
FROM message_vcard
INNER JOIN message
ON message_vcard.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY message.chat_row_id ASC;"""
)
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
@@ -414,21 +487,28 @@ def vcard(db, data):
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
media_name = row[3] if row[3] else ""
media_name = row["media_name"] if row["media_name"] is not None else ""
file_name = "".join(x for x in media_name if x.isalnum())
file_path = f"{base}/{file_name}.vcf"
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row[2])
data[row[1]]["messages"][row[0]]["data"] = media_name + \
f.write(row["vcard"])
data[row["key_remote_jid"]].messages[row["message_row_id"]].data = media_name + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
data[row[1]]["messages"][row[0]]["mime"] = "text/x-vcard"
data[row[1]]["messages"][row[0]]["meta"] = True
data[row["key_remote_jid"]].messages[row["message_row_id"]].mime = "text/x-vcard"
data[row["key_remote_jid"]].messages[row["message_row_id"]].meta = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
def create_html(data, output_folder, template=None, embedded=False):
def create_html(
data,
output_folder,
template=None,
embedded=False,
offline_static=False,
maximum_size=None
):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
@@ -447,8 +527,21 @@ def create_html(data, output_folder, template=None, embedded=False):
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f:
f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
if len(data[contact]["messages"]) == 0:
if len(data[contact].messages) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
@@ -456,11 +549,11 @@ def create_html(data, output_folder, template=None, embedded=False):
else:
file_name = phone_number
if data[contact]["name"] is not None:
if data[contact].name is not None:
if file_name != "":
file_name += "-"
file_name += data[contact]["name"].replace("/", "-")
name = data[contact]["name"]
file_name += data[contact].name.replace("/", "-")
name = data[contact].name
else:
name = phone_number
safe_file_name = ''
@@ -469,9 +562,10 @@ def create_html(data, output_folder, template=None, embedded=False):
f.write(
template.render(
name=name,
msgs=data[contact]["messages"].values(),
msgs=data[contact].messages.values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j"
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
)
if current % 10 == 0:

View File

@@ -6,25 +6,9 @@ import jinja2
import os
import shutil
from pathlib import Path
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from mimetypes import MimeTypes
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
def sanitize_except(html):
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
return None
else:
return current
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, APPLE_TIME
def messages(db, data):
@@ -228,7 +212,7 @@ def vcard(db, data):
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
def create_html(data, output_folder, template=None, embedded=False):
def create_html(data, output_folder, template=None, embedded=False, offline_static=False, maximum_size=None):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
@@ -247,6 +231,18 @@ def create_html(data, output_folder, template=None, embedded=False):
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
if len(data[contact]["messages"]) == 0:
continue
@@ -272,7 +268,8 @@ def create_html(data, output_folder, template=None, embedded=False):
name=name,
msgs=data[contact]["messages"].values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j"
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
)
if current % 10 == 0:

View File

@@ -0,0 +1,588 @@
#!/usr/bin/python3
import sqlite3
import json
import jinja2
import os
import shutil
import re
import io
import hmac
from pathlib import Path
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from enum import Enum
from mimetypes import MimeTypes
from hashlib import sha256
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
try:
import zlib
from Crypto.Cipher import AES
except ModuleNotFoundError:
support_backup = False
else:
support_backup = True
try:
import javaobj
except ModuleNotFoundError:
support_crypt15 = False
else:
support_crypt15 = True
def sanitize_except(html):
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
return None
else:
return current
CRYPT14_OFFSETS = (
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99},
{"iv": 67, "db": 193}
)
class Crypt(Enum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset():
for iv in range(0, 200):
for db in range(0, 200):
yield iv, iv + 16, db
def _generate_hmac_of_hmac(key_stream):
key = hmac.new(
hmac.new(
b'\x00' * 32,
key_stream,
sha256
).digest(),
b"backup encryption\x01",
sha256
)
return key.digest(), key_stream
def _extract_encrypted_key(keyfile):
key_stream = b""
for byte in javaobj.loads(keyfile):
key_stream += byte.to_bytes(1, "big", signed=True)
return _generate_hmac_of_hmac(key_stream)
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
if not support_backup:
return 1
if isinstance(key, io.IOBase):
key = key.read()
if crypt is not Crypt.CRYPT15:
t1 = key[30:62]
if crypt is not Crypt.CRYPT15 and len(key) != 158:
raise ValueError("The key file must be 158 bytes")
if crypt == Crypt.CRYPT14:
if len(database) < 191:
raise ValueError("The crypt14 file must be at least 191 bytes")
current_try = 0
offsets = CRYPT14_OFFSETS[current_try]
t2 = database[15:47]
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
elif crypt == Crypt.CRYPT12:
if len(database) < 67:
raise ValueError("The crypt12 file must be at least 67 bytes")
t2 = database[3:35]
iv = database[51:67]
db_ciphertext = database[67:-20]
elif crypt == Crypt.CRYPT15:
if not support_crypt15:
return 1
if len(database) < 131:
raise ValueError("The crypt15 file must be at least 131 bytes")
t1 = t2 = None
iv = database[8:24]
db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type
db_ciphertext = database[db_offset:]
if t1 != t2:
raise ValueError("The signature of key file and backup file mismatch")
if crypt == Crypt.CRYPT15:
if len(key) == 32:
main_key, hex_key = _generate_hmac_of_hmac(key)
else:
main_key, hex_key = _extract_encrypted_key(key)
if show_crypt15:
hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]
print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key))
else:
main_key = key[126:]
decompressed = False
while not decompressed:
cipher = AES.new(main_key, AES.MODE_GCM, iv)
db_compressed = cipher.decrypt(db_ciphertext)
try:
db = zlib.decompress(db_compressed)
except zlib.error:
if crypt == Crypt.CRYPT14:
current_try += 1
if current_try < len(CRYPT14_OFFSETS):
offsets = CRYPT14_OFFSETS[current_try]
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
continue
else:
print("Common offsets are not applicable to "
"your backup. Trying to brute force it...")
for start_iv, end_iv, start_db in brute_force_offset():
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
cipher = AES.new(main_key, AES.MODE_GCM, iv)
db_compressed = cipher.decrypt(db_ciphertext)
try:
db = zlib.decompress(db_compressed)
except zlib.error:
continue
else:
decompressed = True
print(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues/new"
)
break
if not decompressed:
return 2
else:
return 3
else:
decompressed = True
if db[0:6].upper() == b"SQLITE":
with open(output, "wb") as f:
f.write(db)
return 0
else:
raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...")
def contacts(db, data):
# Get contacts
c = db.cursor()
c.execute("""SELECT count() FROM wa_contacts""")
total_row_number = c.fetchone()[0]
print(f"Gathering contacts...({total_row_number})")
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
row = c.fetchone()
while row is not None:
data[row["jid"]] = ChatStore(row["display_name"])
row = c.fetchone()
def messages(db, data):
# Get message history
c = db.cursor()
c.execute("""SELECT count() FROM message""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
c.execute("""SELECT jid_global.raw_string as key_remote_jid,
message._id,
message.from_me as key_from_me,
message.timestamp,
message.text_data as data,
message.status,
message_future.version as edit_version,
message_thumbnail.thumbnail as thumb_image,
message_media.file_path as remote_resource,
message_media.mime_type as media_wa_type,
message_location.latitude,
message_location.longitude,
message_quoted.key_id as quoted,
message.key_id,
message_quoted.text_data as quoted_data,
message.message_type,
jid_group.raw_string as group_sender_jid,
chat.subject as chat_subject
FROM message
LEFT JOIN message_quoted
ON message_quoted.message_row_id = message._id
LEFT JOIN message_location
ON message_location.message_row_id = message._id
LEFT JOIN message_media
ON message_media.message_row_id = message._id
LEFT JOIN message_thumbnail
ON message_thumbnail.message_row_id = message._id
LEFT JOIN message_future
ON message_future.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid jid_global
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE key_remote_jid <> '-1';""")
i = 0
content = c.fetchone()
while content is not None:
if content["key_remote_jid"] not in data:
data[content["key_remote_jid"]] = ChatStore()
if content["key_remote_jid"] is None:
continue
data[content["key_remote_jid"]].add_message(content["_id"], Message(
from_me=content["key_from_me"],
timestamp=content["timestamp"],
time=content["timestamp"],
key_id=content["key_id"],
))
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
name = None
if content["chat_subject"] is not None:
_jid = content["group_sender_jid"]
else:
_jid = content["key_remote_jid"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
else:
fallback = None
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
else:
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
if content["quoted"] is not None:
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
if content["message_type"] == 1:
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
if content["status"] == 6:
if content["chat_subject"] is not None:
# Is Group
if content["data"] is not None:
try:
int(content["data"])
except ValueError:
msg = f"The group name changed to {content['data']}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
thumb_image = content["thumb_image"]
if thumb_image is not None:
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
# Add user
added = phone_number_re.search(
thumb_image.decode("unicode_escape"))[0]
if added in data:
name_right = data[added]["name"]
else:
name_right = added.split('@')[0]
if content["remote_resource"] is not None:
if content["remote_resource"] in data:
name_left = data[content["remote_resource"]]["name"]
else:
name_left = content["remote_resource"].split('@')[0]
msg = f"{name_left} added {name_right or 'You'}"
else:
msg = f"Added {name_right or 'You'}"
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
# Changed number
original = content["remote_resource"].split('@')[0]
changed = thumb_image[7:].decode().split('@')[0]
msg = f"{original} changed to {changed}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["data"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
# Private chat
if content["data"] is None and content["thumb_image"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
if content["key_from_me"] == 1:
if content["status"] == 5 and content["edit_version"] == 7:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["media_wa_type"] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content["status"] == 0 and content["edit_version"] == 7:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["media_wa_type"] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
i += 1
if i % 1000 == 0:
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(f"Gathering messages...({total_row_number}/{total_row_number})", end="\r")
def media(db, data, media_folder):
# Get media
c = db.cursor()
c.execute("""SELECT count() FROM message_media""")
total_row_number = c.fetchone()[0]
print(f"\nGathering media...(0/{total_row_number})", end="\r")
i = 0
c.execute("""SELECT jid.raw_string,
message_row_id,
file_path,
message_url,
mime_type,
media_key
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY jid.raw_string ASC""")
content = c.fetchone()
mime = MimeTypes()
while content is not None:
file_path = f"{media_folder}/{content['file_path']}"
data[content["raw_string"]].messages[content["message_row_id"]].media = True
if os.path.isfile(file_path):
data[content["raw_string"]].messages[content["message_row_id"]].data = file_path
if content["mime_type"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
data[content["raw_string"]].messages[content["message_row_id"]].mime = guess
else:
data[content["raw_string"]].messages[content["message_row_id"]].mime = "data/data"
else:
data[content["raw_string"]].messages[content["message_row_id"]].mime = content["mime_type"]
else:
# if "https://mmg" in content["mime_type"]:
# try:
# r = requests.get(content["message_url"])
# if r.status_code != 200:
# raise RuntimeError()
# except:
# data[content["raw_string"]].messages[content["message_row_id"]].data = "{The media is missing}"
# data[content["raw_string"]].messages[content["message_row_id"]].media = True
# data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
# else:
data[content["raw_string"]].messages[content["message_row_id"]].data = "The media is missing"
data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
data[content["raw_string"]].messages[content["message_row_id"]].meta = True
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
c = db.cursor()
c.execute("""SELECT message_row_id,
jid.raw_string,
vcard,
message.text_data
FROM message_vcard
INNER JOIN message
ON message_vcard.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY message.chat_row_id ASC;""")
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
base = "WhatsApp/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
media_name = row["text_data"] if row["text_data"] else ""
file_name = "".join(x for x in media_name if x.isalnum())
file_path = f"{base}/{file_name}.vcf"
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row["vcard"])
data[row["raw_string"]].messages[row["message_row_id"]].data = media_name + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
data[row["raw_string"]].messages[row["message_row_id"]].mime = "text/x-vcard"
data[row["raw_string"]].messages[row["message_row_id"]].meta = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
def create_html(
data,
output_folder,
template=None,
embedded=False,
offline_static=False,
maximum_size=None
):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
else:
template_dir = os.path.dirname(template)
template_file = os.path.basename(template)
templateLoader = jinja2.FileSystemLoader(searchpath=template_dir)
templateEnv = jinja2.Environment(loader=templateLoader)
templateEnv.globals.update(determine_day=determine_day)
templateEnv.filters['sanitize_except'] = sanitize_except
template = templateEnv.get_template(template_file)
total_row_number = len(data)
print(f"\nCreating HTML...(0/{total_row_number})", end="\r")
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
if len(data[contact].messages) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
file_name = ""
else:
file_name = phone_number
if data[contact].name is not None:
if file_name != "":
file_name += "-"
file_name += data[contact].name.replace("/", "-")
name = data[contact].name
else:
name = phone_number
safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ")
with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f:
f.write(
template.render(
name=name,
msgs=data[contact].messages.values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
)
if current % 10 == 0:
print(f"Creating HTML...({current}/{total_row_number})", end="\r")
print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r")
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
"-w",
"--wa",
dest="wa",
default="wa.db",
help="Path to contact database")
parser.add_option(
"-m",
"--media",
dest="media",
default="WhatsApp",
help="Path to WhatsApp media folder"
)
# parser.add_option(
# "-t",
# "--template",
# dest="html",
# default="wa.db",
# help="Path to HTML template")
(options, args) = parser.parse_args()
msg_db = "msgstore.db"
output_folder = "temp"
contact_db = options.wa
media_folder = options.media
if len(args) == 1:
msg_db = args[0]
elif len(args) == 2:
msg_db = args[0]
output_folder = args[1]
data = {}
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
contacts(db, data)
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
messages(db, data)
media(db, data, media_folder)
vcard(db, data)
create_html(data, output_folder)
if not os.path.isdir(f"{output_folder}/WhatsApp"):
shutil.move(media_folder, f"{output_folder}/")
with open("result.json", "w") as f:
data = json.dumps(data)
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
f.write(data)
print("Everything is done!")

View File

@@ -0,0 +1,74 @@
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from enum import Enum
def sanitize_except(html):
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
return None
else:
return current
# Android Specific
CRYPT14_OFFSETS = (
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99},
{"iv": 67, "db": 193}
)
class Crypt(Enum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset(max_iv=200, max_db=200):
for iv in range(0, max_iv):
for db in range(0, max_db):
yield iv, iv + 16, db
def check_update():
import urllib.request
import json
from sys import platform
from .__init__ import __version__
package_url_json = "https://pypi.org/pypi/whatsapp-chat-exporter/json"
try:
raw = urllib.request.urlopen(package_url_json)
except Exception:
print("Failed to check for updates.")
return 1
else:
with raw:
package_info = json.load(raw)
latest_version = tuple(map(int, package_info["info"]["version"].split(".")))
current_version = tuple(map(int, __version__.split(".")))
if current_version < latest_version:
print("===============Update===============")
print("A newer version of WhatsApp Chat Exporter is available.")
print("Current version: " + __version__)
print("Latest version: " + package_info["info"]["version"])
if platform == "win32":
print("Update with: pip install --upgrade whatsapp-chat-exporter")
else:
print("Update with: pip3 install --upgrade whatsapp-chat-exporter")
print("====================================")
else:
print("You are using the latest version of WhatsApp Chat Exporter.")
return 0
# iOS Specific
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))

View File

@@ -2,11 +2,10 @@
<html>
<head>
<title>Whatsapp - {{ name }}</title>
<link rel="stylesheet" href="https://www.w3schools.com/w3css/4/w3.css">
<meta charset="UTF-8">
<link rel="stylesheet" href="{{w3css}}">
<style>
@import url('https://fonts.googleapis.com/css2?family=Noto+Sans+HK:wght@300;400&display=swap');
html {
font-family: 'Noto Sans HK', sans-serif;
html, body {
font-size: 12px;
scroll-behavior: smooth;
}
@@ -139,11 +138,11 @@
{% if "image/" in msg.mime %}
<a href="{{ msg.data }}"><img src="{{ msg.data }}" /></a>
{% elif "audio/" in msg.mime %}
<audio controls="controls" autobuffer="autobuffer">
<audio controls preload="auto">
<source src="{{ msg.data }}" />
</audio>
{% elif "video/" in msg.mime %}
<video controls="controls" autobuffer="autobuffer">
<video controls preload="auto">
<source src="{{ msg.data }}" />
</video>
{% elif "/" in msg.mime %}

View File

@@ -11,11 +11,19 @@ setuptools.setup(
name="whatsapp-chat-exporter",
version=version,
author="KnugiHK",
author_email="info@knugi.com",
author_email="hello@knugi.com",
description="A Whatsapp database parser that will give you the "
"history of your Whatsapp conversations in HTML and JSON.",
long_description=long_description,
long_description_content_type="text/markdown",
license="MIT",
keywords=[
"android", "ios", "parsing", "history","iphone", "whatsapp", "message"
"customizable", "android-backup", "crypt12", "whatsapp-chat-exporter",
"whatsapp-export", "whatsapp-database", "whatsapp-database-parser",
"whatsapp-conversations", "iphone-backup", "crypt14", "crypt15", "messages"
],
platforms=["any"],
url="https://github.com/KnugiHK/Whatsapp-Chat-Exporter",
packages=setuptools.find_packages(),
package_data={