mirror of
https://github.com/MatMasIt/smtp_app_proxy.git
synced 2025-06-25 18:24:06 +02:00
Compare commits
5 Commits
42c024c29b
...
main
Author | SHA1 | Date | |
---|---|---|---|
777764e2ab | |||
c778ce2dc6 | |||
82b20b1d5b | |||
ec72ace5e5 | |||
35edcca373 |
@ -22,15 +22,15 @@ gpg: # optional
|
||||
enabled: true
|
||||
absent_notice:
|
||||
enabled: true # Wether to issue a notice if recipient does not use GPG (PGP)
|
||||
text: "\n\n⚠️ This email was sent without end-to-end encryption.\n
|
||||
text: "\n\nNote: This email was sent without end-to-end encryption.\n
|
||||
This mail server supports automatic PGP encryption.\n
|
||||
Consider setting up a PGP key and publishing it to keys.openpgp.org."
|
||||
html:
|
||||
"<p><strong>⚠️ This email was sent without end-to-end encryption.</strong><br>
|
||||
"<p><strong>Note: This email was sent without end-to-end encryption.</strong><br>
|
||||
This mail server supports automatic PGP encryption.<br>
|
||||
Consider setting up a PGP key and publishing it to keys.openpgp.org.</p>"
|
||||
# either text, html or both
|
||||
# either text, html or
|
||||
email_http_keyserver: "https://keys.openpgp.org/vks/v1/by-email/"
|
||||
|
||||
id_domain: "mytest" # For Message ids
|
||||
|
||||
email_http_keyserver: "https://keys.openpgp.org/vks/v1/by-email/"
|
||||
|
63
proxy.py
63
proxy.py
@ -217,9 +217,9 @@ def get_conf(param: str):
|
||||
elif param == "WARN_ABSENT_GPG":
|
||||
return config["gpg"]["absent_notice"]["enabled"] if get_conf("GPG_ENABLED") else False
|
||||
elif param == "GPG_ABSENT_NOTICE_TEXT":
|
||||
return config["gpg"]["absent_notice"]["text"] if get_conf("GPG_ENABLED") else None
|
||||
return config["gpg"]["absent_notice"]["text"] if get_conf("GPG_ENABLED") and "text" in config["gpg"]["absent_notice"] else None
|
||||
elif param == "GPG_ABSENT_NOTICE_HTML":
|
||||
return config["gpg"]["absent_notice"]["html"] if get_conf("GPG_ENABLED") else None
|
||||
return config["gpg"]["absent_notice"]["html"] if get_conf("GPG_ENABLED") and "html" in config["gpg"]["absent_notice"] else None
|
||||
elif param == "ID_DOMAIN":
|
||||
return config["id_domain"]
|
||||
elif param == "KEYSERVER_URL":
|
||||
@ -427,43 +427,39 @@ class EmailProxy:
|
||||
|
||||
|
||||
def fetch_pgp_key(self, email: str) -> bool:
|
||||
"""Checks if a PGP key exists locally or fetches it from the keyserver."""
|
||||
"""Checks if a PGP key exists locally or fetches it from the keyserver, removing old keys if necessary."""
|
||||
|
||||
# Check if the key exists locally
|
||||
keys = gpg.list_keys(keys=email)
|
||||
if keys:
|
||||
logger.info(f"✅ PGP key for {email} found locally.")
|
||||
return True # Key exists locally
|
||||
|
||||
# Key does not exist locally, so try fetching from the keyserver
|
||||
logger.info(f"🔍 Looking up PGP key for {email} on keyserver...")
|
||||
|
||||
# Request the key from the keyserver API
|
||||
key_url = f"{get_conf("KEYSERVER_URL")}{email}"
|
||||
key_url = f"{get_conf('KEYSERVER_URL')}{email}"
|
||||
try:
|
||||
response = requests.get(key_url)
|
||||
|
||||
if response.status_code == 200:
|
||||
key_data = response.text
|
||||
logger.info(
|
||||
f"✅ Successfully retrieved PGP key for {email} from keyserver."
|
||||
)
|
||||
logger.info(f"✅ Successfully retrieved PGP key for {email} from keyserver.")
|
||||
|
||||
# Import the key into GPG
|
||||
# Check if the key exists locally
|
||||
existing_keys = gpg.list_keys()
|
||||
for key in existing_keys:
|
||||
if email in key['uids']:
|
||||
logger.info(f"🔑 Found existing key for {email}, removing it...")
|
||||
# Remove the existing key
|
||||
gpg.delete_keys(key['fingerprint'], True)
|
||||
logger.info(f"✅ Existing key for {email} removed.")
|
||||
|
||||
# Import the new key into GPG
|
||||
import_result = gpg.import_keys(key_data)
|
||||
|
||||
if import_result.count > 0:
|
||||
logger.info(
|
||||
f"✅ PGP key for {email} successfully imported into GPG."
|
||||
)
|
||||
logger.info(f"✅ PGP key for {email} successfully imported into GPG.")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"❌ Failed to import PGP key for {email}.")
|
||||
return False
|
||||
else:
|
||||
logger.warning(
|
||||
f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}"
|
||||
)
|
||||
logger.warning(f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}")
|
||||
return False
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"❌ Error while contacting keyserver: {e}")
|
||||
@ -561,25 +557,22 @@ class EmailProxy:
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
# Check if the part is plain text
|
||||
if part.get_content_type() == "text/plain":
|
||||
if part.get_content_type() == "text/plain" and get_conf("GPG_ABSENT_NOTICE_TEXT") is not None:
|
||||
part.set_payload(part.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT"))
|
||||
# Check if the part is HTML text
|
||||
elif part.get_content_type() == "text/html":
|
||||
html_warning = (
|
||||
f"<p><strong>⚠️ This email was sent without end-to-end encryption.</strong><br>"
|
||||
f"This mail server supports automatic PGP encryption.<br>"
|
||||
f"Consider setting up a PGP key and publishing it to a keyserver "
|
||||
f"(e.g., keys.openpgp.org).</p>"
|
||||
)
|
||||
part.set_payload(part.get_payload() + html_warning)
|
||||
part.replace_header(
|
||||
"Content-Transfer-Encoding", "quoted-printable"
|
||||
) # Ensure HTML part is encoded correctly
|
||||
elif part.get_content_type() == "text/html" and get_conf("GPG_ABSENT_NOTICE_HTML") is not None:
|
||||
part.set_payload(part.get_payload() + get_conf("GPG_ABSENT_NOTICE_HTML"))
|
||||
try:
|
||||
part.replace_header(
|
||||
"Content-Transfer-Encoding", "quoted-printable"
|
||||
) # Ensure HTML part is encoded correctly
|
||||
except KeyError:
|
||||
part["Content-Transfer-Encoding"] = "quoted-printable"
|
||||
else:
|
||||
# If it's a non-multipart message (either plain-text or HTML only)
|
||||
if msg.get_content_type() == "text/plain":
|
||||
if msg.get_content_type() == "text/plain" and get_conf("GPG_ABSENT_NOTICE_TEXT") is not None:
|
||||
msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT"))
|
||||
elif msg.get_content_type() == "text/html":
|
||||
elif msg.get_content_type() == "text/html" and get_conf("GPG_ABSENT_NOTICE_HTML") is not None:
|
||||
msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_HTML"))
|
||||
msg.replace_header("Content-Transfer-Encoding", "quoted-printable")
|
||||
|
||||
|
@ -13,6 +13,7 @@ Environment=PATH=/path/to/smtpproxy/venv/bin:/usr/bin:/bin
|
||||
Environment=VIRTUAL_ENV=/path/to/smtpproxy/venv
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
SyslogIdentifier=smtpproxy
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
Reference in New Issue
Block a user