From 0897dc2897d3c34f926b559e2bd8963671b6672e Mon Sep 17 00:00:00 2001 From: KnugiHK <24708955+KnugiHK@users.noreply.github.com> Date: Sat, 10 Jun 2023 19:24:39 +0800 Subject: [PATCH] Implement export TXT chat #22 --- Whatsapp_Chat_Exporter/__main__.py | 97 ++++++++++++++-------- Whatsapp_Chat_Exporter/data_model.py | 7 +- Whatsapp_Chat_Exporter/extract_exported.py | 81 ++++++++++++++++++ 3 files changed, 148 insertions(+), 37 deletions(-) create mode 100644 Whatsapp_Chat_Exporter/extract_exported.py diff --git a/Whatsapp_Chat_Exporter/__main__.py b/Whatsapp_Chat_Exporter/__main__.py index b04f114..f9e6072 100644 --- a/Whatsapp_Chat_Exporter/__main__.py +++ b/Whatsapp_Chat_Exporter/__main__.py @@ -2,7 +2,8 @@ try: from .__init__ import __version__ except ImportError: from Whatsapp_Chat_Exporter.__init__ import __version__ -from Whatsapp_Chat_Exporter import extract, extract_iphone +import glob +from Whatsapp_Chat_Exporter import extract, extract_exported, extract_iphone from Whatsapp_Chat_Exporter import extract_iphone_media from Whatsapp_Chat_Exporter.data_model import ChatStore from Whatsapp_Chat_Exporter.utility import Crypt, check_update @@ -93,7 +94,6 @@ def main(): help="Path to custom HTML template" ) parser.add_argument( - "-e", "--embedded", dest="embedded", default=False, @@ -147,6 +147,20 @@ def main(): action='store_true', help="Check for updates (require Internet access)" ) + parser.add_argument( + "-e", + "--exported", + dest="exported", + default=None, + help="Path to exported chat file" + ) + parser.add_argument( + "--assume-first-as-me", + dest="assume_first_as_me", + default=False, + action='store_true', + help="Assume the first message in a chat as sent by me (must be used together with -e)" + ) args = parser.parse_args() # Check for updates @@ -154,10 +168,10 @@ def main(): exit(check_update()) # Sanity checks - if args.android and args.iphone: + if args.android and args.iphone and args.exported: print("You must define only one device type.") exit(1) - if not args.android and not args.iphone: + if not args.android and not args.iphone and not args.exported: print("You must define the device type.") exit(1) if args.no_html and not args.json: @@ -216,7 +230,6 @@ def main(): with sqlite3.connect(contact_db) as db: db.row_factory = sqlite3.Row contacts(db, data) - elif args.iphone: import sys if "--iphone" in sys.argv: @@ -241,14 +254,49 @@ def main(): if args.media is None: args.media = "Message" - if os.path.isfile(msg_db): - with sqlite3.connect(msg_db) as db: - db.row_factory = sqlite3.Row - messages(db, data) - media(db, data, args.media) - vcard(db, data) + if not args.exported: + if os.path.isfile(msg_db): + with sqlite3.connect(msg_db) as db: + db.row_factory = sqlite3.Row + messages(db, data) + media(db, data, args.media) + vcard(db, data) + if not args.no_html: + create_html( + data, + args.output, + args.template, + args.embedded, + args.offline, + args.size + ) + else: + print( + "The message database does not exist. You may specify the path " + "to database file with option -d or check your provided path." + ) + exit(2) + + if os.path.isdir(args.media): + if os.path.isdir(f"{args.output}/{args.media}"): + print("Media directory already exists in output directory. Skipping...") + else: + if not args.move_media: + if os.path.isdir(f"{args.output}/WhatsApp"): + print("WhatsApp directory already exists in output directory. Skipping...") + else: + print("Copying media directory...") + shutil.copytree(args.media, f"{args.output}/WhatsApp") + else: + try: + shutil.move(args.media, f"{args.output}/") + except PermissionError: + print("Cannot remove original WhatsApp directory. " + "Perhaps the directory is opened?") + else: + extract_exported.messages(args.exported, data, args.assume_first_as_me) if not args.no_html: - create_html( + extract.create_html( data, args.output, args.template, @@ -256,29 +304,8 @@ def main(): args.offline, args.size ) - else: - print( - "The message database does not exist. You may specify the path " - "to database file with option -d or check your provided path." - ) - exit(2) - - if os.path.isdir(args.media): - if os.path.isdir(f"{args.output}/{args.media}"): - print("Media directory already exists in output directory. Skipping...") - else: - if not args.move_media: - if os.path.isdir(f"{args.output}/WhatsApp"): - print("WhatsApp directory already exists in output directory. Skipping...") - else: - print("Copying media directory...") - shutil.copytree(args.media, f"{args.output}/WhatsApp") - else: - try: - shutil.move(args.media, f"{args.output}/") - except PermissionError: - print("Cannot remove original WhatsApp directory. " - "Perhaps the directory is opened?") + for file in glob.glob(r'*.*'): + shutil.copy(file, args.output) if args.json: if isinstance(data[next(iter(data))], ChatStore): diff --git a/Whatsapp_Chat_Exporter/data_model.py b/Whatsapp_Chat_Exporter/data_model.py index a445d26..f390612 100644 --- a/Whatsapp_Chat_Exporter/data_model.py +++ b/Whatsapp_Chat_Exporter/data_model.py @@ -30,10 +30,13 @@ class ChatStore(): class Message(): - def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int): + def __init__(self, from_me: Union[bool,int], timestamp: int, time: Union[int,str], key_id: int): self.from_me = bool(from_me) self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp - self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M") + if isinstance(time, int): + self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M") + elif isinstance(time, str): + self.time = time self.media = False self.key_id = key_id self.meta = False diff --git a/Whatsapp_Chat_Exporter/extract_exported.py b/Whatsapp_Chat_Exporter/extract_exported.py new file mode 100644 index 0000000..de7783a --- /dev/null +++ b/Whatsapp_Chat_Exporter/extract_exported.py @@ -0,0 +1,81 @@ +from datetime import datetime +from mimetypes import MimeTypes +import os +from Whatsapp_Chat_Exporter.data_model import ChatStore, Message + + +def messages(path, data, assume_first_as_me=False): + """Extracts messages from the exported file""" + with open(path, "r", encoding="utf8") as file: + you = "" + data["chat"] = ChatStore() + total_row_number = len(file.readlines()) + i = 0 + file.seek(0) + for index, line in enumerate(file): + if len(line.split(" - ")) > 1: + time = line.split(" - ")[0] + if ":" not in line.split(time)[1]: + msg.data = line.split(time)[1][3:] + msg.meta = True + else: + name = line.split(time)[1].split(":")[0] + message = line.split(time)[1].split(name + ":")[1].strip() + name = name[3:] + if you == "": + if data["chat"].name is None: + if not assume_first_as_me: + while True: + ans = input(f"Is '{name}' you? (Y/N)").lower() + if ans == "y": + you = name + break + elif ans == "n": + data["chat"].name = name + break + else: + you = name + else: + if name != data["chat"].name: + you = name + if data["chat"].name is None and you != "": + if name != you: + data["chat"].name = name + msg = Message( + you == name, + datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(), + time.split(", ")[1].strip(), + index + ) + if "" in message: + msg.data = "The media is missing" + msg.mime = "media" + msg.meta = True + elif "(file attached)" in message: + mime = MimeTypes() + msg.media = True + file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip()) + if os.path.isfile(file_path): + msg.data = file_path + guess = mime.guess_type(file_path)[0] + if guess is not None: + msg.mime = guess + else: + msg.mime = "application/octet-stream" + else: + msg.data = message + data["chat"].add_message(index, msg) + else: + lookback = index - 1 + while lookback not in data["chat"].messages: + lookback -= 1 + msg = data["chat"].messages[lookback] + if msg.media: + msg.caption = line.strip() + else: + msg.data += "
" + line.strip() + + if index % 1000 == 0: + print(f"Gathering messages & media...({index}/{total_row_number})", end="\r") + print(f"Gathering messages & media...({total_row_number}/{total_row_number})", end="\r") + return data