From f05e0d34519d874598eae919bf60591719c7a45b Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 24 Jan 2026 01:33:18 +0800 Subject: [PATCH] Refactor incremental_merge --- Whatsapp_Chat_Exporter/utility.py | 273 +++++++++++++++++++++++------- 1 file changed, 209 insertions(+), 64 deletions(-) diff --git a/Whatsapp_Chat_Exporter/utility.py b/Whatsapp_Chat_Exporter/utility.py index a4db10c..fb55f3e 100644 --- a/Whatsapp_Chat_Exporter/utility.py +++ b/Whatsapp_Chat_Exporter/utility.py @@ -13,7 +13,7 @@ from datetime import datetime, timedelta from enum import IntEnum from tqdm import tqdm from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union, Any try: from enum import StrEnum, IntEnum except ImportError: @@ -257,86 +257,231 @@ def import_from_json(json_file: str, data: ChatCollection): logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}") -def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool): - """Merges JSON files from the source directory into the target directory. +class IncrementalMerger: + """Handles incremental merging of WhatsApp chat exports.""" + + def __init__(self, pretty_print_json: int, avoid_encoding_json: bool): + """Initialize the merger with JSON formatting options. + + Args: + pretty_print_json: JSON indentation level. + avoid_encoding_json: Whether to avoid ASCII encoding. + """ + self.pretty_print_json = pretty_print_json + self.avoid_encoding_json = avoid_encoding_json + + def _get_json_files(self, source_dir: str) -> List[str]: + """Get list of JSON files from source directory. + + Args: + source_dir: Path to the source directory. + + Returns: + List of JSON filenames. + + Raises: + SystemExit: If no JSON files are found. + """ + json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] + if not json_files: + logger.error("No JSON files found in the source directory.") + raise SystemExit(1) + + logger.info("JSON files found:", json_files) + return json_files - Args: - source_dir (str): The path to the source directory containing JSON files. - target_dir (str): The path to the target directory to merge into. - media_dir (str): The path to the media directory. - """ - json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')] - if not json_files: - logger.error("No JSON files found in the source directory.") - return + def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None: + """Copy a new JSON file to target directory. + + Args: + source_path: Path to source file. + target_path: Path to target file. + target_dir: Target directory path. + json_file: Name of the JSON file. + """ + logger.info(f"Copying '{json_file}' to target directory...") + os.makedirs(target_dir, exist_ok=True) + shutil.copy2(source_path, target_path) - logger.info("JSON files found:", json_files) + def _load_chat_data(self, file_path: str) -> Dict[str, Any]: + """Load JSON data from file. + + Args: + file_path: Path to JSON file. + + Returns: + Loaded JSON data. + """ + with open(file_path, 'r') as file: + return json.load(file) - for json_file in json_files: - source_path = os.path.join(source_dir, json_file) - target_path = os.path.join(target_dir, json_file) + def _parse_chats_from_json(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Parse JSON data into ChatStore objects. + + Args: + data: Raw JSON data. + + Returns: + Dictionary of JID to ChatStore objects. + """ + return {jid: ChatStore.from_json(chat) for jid, chat in data.items()} - if not os.path.exists(target_path): - logger.info(f"Copying '{json_file}' to target directory...") - os.makedirs(target_dir, exist_ok=True) - shutil.copy2(source_path, target_path) + def _merge_chat_stores(self, source_chats: Dict[str, Any], target_chats: Dict[str, Any]) -> Dict[str, Any]: + """Merge source chats into target chats. + + Args: + source_chats: Source ChatStore objects. + target_chats: Target ChatStore objects. + + Returns: + Merged ChatStore objects. + """ + for jid, chat in source_chats.items(): + if jid in target_chats: + target_chats[jid].merge_with(chat) + else: + target_chats[jid] = chat + return target_chats + + def _serialize_chats(self, chats: Dict[str, Any]) -> Dict[str, Any]: + """Serialize ChatStore objects to JSON format. + + Args: + chats: Dictionary of ChatStore objects. + + Returns: + Serialized JSON data. + """ + return {jid: chat.to_json() for jid, chat in chats.items()} + + def _has_changes(self, merged_data: Dict[str, Any], original_data: Dict[str, Any]) -> bool: + """Check if merged data differs from original data. + + Args: + merged_data: Merged JSON data. + original_data: Original JSON data. + + Returns: + True if changes detected, False otherwise. + """ + return json.dumps(merged_data, sort_keys=True) != json.dumps(original_data, sort_keys=True) + + def _save_merged_data(self, target_path: str, merged_data: Dict[str, Any]) -> None: + """Save merged data to target file. + + Args: + target_path: Path to target file. + merged_data: Merged JSON data. + """ + with open(target_path, 'w') as merged_file: + json.dump( + merged_data, + merged_file, + indent=self.pretty_print_json, + ensure_ascii=not self.avoid_encoding_json, + ) + + def _merge_json_file(self, source_path: str, target_path: str, json_file: str) -> None: + """Merge a single JSON file. + + Args: + source_path: Path to source file. + target_path: Path to target file. + json_file: Name of the JSON file. + """ + logger.info(f"Merging '{json_file}' with existing file in target directory...") + + source_data = self._load_chat_data(source_path) + target_data = self._load_chat_data(target_path) + + source_chats = self._parse_chats_from_json(source_data) + target_chats = self._parse_chats_from_json(target_data) + + merged_chats = self._merge_chat_stores(source_chats, target_chats) + merged_data = self._serialize_chats(merged_chats) + + if self._has_changes(merged_data, target_data): + logger.info(f"Changes detected in '{json_file}', updating target file...") + self._save_merged_data(target_path, merged_data) else: - logger.info( - f"Merging '{json_file}' with existing file in target directory...") - with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file: - source_data = json.load(src_file) - target_data = json.load(tgt_file) + logger.info(f"No changes detected in '{json_file}', skipping update.") - # Parse JSON into ChatStore objects using from_json() - source_chats = {jid: ChatStore.from_json( - chat) for jid, chat in source_data.items()} - target_chats = {jid: ChatStore.from_json( - chat) for jid, chat in target_data.items()} + def _should_copy_media_file(self, source_file: str, target_file: str) -> bool: + """Check if media file should be copied. + + Args: + source_file: Path to source media file. + target_file: Path to target media file. + + Returns: + True if file should be copied, False otherwise. + """ + return not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file) - # Merge chats using merge_with() - for jid, chat in source_chats.items(): - if jid in target_chats: - target_chats[jid].merge_with(chat) - else: - target_chats[jid] = chat - - # Serialize merged data - merged_data = {jid: chat.to_json() - for jid, chat in target_chats.items()} - - # Check if the merged data differs from the original target data - if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True): - logger.info( - f"Changes detected in '{json_file}', updating target file...") - with open(target_path, 'w') as merged_file: - json.dump( - merged_data, - merged_file, - indent=pretty_print_json, - ensure_ascii=not avoid_encoding_json, - ) - else: - logger.info( - f"No changes detected in '{json_file}', skipping update.") - - # Merge media directories - source_media_path = os.path.join(source_dir, media_dir) - target_media_path = os.path.join(target_dir, media_dir) - logger.info( - f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") - if os.path.exists(source_media_path): + def _merge_media_directories(self, source_dir: str, target_dir: str, media_dir: str) -> None: + """Merge media directories from source to target. + + Args: + source_dir: Source directory path. + target_dir: Target directory path. + media_dir: Media directory name. + """ + source_media_path = os.path.join(source_dir, media_dir) + target_media_path = os.path.join(target_dir, media_dir) + + logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}") + + if not os.path.exists(source_media_path): + return + for root, _, files in os.walk(source_media_path): relative_path = os.path.relpath(root, source_media_path) target_root = os.path.join(target_media_path, relative_path) os.makedirs(target_root, exist_ok=True) + for file in files: source_file = os.path.join(root, file) target_file = os.path.join(target_root, file) - # we only copy if the file doesn't exist in the target or if the source is newer - if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file): + + if self._should_copy_media_file(source_file, target_file): logger.info(f"Copying '{source_file}' to '{target_file}'...") shutil.copy2(source_file, target_file) + def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None: + """Merge JSON files and media from source to target directory. + + Args: + source_dir: The path to the source directory containing JSON files. + target_dir: The path to the target directory to merge into. + media_dir: The path to the media directory. + """ + json_files = self._get_json_files(source_dir) + + for json_file in json_files: + source_path = os.path.join(source_dir, json_file) + target_path = os.path.join(target_dir, json_file) + + if not os.path.exists(target_path): + self._copy_new_file(source_path, target_path, target_dir, json_file) + else: + self._merge_json_file(source_path, target_path, json_file) + + self._merge_media_directories(source_dir, target_dir, media_dir) + + +def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool) -> None: + """Wrapper for merging JSON files from the source directory into the target directory. + + Args: + source_dir: The path to the source directory containing JSON files. + target_dir: The path to the target directory to merge into. + media_dir: The path to the media directory. + pretty_print_json: JSON indentation level. + avoid_encoding_json: Whether to avoid ASCII encoding. + """ + merger = IncrementalMerger(pretty_print_json, avoid_encoding_json) + merger.merge(source_dir, target_dir, media_dir) + def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]: """Generates a sanitized filename and contact name for a chat.