Compare commits

...

5 Commits

3 changed files with 33 additions and 39 deletions

View File

@ -22,15 +22,15 @@ gpg: # optional
enabled: true enabled: true
absent_notice: absent_notice:
enabled: true # Wether to issue a notice if recipient does not use GPG (PGP) enabled: true # Wether to issue a notice if recipient does not use GPG (PGP)
text: "\n\n⚠️ This email was sent without end-to-end encryption.\n text: "\n\nNote: This email was sent without end-to-end encryption.\n
This mail server supports automatic PGP encryption.\n This mail server supports automatic PGP encryption.\n
Consider setting up a PGP key and publishing it to keys.openpgp.org." Consider setting up a PGP key and publishing it to keys.openpgp.org."
html: html:
"<p><strong>⚠️ This email was sent without end-to-end encryption.</strong><br> "<p><strong>Note: This email was sent without end-to-end encryption.</strong><br>
This mail server supports automatic PGP encryption.<br> This mail server supports automatic PGP encryption.<br>
Consider setting up a PGP key and publishing it to keys.openpgp.org.</p>" Consider setting up a PGP key and publishing it to keys.openpgp.org.</p>"
# either text, html or both # either text, html or
email_http_keyserver: "https://keys.openpgp.org/vks/v1/by-email/"
id_domain: "mytest" # For Message ids id_domain: "mytest" # For Message ids
email_http_keyserver: "https://keys.openpgp.org/vks/v1/by-email/"

View File

