mirror of
https://github.com/MatMasIt/smtp_app_proxy.git
synced 2025-04-20 19:45:25 +02:00
Checks for updated gpg entry
This commit is contained in:
parent
c778ce2dc6
commit
777764e2ab
34
proxy.py
34
proxy.py
@ -427,43 +427,39 @@ class EmailProxy:
|
|||||||
|
|
||||||
|
|
||||||
def fetch_pgp_key(self, email: str) -> bool:
|
def fetch_pgp_key(self, email: str) -> bool:
|
||||||
"""Checks if a PGP key exists locally or fetches it from the keyserver."""
|
"""Checks if a PGP key exists locally or fetches it from the keyserver, removing old keys if necessary."""
|
||||||
|
|
||||||
# Check if the key exists locally
|
|
||||||
keys = gpg.list_keys(keys=email)
|
|
||||||
if keys:
|
|
||||||
logger.info(f"✅ PGP key for {email} found locally.")
|
|
||||||
return True # Key exists locally
|
|
||||||
|
|
||||||
# Key does not exist locally, so try fetching from the keyserver
|
|
||||||
logger.info(f"🔍 Looking up PGP key for {email} on keyserver...")
|
logger.info(f"🔍 Looking up PGP key for {email} on keyserver...")
|
||||||
|
|
||||||
# Request the key from the keyserver API
|
# Request the key from the keyserver API
|
||||||
key_url = f"{get_conf("KEYSERVER_URL")}{email}"
|
key_url = f"{get_conf('KEYSERVER_URL')}{email}"
|
||||||
try:
|
try:
|
||||||
response = requests.get(key_url)
|
response = requests.get(key_url)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
key_data = response.text
|
key_data = response.text
|
||||||
logger.info(
|
logger.info(f"✅ Successfully retrieved PGP key for {email} from keyserver.")
|
||||||
f"✅ Successfully retrieved PGP key for {email} from keyserver."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Import the key into GPG
|
# Check if the key exists locally
|
||||||
|
existing_keys = gpg.list_keys()
|
||||||
|
for key in existing_keys:
|
||||||
|
if email in key['uids']:
|
||||||
|
logger.info(f"🔑 Found existing key for {email}, removing it...")
|
||||||
|
# Remove the existing key
|
||||||
|
gpg.delete_keys(key['fingerprint'], True)
|
||||||
|
logger.info(f"✅ Existing key for {email} removed.")
|
||||||
|
|
||||||
|
# Import the new key into GPG
|
||||||
import_result = gpg.import_keys(key_data)
|
import_result = gpg.import_keys(key_data)
|
||||||
|
|
||||||
if import_result.count > 0:
|
if import_result.count > 0:
|
||||||
logger.info(
|
logger.info(f"✅ PGP key for {email} successfully imported into GPG.")
|
||||||
f"✅ PGP key for {email} successfully imported into GPG."
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logger.warning(f"❌ Failed to import PGP key for {email}.")
|
logger.warning(f"❌ Failed to import PGP key for {email}.")
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}")
|
||||||
f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}"
|
|
||||||
)
|
|
||||||
return False
|
return False
|
||||||
except requests.exceptions.RequestException as e:
|
except requests.exceptions.RequestException as e:
|
||||||
logger.error(f"❌ Error while contacting keyserver: {e}")
|
logger.error(f"❌ Error while contacting keyserver: {e}")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user