Merge pull request #60 from andrp92/dev

Feat: Support for WhatsApp Business (iPhone only)
This commit is contained in:
Knugi
2023-09-22 21:33:12 +08:00
committed by GitHub
5 changed files with 68 additions and 30 deletions

View File

@@ -177,6 +177,13 @@ def main():
action='store_true', action='store_true',
help="Import JSON file and convert to HTML output" help="Import JSON file and convert to HTML output"
) )
parser.add_argument(
"--business",
dest="business",
default=False,
action='store_true',
help="Use Whatsapp Business default files (iOS only)"
)
args = parser.parse_args() args = parser.parse_args()
# Check for updates # Check for updates
@@ -199,6 +206,9 @@ def main():
elif args.import_json and not os.path.isfile(args.json): elif args.import_json and not os.path.isfile(args.json):
print("JSON file not found.") print("JSON file not found.")
exit(1) exit(1)
if args.android and args.business:
print("WhatsApp Business is only available on iOS for now.")
exit(1)
data = {} data = {}
@@ -264,15 +274,19 @@ def main():
media = extract_iphone.media media = extract_iphone.media
vcard = extract_iphone.vcard vcard = extract_iphone.vcard
create_html = extract.create_html create_html = extract.create_html
if args.business:
from Whatsapp_Chat_Exporter.utility import WhatsAppBusinessIdentifier as identifiers
else:
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers
if args.media is None: if args.media is None:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" args.media = identifiers.DOMAIN
if args.backup is not None: if args.backup is not None:
if not os.path.isdir(args.media): if not os.path.isdir(args.media):
extract_iphone_media.extract_media(args.backup) extract_iphone_media.extract_media(args.backup, identifiers)
else: else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.") print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
if args.db is None: if args.db is None:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d" msg_db = identifiers.MESSAGE
else: else:
msg_db = args.db msg_db = args.db
if args.wa is None: if args.wa is None:
@@ -290,7 +304,7 @@ def main():
db.row_factory = sqlite3.Row db.row_factory = sqlite3.Row
messages(db, data, args.media) messages(db, data, args.media)
media(db, data, args.media) media(db, data, args.media)
vcard(db, data) vcard(db, data, args.media)
if args.android: if args.android:
extract.calls(db, data) extract.calls(db, data)
if not args.no_html: if not args.no_html:

View File

@@ -528,7 +528,7 @@ def media(db, data, media_folder):
f"Processing media...({total_row_number}/{total_row_number})", end="\r") f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data): def vcard(db, data, media_folder):
c = db.cursor() c = db.cursor()
try: try:
c.execute("""SELECT message_row_id, c.execute("""SELECT message_row_id,
@@ -558,14 +558,14 @@ def vcard(db, data):
rows = c.fetchall() rows = c.fetchall()
total_row_number = len(rows) total_row_number = len(rows)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "WhatsApp/vCards" path = f"{media_folder}/vCards"
if not os.path.isdir(base): if not os.path.isdir(path):
Path(base).mkdir(parents=True, exist_ok=True) Path(path).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows): for index, row in enumerate(rows):
media_name = row["media_name"] if row["media_name"] is not None else "" media_name = row["media_name"] if row["media_name"] is not None else ""
file_name = "".join(x for x in media_name if x.isalnum()) file_name = "".join(x for x in media_name if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore') file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf") file_path = os.path.join(path, f"{file_name}.vcf")
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
f.write(row["vcard"]) f.write(row["vcard"])

View File

@@ -244,7 +244,7 @@ def media(db, data, media_folder):
f"Processing media...({total_row_number}/{total_row_number})", end="\r") f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data): def vcard(db, data, media_folder):
c = db.cursor() c = db.cursor()
c.execute("""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM, c.execute("""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE, ZWAMEDIAITEM.ZMESSAGE,
@@ -260,13 +260,13 @@ def vcard(db, data):
contents = c.fetchall() contents = c.fetchall()
total_row_number = len(contents) total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r") print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared/Message/vCards" path = f'{media_folder}/Message/vCards'
if not os.path.isdir(base): if not os.path.isdir(path):
Path(base).mkdir(parents=True, exist_ok=True) Path(path).mkdir(parents=True, exist_ok=True)
for index, content in enumerate(contents): for index, content in enumerate(contents):
file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum()) file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore') file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf") file_path = os.path.join(path, f"{file_name}.vcf")
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
f.write(content["ZVCARDSTRING"]) f.write(content["ZVCARDSTRING"])

