mirror of
https://github.com/KnugiHK/WhatsApp-Chat-Exporter.git
synced 2026-02-23 09:04:04 +00:00
Implement export TXT chat #22
This commit is contained in:
@@ -2,7 +2,8 @@ try:
|
||||
from .__init__ import __version__
|
||||
except ImportError:
|
||||
from Whatsapp_Chat_Exporter.__init__ import __version__
|
||||
from Whatsapp_Chat_Exporter import extract, extract_iphone
|
||||
import glob
|
||||
from Whatsapp_Chat_Exporter import extract, extract_exported, extract_iphone
|
||||
from Whatsapp_Chat_Exporter import extract_iphone_media
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore
|
||||
from Whatsapp_Chat_Exporter.utility import Crypt, check_update
|
||||
@@ -93,7 +94,6 @@ def main():
|
||||
help="Path to custom HTML template"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-e",
|
||||
"--embedded",
|
||||
dest="embedded",
|
||||
default=False,
|
||||
@@ -147,6 +147,20 @@ def main():
|
||||
action='store_true',
|
||||
help="Check for updates (require Internet access)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-e",
|
||||
"--exported",
|
||||
dest="exported",
|
||||
default=None,
|
||||
help="Path to exported chat file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--assume-first-as-me",
|
||||
dest="assume_first_as_me",
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Assume the first message in a chat as sent by me (must be used together with -e)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Check for updates
|
||||
@@ -154,10 +168,10 @@ def main():
|
||||
exit(check_update())
|
||||
|
||||
# Sanity checks
|
||||
if args.android and args.iphone:
|
||||
if args.android and args.iphone and args.exported:
|
||||
print("You must define only one device type.")
|
||||
exit(1)
|
||||
if not args.android and not args.iphone:
|
||||
if not args.android and not args.iphone and not args.exported:
|
||||
print("You must define the device type.")
|
||||
exit(1)
|
||||
if args.no_html and not args.json:
|
||||
@@ -216,7 +230,6 @@ def main():
|
||||
with sqlite3.connect(contact_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
contacts(db, data)
|
||||
|
||||
elif args.iphone:
|
||||
import sys
|
||||
if "--iphone" in sys.argv:
|
||||
@@ -241,14 +254,49 @@ def main():
|
||||
if args.media is None:
|
||||
args.media = "Message"
|
||||
|
||||
if os.path.isfile(msg_db):
|
||||
with sqlite3.connect(msg_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
messages(db, data)
|
||||
media(db, data, args.media)
|
||||
vcard(db, data)
|
||||
if not args.exported:
|
||||
if os.path.isfile(msg_db):
|
||||
with sqlite3.connect(msg_db) as db:
|
||||
db.row_factory = sqlite3.Row
|
||||
messages(db, data)
|
||||
media(db, data, args.media)
|
||||
vcard(db, data)
|
||||
if not args.no_html:
|
||||
create_html(
|
||||
data,
|
||||
args.output,
|
||||
args.template,
|
||||
args.embedded,
|
||||
args.offline,
|
||||
args.size
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"The message database does not exist. You may specify the path "
|
||||
"to database file with option -d or check your provided path."
|
||||
)
|
||||
exit(2)
|
||||
|
||||
if os.path.isdir(args.media):
|
||||
if os.path.isdir(f"{args.output}/{args.media}"):
|
||||
print("Media directory already exists in output directory. Skipping...")
|
||||
else:
|
||||
if not args.move_media:
|
||||
if os.path.isdir(f"{args.output}/WhatsApp"):
|
||||
print("WhatsApp directory already exists in output directory. Skipping...")
|
||||
else:
|
||||
print("Copying media directory...")
|
||||
shutil.copytree(args.media, f"{args.output}/WhatsApp")
|
||||
else:
|
||||
try:
|
||||
shutil.move(args.media, f"{args.output}/")
|
||||
except PermissionError:
|
||||
print("Cannot remove original WhatsApp directory. "
|
||||
"Perhaps the directory is opened?")
|
||||
else:
|
||||
extract_exported.messages(args.exported, data, args.assume_first_as_me)
|
||||
if not args.no_html:
|
||||
create_html(
|
||||
extract.create_html(
|
||||
data,
|
||||
args.output,
|
||||
args.template,
|
||||
@@ -256,29 +304,8 @@ def main():
|
||||
args.offline,
|
||||
args.size
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"The message database does not exist. You may specify the path "
|
||||
"to database file with option -d or check your provided path."
|
||||
)
|
||||
exit(2)
|
||||
|
||||
if os.path.isdir(args.media):
|
||||
if os.path.isdir(f"{args.output}/{args.media}"):
|
||||
print("Media directory already exists in output directory. Skipping...")
|
||||
else:
|
||||
if not args.move_media:
|
||||
if os.path.isdir(f"{args.output}/WhatsApp"):
|
||||
print("WhatsApp directory already exists in output directory. Skipping...")
|
||||
else:
|
||||
print("Copying media directory...")
|
||||
shutil.copytree(args.media, f"{args.output}/WhatsApp")
|
||||
else:
|
||||
try:
|
||||
shutil.move(args.media, f"{args.output}/")
|
||||
except PermissionError:
|
||||
print("Cannot remove original WhatsApp directory. "
|
||||
"Perhaps the directory is opened?")
|
||||
for file in glob.glob(r'*.*'):
|
||||
shutil.copy(file, args.output)
|
||||
|
||||
if args.json:
|
||||
if isinstance(data[next(iter(data))], ChatStore):
|
||||
|
||||
@@ -30,10 +30,13 @@ class ChatStore():
|
||||
|
||||
|
||||
class Message():
|
||||
def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int):
|
||||
def __init__(self, from_me: Union[bool,int], timestamp: int, time: Union[int,str], key_id: int):
|
||||
self.from_me = bool(from_me)
|
||||
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
|
||||
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
|
||||
if isinstance(time, int):
|
||||
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
|
||||
elif isinstance(time, str):
|
||||
self.time = time
|
||||
self.media = False
|
||||
self.key_id = key_id
|
||||
self.meta = False
|
||||
|
||||
81
Whatsapp_Chat_Exporter/extract_exported.py
Normal file
81
Whatsapp_Chat_Exporter/extract_exported.py
Normal file
@@ -0,0 +1,81 @@
|
||||
from datetime import datetime
|
||||
from mimetypes import MimeTypes
|
||||
import os
|
||||
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
|
||||
|
||||
|
||||
def messages(path, data, assume_first_as_me=False):
|
||||
"""Extracts messages from the exported file"""
|
||||
with open(path, "r", encoding="utf8") as file:
|
||||
you = ""
|
||||
data["chat"] = ChatStore()
|
||||
total_row_number = len(file.readlines())
|
||||
i = 0
|
||||
file.seek(0)
|
||||
for index, line in enumerate(file):
|
||||
if len(line.split(" - ")) > 1:
|
||||
time = line.split(" - ")[0]
|
||||
if ":" not in line.split(time)[1]:
|
||||
msg.data = line.split(time)[1][3:]
|
||||
msg.meta = True
|
||||
else:
|
||||
name = line.split(time)[1].split(":")[0]
|
||||
message = line.split(time)[1].split(name + ":")[1].strip()
|
||||
name = name[3:]
|
||||
if you == "":
|
||||
if data["chat"].name is None:
|
||||
if not assume_first_as_me:
|
||||
while True:
|
||||
ans = input(f"Is '{name}' you? (Y/N)").lower()
|
||||
if ans == "y":
|
||||
you = name
|
||||
break
|
||||
elif ans == "n":
|
||||
data["chat"].name = name
|
||||
break
|
||||
else:
|
||||
you = name
|
||||
else:
|
||||
if name != data["chat"].name:
|
||||
you = name
|
||||
if data["chat"].name is None and you != "":
|
||||
if name != you:
|
||||
data["chat"].name = name
|
||||
msg = Message(
|
||||
you == name,
|
||||
datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
|
||||
time.split(", ")[1].strip(),
|
||||
index
|
||||
)
|
||||
if "<Media omitted>" in message:
|
||||
msg.data = "The media is missing"
|
||||
msg.mime = "media"
|
||||
msg.meta = True
|
||||
elif "(file attached)" in message:
|
||||
mime = MimeTypes()
|
||||
msg.media = True
|
||||
file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip())
|
||||
if os.path.isfile(file_path):
|
||||
msg.data = file_path
|
||||
guess = mime.guess_type(file_path)[0]
|
||||
if guess is not None:
|
||||
msg.mime = guess
|
||||
else:
|
||||
msg.mime = "application/octet-stream"
|
||||
else:
|
||||
msg.data = message
|
||||
data["chat"].add_message(index, msg)
|
||||
else:
|
||||
lookback = index - 1
|
||||
while lookback not in data["chat"].messages:
|
||||
lookback -= 1
|
||||
msg = data["chat"].messages[lookback]
|
||||
if msg.media:
|
||||
msg.caption = line.strip()
|
||||
else:
|
||||
msg.data += "<br>" + line.strip()
|
||||
|
||||
if index % 1000 == 0:
|
||||
print(f"Gathering messages & media...({index}/{total_row_number})", end="\r")
|
||||
print(f"Gathering messages & media...({total_row_number}/{total_row_number})", end="\r")
|
||||
return data
|
||||
Reference in New Issue
Block a user