diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index b0b14db..fa6f0af 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -197,6 +197,17 @@ def setup_argument_parser() -> ArgumentParser: help="Create a copy of the media seperated per chat in /separated/ directory" ) + # Media Timestamp Options + timestamp_group = parser.add_argument_group('Media Timestamp Options') + timestamp_group.add_argument( + "--embed-exif", dest="embed_exif", default=False, action='store_true', + help="Embed message timestamp in EXIF data of media files (requires piexif/Pillow)" + ) + timestamp_group.add_argument( + "--rename-media", dest="rename_media", default=False, action='store_true', + help="Rename media files with timestamp prefix (YYYY-MM-DD_HH-MM-SS_filename)" + ) + # Filtering options filter_group = parser.add_argument_group('Filtering Options') filter_group.add_argument( @@ -371,6 +382,17 @@ def validate_args(parser: ArgumentParser, args) -> None: validate_chat_filters(parser, args.filter_chat_include) validate_chat_filters(parser, args.filter_chat_exclude) + # EXIF dependency validation + if args.embed_exif: + try: + import piexif + from PIL import Image + except ImportError: + parser.error( + "--embed-exif requires piexif and Pillow. " + "Install with: pip install whatsapp-chat-exporter[media_timestamp]" + ) + def validate_chat_filters(parser: ArgumentParser, chat_filter: Optional[List[str]]) -> None: """Validate chat filters to ensure they contain only phone numbers.""" @@ -570,6 +592,7 @@ def process_messages(args, data: ChatCollection) -> None: message_handler.media( db, data, args.media, args.filter_date, filter_chat, args.filter_empty, args.separate_media, args.fix_dot_files + args.embed_exif, args.rename_media, args.timezone_offset ) # Process vcards diff --git a/Whatsapp_Chat_Exporter/android_handler.py b/Whatsapp_Chat_Exporter/android_handler.py index f5c23d3..9102e88 100644 --- a/Whatsapp_Chat_Exporter/android_handler.py +++ b/Whatsapp_Chat_Exporter/android_handler.py @@ -15,6 +15,7 @@ from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, JidType, Device, from Whatsapp_Chat_Exporter.utility import rendering, get_file_name, setup_template, get_cond_for_empty from Whatsapp_Chat_Exporter.utility import get_status_location, convert_time_unit, get_jid_map_selection from Whatsapp_Chat_Exporter.utility import get_chat_condition, safe_name, bytes_to_readable, determine_metadata +from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp @@ -578,7 +579,8 @@ def _get_reactions(db, data): logging.info(f"Processed {total_row_number} reactions in {convert_time_unit(total_time)}") -def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False): +def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=True, fix_dot_files=False + embed_exif=False, rename_media=False, timezone_offset=0): """ Process WhatsApp media files from the database. @@ -590,6 +592,9 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa filter_chat: Chat filter conditions filter_empty: Filter for empty chats separate_media: Whether to separate media files by chat + embed_exif: Whether to embed EXIF timestamp in media files + rename_media: Whether to rename media files with timestamp prefix + timezone_offset: Hours offset from UTC for timestamp formatting """ c = db.cursor() total_row_number = _get_media_count(c, filter_empty, filter_date, filter_chat) @@ -607,11 +612,13 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar: while (content := _fetch_row_safely(content_cursor)) is not None: - _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files) + _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files + embed_exif, rename_media, timezone_offset) pbar.update(1) total_time = pbar.format_dict['elapsed'] logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") + # Helper functions for media processing def _get_media_count(cursor, filter_empty, filter_date, filter_chat): @@ -755,7 +762,8 @@ def _get_media_cursor_new(cursor, filter_empty, filter_date, filter_chat): return cursor -def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False): +def _process_single_media(data, content, media_folder, mime, separate_media, fix_dot_files=False, + embed_exif=False, rename_media=False, timezone_offset=0): """Process a single media file.""" file_path = f"{media_folder}/{content['file_path']}" current_chat = data.get_chat(content["key_remote_jid"]) @@ -791,8 +799,24 @@ def _process_single_media(data, content, media_folder, mime, separate_media, fix new_folder = os.path.join(media_folder, "separated", chat_display_name) Path(new_folder).mkdir(parents=True, exist_ok=True) new_path = os.path.join(new_folder, current_filename) - shutil.copy2(file_path, new_path) - message.data = new_path + # Use timestamp processing if enabled + if embed_exif or rename_media: + final_path = process_media_with_timestamp( + file_path, new_path, message.timestamp, + timezone_offset, embed_exif, rename_media + ) + message.data = final_path + else: + shutil.copy2(file_path, new_path) + message.data = new_path + elif embed_exif or rename_media: + # Handle in-place processing when not separating + # Create a copy with timestamp processing in the same folder + final_path = process_media_with_timestamp( + file_path, file_path, message.timestamp, + timezone_offset, embed_exif, rename_media + ) + message.data = final_path else: message.data = file_path else: diff --git a/Whatsapp_Chat_Exporter/ios_handler.py b/Whatsapp_Chat_Exporter/ios_handler.py index 2ee1091..33b3c01 100644 --- a/Whatsapp_Chat_Exporter/ios_handler.py +++ b/Whatsapp_Chat_Exporter/ios_handler.py @@ -13,8 +13,7 @@ from Whatsapp_Chat_Exporter.data_model import ChatStore, Message from Whatsapp_Chat_Exporter.utility import APPLE_TIME, get_chat_condition, Device from Whatsapp_Chat_Exporter.utility import bytes_to_readable, convert_time_unit, safe_name from Whatsapp_Chat_Exporter.poll import decode_poll_from_receipt_blob - - +from Whatsapp_Chat_Exporter.media_timestamp import process_media_with_timestamp def contacts(db, data): @@ -387,7 +386,8 @@ def process_message_text(message, content): message.data = msg -def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False): +def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separate_media=False, fix_dot_files=False, + embed_exif=False, rename_media=False, timezone_offset=0): """Process media files from WhatsApp messages.""" c = db.cursor() @@ -445,13 +445,15 @@ def media(db, data, media_folder, filter_date, filter_chat, filter_empty, separa mime = MimeTypes() with tqdm(total=total_row_number, desc="Processing media", unit="media", leave=False) as pbar: while (content := c.fetchone()) is not None: - process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files) + process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files, + embed_exif, rename_media, timezone_offset) pbar.update(1) total_time = pbar.format_dict['elapsed'] logging.info(f"Processed {total_row_number} media in {convert_time_unit(total_time)}") -def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False): +def process_media_item(content, data, media_folder, mime, separate_media, fix_dot_files=False, + embed_exif=False, rename_media=False, timezone_offset=0): """Process a single media item.""" file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}" current_chat = data.get_chat(content["ZCONTACTJID"]) @@ -487,10 +489,26 @@ def process_media_item(content, data, media_folder, mime, separate_media, fix_do new_folder = os.path.join(media_folder, "separated", chat_display_name) Path(new_folder).mkdir(parents=True, exist_ok=True) new_path = os.path.join(new_folder, current_filename) - shutil.copy2(file_path, new_path) - message.data = '/'.join(new_path.split("/")[1:]) + # Use timestamp processing if enabled + if embed_exif or rename_media: + final_path = process_media_with_timestamp( + file_path, new_path, message.timestamp, + timezone_offset, embed_exif, rename_media + ) + message.data = '/'.join(final_path.split("\\")[1:]) + else: + shutil.copy2(file_path, new_path) + message.data = '/'.join(new_path.split("\\")[1:]) + elif embed_exif or rename_media: + # Handle in-place processing when not separating + final_path = process_media_with_timestamp( + file_path, file_path, message.timestamp, + timezone_offset, embed_exif, rename_media + ) + message.data = '/'.join(final_path.split("/")[1:]) else: message.data = '/'.join(file_path.split("/")[1:]) + else: # Handle missing media message.data = "The media is missing" diff --git a/Whatsapp_Chat_Exporter/media_timestamp.py b/Whatsapp_Chat_Exporter/media_timestamp.py new file mode 100644 index 0000000..954219c --- /dev/null +++ b/Whatsapp_Chat_Exporter/media_timestamp.py @@ -0,0 +1,199 @@ +""" +Media timestamp utilities for embedding EXIF data and renaming files. +""" + +import os +import logging +import shutil +from datetime import datetime +from typing import Optional + +from Whatsapp_Chat_Exporter.data_model import TimeZone + +logger = logging.getLogger(__name__) + +# Optional imports for EXIF support +try: + import piexif + from PIL import Image + HAS_EXIF_SUPPORT = True +except ImportError: + HAS_EXIF_SUPPORT = False + + +def format_timestamp_for_filename(timestamp: float, timezone_offset: int = 0) -> str: + """ + Format a Unix timestamp for use in filenames. + + Args: + timestamp: Unix timestamp (seconds) + timezone_offset: Hours offset from UTC + + Returns: + Formatted string: YYYY-MM-DD_HH-MM-SS + """ + dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset)) + return dt.strftime("%Y-%m-%d_%H-%M-%S") + + +def format_timestamp_for_exif(timestamp: float, timezone_offset: int = 0) -> str: + """ + Format a Unix timestamp for EXIF DateTime fields. + + Args: + timestamp: Unix timestamp (seconds) + timezone_offset: Hours offset from UTC + + Returns: + Formatted string: YYYY:MM:DD HH:MM:SS (EXIF format) + """ + dt = datetime.fromtimestamp(timestamp, TimeZone(timezone_offset)) + return dt.strftime("%Y:%m:%d %H:%M:%S") + + +def generate_timestamped_filename( + original_path: str, + timestamp: float, + timezone_offset: int = 0 +) -> str: + """ + Generate a new filename with timestamp prefix. + + Args: + original_path: Original file path + timestamp: Unix timestamp (seconds) + timezone_offset: Hours offset from UTC + + Returns: + New filename with format: YYYY-MM-DD_HH-MM-SS_original-name.ext + """ + directory = os.path.dirname(original_path) + original_name = os.path.basename(original_path) + timestamp_prefix = format_timestamp_for_filename(timestamp, timezone_offset) + new_name = f"{timestamp_prefix}_{original_name}" + return os.path.join(directory, new_name) + + +def embed_exif_timestamp( + file_path: str, + timestamp: float, + timezone_offset: int = 0 +) -> bool: + """ + Embed timestamp in EXIF data for supported image formats. + + Args: + file_path: Path to the image file + timestamp: Unix timestamp (seconds) + timezone_offset: Hours offset from UTC + + Returns: + True if successful, False otherwise + """ + if not HAS_EXIF_SUPPORT: + logger.warning("EXIF support not available. Install piexif and Pillow.") + return False + + # Check file extension + ext = os.path.splitext(file_path)[1].lower() + if ext not in ('.jpg', '.jpeg', '.tiff', '.tif'): + logger.debug(f"EXIF embedding not supported for {ext} files: {file_path}") + return False + + try: + exif_datetime = format_timestamp_for_exif(timestamp, timezone_offset) + exif_datetime_bytes = exif_datetime.encode('utf-8') + + # Try to load existing EXIF data + try: + exif_dict = piexif.load(file_path) + except Exception: + # No existing EXIF, create empty structure + exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None} + + # Set DateTime fields in Exif IFD + exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = exif_datetime_bytes + exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = exif_datetime_bytes + + # Set DateTime in 0th IFD (basic TIFF tag) + exif_dict["0th"][piexif.ImageIFD.DateTime] = exif_datetime_bytes + + # Dump and insert EXIF data + exif_bytes = piexif.dump(exif_dict) + piexif.insert(exif_bytes, file_path) + + return True + + except Exception as e: + logger.warning(f"Failed to embed EXIF in {file_path}: {e}") + return False + + +def _handle_duplicate_filename(file_path: str) -> str: + """ + Generate a unique filename by appending a counter if file exists. + + Args: + file_path: Original file path + + Returns: + Unique file path with counter appended if necessary + """ + if not os.path.exists(file_path): + return file_path + + base, ext = os.path.splitext(file_path) + counter = 1 + + while os.path.exists(file_path): + file_path = f"{base}_{counter}{ext}" + counter += 1 + + return file_path + + +def process_media_with_timestamp( + source_path: str, + dest_path: str, + timestamp: Optional[float], + timezone_offset: int = 0, + embed_exif: bool = False, + rename_media: bool = False +) -> str: + """ + Process a media file with optional timestamp embedding and renaming. + + Args: + source_path: Source file path + dest_path: Destination file path (may be modified if renaming) + timestamp: Unix timestamp (seconds), or None if unavailable + timezone_offset: Hours offset from UTC + embed_exif: Whether to embed EXIF timestamp + rename_media: Whether to rename file with timestamp prefix + + Returns: + Final destination path (may differ from dest_path if renamed) + """ + # If no timestamp available, just copy + if timestamp is None: + logger.warning(f"No timestamp available for {source_path}, skipping timestamp operations") + shutil.copy2(source_path, dest_path) + return dest_path + + # Determine final path + final_path = dest_path + if rename_media: + final_path = generate_timestamped_filename(dest_path, timestamp, timezone_offset) + + # Handle duplicate filenames + if os.path.exists(final_path) and final_path != source_path: + final_path = _handle_duplicate_filename(final_path) + + # Copy file to destination + shutil.copy2(source_path, final_path) + + # Embed EXIF if requested + if embed_exif: + embed_exif_timestamp(final_path, timestamp, timezone_offset) + + return final_path