diff --git a/.gitignore b/.gitignore index 5831f34..c427c08 100644 --- a/.gitignore +++ b/.gitignore @@ -138,7 +138,9 @@ __main__ # Dev time intermidiates & temp files result/ +output/ WhatsApp/ +AppDomainGroup-group.net.whatsapp.WhatsApp.shared/ /*.db /*.db-* /myout diff --git a/README.md b/README.md index 19cd848..94c0745 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,19 @@ Contact Enrichment: Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country +Incremental Merging: + --incremental-merge Performs an incremental merge of two exports. Requires setting both --source- + dir and --target-dir. The chats (JSON files only) and media from the source + directory will be merged into the target directory. No chat messages or media + will be deleted from the target directory; only new chat messages and media + will be added to it. This enables chat messages and media to be deleted from + the device to free up space, while ensuring they are preserved in the exported + backups. + --source-dir SOURCE_DIR + Sets the source directory. Used for performing incremental merges. + --target-dir TARGET_DIR + Sets the target directory. Used for performing incremental merges. + Miscellaneous: -s, --showkey Show the HEX key used to decrypt the database --check-update Check for updates (require Internet access) diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index bf130be..51bbabf 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -13,7 +13,7 @@ from Whatsapp_Chat_Exporter import ios_handler, ios_media_handler from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Crypt, check_update, DbType from Whatsapp_Chat_Exporter.utility import readable_to_bytes, sanitize_filename -from Whatsapp_Chat_Exporter.utility import import_from_json, bytes_to_readable +from Whatsapp_Chat_Exporter.utility import import_from_json, incremental_merge, bytes_to_readable from argparse import ArgumentParser, SUPPRESS from datetime import datetime from getpass import getpass @@ -34,12 +34,12 @@ def setup_argument_parser() -> ArgumentParser: """Set up and return the argument parser with all options.""" parser = ArgumentParser( description='A customizable Android and iOS/iPadOS WhatsApp database parser that ' - 'will give you the history of your WhatsApp conversations in HTML ' - 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', + 'will give you the history of your WhatsApp conversations in HTML ' + 'and JSON. Android Backup Crypt12, Crypt14 and Crypt15 supported.', epilog=f'WhatsApp Chat Exporter: {importlib.metadata.version("whatsapp_chat_exporter")} Licensed with MIT. See ' - 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' + 'https://wts.knugi.dev/docs?dest=osl for all open source licenses.' ) - + # Device type arguments device_group = parser.add_argument_group('Device Type') device_group.add_argument( @@ -54,7 +54,7 @@ def setup_argument_parser() -> ArgumentParser: "-e", "--exported", dest="exported", default=None, help="Define the target as exported chat file and specify the path to the file" ) - + # Input file paths input_group = parser.add_argument_group('Input Files') input_group.add_argument( @@ -86,7 +86,7 @@ def setup_argument_parser() -> ArgumentParser: "--wab", "--wa-backup", dest="wab", default=None, help="Path to contact database in crypt15 format" ) - + # Output options output_group = parser.add_argument_group('Output Options') output_group.add_argument( @@ -109,7 +109,7 @@ def setup_argument_parser() -> ArgumentParser: "--size", "--output-size", "--split", dest="size", nargs='?', const=0, default=None, help="Maximum (rough) size of a single output file in bytes, 0 for auto" ) - + # JSON formatting options json_group = parser.add_argument_group('JSON Options') json_group.add_argument( @@ -128,7 +128,7 @@ def setup_argument_parser() -> ArgumentParser: "--import", dest="import_json", default=False, action='store_true', help="Import JSON file and convert to HTML output" ) - + # HTML options html_group = parser.add_argument_group('HTML Options') html_group.add_argument( @@ -155,7 +155,7 @@ def setup_argument_parser() -> ArgumentParser: "--headline", dest="headline", default="Chat history with ??", help="The custom headline for the HTML output. Use '??' as a placeholder for the chat name" ) - + # Media handling media_group = parser.add_argument_group('Media Handling') media_group.add_argument( @@ -166,7 +166,7 @@ def setup_argument_parser() -> ArgumentParser: "--create-separated-media", dest="separate_media", default=False, action='store_true', help="Create a copy of the media seperated per chat in /separated/ directory" ) - + # Filtering options filter_group = parser.add_argument_group('Filtering Options') filter_group.add_argument( @@ -195,7 +195,7 @@ def setup_argument_parser() -> ArgumentParser: "Setting this flag will cause the exporter to render those. " "This is useful if chat(s) are missing from the output") ) - + # Contact enrichment contact_group = parser.add_argument_group('Contact Enrichment') contact_group.add_argument( @@ -206,7 +206,34 @@ def setup_argument_parser() -> ArgumentParser: "--default-country-code", dest="default_country_code", default=None, help="Use with --enrich-from-vcards. When numbers in the vcf file does not have a country code, this will be used. 1 is for US, 66 for Thailand etc. Most likely use the number of your own country" ) - + + # Incremental merging + inc_merging_group = parser.add_argument_group('Incremental Merging') + inc_merging_group.add_argument( + "--incremental-merge", + dest="incremental_merge", + default=False, + action='store_true', + help=("Performs an incremental merge of two exports. " + "Requires setting both --source-dir and --target-dir. " + "The chats (JSON files only) and media from the source directory will be merged into the target directory. " + "No chat messages or media will be deleted from the target directory; only new chat messages and media will be added to it. " + "This enables chat messages and media to be deleted from the device to free up space, while ensuring they are preserved in the exported backups." + ) + ) + inc_merging_group.add_argument( + "--source-dir", + dest="source_dir", + default=None, + help="Sets the source directory. Used for performing incremental merges." + ) + inc_merging_group.add_argument( + "--target-dir", + dest="target_dir", + default=None, + help="Sets the target directory. Used for performing incremental merges." + ) + # Miscellaneous misc_group = parser.add_argument_group('Miscellaneous') misc_group.add_argument( @@ -233,7 +260,7 @@ def setup_argument_parser() -> ArgumentParser: "--max-bruteforce-worker", dest="max_bruteforce_worker", default=10, type=int, help="Specify the maximum number of worker for bruteforce decryption." ) - + return parser @@ -245,50 +272,59 @@ def validate_args(parser: ArgumentParser, args) -> None: if not args.android and not args.ios and not args.exported and not args.import_json: parser.error("You must define the device type.") if args.no_html and not args.json and not args.text_format: - parser.error("You must either specify a JSON output file, text file output directory or enable HTML output.") + parser.error( + "You must either specify a JSON output file, text file output directory or enable HTML output.") if args.import_json and (args.android or args.ios or args.exported or args.no_html): - parser.error("You can only use --import with -j and without --no-html, -a, -i, -e.") + parser.error( + "You can only use --import with -j and without --no-html, -a, -i, -e.") elif args.import_json and not os.path.isfile(args.json): parser.error("JSON file not found.") + if args.incremental_merge and (args.source_dir is None or args.target_dir is None): + parser.error( + "You must specify both --source-dir and --target-dir for incremental merge.") if args.android and args.business: parser.error("WhatsApp Business is only available on iOS for now.") if "??" not in args.headline: parser.error("--headline must contain '??' for replacement.") - + # JSON validation if args.json_per_chat and args.json and ( - (args.json.endswith(".json") and os.path.isfile(args.json)) or + (args.json.endswith(".json") and os.path.isfile(args.json)) or (not args.json.endswith(".json") and os.path.isfile(args.json)) ): - parser.error("When --per-chat is enabled, the destination of --json must be a directory.") - + parser.error( + "When --per-chat is enabled, the destination of --json must be a directory.") + # vCards validation if args.enrich_from_vcards is not None and args.default_country_code is None: - parser.error("When --enrich-from-vcards is provided, you must also set --default-country-code") - + parser.error( + "When --enrich-from-vcards is provided, you must also set --default-country-code") + # Size validation if args.size is not None and not isinstance(args.size, int) and not args.size.isnumeric(): try: args.size = readable_to_bytes(args.size) except ValueError: - parser.error("The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") - + parser.error( + "The value for --split must be ended in pure bytes or with a proper unit (e.g., 1048576 or 1MB)") + # Date filter validation and processing if args.filter_date is not None: process_date_filter(parser, args) - + # Crypt15 key validation if args.key is None and args.backup is not None and args.backup.endswith("crypt15"): args.key = getpass("Enter your encryption key: ") - + # Theme validation if args.whatsapp_theme: args.template = "whatsapp_new.html" - + # Chat filter validation if args.filter_chat_include is not None and args.filter_chat_exclude is not None: - parser.error("Chat inclusion and exclusion filters cannot be used together.") - + parser.error( + "Chat inclusion and exclusion filters cannot be used together.") + validate_chat_filters(parser, args.filter_chat_include) validate_chat_filters(parser, args.filter_chat_exclude) @@ -298,21 +334,24 @@ def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str if chat_filter is not None: for chat in chat_filter: if not chat.isnumeric(): - parser.error("Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") + parser.error( + "Enter a phone number in the chat filter. See https://wts.knugi.dev/docs?dest=chat") def process_date_filter(parser: ArgumentParser, args) -> None: """Process and validate date filter arguments.""" if " - " in args.filter_date: start, end = args.filter_date.split(" - ") - start = int(datetime.strptime(start, args.filter_date_format).timestamp()) + start = int(datetime.strptime( + start, args.filter_date_format).timestamp()) end = int(datetime.strptime(end, args.filter_date_format).timestamp()) - + if start < 1009843200 or end < 1009843200: parser.error("WhatsApp was first released in 2009...") if start > end: - parser.error("The start date cannot be a moment after the end date.") - + parser.error( + "The start date cannot be a moment after the end date.") + if args.android: args.filter_date = f"BETWEEN {start}000 AND {end}000" elif args.ios: @@ -324,13 +363,15 @@ def process_date_filter(parser: ArgumentParser, args) -> None: def process_single_date_filter(parser: ArgumentParser, args) -> None: """Process single date comparison filters.""" if len(args.filter_date) < 3: - parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") - - _timestamp = int(datetime.strptime(args.filter_date[2:], args.filter_date_format).timestamp()) - + parser.error( + "Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + + _timestamp = int(datetime.strptime( + args.filter_date[2:], args.filter_date_format).timestamp()) + if _timestamp < 1009843200: parser.error("WhatsApp was first released in 2009...") - + if args.filter_date[:2] == "> ": if args.android: args.filter_date = f">= {_timestamp}000" @@ -342,7 +383,8 @@ def process_single_date_filter(parser: ArgumentParser, args) -> None: elif args.ios: args.filter_date = f"<= {_timestamp - APPLE_TIME}" else: - parser.error("Unsupported date format. See https://wts.knugi.dev/docs?dest=date") + parser.error( + "Unsupported date format. See https://wts.knugi.dev/docs?dest=date") def setup_contact_store(args) -> Optional['ContactsFromVCards']: @@ -356,7 +398,8 @@ def setup_contact_store(args) -> Optional['ContactsFromVCards']: ) exit(1) contact_store = ContactsFromVCards() - contact_store.load_vcf_file(args.enrich_from_vcards, args.default_country_code) + contact_store.load_vcf_file( + args.enrich_from_vcards, args.default_country_code) return contact_store return None @@ -366,9 +409,9 @@ def decrypt_android_backup(args) -> int: if args.key is None or args.backup is None: print("You must specify the backup file with -b and a key with -k") return 1 - + print("Decryption key specified, decrypting WhatsApp backup...") - + # Determine crypt type if "crypt12" in args.backup: crypt = Crypt.CRYPT12 @@ -379,7 +422,7 @@ def decrypt_android_backup(args) -> int: else: print("Unknown backup format. The backup file must be crypt12, crypt14 or crypt15.") return 1 - + # Get key keyfile_stream = False if not os.path.isfile(args.key) and all(char in string.hexdigits for char in args.key.replace(" ", "")): @@ -387,10 +430,10 @@ def decrypt_android_backup(args) -> int: else: key = open(args.key, "rb") keyfile_stream = True - + # Read backup db = open(args.backup, "rb").read() - + # Process WAB if provided error_wa = 0 if args.wab: @@ -407,7 +450,7 @@ def decrypt_android_backup(args) -> int: ) if isinstance(key, io.IOBase): key.seek(0) - + # Decrypt message database error_message = android_crypt.decrypt_backup( db, @@ -419,7 +462,7 @@ def decrypt_android_backup(args) -> int: keyfile_stream=keyfile_stream, max_worker=args.max_bruteforce_worker ) - + # Handle errors if error_wa != 0: return error_wa @@ -444,7 +487,7 @@ def handle_decrypt_error(error: int) -> None: def process_contacts(args, data: ChatCollection, contact_store=None) -> None: """Process contacts from the database.""" contact_db = args.wa if args.wa else "wa.db" if args.android else "ContactsV2.sqlite" - + if os.path.isfile(contact_db): with sqlite3.connect(contact_db) as db: db.row_factory = sqlite3.Row @@ -457,42 +500,42 @@ def process_contacts(args, data: ChatCollection, contact_store=None) -> None: def process_messages(args, data: ChatCollection) -> None: """Process messages, media and vcards from the database.""" msg_db = args.db if args.db else "msgstore.db" if args.android else args.identifiers.MESSAGE - + if not os.path.isfile(msg_db): print( "The message database does not exist. You may specify the path " "to database file with option -d or check your provided path." ) exit(6) - + filter_chat = (args.filter_chat_include, args.filter_chat_exclude) - + with sqlite3.connect(msg_db) as db: db.row_factory = sqlite3.Row - + # Process messages if args.android: message_handler = android_handler else: message_handler = ios_handler - + message_handler.messages( - db, data, args.media, args.timezone_offset, + db, data, args.media, args.timezone_offset, args.filter_date, filter_chat, args.filter_empty ) - + # Process media message_handler.media( - db, data, args.media, args.filter_date, + db, data, args.media, args.filter_date, filter_chat, args.filter_empty, args.separate_media ) - + # Process vcards message_handler.vcard( - db, data, args.media, args.filter_date, + db, data, args.media, args.filter_date, filter_chat, args.filter_empty ) - + # Process calls process_calls(args, db, data, filter_chat) @@ -511,9 +554,10 @@ def handle_media_directory(args) -> None: """Handle media directory copying or moving.""" if os.path.isdir(args.media): media_path = os.path.join(args.output, args.media) - + if os.path.isdir(media_path): - print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n") + print( + "\nWhatsApp directory already exists in output directory. Skipping...", end="\n") else: if args.move_media: try: @@ -534,7 +578,7 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None: # Enrich from vcards if available if contact_store and not contact_store.is_empty(): contact_store.enrich_from_vcards(data) - + android_handler.create_html( data, args.output, @@ -546,12 +590,12 @@ def create_output_files(args, data: ChatCollection, contact_store=None) -> None: args.whatsapp_theme, args.headline ) - + # Create text files if requested if args.text_format: print("Writing text file...") android_handler.create_txt(data, args.text_format) - + # Create JSON files if requested if args.json and not args.import_json: export_json(args, data, contact_store) @@ -562,11 +606,11 @@ def export_json(args, data: ChatCollection, contact_store=None) -> None: # Enrich from vcards if available if contact_store and not contact_store.is_empty(): contact_store.enrich_from_vcards(data) - + # Convert ChatStore objects to JSON if isinstance(data.get(next(iter(data), None)), ChatStore): data = {jik: chat.to_json() for jik, chat in data.items()} - + # Export as a single file or per chat if not args.json_per_chat: export_single_json(args, data) @@ -590,11 +634,11 @@ def export_multiple_json(args, data: Dict) -> None: """Export data to multiple JSON files, one per chat.""" # Adjust output path if needed json_path = args.json[:-5] if args.json.endswith(".json") else args.json - + # Create directory if it doesn't exist if not os.path.isdir(json_path): os.makedirs(json_path, exist_ok=True) - + # Export each chat total = len(data.keys()) for index, jik in enumerate(data.keys()): @@ -602,11 +646,11 @@ def export_multiple_json(args, data: Dict) -> None: contact = data[jik]["name"].replace('/', '') else: contact = jik.replace('+', '') - + with open(f"{json_path}/{sanitize_filename(contact)}.json", "w") as f: file_content = json.dumps( - {jik: data[jik]}, - ensure_ascii=not args.avoid_encoding_json, + {jik: data[jik]}, + ensure_ascii=not args.avoid_encoding_json, indent=args.pretty_print_json ) f.write(file_content) @@ -617,7 +661,7 @@ def export_multiple_json(args, data: Dict) -> None: def process_exported_chat(args, data: ChatCollection) -> None: """Process an exported chat file.""" exported_handler.messages(args.exported, data, args.assume_first_as_me) - + if not args.no_html: android_handler.create_html( data, @@ -630,7 +674,7 @@ def process_exported_chat(args, data: ChatCollection) -> None: args.whatsapp_theme, args.headline ) - + # Copy files to output directory for file in glob.glob(r'*.*'): shutil.copy(file, args.output) @@ -641,23 +685,23 @@ def main(): # Set up and parse arguments parser = setup_argument_parser() args = parser.parse_args() - + # Check for updates if args.check_update: exit(check_update()) - + # Validate arguments validate_args(parser, args) - + # Create output directory if it doesn't exist os.makedirs(args.output, exist_ok=True) - + # Initialize data collection data = ChatCollection() - + # Set up contact store for vCard enrichment if needed contact_store = setup_contact_store(args) - + if args.import_json: # Import from JSON import_from_json(args.json, data) @@ -681,13 +725,13 @@ def main(): # Set default media path if not provided if args.media is None: args.media = "WhatsApp" - + # Set default DB paths if not provided if args.db is None: args.db = "msgstore.db" if args.wa is None: args.wa = "wa.db" - + # Decrypt backup if needed if args.key is not None: error = decrypt_android_backup(args) @@ -700,34 +744,50 @@ def main(): else: from Whatsapp_Chat_Exporter.utility import WhatsAppIdentifier as identifiers args.identifiers = identifiers - + # Set default media path if not provided if args.media is None: args.media = identifiers.DOMAIN - + # Extract media from backup if needed if args.backup is not None: if not os.path.isdir(args.media): - ios_media_handler.extract_media(args.backup, identifiers, args.decrypt_chunk_size) + ios_media_handler.extract_media( + args.backup, identifiers, args.decrypt_chunk_size) else: - print("WhatsApp directory already exists, skipping WhatsApp file extraction.") - + print( + "WhatsApp directory already exists, skipping WhatsApp file extraction.") + # Set default DB paths if not provided if args.db is None: args.db = identifiers.MESSAGE if args.wa is None: args.wa = "ContactsV2.sqlite" - - # Process contacts - process_contacts(args, data, contact_store) - - # Process messages, media, and calls - process_messages(args, data) - - # Create output files - create_output_files(args, data, contact_store) - - # Handle media directory - handle_media_directory(args) - print("Everything is done!") \ No newline at end of file + if args.incremental_merge: + incremental_merge( + args.source_dir, + args.target_dir, + args.media, + args.pretty_print_json, + args.avoid_encoding_json + ) + print("Incremental merge completed successfully.") + else: + # Process contacts + process_contacts(args, data, contact_store) + + # Process messages, media, and calls + process_messages(args, data) + + # Create output files + create_output_files(args, data, contact_store) + + # Handle media directory + handle_media_directory(args) + + print("Everything is done!") + + +if __name__ == "__main__": + main() diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index e84154d..1ebf75d 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -7,6 +7,7 @@ class Timing: """ Handles timestamp formatting with timezone support. """ + def __init__(self, timezone_offset: Optional[int]) -> None: """ Initialize Timing object. @@ -27,7 +28,7 @@ class Timing: Returns: Optional[str]: Formatted timestamp string, or None if timestamp is None """ - if timestamp: + if timestamp is not None: timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp return datetime.fromtimestamp(timestamp, TimeZone(self.timezone_offset)).strftime(format) return None @@ -37,6 +38,7 @@ class TimeZone(tzinfo): """ Custom timezone class with fixed offset. """ + def __init__(self, offset: int) -> None: """ Initialize TimeZone object. @@ -151,6 +153,7 @@ class ChatStore: """ Stores chat information and messages. """ + def __init__(self, type: str, name: Optional[str] = None, media: Optional[str] = None) -> None: """ Initialize ChatStore object. @@ -159,7 +162,7 @@ class ChatStore: type (str): Device type (IOS or ANDROID) name (Optional[str]): Chat name media (Optional[str]): Path to media folder - + Raises: TypeError: If name is not a string or None """ @@ -182,7 +185,7 @@ class ChatStore: self.their_avatar_thumb = None self.status = None self.media_base = "" - + def __len__(self) -> int: """Get number of chats. Required for dict-like access.""" return len(self._messages) @@ -192,7 +195,7 @@ class ChatStore: if not isinstance(message, Message): raise TypeError("message must be a Message object") self._messages[id] = message - + def get_message(self, id: str) -> 'Message': """Get a message from the chat store.""" return self._messages.get(id) @@ -211,13 +214,28 @@ class ChatStore: 'their_avatar': self.their_avatar, 'their_avatar_thumb': self.their_avatar_thumb, 'status': self.status, + 'media_base': self.media_base, 'messages': {id: msg.to_json() for id, msg in self._messages.items()} } + @classmethod + def from_json(cls, data: Dict) -> 'ChatStore': + """Create a chat store from JSON data.""" + chat = cls(data.get("type"), data.get("name")) + chat.my_avatar = data.get("my_avatar") + chat.their_avatar = data.get("their_avatar") + chat.their_avatar_thumb = data.get("their_avatar_thumb") + chat.status = data.get("status") + chat.media_base = data.get("media_base") + for id, msg_data in data.get("messages", {}).items(): + message = Message.from_json(msg_data) + chat.add_message(id, message) + return chat + def get_last_message(self) -> 'Message': """Get the most recent message in the chat.""" return tuple(self._messages.values())[-1] - + def items(self): """Get message items pairs.""" return self._messages.items() @@ -230,18 +248,40 @@ class ChatStore: """Get all message keys in the chat.""" return self._messages.keys() + def merge_with(self, other: 'ChatStore'): + """Merge another ChatStore into this one. + + Args: + other (ChatStore): The ChatStore to merge with + + """ + if not isinstance(other, ChatStore): + raise TypeError("Can only merge with another ChatStore object") + + # Update fields if they are not None in the other ChatStore + self.name = other.name or self.name + self.type = other.type or self.type + self.my_avatar = other.my_avatar or self.my_avatar + self.their_avatar = other.their_avatar or self.their_avatar + self.their_avatar_thumb = other.their_avatar_thumb or self.their_avatar_thumb + self.status = other.status or self.status + + # Merge messages + self._messages.update(other._messages) + class Message: """ Represents a single message in a chat. """ + def __init__( self, *, from_me: Union[bool, int], timestamp: int, time: Union[int, float, str], - key_id: int, + key_id: Union[int, str], received_timestamp: int, read_timestamp: int, timezone_offset: int = 0, @@ -266,7 +306,7 @@ class Message: self.from_me = bool(from_me) self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp timing = Timing(timezone_offset) - + if isinstance(time, (int, float)): self.time = timing.format_timestamp(self.timestamp, "%H:%M") elif isinstance(time, str): @@ -281,10 +321,22 @@ class Message: self.sender = None self.safe = False self.mime = None - self.message_type = message_type, - self.received_timestamp = timing.format_timestamp(received_timestamp, "%Y/%m/%d %H:%M") - self.read_timestamp = timing.format_timestamp(read_timestamp, "%Y/%m/%d %H:%M") - + self.message_type = message_type + if isinstance(received_timestamp, (int, float)): + self.received_timestamp = timing.format_timestamp( + received_timestamp, "%Y/%m/%d %H:%M") + elif isinstance(received_timestamp, str): + self.received_timestamp = received_timestamp + else: + self.received_timestamp = None + if isinstance(read_timestamp, (int, float)): + self.read_timestamp = timing.format_timestamp( + read_timestamp, "%Y/%m/%d %H:%M") + elif isinstance(read_timestamp, str): + self.read_timestamp = read_timestamp + else: + self.read_timestamp = None + # Extra attributes self.reply = None self.quoted_data = None @@ -309,5 +361,32 @@ class Message: 'quoted_data': self.quoted_data, 'caption': self.caption, 'thumb': self.thumb, - 'sticker': self.sticker - } \ No newline at end of file + 'sticker': self.sticker, + 'message_type': self.message_type, + 'received_timestamp': self.received_timestamp, + 'read_timestamp': self.read_timestamp + } + + @classmethod + def from_json(cls, data: Dict) -> 'Message': + message = cls( + from_me=data["from_me"], + timestamp=data["timestamp"], + time=data["time"], + key_id=data["key_id"], + message_type=data.get("message_type"), + received_timestamp=data.get("received_timestamp"), + read_timestamp=data.get("read_timestamp") + ) + message.media = data.get("media") + message.meta = data.get("meta") + message.data = data.get("data") + message.sender = data.get("sender") + message.safe = data.get("safe") + message.mime = data.get("mime") + message.reply = data.get("reply") + message.quoted_data = data.get("quoted_data") + message.caption = data.get("caption") + message.thumb = data.get("thumb") + message.sticker = data.get("sticker") + return message diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index 6d9fd36..b6f7bc2 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -5,6 +5,7 @@ import os import unicodedata import re import math +import shutil from bleach import clean as sanitize from markupsafe import Markup from datetime import datetime, timedelta @@ -15,8 +16,9 @@ try: from enum import StrEnum, IntEnum except ImportError: # < Python 3.11 - # This should be removed when the support for Python 3.10 ends. + # This should be removed when the support for Python 3.10 ends. (31 Oct 2026) from enum import Enum + class StrEnum(str, Enum): pass @@ -71,7 +73,7 @@ def bytes_to_readable(size_bytes: int) -> str: A human-readable string representing the file size. """ if size_bytes == 0: - return "0B" + return "0B" size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) @@ -99,7 +101,7 @@ def readable_to_bytes(size_str: str) -> int: 'TB': 1024**4, 'PB': 1024**5, 'EB': 1024**6, - 'ZB': 1024**7, + 'ZB': 1024**7, 'YB': 1024**8 } size_str = size_str.upper().strip() @@ -154,7 +156,8 @@ def check_update(): else: with raw: package_info = json.load(raw) - latest_version = tuple(map(int, package_info["info"]["version"].split("."))) + latest_version = tuple( + map(int, package_info["info"]["version"].split("."))) __version__ = importlib.metadata.version("whatsapp_chat_exporter") current_version = tuple(map(int, __version__.split("."))) if current_version < latest_version: @@ -173,17 +176,17 @@ def check_update(): def rendering( - output_file_name, - template, - name, - msgs, - contact, - w3css, - chat, - headline, - next=False, - previous=False - ): + output_file_name, + template, + name, + msgs, + contact, + w3css, + chat, + headline, + next=False, + previous=False +): if chat.their_avatar_thumb is None and chat.their_avatar is not None: their_avatar_thumb = chat.their_avatar else: @@ -255,7 +258,89 @@ def import_from_json(json_file: str, data: Dict[str, ChatStore]): message.sticker = msg.get("sticker") chat.add_message(id, message) data[jid] = chat - print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") + print( + f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r") + + +def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): + """Merges JSON files from the source directory into the target directory. + + Args: + source_dir (str): The path to the source directory containing JSON files. + target_dir (str): The path to the target directory to merge into. + media_dir (str): The path to the media directory. + """ + json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] + if not json_files: + print("No JSON files found in the source directory.") + return + + print("JSON files found:", json_files) + + for json_file in json_files: + source_path = os.path.join(source_dir, json_file) + target_path = os.path.join(target_dir, json_file) + + if not os.path.exists(target_path): + print(f"Copying '{json_file}' to target directory...") + os.makedirs(target_dir, exist_ok=True) + shutil.copy2(source_path, target_path) + else: + print( + f"Merging '{json_file}' with existing file in target directory...") + with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: + source_data = json.load(src_file) + target_data = json.load(tgt_file) + + # Parse JSON into ChatStore objects using from_json() + source_chats = {jid: ChatStore.from_json( + chat) for jid, chat in source_data.items()} + target_chats = {jid: ChatStore.from_json( + chat) for jid, chat in target_data.items()} + + # Merge chats using merge_with() + for jid, chat in source_chats.items(): + if jid in target_chats: + target_chats[jid].merge_with(chat) + else: + target_chats[jid] = chat + + # Serialize merged data + merged_data = {jid: chat.to_json() + for jid, chat in target_chats.items()} + + # Check if the merged data differs from the original target data + if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): + print( + f"Changes detected in '{json_file}', updating target file...") + with open(target_path, 'w') as merged_file: + json.dump( + merged_data, + merged_file, + indent=pretty_print_json, + ensure_ascii=not avoid_encoding_json, + ) + else: + print( + f"No changes detected in '{json_file}', skipping update.") + + # Merge media directories + source_media_path = os.path.join(source_dir, media_dir) + target_media_path = os.path.join(target_dir, media_dir) + print( + f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + if os.path.exists(source_media_path): + for root, _, files in os.walk(source_media_path): + relative_path = os.path.relpath(root, source_media_path) + target_root = os.path.join(target_media_path, relative_path) + os.makedirs(target_root, exist_ok=True) + for file in files: + source_file = os.path.join(root, file) + target_file = os.path.join(target_root, file) + # we only copy if the file doesn't exist in the target or if the source is newer + if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file): + print(f"Copying '{source_file}' to '{target_file}'...") + shutil.copy2(source_file, target_file) def sanitize_filename(file_name: str) -> str: @@ -335,23 +420,29 @@ def get_chat_condition(filter: Optional[List[str]], include: bool, columns: List if filter is not None: conditions = [] if len(columns) < 2 and jid is not None: - raise ValueError("There must be at least two elements in argument columns if jid is not None") + raise ValueError( + "There must be at least two elements in argument columns if jid is not None") if jid is not None: if platform == "android": is_group = f"{jid}.type == 1" elif platform == "ios": is_group = f"{jid} IS NOT NULL" else: - raise ValueError("Only android and ios are supported for argument platform if jid is not None") + raise ValueError( + "Only android and ios are supported for argument platform if jid is not None") for index, chat in enumerate(filter): if include: - conditions.append(f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") + conditions.append( + f"{' OR' if index > 0 else ''} {columns[0]} LIKE '%{chat}%'") if len(columns) > 1: - conditions.append(f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") + conditions.append( + f" OR ({columns[1]} LIKE '%{chat}%' AND {is_group})") else: - conditions.append(f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") + conditions.append( + f"{' AND' if index > 0 else ''} {columns[0]} NOT LIKE '%{chat}%'") if len(columns) > 1: - conditions.append(f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") + conditions.append( + f" AND ({columns[1]} NOT LIKE '%{chat}%' AND {is_group})") return f"AND ({' '.join(conditions)})" else: return "" @@ -446,7 +537,7 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona else: msg = f"{old} changed their number to {new}" elif content["action_type"] == 46: - return # Voice message in PM??? Seems no need to handle. + return # Voice message in PM??? Seems no need to handle. elif content["action_type"] == 47: msg = "The contact is an official business account" elif content["action_type"] == 50: @@ -463,7 +554,8 @@ def determine_metadata(content: sqlite3.Row, init_msg: Optional[str]) -> Optiona elif content["action_type"] == 67: return # (PM) this contact use secure service from Facebook??? elif content["action_type"] == 69: - return # (PM) this contact use secure service from Facebook??? What's the difference with 67???? + # (PM) this contact use secure service from Facebook??? What's the difference with 67???? + return else: return # Unsupported return msg @@ -490,7 +582,8 @@ def get_status_location(output_folder: str, offline_static: str) -> str: w3css_path = os.path.join(static_folder, "w3.css") if not os.path.isfile(w3css_path): with urllib.request.urlopen(w3css) as resp: - with open(w3css_path, "wb") as f: f.write(resp.read()) + with open(w3css_path, "wb") as f: + f.write(resp.read()) w3css = os.path.join(offline_static, "w3.css") @@ -521,6 +614,7 @@ def setup_template(template: Optional[str], no_avatar: bool, experimental: bool template_env.filters['sanitize_except'] = sanitize_except return template_env.get_template(template_file) + # iOS Specific APPLE_TIME = 978307200 @@ -541,23 +635,31 @@ def slugify(value: str, allow_unicode: bool = False) -> str: if allow_unicode: value = unicodedata.normalize('NFKC', value) else: - value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') + value = unicodedata.normalize('NFKD', value).encode( + 'ascii', 'ignore').decode('ascii') value = re.sub(r'[^\w\s-]', '', value.lower()) return re.sub(r'[-\s]+', '-', value).strip('-_') class WhatsAppIdentifier(StrEnum): - MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite - CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite - CALL = "1b432994e958845fffe8e2f190f26d1511534088" # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ChatStorage.sqlite + MESSAGE = "7c7fba66680ef796b916b067077cc246adacf01d" + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-ContactsV2.sqlite + CONTACT = "b8548dc30aa1030df0ce18ef08b882cf7ab5212f" + # AppDomainGroup-group.net.whatsapp.WhatsApp.shared-CallHistory.sqlite + CALL = "1b432994e958845fffe8e2f190f26d1511534088" DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" class WhatsAppBusinessIdentifier(StrEnum): - MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite - CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite - CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite - DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ChatStorage.sqlite + MESSAGE = "724bd3b98b18518b455a87c1f3ac3a0d189c4466" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-ContactsV2.sqlite + CONTACT = "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552" + # AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared-CallHistory.sqlite + CALL = "b463f7c4365eefc5a8723930d97928d4e907c603" + DOMAIN = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared" + class JidType(IntEnum): PM = 0 diff --git a/pyproject.toml b/pyproject.toml index 4c90c67..054bbc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,3 +60,8 @@ include = ["Whatsapp_Chat_Exporter"] [tool.setuptools.package-data] Whatsapp_Chat_Exporter = ["*.html"] + +[dependency-groups] +dev = [ + "pytest>=8.3.5", +] diff --git a/tests/test_incremental_merge.py b/tests/test_incremental_merge.py new file mode 100644 index 0000000..39f6c19 --- /dev/null +++ b/tests/test_incremental_merge.py @@ -0,0 +1,341 @@ +import os +import json +import pytest +from unittest.mock import patch, mock_open, call, MagicMock +from Whatsapp_Chat_Exporter.utility import incremental_merge +from Whatsapp_Chat_Exporter.data_model import ChatStore + +# Test data setup +BASE_PATH = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared" +chat_data_1 = { + "12345678@s.whatsapp.net": { + "name": "Friend", + "type": "ios", + "my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"), + "their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"), + "their_avatar_thumb": None, + "status": None, + "messages": { + "24690": { + "from_me": True, + "timestamp": 1463926635.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B7E", + "meta": False, + "data": "I'm here", + "safe": False, + "sticker": False + }, + "24691": { # This message only exists in target + "from_me": False, + "timestamp": 1463926641.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B8E", + "meta": False, + "data": "Great to see you", + "safe": False, + "sticker": False + } + } + } +} + +chat_data_2 = { + "12345678@s.whatsapp.net": { + "name": "Friend", + "type": "ios", + "my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"), + "their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"), + "their_avatar_thumb": None, + "status": None, + "messages": { + "24690": { + "from_me": True, + "timestamp": 1463926635.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B7E", + "meta": False, + "data": "I'm here", + "safe": False, + "sticker": False + }, + "24692": { # This message only exists in source + "from_me": False, + "timestamp": 1463926642.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B9E", + "meta": False, + "data": "Hi there!", + "safe": False, + "sticker": False + }, + } + } +} + +# Expected merged data - should contain all messages with all fields initialized as they would be by Message class +chat_data_merged = { + "12345678@s.whatsapp.net": { + "name": "Friend", + "type": "ios", + "my_avatar": os.path.join(BASE_PATH, "Media", "Profile", "Photo.jpg"), + "their_avatar": os.path.join(BASE_PATH, "Media", "Profile", "12345678-1709851420.thumb"), + "their_avatar_thumb": None, + "status": None, + "media_base": None, + "messages": { + "24690": { + "from_me": True, + "timestamp": 1463926635.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B7E", + "meta": False, + "data": "I'm here", + "sender": None, + "safe": False, + "mime": None, + "reply": None, + "quoted_data": None, + "caption": None, + "thumb": None, + "sticker": False, + "message_type": None, + "received_timestamp": None, + "read_timestamp": None + }, + "24691": { + "from_me": False, + "timestamp": 1463926641.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B8E", + "meta": False, + "data": "Great to see you", + "sender": None, + "safe": False, + "mime": None, + "reply": None, + "quoted_data": None, + "caption": None, + "thumb": None, + "sticker": False, + "message_type": None, + "received_timestamp": None, + "read_timestamp": None + }, + "24692": { + "from_me": False, + "timestamp": 1463926642.571629, + "time": "10:17", + "media": False, + "key_id": "34B5EF10FBCA37B9E", + "meta": False, + "data": "Hi there!", + "sender": None, + "safe": False, + "mime": None, + "reply": None, + "quoted_data": None, + "caption": None, + "thumb": None, + "sticker": False, + "message_type": None, + "received_timestamp": None, + "read_timestamp": None + }, + } + } +} + + +@pytest.fixture +def mock_filesystem(): + with ( + patch("os.path.exists") as mock_exists, + patch("os.makedirs") as mock_makedirs, + patch("os.path.getmtime") as mock_getmtime, + patch("os.listdir") as mock_listdir, + patch("os.walk") as mock_walk, + patch("shutil.copy2") as mock_copy2, + ): + yield { + "exists": mock_exists, + "makedirs": mock_makedirs, + "getmtime": mock_getmtime, + "listdir": mock_listdir, + "walk": mock_walk, + "copy2": mock_copy2, + } + + +def test_incremental_merge_new_file(mock_filesystem): + """Test merging when target file doesn't exist""" + source_dir = "/source" + target_dir = "/target" + media_dir = "media" + + # Setup mock filesystem + mock_filesystem["exists"].side_effect = lambda x: x == "/source" + mock_filesystem["listdir"].return_value = ["chat.json"] + + # Run the function + incremental_merge(source_dir, target_dir, media_dir, 2, True) + + # Verify the operations + mock_filesystem["makedirs"].assert_called_once_with(target_dir, exist_ok=True) + mock_filesystem["copy2"].assert_called_once_with( + os.path.join(source_dir, "chat.json"), + os.path.join(target_dir, "chat.json") + ) + + +def test_incremental_merge_existing_file_with_changes(mock_filesystem): + """Test merging when target file exists and has changes""" + source_dir = "source" + target_dir = "target" + media_dir = "media" + + # Setup mock filesystem + mock_filesystem["exists"].side_effect = lambda x: True + mock_filesystem["listdir"].return_value = ["chat.json"] + + # Mock file operations with consistent path separators + source_file = os.path.join(source_dir, "chat.json") + target_file = os.path.join(target_dir, "chat.json") + mock_file_content = { + source_file: json.dumps(chat_data_2), + target_file: json.dumps(chat_data_1), + } + + written_chunks = [] + + def mock_file_write(data): + written_chunks.append(data) + + mock_write = MagicMock(side_effect=mock_file_write) + + with patch("builtins.open", mock_open()) as mock_file: + def mock_file_read(filename, mode="r"): + if mode == 'w': + file_mock = mock_open().return_value + file_mock.write.side_effect = mock_write + return file_mock + else: + # Use normalized path for lookup + norm_filename = os.path.normpath(filename) + content = mock_file_content.get(norm_filename, '') + file_mock = mock_open(read_data=content).return_value + return file_mock + + mock_file.side_effect = mock_file_read + + # Run the function + incremental_merge(source_dir, target_dir, media_dir, 2, True) + + # Verify file operations using os.path.join + mock_file.assert_any_call(source_file, "r") + mock_file.assert_any_call(target_file, "r") + mock_file.assert_any_call(target_file, "w") + + # Rest of verification code... + assert mock_write.called, "Write method was never called" + written_data = json.loads(''.join(written_chunks)) + assert written_data is not None, "No data was written" + assert written_data == chat_data_merged, "Merged data does not match expected result" + + messages = written_data["12345678@s.whatsapp.net"]["messages"] + assert "24690" in messages, "Common message should be present" + assert "24691" in messages, "Target-only message should be preserved" + assert "24692" in messages, "Source-only message should be added" + assert len(messages) == 3, "Should have exactly 3 messages" + + +def test_incremental_merge_existing_file_no_changes(mock_filesystem): + """Test merging when target file exists but has no changes""" + source_dir = "source" + target_dir = "target" + media_dir = "media" + + # Setup mock filesystem + mock_filesystem["exists"].side_effect = lambda x: True + mock_filesystem["listdir"].return_value = ["chat.json"] + + # Mock file operations with consistent path separators + source_file = os.path.join(source_dir, "chat.json") + target_file = os.path.join(target_dir, "chat.json") + mock_file_content = { + source_file: json.dumps(chat_data_1), + target_file: json.dumps(chat_data_1), + } + + with patch("builtins.open", mock_open()) as mock_file: + def mock_file_read(filename, mode="r"): + if mode == 'w': + file_mock = mock_open().return_value + return file_mock + else: + # Use normalized path for lookup + norm_filename = os.path.normpath(filename) + content = mock_file_content.get(norm_filename, '') + file_mock = mock_open(read_data=content).return_value + return file_mock + + mock_file.side_effect = mock_file_read + + # Run the function + incremental_merge(source_dir, target_dir, media_dir, 2, True) + + # Verify no write operations occurred on target file + write_calls = [ + call for call in mock_file.mock_calls if call[0] == "().write"] + assert len(write_calls) == 0 + + +def test_incremental_merge_media_copy(mock_filesystem): + """Test media file copying during merge""" + source_dir = "source" + target_dir = "target" + media_dir = "media" + + # Setup mock filesystem + mock_filesystem["exists"].side_effect = lambda x: True + mock_filesystem["listdir"].return_value = ["chat.json"] + mock_filesystem["walk"].return_value = [ + (os.path.join(source_dir, "media"), ["subfolder"], ["file1.jpg"]), + (os.path.join(source_dir, "media", "subfolder"), [], ["file2.jpg"]), + ] + mock_filesystem["getmtime"].side_effect = lambda x: 1000 if "source" in x else 500 + + # Mock file operations with consistent path separators + source_file = os.path.join(source_dir, "chat.json") + target_file = os.path.join(target_dir, "chat.json") + mock_file_content = { + source_file: json.dumps(chat_data_1), + target_file: json.dumps(chat_data_1), + } + + with patch("builtins.open", mock_open()) as mock_file: + def mock_file_read(filename, mode="r"): + if mode == 'w': + file_mock = mock_open().return_value + return file_mock + else: + # Use normalized path for lookup + norm_filename = os.path.normpath(filename) + content = mock_file_content.get(norm_filename, '') + file_mock = mock_open(read_data=content).return_value + return file_mock + + mock_file.side_effect = mock_file_read + + # Run the function + incremental_merge(source_dir, target_dir, media_dir, 2, True) + + # Verify media file operations + assert mock_filesystem["makedirs"].call_count >= 2 # At least target dir and media dir + assert mock_filesystem["copy2"].call_count == 2 # Two media files copied