Refactor to use tqdm for showing progress

This commit is contained in:
KnugiHK
2026-01-17 13:18:31 +08:00
parent 1c7d6f7912
commit d200130335
7 changed files with 264 additions and 302 deletions

View File

@@ -1,13 +1,11 @@
import time
import hmac
import io
import logging
import threading
import zlib
import concurrent.futures
from tqdm import tqdm
from typing import Tuple, Union
from hashlib import sha256
from sys import exit
from Whatsapp_Chat_Exporter.utility import CLEAR_LINE, CRYPT14_OFFSETS, Crypt, DbType
try:
@@ -165,82 +163,64 @@ def _decrypt_crypt14(database: bytes, main_key: bytes, max_worker: int = 10) ->
# Attempt known offsets first
for offsets in CRYPT14_OFFSETS:
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
iv = offsets["iv"]
db = offsets["db"]
try:
decrypted_db = _decrypt_database(db_ciphertext, main_key, iv)
decrypted_db = _attempt_decrypt_task((iv, iv + 16, db), database, main_key)
except (zlib.error, ValueError):
pass # Try next offset
continue
else:
logger.debug(
f"Decryption successful with known offsets: IV {offsets['iv']}, DB {offsets['db']}{CLEAR_LINE}"
f"Decryption successful with known offsets: IV {iv}, DB {db}{CLEAR_LINE}"
)
return decrypted_db # Successful decryption
def animate_message(stop_event):
base_msg = "Common offsets failed. Initiating brute-force with multithreading"
dots = ["", ".", "..", "..."]
i = 0
while not stop_event.is_set():
logger.info(f"{base_msg}{dots[i % len(dots)]}\x1b[K\r")
time.sleep(0.3)
i += 1
logger.info(f"Common offsets failed but brute-forcing the offset works!{CLEAR_LINE}")
offset_max = 200
logger.info(f"Common offsets failed. Attempt to brute-force...{CLEAR_LINE}")
with tqdm(total=offset_max ** 2, desc="Brute-forcing offsets", unit="trial", leave=False) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_worker) as executor:
# Map futures to their offsets
future_to_offset = {
executor.submit(_attempt_decrypt_task, offset, database, main_key): offset
for offset in brute_force_offset(offset_max, offset_max)
}
stop_event = threading.Event()
anim_thread = threading.Thread(target=animate_message, args=(stop_event,))
anim_thread.start()
try:
for future in concurrent.futures.as_completed(future_to_offset):
pbar.update(1)
result = future.result()
if result is not None:
# Success! Shutdown other tasks immediately
executor.shutdown(wait=False, cancel_futures=True)
start_iv, _, start_db = future_to_offset[future]
logger.info(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
f"\nShutting down other threads...{CLEAR_LINE}"
)
return result
# Convert brute force generator into a list for parallel processing
offset_combinations = list(brute_force_offset())
def attempt_decrypt(offset_tuple):
"""Attempt decryption with the given offsets."""
start_iv, end_iv, start_db = offset_tuple
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
logger.debug(""f"Trying offsets: IV {start_iv}-{end_iv}, DB {start_db}{CLEAR_LINE}")
try:
db = _decrypt_database(db_ciphertext, main_key, iv)
except (zlib.error, ValueError):
return None # Decryption failed, move to next
else:
stop_event.set()
anim_thread.join()
logger.info(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
f"\nShutting down other threads...{CLEAR_LINE}"
)
return db
with concurrent.futures.ThreadPoolExecutor(max_worker) as executor:
future_to_offset = {executor.submit(attempt_decrypt, offset)
: offset for offset in offset_combinations}
try:
for future in concurrent.futures.as_completed(future_to_offset):
result = future.result()
if result is not None:
# Shutdown remaining threads
executor.shutdown(wait=False, cancel_futures=True)
return result
except KeyboardInterrupt:
stop_event.set()
anim_thread.join()
logger.info(f"Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}")
executor.shutdown(wait=False, cancel_futures=True)
exit(1)
finally:
stop_event.set()
anim_thread.join()
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
raise KeyboardInterrupt("Brute force interrupted by user (Ctrl+C). Shutting down gracefully...{CLEAR_LINE}")
raise OffsetNotFoundError("Could not find the correct offsets for decryption.")
def _attempt_decrypt_task(offset_tuple, database, main_key):
"""Attempt decryption with the given offsets."""
start_iv, end_iv, start_db = offset_tuple
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
try:
return _decrypt_database(db_ciphertext, main_key, iv)
except (zlib.error, ValueError):
return None
def _decrypt_crypt12(database: bytes, main_key: bytes) -> bytes:
"""Decrypt a crypt12 database.