PySide6/PJSUA2-basiertes Desktop-Softphone mit: - SIP-Telefonie (Anrufe, DTMF, Halten, Transfer, Konferenz) - CardDAV-Kontaktsync mit Write-Back (Multi-Account) - Kontaktverwaltung mit Suche und Bearbeiten-Dialog - Anrufliste mit Namensauflösung - Favoriten-Panel (manuell + häufig angerufen) - BLF-Panel (SIP Presence Monitoring) - Responsives Layout (kompakt/erweitert) - Click-to-Call (tel:/sip: URI via D-Bus) - KDE-Integration (Tray, Benachrichtigungen) - PipeWire/PulseAudio Audio-Routing - AppImage Build-Support (PyInstaller + appimagetool) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
568 lines
18 KiB
Python
568 lines
18 KiB
Python
"""CardDAV-Sync - Kontakte von Nextcloud/CardDAV-Server abrufen und bearbeiten."""
|
|
|
|
import re
|
|
from urllib.parse import urljoin
|
|
|
|
try:
|
|
import requests
|
|
REQUESTS_VERFUEGBAR = True
|
|
except ImportError:
|
|
REQUESTS_VERFUEGBAR = False
|
|
|
|
try:
|
|
import vobject
|
|
VOBJECT_VERFUEGBAR = True
|
|
except ImportError:
|
|
VOBJECT_VERFUEGBAR = False
|
|
|
|
|
|
class CardDavSync:
|
|
"""Synchronisiert Kontakte von einem CardDAV-Server (Nextcloud).
|
|
|
|
Kontakt-Format (ein Eintrag pro Person):
|
|
{
|
|
"name": "Max Mustermann",
|
|
"email": "max@example.com",
|
|
"emails": ["max@example.com"],
|
|
"firma": "Firma GmbH",
|
|
"titel": "Geschäftsführer",
|
|
"nummern": {
|
|
"telefon": "+49405551234",
|
|
"handy": "+491701234567",
|
|
"geschaeftlich": "+49405559999",
|
|
},
|
|
"adresse": "Musterstr. 1, 12345 Musterstadt",
|
|
"geburtstag": "01.01.1990",
|
|
"notiz": "...",
|
|
"url": "https://...",
|
|
"quelle": "carddav",
|
|
"account": "Privat",
|
|
"_href": "/remote.php/dav/.../UUID.vcf",
|
|
"_etag": "\"abc123\"",
|
|
}
|
|
"""
|
|
|
|
def kontakte_abrufen(self, url, benutzername, passwort, account_name=""):
|
|
"""Alle Kontakte vom CardDAV-Server abrufen.
|
|
|
|
Returns:
|
|
(kontakte, fehler): Tuple aus Kontaktliste und Fehlerstring
|
|
"""
|
|
if not REQUESTS_VERFUEGBAR:
|
|
return [], "requests nicht installiert (pip install requests)"
|
|
if not VOBJECT_VERFUEGBAR:
|
|
return [], "vobject nicht installiert (pip install vobject)"
|
|
|
|
if not url.endswith("/"):
|
|
url += "/"
|
|
|
|
try:
|
|
vcards_raw = self._vcards_holen(url, benutzername, passwort)
|
|
except requests.exceptions.ConnectionError:
|
|
return [], "Verbindung fehlgeschlagen"
|
|
except requests.exceptions.Timeout:
|
|
return [], "Server-Timeout"
|
|
except requests.exceptions.SSLError as e:
|
|
return [], f"SSL-Fehler: {e}"
|
|
except requests.exceptions.HTTPError as e:
|
|
code = e.response.status_code if e.response else "?"
|
|
if code == 401:
|
|
return [], "Authentifizierung fehlgeschlagen"
|
|
return [], f"HTTP-Fehler {code}"
|
|
except Exception as e:
|
|
return [], str(e)
|
|
|
|
kontakte = []
|
|
for eintrag in vcards_raw:
|
|
# eintrag ist dict mit vcard_text, href, etag
|
|
if isinstance(eintrag, dict):
|
|
parsed = self._vcard_parsen(
|
|
eintrag["vcard_text"], account_name,
|
|
href=eintrag.get("href", ""),
|
|
etag=eintrag.get("etag", ""),
|
|
)
|
|
else:
|
|
# Fallback: reiner String (z.B. von _propfind_einzeln)
|
|
parsed = self._vcard_parsen(eintrag, account_name)
|
|
if parsed:
|
|
kontakte.append(parsed)
|
|
|
|
kontakte.sort(key=lambda k: k.get("name", "").lower())
|
|
return kontakte, ""
|
|
|
|
def alle_accounts_abrufen(self, accounts):
|
|
"""Kontakte von mehreren CardDAV-Accounts abrufen.
|
|
|
|
Args:
|
|
accounts: Liste von {"name": "...", "url": "...",
|
|
"benutzername": "...", "passwort": "..."}
|
|
|
|
Returns:
|
|
(alle_kontakte, fehler_liste)
|
|
"""
|
|
alle = []
|
|
fehler = []
|
|
for acc in accounts:
|
|
if not acc.get("url") or not acc.get("benutzername"):
|
|
continue
|
|
kontakte, err = self.kontakte_abrufen(
|
|
acc["url"], acc["benutzername"], acc["passwort"],
|
|
account_name=acc.get("name", ""),
|
|
)
|
|
if err:
|
|
fehler.append(f"{acc.get('name', 'Unbekannt')}: {err}")
|
|
else:
|
|
alle.extend(kontakte)
|
|
|
|
alle.sort(key=lambda k: k.get("name", "").lower())
|
|
return alle, fehler
|
|
|
|
# === Write-Back: Kontakt auf Server aktualisieren ===
|
|
|
|
def kontakt_aktualisieren(self, server_url, benutzername, passwort,
|
|
kontakt):
|
|
"""Kontakt auf dem CardDAV-Server aktualisieren (PUT).
|
|
|
|
Args:
|
|
server_url: Basis-URL des Adressbuchs
|
|
benutzername, passwort: Auth-Daten
|
|
kontakt: Kontakt-Dict mit _href und _etag
|
|
|
|
Returns:
|
|
(erfolg, fehler_text)
|
|
"""
|
|
href = kontakt.get("_href", "")
|
|
etag = kontakt.get("_etag", "")
|
|
if not href:
|
|
return False, "Kein CardDAV-Href vorhanden"
|
|
|
|
# vCard aus Kontakt-Daten aufbauen
|
|
vcard_text = self._kontakt_zu_vcard(kontakt)
|
|
if not vcard_text:
|
|
return False, "vCard konnte nicht erstellt werden"
|
|
|
|
# Volle URL zusammenbauen
|
|
base = server_url
|
|
if "/remote.php" in base:
|
|
base = base.split("/remote.php")[0]
|
|
full_url = urljoin(base, href)
|
|
|
|
headers = {
|
|
"Content-Type": "text/vcard; charset=utf-8",
|
|
}
|
|
if etag:
|
|
headers["If-Match"] = etag
|
|
|
|
try:
|
|
response = requests.put(
|
|
full_url,
|
|
auth=(benutzername, passwort),
|
|
headers=headers,
|
|
data=vcard_text.encode("utf-8"),
|
|
timeout=30,
|
|
)
|
|
if response.status_code in (200, 201, 204):
|
|
return True, ""
|
|
elif response.status_code == 412:
|
|
return False, "Kontakt wurde zwischenzeitlich geändert (Konflikt)"
|
|
else:
|
|
return False, f"HTTP {response.status_code}"
|
|
except requests.exceptions.ConnectionError:
|
|
return False, "Verbindung fehlgeschlagen"
|
|
except requests.exceptions.Timeout:
|
|
return False, "Server-Timeout"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def _kontakt_zu_vcard(self, kontakt):
|
|
"""Kontakt-Dict in vCard-Text umwandeln."""
|
|
try:
|
|
card = vobject.vCard()
|
|
|
|
# Name
|
|
name = kontakt.get("name", "")
|
|
card.add("fn").value = name
|
|
n = card.add("n")
|
|
teile = name.split(" ", 1)
|
|
if len(teile) == 2:
|
|
n.value = vobject.vcard.Name(
|
|
family=teile[1], given=teile[0])
|
|
else:
|
|
n.value = vobject.vcard.Name(family=name)
|
|
|
|
# Telefonnummern
|
|
nummern = kontakt.get("nummern", {})
|
|
typ_map = {
|
|
"telefon": "HOME",
|
|
"handy": "CELL",
|
|
"geschaeftlich": "WORK",
|
|
"sonstige": "VOICE",
|
|
}
|
|
for typ, nummer in nummern.items():
|
|
if nummer:
|
|
tel = card.add("tel")
|
|
tel.value = nummer
|
|
tel.type_param = typ_map.get(typ, "VOICE")
|
|
|
|
# E-Mails
|
|
emails = kontakt.get("emails", [])
|
|
if not emails and kontakt.get("email"):
|
|
emails = [kontakt["email"]]
|
|
for em in emails:
|
|
if em:
|
|
email_obj = card.add("email")
|
|
email_obj.value = em
|
|
|
|
# Firma
|
|
firma = kontakt.get("firma", "")
|
|
if firma:
|
|
org = card.add("org")
|
|
org.value = [firma]
|
|
|
|
# Titel
|
|
titel = kontakt.get("titel", "")
|
|
if titel:
|
|
card.add("title").value = titel
|
|
|
|
# Adresse (vereinfacht: alles in street)
|
|
adresse = kontakt.get("adresse", "")
|
|
if adresse:
|
|
adr = card.add("adr")
|
|
adr.value = vobject.vcard.Address(street=adresse)
|
|
|
|
# Geburtstag (DD.MM.YYYY → YYYYMMDD)
|
|
geburtstag = kontakt.get("geburtstag", "")
|
|
if geburtstag and len(geburtstag) == 10:
|
|
try:
|
|
teile = geburtstag.split(".")
|
|
bday = f"{teile[2]}{teile[1]}{teile[0]}"
|
|
card.add("bday").value = bday
|
|
except (IndexError, ValueError):
|
|
pass
|
|
|
|
# Website
|
|
url = kontakt.get("url", "")
|
|
if url:
|
|
card.add("url").value = url
|
|
|
|
# Notiz
|
|
notiz = kontakt.get("notiz", "")
|
|
if notiz:
|
|
card.add("note").value = notiz
|
|
|
|
return card.serialize()
|
|
except Exception:
|
|
return None
|
|
|
|
# === Server-Abruf ===
|
|
|
|
def _vcards_holen(self, url, benutzername, passwort):
|
|
"""vCards vom Server holen - REPORT (bevorzugt) oder Fallback."""
|
|
auth = (benutzername, passwort)
|
|
|
|
# Methode 1: REPORT addressbook-query (Nextcloud-Standard)
|
|
vcards = self._report_addressbook_query(url, auth)
|
|
if vcards:
|
|
return vcards
|
|
|
|
# Methode 2: PROPFIND mit inline address-data
|
|
vcards = self._propfind_mit_daten(url, auth)
|
|
if vcards:
|
|
return vcards
|
|
|
|
# Methode 3: PROPFIND nur hrefs, dann einzeln abrufen
|
|
return self._propfind_einzeln(url, auth)
|
|
|
|
def _report_addressbook_query(self, url, auth):
|
|
"""REPORT addressbook-query - holt alle vCards auf einmal."""
|
|
body = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<card:addressbook-query xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
|
|
<d:prop>
|
|
<d:getetag/>
|
|
<card:address-data/>
|
|
</d:prop>
|
|
</card:addressbook-query>"""
|
|
|
|
try:
|
|
response = requests.request(
|
|
"REPORT", url,
|
|
auth=auth,
|
|
headers={
|
|
"Depth": "1",
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
},
|
|
data=body.encode("utf-8"),
|
|
timeout=60,
|
|
)
|
|
if response.status_code not in (200, 207):
|
|
return []
|
|
except requests.RequestException:
|
|
return []
|
|
|
|
return self._vcards_aus_xml(response.text)
|
|
|
|
def _propfind_mit_daten(self, url, auth):
|
|
"""PROPFIND mit inline address-data."""
|
|
body = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<d:propfind xmlns:d="DAV:" xmlns:card="urn:ietf:params:xml:ns:carddav">
|
|
<d:prop>
|
|
<d:getetag/>
|
|
<card:address-data/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
response = requests.request(
|
|
"PROPFIND", url,
|
|
auth=auth,
|
|
headers={
|
|
"Depth": "1",
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
},
|
|
data=body.encode("utf-8"),
|
|
timeout=60,
|
|
)
|
|
response.raise_for_status()
|
|
return self._vcards_aus_xml(response.text)
|
|
|
|
def _propfind_einzeln(self, url, auth):
|
|
"""Fallback: PROPFIND für hrefs, dann jede .vcf einzeln abrufen."""
|
|
body = """<?xml version="1.0" encoding="UTF-8"?>
|
|
<d:propfind xmlns:d="DAV:">
|
|
<d:prop>
|
|
<d:getetag/>
|
|
<d:getcontenttype/>
|
|
</d:prop>
|
|
</d:propfind>"""
|
|
|
|
response = requests.request(
|
|
"PROPFIND", url,
|
|
auth=auth,
|
|
headers={
|
|
"Depth": "1",
|
|
"Content-Type": "application/xml; charset=utf-8",
|
|
},
|
|
data=body.encode("utf-8"),
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
text = response.text
|
|
href_pattern = r'<[^>]*href[^>]*>([^<]*\.vcf)</[^>]*href>'
|
|
hrefs = re.findall(href_pattern, text, re.IGNORECASE)
|
|
|
|
vcards = []
|
|
base = url.split("/remote.php")[0] if "/remote.php" in url else url
|
|
|
|
for href in hrefs:
|
|
if href.startswith("http"):
|
|
vcf_url = href
|
|
else:
|
|
vcf_url = urljoin(base, href)
|
|
try:
|
|
r = requests.get(vcf_url, auth=auth, timeout=10)
|
|
if r.status_code == 200 and "BEGIN:VCARD" in r.text:
|
|
vcards.append(r.text)
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
return vcards
|
|
|
|
def _vcards_aus_xml(self, xml_text):
|
|
"""vCard-Daten + Metadaten (href, etag) aus XML-Response extrahieren."""
|
|
ergebnisse = []
|
|
|
|
# Pro <d:response>-Block parsen
|
|
response_pattern = (
|
|
r'<[^>]*?response[^>]*?>(.*?)</[^>]*?response>'
|
|
)
|
|
responses = re.findall(
|
|
response_pattern, xml_text, re.DOTALL | re.IGNORECASE)
|
|
|
|
for block in responses:
|
|
# href extrahieren
|
|
href_match = re.search(
|
|
r'<[^>]*?href[^>]*?>([^<]+)</[^>]*?href>',
|
|
block, re.IGNORECASE)
|
|
href = href_match.group(1).strip() if href_match else ""
|
|
|
|
# etag extrahieren
|
|
etag_match = re.search(
|
|
r'<[^>]*?getetag[^>]*?>([^<]+)</[^>]*?getetag>',
|
|
block, re.IGNORECASE)
|
|
etag = etag_match.group(1).strip() if etag_match else ""
|
|
|
|
# vCard-Daten extrahieren
|
|
vcard_match = re.search(
|
|
r'<[^>]*?address-data[^>]*?>(.*?)</[^>]*?address-data>',
|
|
block, re.DOTALL | re.IGNORECASE)
|
|
if not vcard_match:
|
|
continue
|
|
|
|
vcard = vcard_match.group(1).strip()
|
|
# XML-Entities dekodieren
|
|
vcard = vcard.replace("<", "<").replace(">", ">")
|
|
vcard = vcard.replace("&", "&").replace(""", '"')
|
|
vcard = vcard.replace(" ", "\r")
|
|
|
|
if vcard.startswith("BEGIN:VCARD"):
|
|
ergebnisse.append({
|
|
"vcard_text": vcard,
|
|
"href": href,
|
|
"etag": etag,
|
|
})
|
|
|
|
return ergebnisse
|
|
|
|
def _vcard_parsen(self, vcard_text, account_name="",
|
|
href="", etag=""):
|
|
"""Eine vCard in ein Kontakt-Dict umwandeln."""
|
|
try:
|
|
card = vobject.readOne(vcard_text)
|
|
except Exception:
|
|
return None
|
|
|
|
# Name
|
|
name = ""
|
|
if hasattr(card, "fn"):
|
|
name = card.fn.value
|
|
elif hasattr(card, "n"):
|
|
n = card.n.value
|
|
name = f"{n.given} {n.family}".strip()
|
|
if not name:
|
|
return None
|
|
|
|
# E-Mail
|
|
email = ""
|
|
if hasattr(card, "email"):
|
|
email = card.email.value
|
|
|
|
# Firma
|
|
firma = ""
|
|
if hasattr(card, "org"):
|
|
org = card.org.value
|
|
if isinstance(org, list):
|
|
firma = org[0] if org else ""
|
|
else:
|
|
firma = str(org)
|
|
|
|
# Telefonnummern nach Typ gruppieren
|
|
nummern = {}
|
|
tel_list = getattr(card, "tel_list", [])
|
|
if not tel_list and hasattr(card, "tel"):
|
|
tel_list = [card.tel]
|
|
|
|
for tel in tel_list:
|
|
nummer = self._nummer_bereinigen(tel.value)
|
|
if not nummer:
|
|
continue
|
|
|
|
typ = self._tel_typ_bestimmen(tel)
|
|
if typ == "handy" and "handy" not in nummern:
|
|
nummern["handy"] = nummer
|
|
elif typ == "geschaeftlich" and "geschaeftlich" not in nummern:
|
|
nummern["geschaeftlich"] = nummer
|
|
elif typ == "telefon" and "telefon" not in nummern:
|
|
nummern["telefon"] = nummer
|
|
elif typ not in ("handy", "geschaeftlich", "telefon"):
|
|
if "sonstige" not in nummern:
|
|
nummern["sonstige"] = nummer
|
|
|
|
if not nummern:
|
|
return None
|
|
|
|
# Adresse
|
|
adresse = ""
|
|
if hasattr(card, "adr"):
|
|
adr = card.adr.value
|
|
teile = []
|
|
if adr.street:
|
|
teile.append(adr.street)
|
|
plz_ort = " ".join(
|
|
filter(None, [adr.code or "", adr.city or ""]))
|
|
if plz_ort:
|
|
teile.append(plz_ort)
|
|
if adr.region:
|
|
teile.append(adr.region)
|
|
if adr.country:
|
|
teile.append(adr.country)
|
|
adresse = ", ".join(teile)
|
|
|
|
# Geburtstag
|
|
geburtstag = ""
|
|
if hasattr(card, "bday"):
|
|
bday_raw = card.bday.value
|
|
if isinstance(bday_raw, str):
|
|
bday_raw = bday_raw.replace("-", "")
|
|
if len(bday_raw) >= 8:
|
|
geburtstag = (
|
|
f"{bday_raw[6:8]}.{bday_raw[4:6]}.{bday_raw[:4]}")
|
|
else:
|
|
try:
|
|
geburtstag = bday_raw.strftime("%d.%m.%Y")
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
# Notizen
|
|
notiz = ""
|
|
if hasattr(card, "note"):
|
|
notiz = card.note.value or ""
|
|
|
|
# Berufsbezeichnung
|
|
titel = ""
|
|
if hasattr(card, "title"):
|
|
titel = card.title.value or ""
|
|
|
|
# Website
|
|
url_wert = ""
|
|
if hasattr(card, "url"):
|
|
url_wert = card.url.value or ""
|
|
|
|
# Alle E-Mails
|
|
emails = []
|
|
email_list = getattr(card, "email_list", [])
|
|
if not email_list and hasattr(card, "email"):
|
|
email_list = [card.email]
|
|
for em in email_list:
|
|
if em.value and em.value not in emails:
|
|
emails.append(em.value)
|
|
|
|
return {
|
|
"name": name,
|
|
"email": email,
|
|
"emails": emails,
|
|
"firma": firma,
|
|
"titel": titel,
|
|
"nummern": nummern,
|
|
"adresse": adresse,
|
|
"geburtstag": geburtstag,
|
|
"notiz": notiz,
|
|
"url": url_wert,
|
|
"quelle": "carddav",
|
|
"account": account_name,
|
|
"_href": href,
|
|
"_etag": etag,
|
|
}
|
|
|
|
def _tel_typ_bestimmen(self, tel):
|
|
"""vCard TEL-Typ auf unsere Kategorien abbilden."""
|
|
params = tel.params if hasattr(tel, "params") else {}
|
|
typen = params.get("TYPE", [])
|
|
if isinstance(typen, str):
|
|
typen = [typen]
|
|
typen_lower = [t.lower() for t in typen]
|
|
|
|
if "cell" in typen_lower or "mobile" in typen_lower:
|
|
return "handy"
|
|
elif "work" in typen_lower:
|
|
return "geschaeftlich"
|
|
elif "home" in typen_lower:
|
|
return "telefon"
|
|
elif "voice" in typen_lower:
|
|
return "telefon"
|
|
return "sonstige"
|
|
|
|
def _nummer_bereinigen(self, nummer):
|
|
"""Telefonnummer bereinigen."""
|
|
if not nummer:
|
|
return ""
|
|
return re.sub(r"[^\d+*#]", "", nummer)
|