Refactor incremental_merge

This commit is contained in:
KnugiHK
2026-01-24 01:33:18 +08:00
parent 0c5f2b7f13
commit f05e0d3451

View File

@@ -13,7 +13,7 @@ from datetime import datetime, timedelta
from enum import IntEnum
from tqdm import tqdm
from Whatsapp_Chat_Exporter.data_model import ChatCollection, ChatStore, Timing
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union, Any
try:
from enum import StrEnum, IntEnum
except ImportError:
@@ -257,86 +257,231 @@ def import_from_json(json_file: str, data: ChatCollection):
logger.info(f"Imported {total_row_number} chats from JSON in {convert_time_unit(total_time)}{CLEAR_LINE}")
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool):
"""Merges JSON files from the source directory into the target directory.
class IncrementalMerger:
"""Handles incremental merging of WhatsApp chat exports."""
def __init__(self, pretty_print_json: int, avoid_encoding_json: bool):
"""Initialize the merger with JSON formatting options.
Args:
pretty_print_json: JSON indentation level.
avoid_encoding_json: Whether to avoid ASCII encoding.
"""
self.pretty_print_json = pretty_print_json
self.avoid_encoding_json = avoid_encoding_json
def _get_json_files(self, source_dir: str) -> List[str]:
"""Get list of JSON files from source directory.
Args:
source_dir: Path to the source directory.
Returns:
List of JSON filenames.
Raises:
SystemExit: If no JSON files are found.
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
logger.error("No JSON files found in the source directory.")
raise SystemExit(1)
logger.info("JSON files found:", json_files)
return json_files
Args:
source_dir (str): The path to the source directory containing JSON files.
target_dir (str): The path to the target directory to merge into.
media_dir (str): The path to the media directory.
"""
json_files = [f for f in os.listdir(source_dir) if f.endswith('.json')]
if not json_files:
logger.error("No JSON files found in the source directory.")
return
def _copy_new_file(self, source_path: str, target_path: str, target_dir: str, json_file: str) -> None:
"""Copy a new JSON file to target directory.
Args:
source_path: Path to source file.
target_path: Path to target file.
target_dir: Target directory path.
json_file: Name of the JSON file.
"""
logger.info(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
logger.info("JSON files found:", json_files)
def _load_chat_data(self, file_path: str) -> Dict[str, Any]:
"""Load JSON data from file.
Args:
file_path: Path to JSON file.
Returns:
Loaded JSON data.
"""
with open(file_path, 'r') as file:
return json.load(file)
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
def _parse_chats_from_json(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse JSON data into ChatStore objects.
Args:
data: Raw JSON data.
Returns:
Dictionary of JID to ChatStore objects.
"""
return {jid: ChatStore.from_json(chat) for jid, chat in data.items()}
if not os.path.exists(target_path):
logger.info(f"Copying '{json_file}' to target directory...")
os.makedirs(target_dir, exist_ok=True)
shutil.copy2(source_path, target_path)
def _merge_chat_stores(self, source_chats: Dict[str, Any], target_chats: Dict[str, Any]) -> Dict[str, Any]:
"""Merge source chats into target chats.
Args:
source_chats: Source ChatStore objects.
target_chats: Target ChatStore objects.
Returns:
Merged ChatStore objects.
"""
for jid, chat in source_chats.items():
if jid in target_chats:
target_chats[jid].merge_with(chat)
else:
target_chats[jid] = chat
return target_chats
def _serialize_chats(self, chats: Dict[str, Any]) -> Dict[str, Any]:
"""Serialize ChatStore objects to JSON format.
Args:
chats: Dictionary of ChatStore objects.
Returns:
Serialized JSON data.
"""
return {jid: chat.to_json() for jid, chat in chats.items()}
def _has_changes(self, merged_data: Dict[str, Any], original_data: Dict[str, Any]) -> bool:
"""Check if merged data differs from original data.
Args:
merged_data: Merged JSON data.
original_data: Original JSON data.
Returns:
True if changes detected, False otherwise.
"""
return json.dumps(merged_data, sort_keys=True) != json.dumps(original_data, sort_keys=True)
def _save_merged_data(self, target_path: str, merged_data: Dict[str, Any]) -> None:
"""Save merged data to target file.
Args:
target_path: Path to target file.
merged_data: Merged JSON data.
"""
with open(target_path, 'w') as merged_file:
json.dump(
merged_data,
merged_file,
indent=self.pretty_print_json,
ensure_ascii=not self.avoid_encoding_json,
)
def _merge_json_file(self, source_path: str, target_path: str, json_file: str) -> None:
"""Merge a single JSON file.
Args:
source_path: Path to source file.
target_path: Path to target file.
json_file: Name of the JSON file.
"""
logger.info(f"Merging '{json_file}' with existing file in target directory...")
source_data = self._load_chat_data(source_path)
target_data = self._load_chat_data(target_path)
source_chats = self._parse_chats_from_json(source_data)
target_chats = self._parse_chats_from_json(target_data)
merged_chats = self._merge_chat_stores(source_chats, target_chats)
merged_data = self._serialize_chats(merged_chats)
if self._has_changes(merged_data, target_data):
logger.info(f"Changes detected in '{json_file}', updating target file...")
self._save_merged_data(target_path, merged_data)
else:
logger.info(
f"Merging '{json_file}' with existing file in target directory...")
with open(source_path, 'r') as src_file, open(target_path, 'r') as tgt_file:
source_data = json.load(src_file)
target_data = json.load(tgt_file)
logger.info(f"No changes detected in '{json_file}', skipping update.")
# Parse JSON into ChatStore objects using from_json()
source_chats = {jid: ChatStore.from_json(
chat) for jid, chat in source_data.items()}
target_chats = {jid: ChatStore.from_json(
chat) for jid, chat in target_data.items()}
def _should_copy_media_file(self, source_file: str, target_file: str) -> bool:
"""Check if media file should be copied.
Args:
source_file: Path to source media file.
target_file: Path to target media file.
Returns:
True if file should be copied, False otherwise.
"""
return not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file)
# Merge chats using merge_with()
for jid, chat in source_chats.items():
if jid in target_chats:
target_chats[jid].merge_with(chat)
else:
target_chats[jid] = chat
# Serialize merged data
merged_data = {jid: chat.to_json()
for jid, chat in target_chats.items()}
# Check if the merged data differs from the original target data
if json.dumps(merged_data, sort_keys=True) != json.dumps(target_data, sort_keys=True):
logger.info(
f"Changes detected in '{json_file}', updating target file...")
with open(target_path, 'w') as merged_file:
json.dump(
merged_data,
merged_file,
indent=pretty_print_json,
ensure_ascii=not avoid_encoding_json,
)
else:
logger.info(
f"No changes detected in '{json_file}', skipping update.")
# Merge media directories
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
logger.info(
f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if os.path.exists(source_media_path):
def _merge_media_directories(self, source_dir: str, target_dir: str, media_dir: str) -> None:
"""Merge media directories from source to target.
Args:
source_dir: Source directory path.
target_dir: Target directory path.
media_dir: Media directory name.
"""
source_media_path = os.path.join(source_dir, media_dir)
target_media_path = os.path.join(target_dir, media_dir)
logger.info(f"Merging media directories. Source: {source_media_path}, target: {target_media_path}")
if not os.path.exists(source_media_path):
return
for root, _, files in os.walk(source_media_path):
relative_path = os.path.relpath(root, source_media_path)
target_root = os.path.join(target_media_path, relative_path)
os.makedirs(target_root, exist_ok=True)
for file in files:
source_file = os.path.join(root, file)
target_file = os.path.join(target_root, file)
# we only copy if the file doesn't exist in the target or if the source is newer
if not os.path.exists(target_file) or os.path.getmtime(source_file) > os.path.getmtime(target_file):
if self._should_copy_media_file(source_file, target_file):
logger.info(f"Copying '{source_file}' to '{target_file}'...")
shutil.copy2(source_file, target_file)
def merge(self, source_dir: str, target_dir: str, media_dir: str) -> None:
"""Merge JSON files and media from source to target directory.
Args:
source_dir: The path to the source directory containing JSON files.
target_dir: The path to the target directory to merge into.
media_dir: The path to the media directory.
"""
json_files = self._get_json_files(source_dir)
for json_file in json_files:
source_path = os.path.join(source_dir, json_file)
target_path = os.path.join(target_dir, json_file)
if not os.path.exists(target_path):
self._copy_new_file(source_path, target_path, target_dir, json_file)
else:
self._merge_json_file(source_path, target_path, json_file)
self._merge_media_directories(source_dir, target_dir, media_dir)
def incremental_merge(source_dir: str, target_dir: str, media_dir: str, pretty_print_json: int, avoid_encoding_json: bool) -> None:
"""Wrapper for merging JSON files from the source directory into the target directory.
Args:
source_dir: The path to the source directory containing JSON files.
target_dir: The path to the target directory to merge into.
media_dir: The path to the media directory.
pretty_print_json: JSON indentation level.
avoid_encoding_json: Whether to avoid ASCII encoding.
"""
merger = IncrementalMerger(pretty_print_json, avoid_encoding_json)
merger.merge(source_dir, target_dir, media_dir)
def get_file_name(contact: str, chat: ChatStore) -> Tuple[str, str]:
"""Generates a sanitized filename and contact name for a chat.