@ -217,9 +217,9 @@ def get_conf(param: str):
elif param == "WARN_ABSENT_GPG": elif param == "WARN_ABSENT_GPG":
return config["gpg"]["absent_notice"]["enabled"] if get_conf("GPG_ENABLED") else False return config["gpg"]["absent_notice"]["enabled"] if get_conf("GPG_ENABLED") else False
elif param == "GPG_ABSENT_NOTICE_TEXT": elif param == "GPG_ABSENT_NOTICE_TEXT":
return config["gpg"]["absent_notice"]["text"] if get_conf("GPG_ENABLED") else None return config["gpg"]["absent_notice"]["text"] if get_conf("GPG_ENABLED") and "text" in config["gpg"]["absent_notice"] else None
elif param == "GPG_ABSENT_NOTICE_HTML": elif param == "GPG_ABSENT_NOTICE_HTML":
return config["gpg"]["absent_notice"]["html"] if get_conf("GPG_ENABLED") else None return config["gpg"]["absent_notice"]["html"] if get_conf("GPG_ENABLED") and "html" in config["gpg"]["absent_notice"] else None
elif param == "ID_DOMAIN": elif param == "ID_DOMAIN":
return config["id_domain"] return config["id_domain"]
elif param == "KEYSERVER_URL": elif param == "KEYSERVER_URL":
@ -427,43 +427,39 @@ class EmailProxy:
def fetch_pgp_key(self, email: str) -> bool: def fetch_pgp_key(self, email: str) -> bool:
"""Checks if a PGP key exists locally or fetches it from the keyserver.""" """Checks if a PGP key exists locally or fetches it from the keyserver, removing old keys if necessary."""
# Check if the key exists locally
keys = gpg.list_keys(keys=email)
if keys:
logger.info(f"✅ PGP key for {email} found locally.")
return True # Key exists locally
# Key does not exist locally, so try fetching from the keyserver
logger.info(f"🔍 Looking up PGP key for {email} on keyserver...") logger.info(f"🔍 Looking up PGP key for {email} on keyserver...")
# Request the key from the keyserver API # Request the key from the keyserver API
key_url = f"{get_conf("KEYSERVER_URL")}{email}" key_url = f"{get_conf('KEYSERVER_URL')}{email}"
try: try:
response = requests.get(key_url) response = requests.get(key_url)
if response.status_code == 200: if response.status_code == 200:
key_data = response.text key_data = response.text
logger.info( logger.info(f"✅ Successfully retrieved PGP key for {email} from keyserver.")
f"✅ Successfully retrieved PGP key for {email} from keyserver."
)
# Import the key into GPG # Check if the key exists locally
existing_keys = gpg.list_keys()
for key in existing_keys:
if email in key['uids']:
logger.info(f"🔑 Found existing key for {email}, removing it...")
# Remove the existing key
gpg.delete_keys(key['fingerprint'], True)
logger.info(f"✅ Existing key for {email} removed.")
# Import the new key into GPG
import_result = gpg.import_keys(key_data) import_result = gpg.import_keys(key_data)
if import_result.count > 0: if import_result.count > 0:
logger.info( logger.info(f"✅ PGP key for {email} successfully imported into GPG.")
f"✅ PGP key for {email} successfully imported into GPG."
)
return True return True
else: else:
logger.warning(f"❌ Failed to import PGP key for {email}.") logger.warning(f"❌ Failed to import PGP key for {email}.")
return False return False
else: else:
logger.warning( logger.warning(f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}")
f"❌ No PGP key found for {email}. HTTP status code: {response.status_code}"
)
return False return False
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"❌ Error while contacting keyserver: {e}") logger.error(f"❌ Error while contacting keyserver: {e}")
@ -561,25 +557,22 @@ class EmailProxy:
if msg.is_multipart(): if msg.is_multipart():
for part in msg.walk(): for part in msg.walk():
# Check if the part is plain text # Check if the part is plain text
if part.get_content_type() == "text/plain": if part.get_content_type() == "text/plain" and get_conf("GPG_ABSENT_NOTICE_TEXT") is not None:
part.set_payload(part.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT")) part.set_payload(part.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT"))
# Check if the part is HTML text # Check if the part is HTML text
elif part.get_content_type() == "text/html": elif part.get_content_type() == "text/html" and get_conf("GPG_ABSENT_NOTICE_HTML") is not None:
html_warning = ( part.set_payload(part.get_payload() + get_conf("GPG_ABSENT_NOTICE_HTML"))
f"<p><strong>⚠️ This email was sent without end-to-end encryption.</strong><br>" try:
f"This mail server supports automatic PGP encryption.<br>"
f"Consider setting up a PGP key and publishing it to a keyserver "
f"(e.g., keys.openpgp.org).</p>"
)
part.set_payload(part.get_payload() + html_warning)
part.replace_header( part.replace_header(
"Content-Transfer-Encoding", "quoted-printable" "Content-Transfer-Encoding", "quoted-printable"
) # Ensure HTML part is encoded correctly ) # Ensure HTML part is encoded correctly
except KeyError:
part["Content-Transfer-Encoding"] = "quoted-printable"
else: else:
# If it's a non-multipart message (either plain-text or HTML only) # If it's a non-multipart message (either plain-text or HTML only)
if msg.get_content_type() == "text/plain": if msg.get_content_type() == "text/plain" and get_conf("GPG_ABSENT_NOTICE_TEXT") is not None:
msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT")) msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_TEXT"))
elif msg.get_content_type() == "text/html": elif msg.get_content_type() == "text/html" and get_conf("GPG_ABSENT_NOTICE_HTML") is not None:
msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_HTML")) msg.set_payload(msg.get_payload() + get_conf("GPG_ABSENT_NOTICE_HTML"))
msg.replace_header("Content-Transfer-Encoding", "quoted-printable") msg.replace_header("Content-Transfer-Encoding", "quoted-printable")

View File

@ -13,6 +13,7 @@ Environment=PATH=/path/to/smtpproxy/venv/bin:/usr/bin:/bin
Environment=VIRTUAL_ENV=/path/to/smtpproxy/venv Environment=VIRTUAL_ENV=/path/to/smtpproxy/venv
StandardOutput=journal StandardOutput=journal
StandardError=journal StandardError=journal
SyslogIdentifier=smtpproxy
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target