Merge pull request #149 from fschuh/main_test

Support for incremental merges of two export folders
This commit is contained in:
Knugi
2025-05-07 21:18:33 +08:00
committed by GitHub
7 changed files with 748 additions and 146 deletions

2
.gitignore vendored
View File

@@ -138,7 +138,9 @@ __main__
# Dev time intermidiates & temp files
result/
output/
WhatsApp/
AppDomainGroup-group.net.whatsapp.WhatsApp.shared/
/*.db
/*.db-*
/myout

View File

@@ -231,6 +231,19 @@ Contact Enrichment:
Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this
will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country
Incremental Merging:
--incremental-merge Performs an incremental merge of two exports. Requires setting both --source-
dir and --target-dir. The chats (JSON files only) and media from the source
directory will be merged into the target directory. No chat messages or media
will be deleted from the target directory; only new chat messages and media
will be added to it. This enables chat messages and media to be deleted from
the device to free up space, while ensuring they are preserved in the exported
backups.
--source-dir SOURCE_DIR
Sets the source directory. Used for performing incremental merges.
--target-dir TARGET_DIR
Sets the target directory. Used for performing incremental merges.
Miscellaneous:
-s, --showkey Show the HEX key used to decrypt the database
--check-update Check for updates (require Internet access)

View File

@@ -13,7 +13,7 @@ from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, check_update, DbType
from Whatsapp_Chat_Exporter.utility import readable_to_bytes, sanitize_filename
from Whatsapp_Chat_Exporter.utility import import_from_json, bytes_to_readable
from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, bytes_to_readable
from argparse import ArgumentParser, SUPPRESS
from datetime import datetime
from getpass import getpass
@@ -34,12 +34,12 @@ def setup_argument_parser() -> ArgumentParser:
"""Set up and return the argument parser with all options."""
parser = ArgumentParser(
description='A customizable Android and iOS/iPadOS WhatsApp database parser that '
'will give you the history of your WhatsApp conversations in HTML '
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
'will give you the history of your WhatsApp conversations in HTML '
'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.',
epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See '
'https://wts.knugi.dev/docs?dest=osl for all open source licenses.'
'https://wts.knugi.dev/docs?dest=osl for all open source licenses.'
)
# Device type arguments
device_group = parser.add_argument_group('Device Type')
device_group.add_argument(
@@ -54,7 +54,7 @@ def setup_argument_parser() -> ArgumentParser:
"-e", "--exported", dest="exported", default=None,
help="Define the target as exported chat file and specify the path to the file"
)
# Input file paths
input_group = parser.add_argument_group('Input Files')
input_group.add_argument(
@@ -86,7 +86,7 @@ def setup_argument_parser() -> ArgumentParser:
"--wab", "--wa-backup", dest="wab", default=None,
help="Path to contact database in crypt15 format"
)
# Output options
output_group = parser.add_argument_group('Output Options')
output_group.add_argument(
@@ -109,7 +109,7 @@ def setup_argument_parser() -> ArgumentParser:
"--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None,
help="Maximum (rough) size of a single output file in bytes, 0 for auto"
)
# JSON formatting options
json_group = parser.add_argument_group('JSON Options')
json_group.add_argument(
@@ -128,7 +128,7 @@ def setup_argument_parser() -> ArgumentParser:
"--import", dest="import_json", default=False, action='store_true',
help="Import JSON file and convert to HTML output"
)
# HTML options
html_group = parser.add_argument_group('HTML Options')
html_group.add_argument(
@@ -155,7 +155,7 @@ def setup_argument_parser() -> ArgumentParser:
"--headline", dest="headline", default="Chat history with ??",
help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name"
)
# Media handling
media_group = parser.add_argument_group('Media Handling')
media_group.add_argument(
@@ -166,7 +166,7 @@ def setup_argument_parser() -> ArgumentParser:
"--create-separated-media", dest="separate_media", default=False, action='store_true',
help="Create a copy of the media seperated per chat in <MEDIA>/separated/ directory"
)
# Filtering options
filter_group = parser.add_argument_group('Filtering Options')
filter_group.add_argument(
@@ -195,7 +195,7 @@ def setup_argument_parser() -> ArgumentParser:
"Setting this flag will cause the exporter to render those. "
"This is useful if chat(s) are missing from the output")
)
# Contact enrichment
contact_group = parser.add_argument_group('Contact Enrichment')
contact_group.add_argument(
@@ -206,7 +206,34 @@ def setup_argument_parser() -> ArgumentParser:
"--default-country-code", dest="default_country_code", default=None,
help="Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country"
)
# Incremental merging
inc_merging_group = parser.add_argument_group('Incremental Merging')
inc_merging_group.add_argument(
"--incremental-merge",
dest="incremental_merge",
default=False,
action='store_true',
help=("Performs an incremental merge of two exports. "
"Requires setting both --source-dir and --target-dir. "
"The chats (JSON files only) and media from the source directory will be merged into the target directory. "
"No chat messages or media will be deleted from the target directory; only new chat messages and media will be added to it. "
"This enables chat messages and media to be deleted from the device to free up space, while ensuring they are preserved in the exported backups."
)
)
inc_merging_group.add_argument(
"--source-dir",
dest="source_dir",
default=None,
help="Sets the source directory. Used for performing incremental merges."
)
inc_merging_group.add_argument(
"--target-dir",
dest="target_dir",
default=None,
help="Sets the target directory. Used for performing incremental merges."
)
# Miscellaneous
misc_group = parser.add_argument_group('Miscellaneous')
misc_group.add_argument(
@@ -233,7 +260,7 @@ def setup_argument_parser() -> ArgumentParser:
"--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int,
help="Specify the maximum number of worker for bruteforce decryption."
)
return parser
@@ -245,50 +272,59 @@ def validate_args(parser: ArgumentParser, args) -> None:
if not args.android and not args.ios and not args.exported and not args.import_json:
parser.error("You must define the device type.")
if args.no_html and not args.json and not args.text_format:
parser.error("You must either specify a JSON output file, text file output directory or enable HTML output.")
parser.error(
"You must either specify a JSON output file, text file output directory or enable HTML output.")
if args.import_json and (args.android or args.ios or args.exported or args.no_html):
parser.error("You can only use --import with -j and without --no-html, -a, -i, -e.")
parser.error(
"You can only use --import with -j and without --no-html, -a, -i, -e.")
elif args.import_json and not os.path.isfile(args.json):
parser.error("JSON file not found.")
if args.incremental_merge and (args.source_dir is None or args.target_dir is None):
parser.error(
"You must specify both --source-dir and --target-dir for incremental merge.")
if args.android and args.business:
parser.error("WhatsApp Business is only available on iOS for now.")
if "??" not in args.headline:
parser.error("--headline must contain '??' for replacement.")
# JSON validation
if args.json_per_chat and args.json and (
(args.json.endswith(".json") and os.path.isfile(args.json)) or
(args.json.endswith(".json") and os.path.isfile(args.json)) or
(not args.json.endswith(".json") and os.path.isfile(args.json))
):
parser.error("When --per-chat is enabled, the destination of --json must be a directory.")
parser.error(
"When --per-chat is enabled, the destination of --json must be a directory.")
# vCards validation
if args.enrich_from_vcards is not None and args.default_country_code is None:
parser.error("When --enrich-from-vcards is provided, you must also set --default-country-code")
parser.error(
"When --enrich-from-vcards is provided, you must also set --default-country-code")
# Size validation
if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric():
try:
args.size = readable_to_bytes(args.size)
except ValueError:
parser.error("The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)")
parser.error(
"The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)")
# Date filter validation and processing
if args.filter_date is not None:
process_date_filter(parser, args)
# Crypt15 key validation
if args.key is None and args.backup is not None and args.backup.endswith("crypt15"):
args.key = getpass("Enter your encryption key: ")
# Theme validation
if args.whatsapp_theme:
args.template = "whatsapp_new.html"
# Chat filter validation
if args.filter_chat_include is not None and args.filter_chat_exclude is not None:
parser.error("Chat inclusion and exclusion filters cannot be used together.")
parser.error(
"Chat inclusion and exclusion filters cannot be used together.")
validate_chat_filters(parser, args.filter_chat_include)
validate_chat_filters(parser, args.filter_chat_exclude)
@@ -298,21 +334,24 @@ def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str
if chat_filter is not None:
for chat in chat_filter:
if not chat.isnumeric():
parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat")
parser.error(
"Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat")
def process_date_filter(parser: ArgumentParser, args) -> None:
"""Process and validate date filter arguments."""
if " - " in args.filter_date:
start, end = args.filter_date.split(" - ")
start = int(datetime.strptime(start, args.filter_date_format).timestamp())
start = int(datetime.strptime(
start, args.filter_date_format).timestamp())
end = int(datetime.strptime(end, args.filter_date_format).timestamp())
if start < 1009843200 or end < 1009843200:
parser.error("WhatsApp was first released in 2009...")
if start > end:
parser.error("The start date cannot be a moment after the end date.")
parser.error(
"The start date cannot be a moment after the end date.")
if args.android:
args.filter_date = f"BETWEEN {start}000 AND {end}000"
elif args.ios:
@@ -324,13 +363,15 @@ def process_date_filter(parser: ArgumentParser, args) -> None:
def process_single_date_filter(parser: ArgumentParser, args) -> None:
"""Process single date comparison filters."""
if len(args.filter_date) < 3:
parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
_timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp())
parser.error(
"Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
_timestamp = int(datetime.strptime(
args.filter_date[2:], args.filter_date_format).timestamp())
if _timestamp < 1009843200:
parser.error("WhatsApp was first released in 2009...")
if args.filter_date[:2] == "> ":
if args.android:
args.filter_date = f">= {_timestamp}000"
@@ -342,7 +383,8 @@ def process_single_date_filter(parser: ArgumentParser, args) -> None:
elif args.ios:
args.filter_date = f"<= {_timestamp - APPLE_TIME}"
else:
parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
parser.error(
"Unsupported date format. See https://wts.knugi.dev/docs?dest=date")
def setup_contact_store(args) -> Optional['ContactsFromVCards']:
@@ -356,7 +398,8 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']:
)
exit(1)
contact_store = ContactsFromVCards()
contact_store.load_vcf_file(args.enrich_from_vcards, args.default_country_code)
contact_store.load_vcf_file(
args.enrich_from_vcards, args.default_country_code)
return contact_store
return None
@@ -366,9 +409,9 @@ def decrypt_android_backup(args) -> int:
if args.key is None or args.backup is None:
print("You must specify the backup file with -b and a key with -k")
return 1
print("Decryption key specified, decrypting WhatsApp backup...")
# Determine crypt type
if "crypt12" in args.backup:
crypt = Crypt.CRYPT12
@@ -379,7 +422,7 @@ def decrypt_android_backup(args) -> int:
else:
print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.")
return 1
# Get key
keyfile_stream = False
if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")):
@@ -387,10 +430,10 @@ def decrypt_android_backup(args) -> int:
else:
key = open(args.key, "rb")
keyfile_stream = True
# Read backup
db = open(args.backup, "rb").read()
# Process WAB if provided
error_wa = 0
if args.wab:
@@ -407,7 +450,7 @@ def decrypt_android_backup(args) -> int:
)
if isinstance(key, io.IOBase):
key.seek(0)
# Decrypt message database
error_message = android_crypt.decrypt_backup(
db,
@@ -419,7 +462,7 @@ def decrypt_android_backup(args) -> int:
keyfile_stream=keyfile_stream,
max_worker=args.max_bruteforce_worker
)
# Handle errors
if error_wa != 0:
return error_wa
@@ -444,7 +487,7 @@ def handle_decrypt_error(error: int) -> None:
def process_contacts(args, data: ChatCollection, contact_store=None) -> None:
"""Process contacts from the database."""
contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite"
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
db.row_factory = sqlite3.Row
@@ -457,42 +500,42 @@ def process_contacts(args, data: ChatCollection, contact_store=None) -> None:
def process_messages(args, data: ChatCollection) -> None:
"""Process messages, media and vcards from the database."""
msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE
if not os.path.isfile(msg_db):
print(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path."
)
exit(6)
filter_chat = (args.filter_chat_include, args.filter_chat_exclude)
with sqlite3.connect(msg_db) as db:
db.row_factory = sqlite3.Row
# Process messages
if args.android:
message_handler = android_handler
else:
message_handler = ios_handler
message_handler.messages(
db, data, args.media, args.timezone_offset,
db, data, args.media, args.timezone_offset,
args.filter_date, filter_chat, args.filter_empty
)
# Process media
message_handler.media(
db, data, args.media, args.filter_date,
db, data, args.media, args.filter_date,
filter_chat, args.filter_empty, args.separate_media
)
# Process vcards
message_handler.vcard(
db, data, args.media, args.filter_date,
db, data, args.media, args.filter_date,
filter_chat, args.filter_empty
)
# Process calls
process_calls(args, db, data, filter_chat)
@@ -511,9 +554,10 @@ def handle_media_directory(args) -> None:
"""Handle media directory copying or moving."""
if os.path.isdir(args.media):
media_path = os.path.join(args.output, args.media)
if os.path.isdir(media_path):
print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n")
print(
"\nWhatsApp directory already exists in output directory. Skipping...", end="\n")
else:
if args.move_media:
try:
@@ -534,7 +578,7 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None:
# Enrich from vcards if available
if contact_store and not contact_store.is_empty():
contact_store.enrich_from_vcards(data)
android_handler.create_html(
data,
args.output,
@@ -546,12 +590,12 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None:
args.whatsapp_theme,
args.headline
)
# Create text files if requested
if args.text_format:
print("Writing text file...")
android_handler.create_txt(data, args.text_format)
# Create JSON files if requested
if args.json and not args.import_json:
export_json(args, data, contact_store)
@@ -562,11 +606,11 @@ def export_json(args, data: ChatCollection, contact_store=None) -> None:
# Enrich from vcards if available
if contact_store and not contact_store.is_empty():
contact_store.enrich_from_vcards(data)
# Convert ChatStore objects to JSON
if isinstance(data.get(next(iter(data), None)), ChatStore):
data = {jik: chat.to_json() for jik, chat in data.items()}
# Export as a single file or per chat
if not args.json_per_chat:
export_single_json(args, data)
@@ -590,11 +634,11 @@ def export_multiple_json(args, data: Dict) -> None:
"""Export data to multiple JSON files, one per chat."""
# Adjust output path if needed
json_path = args.json[:-5] if args.json.endswith(".json") else args.json
# Create directory if it doesn't exist
if not os.path.isdir(json_path):
os.makedirs(json_path, exist_ok=True)
# Export each chat
total = len(data.keys())
for index, jik in enumerate(data.keys()):
@@ -602,11 +646,11 @@ def export_multiple_json(args, data: Dict) -> None:
contact = data[jik]["name"].replace('/', '')
else:
contact = jik.replace('+', '')
with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f:
file_content = json.dumps(
{jik: data[jik]},
ensure_ascii=not args.avoid_encoding_json,
{jik: data[jik]},
ensure_ascii=not args.avoid_encoding_json,
indent=args.pretty_print_json
)
f.write(file_content)
@@ -617,7 +661,7 @@ def export_multiple_json(args, data: Dict) -> None:
def process_exported_chat(args, data: ChatCollection) -> None:
"""Process an exported chat file."""
exported_handler.messages(args.exported, data, args.assume_first_as_me)
if not args.no_html:
android_handler.create_html(
data,
@@ -630,7 +674,7 @@ def process_exported_chat(args, data: ChatCollection) -> None:
args.whatsapp_theme,
args.headline
)
# Copy files to output directory
for file in glob.glob(r'*.*'):
shutil.copy(file, args.output)
@@ -641,23 +685,23 @@ def main():
# Set up and parse arguments
parser = setup_argument_parser()
args = parser.parse_args()
# Check for updates
if args.check_update:
exit(check_update())
# Validate arguments
validate_args(parser, args)
# Create output directory if it doesn't exist
os.makedirs(args.output, exist_ok=True)
# Initialize data collection
data = ChatCollection()
# Set up contact store for vCard enrichment if needed
contact_store = setup_contact_store(args)
if args.import_json:
# Import from JSON
import_from_json(args.json, data)
@@ -681,13 +725,13 @@ def main():
# Set default media path if not provided
if args.media is None:
args.media = "WhatsApp"
# Set default DB paths if not provided
if args.db is None:
args.db = "msgstore.db"
if args.wa is None:
args.wa = "wa.db"
# Decrypt backup if needed
if args.key is not None:
error = decrypt_android_backup(args)
@@ -700,34 +744,50 @@ def main():
else:
from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers
args.identifiers = identifiers
# Set default media path if not provided
if args.media is None:
args.media = identifiers.DOMAIN
# Extract media from backup if needed
if args.backup is not None:
if not os.path.isdir(args.media):
ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size)
ios_media_handler.extract_media(
args.backup, identifiers, args.decrypt_chunk_size)
else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
print(
"WhatsApp directory already exists, skipping WhatsApp file extraction.")
# Set default DB paths if not provided
if args.db is None:
args.db = identifiers.MESSAGE
if args.wa is None:
args.wa = "ContactsV2.sqlite"
# Process contacts
process_contacts(args, data, contact_store)
# Process messages, media, and calls
process_messages(args, data)
# Create output files
create_output_files(args, data, contact_store)
# Handle media directory
handle_media_directory(args)
print("Everything is done!")
if args.incremental_merge:
incremental_merge(
args.source_dir,
args.target_dir,
args.media,
args.pretty_print_json,
args.avoid_encoding_json
)
print("Incremental merge completed successfully.")
else:
# Process contacts
process_contacts(args, data, contact_store)
# Process messages, media, and calls
process_messages(args, data)
# Create output files
create_output_files(args, data, contact_store)
# Handle media directory
handle_media_directory(args)
print("Everything is done!")
if __name__ == "__main__":
main()

View File

@@ -7,6 +7,7 @@ class Timing:
"""
Handles timestamp formatting with timezone support.
"""
def __init__(self, timezone_offset: Optional[int]) -> None:
"""
Initialize Timing object.
@@ -27,7 +28,7 @@ class Timing:
Returns:
Optional[str]: Formatted timestamp string, or None if timestamp is None
"""
if timestamp:
if timestamp is not None:
timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
return datetime.fromtimestamp(timestamp, TimeZone(self.timezone_offset)).strftime(format)
return None
@@ -37,6 +38,7 @@ class TimeZone(tzinfo):
"""
Custom timezone class with fixed offset.
"""
def __init__(self, offset: int) -> None:
"""
Initialize TimeZone object.
@@ -151,6 +153,7 @@ class ChatStore:
"""
Stores chat information and messages.
"""
def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None:
"""
Initialize ChatStore object.
@@ -159,7 +162,7 @@ class ChatStore:
type (str): Device type (IOS or ANDROID)
name (Optional[str]): Chat name
media (Optional[str]): Path to media folder
Raises:
TypeError: If name is not a string or None
"""
@@ -182,7 +185,7 @@ class ChatStore:
self.their_avatar_thumb = None
self.status = None
self.media_base = ""
def __len__(self) -> int:
"""Get number of chats. Required for dict-like access."""
return len(self._messages)
@@ -192,7 +195,7 @@ class ChatStore:
if not isinstance(message, Message):
raise TypeError("message must be a Message object")
self._messages[id] = message
def get_message(self, id: str) -> 'Message':
"""Get a message from the chat store."""
return self._messages.get(id)
@@ -211,13 +214,28 @@ class ChatStore:
'their_avatar': self.their_avatar,
'their_avatar_thumb': self.their_avatar_thumb,
'status': self.status,
'media_base': self.media_base,
'messages': {id: msg.to_json() for id, msg in self._messages.items()}
}
@classmethod
def from_json(cls, data: Dict) -> 'ChatStore':
"""Create a chat store from JSON data."""
chat = cls(data.get("type"), data.get("name"))
chat.my_avatar = data.get("my_avatar")
chat.their_avatar = data.get("their_avatar")
chat.their_avatar_thumb = data.get("their_avatar_thumb")
chat.status = data.get("status")
chat.media_base = data.get("media_base")
for id, msg_data in data.get("messages", {}).items():
message = Message.from_json(msg_data)
chat.add_message(id, message)
return chat
def get_last_message(self) -> 'Message':
"""Get the most recent message in the chat."""
return tuple(self._messages.values())[-1]
def items(self):
"""Get message items pairs."""
return self._messages.items()
@@ -230,18 +248,40 @@ class ChatStore:
"""Get all message keys in the chat."""
return self._messages.keys()
def merge_with(self, other: 'ChatStore'):
"""Merge another ChatStore into this one.
Args:
other (ChatStore): The ChatStore to merge with
"""
if not isinstance(other, ChatStore):
raise TypeError("Can only merge with another ChatStore object")
# Update fields if they are not None in the other ChatStore
self.name = other.name or self.name
self.type = other.type or self.type
self.my_avatar = other.my_avatar or self.my_avatar
self.their_avatar = other.their_avatar or self.their_avatar
self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb
self.status = other.status or self.status
# Merge messages
self._messages.update(other._messages)
class Message:
"""
Represents a single message in a chat.
"""
def __init__(
self,
*,
from_me: Union[bool, int],
timestamp: int,
time: Union[int, float, str],
key_id: int,
key_id: Union[int, str],
received_timestamp: int,
read_timestamp: int,
timezone_offset: int = 0,
@@ -266,7 +306,7 @@ class Message:
self.from_me = bool(from_me)
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
timing = Timing(timezone_offset)
if isinstance(time, (int, float)):
self.time = timing.format_timestamp(self.timestamp, "%H:%M")
elif isinstance(time, str):
@@ -281,10 +321,22 @@ class Message:
self.sender = None
self.safe = False
self.mime = None
self.message_type = message_type,
self.received_timestamp = timing.format_timestamp(received_timestamp, "%Y/%m/%d %H:%M")
self.read_timestamp = timing.format_timestamp(read_timestamp, "%Y/%m/%d %H:%M")
self.message_type = message_type
if isinstance(received_timestamp, (int, float)):
self.received_timestamp = timing.format_timestamp(
received_timestamp, "%Y/%m/%d %H:%M")
elif isinstance(received_timestamp, str):
self.received_timestamp = received_timestamp
else:
self.received_timestamp = None
if isinstance(read_timestamp, (int, float)):
self.read_timestamp = timing.format_timestamp(
read_timestamp, "%Y/%m/%d %H:%M")
elif isinstance(read_timestamp, str):
self.read_timestamp = read_timestamp
else:
self.read_timestamp = None
# Extra attributes
self.reply = None
self.quoted_data = None
@@ -309,5 +361,32 @@ class Message:
'quoted_data': self.quoted_data,
'caption': self.caption,
'thumb': self.thumb,
'sticker': self.sticker
}
'sticker': self.sticker,
'message_type': self.message_type,
'received_timestamp': self.received_timestamp,
'read_timestamp': self.read_timestamp
}
@classmethod
def from_json(cls, data: Dict) -> 'Message':
message = cls(
from_me=data["from_me"],
timestamp=data["timestamp"],
time=data["time"],
key_id=data["key_id"],
message_type=data.get("message_type"),
received_timestamp=data.get("received_timestamp"),
read_timestamp=data.get("read_timestamp")
)
message.media = data.get("media")
message.meta = data.get("meta")
message.data = data.get("data")
message.sender = data.get("sender")
message.safe = data.get("safe")
message.mime = data.get("mime")
message.reply = data.get("reply")
message.quoted_data = data.get("quoted_data")
message.caption = data.get("caption")
message.thumb = data.get("thumb")
message.sticker = data.get("sticker")
return message

View File

@@ -5,6 +5,7 @@ import os
import unicodedata
import re
import math
import shutil
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime, timedelta
@@ -15,8 +16,9 @@ try:
from enum import StrEnum, IntEnum
except ImportError:
# < Python 3.11
# This should be removed when the support for Python 3.10 ends.
# This should be removed when the support for Python 3.10 ends. (31 Oct 2026)
from enum import Enum
class StrEnum(str, Enum):
pass
@@ -71,7 +73,7 @@ def bytes_to_readable(size_bytes: int) -> str:
A human-readable string representing the file size.
"""
if size_bytes == 0:
return "0B"
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
@@ -99,7 +101,7 @@ def readable_to_bytes(size_str: str) -> int:
'TB': 1024**4,
'PB': 1024**5,
'EB': 1024**6,
'ZB': 1024**7,
'ZB': 1024**7,
'YB': 1024**8
}
size_str = size_str.upper().strip()
@@ -154,7 +156,8 @@ def check_update():
else:
with raw:
package_info = json.load(raw)
latest_version = tuple(map(int, package_info["info"]["version"].split(".")))
latest_version = tuple(
map(int, package_info["info"]["version"].split(".")))
__version__ = importlib.metadata.version("whatsapp_chat_exporter")
current_version = tuple(map(int, __version__.split(".")))
if current_version < latest_version:
@@ -173,17 +176,17 @@ def check_update():
def rendering(
output_file_name,
template,
name,
msgs,
contact,
w3css,
chat,
headline,
next=False,
previous=False
):
output_file_name,
template,
name,
msgs,
contact,
w3css,
chat,
headline,
next=False,
previous=False
):
if chat.their_avatar_thumb is None and chat.their_avatar is not None:
their_avatar_thumb = chat.their_avatar
else:
@@ -255,7 +258,89 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]):
message.sticker = msg.get("sticker")
chat.add_message(id, message)
data[jid] = chat
print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
print(
f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool):
"""Merges JSON files from the source directory into the target directory.
Args:
source_dir (str): The path to the source directory containing JSON files.
target_dir (str): The path to the target directory to merge into.
media_dir (str): The path to the media directory.
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
print("No JSON files found in the source directory.")
return
print("JSON files found:", json_files)
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
if not os.path.exists(target_path):
print(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
else:
print(
f"Merging '{json_file}' with existing file in target directory...")
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
source_data = json.load(src_file)
target_data = json.load(tgt_file)
# Parse JSON into ChatStore objects using from_json()
source_chats = {jid: ChatStore.from_json(
chat) for jid, chat in source_data.items()}
target_chats = {jid: ChatStore.from_json(
chat) for jid, chat in target_data.items()}
# Merge chats using merge_with()
for jid, chat in source_chats.items():
if jid in target_chats:
target_chats[jid].merge_with(chat)
else:
target_chats[jid] = chat
# Serialize merged data
merged_data = {jid: chat.to_json()
for jid, chat in target_chats.items()}
# Check if the merged data differs from the original target data
if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True):
print(
f"Changes detected in '{json_file}', updating target file...")
with open(target_path, 'w') as merged_file:
json.dump(
merged_data,
merged_file,
indent=pretty_print_json,
ensure_ascii=not avoid_encoding_json,
)
else:
print(
f"No changes detected in '{json_file}', skipping update.")
# Merge media directories
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
print(
f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if os.path.exists(source_media_path):
for root, _, files in os.walk(source_media_path):
relative_path = os.path.relpath(root, source_media_path)
target_root = os.path.join(target_media_path, relative_path)
os.makedirs(target_root, exist_ok=True)
for file in files:
source_file = os.path.join(root, file)
target_file = os.path.join(target_root, file)
# we only copy if the file doesn't exist in the target or if the source is newer
if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file):
print(f"Copying '{source_file}' to '{target_file}'...")
shutil.copy2(source_file, target_file)
def sanitize_filename(file_name: str) -> str:
@@ -335,23 +420,29 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List
if filter is not None:
conditions = []
if len(columns) < 2 and jid is not None:
raise ValueError("There must be at least two elements in argument columns if jid is not None")
raise ValueError(
"There must be at least two elements in argument columns if jid is not None")
if jid is not None:
if platform == "android":
is_group = f"{jid}.type == 1"
elif platform == "ios":
is_group = f"{jid} IS NOT NULL"
else:
raise ValueError("Only android and ios are supported for argument platform if jid is not None")
raise ValueError(
"Only android and ios are supported for argument platform if jid is not None")
for index, chat in enumerate(filter):
if include:
conditions.append(f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'")
conditions.append(
f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'")
if len(columns) > 1:
conditions.append(f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})")
conditions.append(
f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})")
else:
conditions.append(f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'")
conditions.append(
f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'")
if len(columns) > 1:
conditions.append(f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})")
conditions.append(
f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})")
return f"AND ({' '.join(conditions)})"
else:
return ""
@@ -446,7 +537,7 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona
else:
msg = f"{old} changed their number to {new}"
elif content["action_type"] == 46:
return # Voice message in PM??? Seems no need to handle.
return # Voice message in PM??? Seems no need to handle.
elif content["action_type"] == 47:
msg = "The contact is an official business account"
elif content["action_type"] == 50:
@@ -463,7 +554,8 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona
elif content["action_type"] == 67:
return # (PM) this contact use secure service from Facebook???
elif content["action_type"] == 69:
return # (PM) this contact use secure service from Facebook??? What's the difference with 67????
# (PM) this contact use secure service from Facebook??? What's the difference with 67????
return
else:
return # Unsupported
return msg
@@ -490,7 +582,8 @@ def get_status_location(output_folder: str, offline_static: str) -> str:
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
with open(w3css_path, "wb") as f:
f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
@@ -521,6 +614,7 @@ def setup_template(template: Optional[str], no_avatar: bool, experimental: bool
template_env.filters['sanitize_except'] = sanitize_except
return template_env.get_template(template_file)
# iOS Specific
APPLE_TIME = 978307200
@@ -541,23 +635,31 @@ def slugify(value: str, allow_unicode: bool = False) -> str:
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = unicodedata.normalize('NFKD', value).encode(
'ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[-\s]+', '-', value).strip('-_')
class WhatsAppIdentifier(StrEnum):
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite
CALL = "1b432994e958845fffe8e2f190f26d1511534088" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite
# AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite
MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d"
# AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite
CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f"
# AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite
CALL = "1b432994e958845fffe8e2f190f26d1511534088"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
class WhatsAppBusinessIdentifier(StrEnum):
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite
CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
# AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite
MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
# AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite
CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552"
# AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite
CALL = "b463f7c4365eefc5a8723930d97928d4e907c603"
DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
class JidType(IntEnum):
PM = 0

View File

@@ -60,3 +60,8 @@ include = ["Whatsapp_Chat_Exporter"]
[tool.setuptools.package-data]
Whatsapp_Chat_Exporter = ["*.html"]
[dependency-groups]
dev = [
"pytest>=8.3.5",
]

View File

@@ -0,0 +1,341 @@
import os
import json
import pytest
from unittest.mock import patch, mock_open, call, MagicMock
from Whatsapp_Chat_Exporter.utility import incremental_merge
from Whatsapp_Chat_Exporter.data_model import ChatStore
# Test data setup
BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
chat_data_1 = {
"12345678@s.whatsapp.net": {
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
"their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"),
"their_avatar_thumb": None,
"status": None,
"messages": {
"24690": {
"from_me": True,
"timestamp": 1463926635.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B7E",
"meta": False,
"data": "I'm here",
"safe": False,
"sticker": False
},
"24691": { # This message only exists in target
"from_me": False,
"timestamp": 1463926641.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B8E",
"meta": False,
"data": "Great to see you",
"safe": False,
"sticker": False
}
}
}
}
chat_data_2 = {
"12345678@s.whatsapp.net": {
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
"their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"),
"their_avatar_thumb": None,
"status": None,
"messages": {
"24690": {
"from_me": True,
"timestamp": 1463926635.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B7E",
"meta": False,
"data": "I'm here",
"safe": False,
"sticker": False
},
"24692": { # This message only exists in source
"from_me": False,
"timestamp": 1463926642.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B9E",
"meta": False,
"data": "Hi there!",
"safe": False,
"sticker": False
},
}
}
}
# Expected merged data - should contain all messages with all fields initialized as they would be by Message class
chat_data_merged = {
"12345678@s.whatsapp.net": {
"name": "Friend",
"type": "ios",
"my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"),
"their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"),
"their_avatar_thumb": None,
"status": None,
"media_base": None,
"messages": {
"24690": {
"from_me": True,
"timestamp": 1463926635.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B7E",
"meta": False,
"data": "I'm here",
"sender": None,
"safe": False,
"mime": None,
"reply": None,
"quoted_data": None,
"caption": None,
"thumb": None,
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
},
"24691": {
"from_me": False,
"timestamp": 1463926641.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B8E",
"meta": False,
"data": "Great to see you",
"sender": None,
"safe": False,
"mime": None,
"reply": None,
"quoted_data": None,
"caption": None,
"thumb": None,
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
},
"24692": {
"from_me": False,
"timestamp": 1463926642.571629,
"time": "10:17",
"media": False,
"key_id": "34B5EF10FBCA37B9E",
"meta": False,
"data": "Hi there!",
"sender": None,
"safe": False,
"mime": None,
"reply": None,
"quoted_data": None,
"caption": None,
"thumb": None,
"sticker": False,
"message_type": None,
"received_timestamp": None,
"read_timestamp": None
},
}
}
}
@pytest.fixture
def mock_filesystem():
with (
patch("os.path.exists") as mock_exists,
patch("os.makedirs") as mock_makedirs,
patch("os.path.getmtime") as mock_getmtime,
patch("os.listdir") as mock_listdir,
patch("os.walk") as mock_walk,
patch("shutil.copy2") as mock_copy2,
):
yield {
"exists": mock_exists,
"makedirs": mock_makedirs,
"getmtime": mock_getmtime,
"listdir": mock_listdir,
"walk": mock_walk,
"copy2": mock_copy2,
}
def test_incremental_merge_new_file(mock_filesystem):
"""Test merging when target file doesn't exist"""
source_dir = "/source"
target_dir = "/target"
media_dir = "media"
# Setup mock filesystem
mock_filesystem["exists"].side_effect = lambda x: x == "/source"
mock_filesystem["listdir"].return_value = ["chat.json"]
# Run the function
incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify the operations
mock_filesystem["makedirs"].assert_called_once_with(target_dir, exist_ok=True)
mock_filesystem["copy2"].assert_called_once_with(
os.path.join(source_dir, "chat.json"),
os.path.join(target_dir, "chat.json")
)
def test_incremental_merge_existing_file_with_changes(mock_filesystem):
"""Test merging when target file exists and has changes"""
source_dir = "source"
target_dir = "target"
media_dir = "media"
# Setup mock filesystem
mock_filesystem["exists"].side_effect = lambda x: True
mock_filesystem["listdir"].return_value = ["chat.json"]
# Mock file operations with consistent path separators
source_file = os.path.join(source_dir, "chat.json")
target_file = os.path.join(target_dir, "chat.json")
mock_file_content = {
source_file: json.dumps(chat_data_2),
target_file: json.dumps(chat_data_1),
}
written_chunks = []
def mock_file_write(data):
written_chunks.append(data)
mock_write = MagicMock(side_effect=mock_file_write)
with patch("builtins.open", mock_open()) as mock_file:
def mock_file_read(filename, mode="r"):
if mode == 'w':
file_mock = mock_open().return_value
file_mock.write.side_effect = mock_write
return file_mock
else:
# Use normalized path for lookup
norm_filename = os.path.normpath(filename)
content = mock_file_content.get(norm_filename, '')
file_mock = mock_open(read_data=content).return_value
return file_mock
mock_file.side_effect = mock_file_read
# Run the function
incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify file operations using os.path.join
mock_file.assert_any_call(source_file, "r")
mock_file.assert_any_call(target_file, "r")
mock_file.assert_any_call(target_file, "w")
# Rest of verification code...
assert mock_write.called, "Write method was never called"
written_data = json.loads(''.join(written_chunks))
assert written_data is not None, "No data was written"
assert written_data == chat_data_merged, "Merged data does not match expected result"
messages = written_data["12345678@s.whatsapp.net"]["messages"]
assert "24690" in messages, "Common message should be present"
assert "24691" in messages, "Target-only message should be preserved"
assert "24692" in messages, "Source-only message should be added"
assert len(messages) == 3, "Should have exactly 3 messages"
def test_incremental_merge_existing_file_no_changes(mock_filesystem):
"""Test merging when target file exists but has no changes"""
source_dir = "source"
target_dir = "target"
media_dir = "media"
# Setup mock filesystem
mock_filesystem["exists"].side_effect = lambda x: True
mock_filesystem["listdir"].return_value = ["chat.json"]
# Mock file operations with consistent path separators
source_file = os.path.join(source_dir, "chat.json")
target_file = os.path.join(target_dir, "chat.json")
mock_file_content = {
source_file: json.dumps(chat_data_1),
target_file: json.dumps(chat_data_1),
}
with patch("builtins.open", mock_open()) as mock_file:
def mock_file_read(filename, mode="r"):
if mode == 'w':
file_mock = mock_open().return_value
return file_mock
else:
# Use normalized path for lookup
norm_filename = os.path.normpath(filename)
content = mock_file_content.get(norm_filename, '')
file_mock = mock_open(read_data=content).return_value
return file_mock
mock_file.side_effect = mock_file_read
# Run the function
incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify no write operations occurred on target file
write_calls = [
call for call in mock_file.mock_calls if call[0] == "().write"]
assert len(write_calls) == 0
def test_incremental_merge_media_copy(mock_filesystem):
"""Test media file copying during merge"""
source_dir = "source"
target_dir = "target"
media_dir = "media"
# Setup mock filesystem
mock_filesystem["exists"].side_effect = lambda x: True
mock_filesystem["listdir"].return_value = ["chat.json"]
mock_filesystem["walk"].return_value = [
(os.path.join(source_dir, "media"), ["subfolder"], ["file1.jpg"]),
(os.path.join(source_dir, "media", "subfolder"), [], ["file2.jpg"]),
]
mock_filesystem["getmtime"].side_effect = lambda x: 1000 if "source" in x else 500
# Mock file operations with consistent path separators
source_file = os.path.join(source_dir, "chat.json")
target_file = os.path.join(target_dir, "chat.json")
mock_file_content = {
source_file: json.dumps(chat_data_1),
target_file: json.dumps(chat_data_1),
}
with patch("builtins.open", mock_open()) as mock_file:
def mock_file_read(filename, mode="r"):
if mode == 'w':
file_mock = mock_open().return_value
return file_mock
else:
# Use normalized path for lookup
norm_filename = os.path.normpath(filename)
content = mock_file_content.get(norm_filename, '')
file_mock = mock_open(read_data=content).return_value
return file_mock
mock_file.side_effect = mock_file_read
# Run the function
incremental_merge(source_dir, target_dir, media_dir, 2, True)
# Verify media file operations
assert mock_filesystem["makedirs"].call_count >= 2 # At least target dir and media dir
assert mock_filesystem["copy2"].call_count == 2 # Two media files copied