View File

@@ -6,29 +6,38 @@ import os
import time import time
import getpass import getpass
import threading import threading
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier
try: try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath from iphone_backup_decrypt import EncryptedBackup, RelativePath
from iphone_backup_decrypt import FailedToDecryptError, Domain from iphone_backup_decrypt import FailedToDecryptError
except ModuleNotFoundError: except ModuleNotFoundError:
support_encrypted = False support_encrypted = False
else: else:
support_encrypted = True support_encrypted = True
def extract_encrypted(base_dir, password): def extract_encrypted(base_dir, password, identifiers):
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False) backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
print("Decrypting WhatsApp database...") print("Decrypting WhatsApp database...", end="")
try: try:
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES, backup.extract_file(
output_filename="7c7fba66680ef796b916b067077cc246adacf01d") relative_path=RelativePath.WHATSAPP_MESSAGES,
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS, domain=identifiers.DOMAIN,
output_filename="b8548dc30aa1030df0ce18ef08b882cf7ab5212f") output_filename=identifiers.MESSAGE
)
backup.extract_file(
relative_path=RelativePath.WHATSAPP_CONTACTS,
domain=identifiers.DOMAIN,
output_filename=identifiers.CONTACT
)
except FailedToDecryptError: except FailedToDecryptError:
print("Failed to decrypt backup: incorrect password?") print("Failed to decrypt backup: incorrect password?")
exit() exit()
else:
print("Done")
extract_thread = threading.Thread( extract_thread = threading.Thread(
target=backup.extract_files_by_domain, target=backup.extract_files_by_domain,
args=(Domain.WHATSAPP, Domain.WHATSAPP) args=(identifiers.DOMAIN, identifiers.DOMAIN)
) )
extract_thread.daemon = True extract_thread.daemon = True
extract_thread.start() extract_thread.start()
@@ -61,7 +70,7 @@ def is_encrypted(base_dir):
return False return False
def extract_media(base_dir): def extract_media(base_dir, identifiers):
if is_encrypted(base_dir): if is_encrypted(base_dir):
if not support_encrypted: if not support_encrypted:
print("You don't have the dependencies to handle encrypted backup.") print("You don't have the dependencies to handle encrypted backup.")
@@ -70,21 +79,24 @@ def extract_media(base_dir):
return False return False
print("Encryption detected on the backup!") print("Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:") password = getpass.getpass("Enter the password for the backup:")
extract_encrypted(base_dir, password) extract_encrypted(base_dir, password, identifiers)
else: else:
wts_db = os.path.join(base_dir, "7c/7c7fba66680ef796b916b067077cc246adacf01d") wts_db = os.path.join(base_dir, identifiers.MESSAGE[:2], identifiers.MESSAGE)
contact_db = os.path.join(base_dir, "b8/b8548dc30aa1030df0ce18ef08b882cf7ab5212f") contact_db = os.path.join(base_dir, identifiers.CONTACT[:2], identifiers.CONTACT)
if not os.path.isfile(wts_db): if not os.path.isfile(wts_db):
print("WhatsApp database not found.") if identifiers is WhatsAppIdentifier:
print("WhatsApp database not found.")
else:
print("WhatsApp Business database not found.")
exit() exit()
else: else:
shutil.copyfile(wts_db, "7c7fba66680ef796b916b067077cc246adacf01d") shutil.copyfile(wts_db, identifiers.MESSAGE)
if not os.path.isfile(contact_db): if not os.path.isfile(contact_db):
print("Contact database not found.") print("Contact database not found.")
exit() exit()
else: else:
shutil.copyfile(contact_db, "b8548dc30aa1030df0ce18ef08b882cf7ab5212f") shutil.copyfile(contact_db, identifiers.CONTACT)
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" _wts_id = identifiers.DOMAIN
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest: with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
manifest.row_factory = sqlite3.Row manifest.row_factory = sqlite3.Row
c = manifest.cursor() c = manifest.cursor()

View File

@@ -290,3 +290,15 @@ def setup_template(template, no_avatar):
# iOS Specific # iOS Specific
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1)) APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))
class WhatsAppIdentifier(StrEnum):
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
class WhatsAppBusinessIdentifier(StrEnum):
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"