"""CardDAV-Sync - Kontakte von Nextcloud/CardDAV-Server abrufen und bearbeiten.""" import re from urllib.parse import urljoin try: import requests REQUESTS_VERFUEGBAR = True except ImportError: REQUESTS_VERFUEGBAR = False try: import vobject VOBJECT_VERFUEGBAR = True except ImportError: VOBJECT_VERFUEGBAR = False class CardDavSync: """Synchronisiert Kontakte von einem CardDAV-Server (Nextcloud). Kontakt-Format (ein Eintrag pro Person): { "name": "Max Mustermann", "email": "max@example.com", "emails": ["max@example.com"], "firma": "Firma GmbH", "titel": "Geschäftsführer", "nummern": { "telefon": "+49405551234", "handy": "+491701234567", "geschaeftlich": "+49405559999", }, "adresse": "Musterstr. 1, 12345 Musterstadt", "geburtstag": "01.01.1990", "notiz": "...", "url": "https://...", "quelle": "carddav", "account": "Privat", "_href": "/remote.php/dav/.../UUID.vcf", "_etag": "\"abc123\"", } """ def kontakte_abrufen(self, url, benutzername, passwort, account_name=""): """Alle Kontakte vom CardDAV-Server abrufen. Returns: (kontakte, fehler): Tuple aus Kontaktliste und Fehlerstring """ if not REQUESTS_VERFUEGBAR: return [], "requests nicht installiert (pip install requests)" if not VOBJECT_VERFUEGBAR: return [], "vobject nicht installiert (pip install vobject)" if not url.endswith("/"): url += "/" try: vcards_raw = self._vcards_holen(url, benutzername, passwort) except requests.exceptions.ConnectionError: return [], "Verbindung fehlgeschlagen" except requests.exceptions.Timeout: return [], "Server-Timeout" except requests.exceptions.SSLError as e: return [], f"SSL-Fehler: {e}" except requests.exceptions.HTTPError as e: code = e.response.status_code if e.response else "?" if code == 401: return [], "Authentifizierung fehlgeschlagen" return [], f"HTTP-Fehler {code}" except Exception as e: return [], str(e) kontakte = [] for eintrag in vcards_raw: # eintrag ist dict mit vcard_text, href, etag if isinstance(eintrag, dict): parsed = self._vcard_parsen( eintrag["vcard_text"], account_name, href=eintrag.get("href", ""), etag=eintrag.get("etag", ""), ) else: # Fallback: reiner String (z.B. von _propfind_einzeln) parsed = self._vcard_parsen(eintrag, account_name) if parsed: kontakte.append(parsed) kontakte.sort(key=lambda k: k.get("name", "").lower()) return kontakte, "" def alle_accounts_abrufen(self, accounts): """Kontakte von mehreren CardDAV-Accounts abrufen. Args: accounts: Liste von {"name": "...", "url": "...", "benutzername": "...", "passwort": "..."} Returns: (alle_kontakte, fehler_liste) """ alle = [] fehler = [] for acc in accounts: if not acc.get("url") or not acc.get("benutzername"): continue kontakte, err = self.kontakte_abrufen( acc["url"], acc["benutzername"], acc["passwort"], account_name=acc.get("name", ""), ) if err: fehler.append(f"{acc.get('name', 'Unbekannt')}: {err}") else: alle.extend(kontakte) alle.sort(key=lambda k: k.get("name", "").lower()) return alle, fehler # === Write-Back: Kontakt auf Server aktualisieren === def kontakt_aktualisieren(self, server_url, benutzername, passwort, kontakt): """Kontakt auf dem CardDAV-Server aktualisieren (PUT). Args: server_url: Basis-URL des Adressbuchs benutzername, passwort: Auth-Daten kontakt: Kontakt-Dict mit _href und _etag Returns: (erfolg, fehler_text) """ href = kontakt.get("_href", "") etag = kontakt.get("_etag", "") if not href: return False, "Kein CardDAV-Href vorhanden" # vCard aus Kontakt-Daten aufbauen vcard_text = self._kontakt_zu_vcard(kontakt) if not vcard_text: return False, "vCard konnte nicht erstellt werden" # Volle URL zusammenbauen base = server_url if "/remote.php" in base: base = base.split("/remote.php")[0] full_url = urljoin(base, href) headers = { "Content-Type": "text/vcard; charset=utf-8", } if etag: headers["If-Match"] = etag try: response = requests.put( full_url, auth=(benutzername, passwort), headers=headers, data=vcard_text.encode("utf-8"), timeout=30, ) if response.status_code in (200, 201, 204): return True, "" elif response.status_code == 412: return False, "Kontakt wurde zwischenzeitlich geändert (Konflikt)" else: return False, f"HTTP {response.status_code}" except requests.exceptions.ConnectionError: return False, "Verbindung fehlgeschlagen" except requests.exceptions.Timeout: return False, "Server-Timeout" except Exception as e: return False, str(e) def _kontakt_zu_vcard(self, kontakt): """Kontakt-Dict in vCard-Text umwandeln.""" try: card = vobject.vCard() # Name name = kontakt.get("name", "") card.add("fn").value = name n = card.add("n") teile = name.split(" ", 1) if len(teile) == 2: n.value = vobject.vcard.Name( family=teile[1], given=teile[0]) else: n.value = vobject.vcard.Name(family=name) # Telefonnummern nummern = kontakt.get("nummern", {}) typ_map = { "telefon": "HOME", "handy": "CELL", "geschaeftlich": "WORK", "sonstige": "VOICE", } for typ, nummer in nummern.items(): if nummer: tel = card.add("tel") tel.value = nummer tel.type_param = typ_map.get(typ, "VOICE") # E-Mails emails = kontakt.get("emails", []) if not emails and kontakt.get("email"): emails = [kontakt["email"]] for em in emails: if em: email_obj = card.add("email") email_obj.value = em # Firma firma = kontakt.get("firma", "") if firma: org = card.add("org") org.value = [firma] # Titel titel = kontakt.get("titel", "") if titel: card.add("title").value = titel # Adresse (vereinfacht: alles in street) adresse = kontakt.get("adresse", "") if adresse: adr = card.add("adr") adr.value = vobject.vcard.Address(street=adresse) # Geburtstag (DD.MM.YYYY → YYYYMMDD) geburtstag = kontakt.get("geburtstag", "") if geburtstag and len(geburtstag) == 10: try: teile = geburtstag.split(".") bday = f"{teile[2]}{teile[1]}{teile[0]}" card.add("bday").value = bday except (IndexError, ValueError): pass # Website url = kontakt.get("url", "") if url: card.add("url").value = url # Notiz notiz = kontakt.get("notiz", "") if notiz: card.add("note").value = notiz return card.serialize() except Exception: return None # === Server-Abruf === def _vcards_holen(self, url, benutzername, passwort): """vCards vom Server holen - REPORT (bevorzugt) oder Fallback.""" auth = (benutzername, passwort) # Methode 1: REPORT addressbook-query (Nextcloud-Standard) vcards = self._report_addressbook_query(url, auth) if vcards: return vcards # Methode 2: PROPFIND mit inline address-data vcards = self._propfind_mit_daten(url, auth) if vcards: return vcards # Methode 3: PROPFIND nur hrefs, dann einzeln abrufen return self._propfind_einzeln(url, auth) def _report_addressbook_query(self, url, auth): """REPORT addressbook-query - holt alle vCards auf einmal.""" body = """ """ try: response = requests.request( "REPORT", url, auth=auth, headers={ "Depth": "1", "Content-Type": "application/xml; charset=utf-8", }, data=body.encode("utf-8"), timeout=60, ) if response.status_code not in (200, 207): return [] except requests.RequestException: return [] return self._vcards_aus_xml(response.text) def _propfind_mit_daten(self, url, auth): """PROPFIND mit inline address-data.""" body = """ """ response = requests.request( "PROPFIND", url, auth=auth, headers={ "Depth": "1", "Content-Type": "application/xml; charset=utf-8", }, data=body.encode("utf-8"), timeout=60, ) response.raise_for_status() return self._vcards_aus_xml(response.text) def _propfind_einzeln(self, url, auth): """Fallback: PROPFIND für hrefs, dann jede .vcf einzeln abrufen.""" body = """ """ response = requests.request( "PROPFIND", url, auth=auth, headers={ "Depth": "1", "Content-Type": "application/xml; charset=utf-8", }, data=body.encode("utf-8"), timeout=30, ) response.raise_for_status() text = response.text href_pattern = r'<[^>]*href[^>]*>([^<]*\.vcf)]*href>' hrefs = re.findall(href_pattern, text, re.IGNORECASE) vcards = [] base = url.split("/remote.php")[0] if "/remote.php" in url else url for href in hrefs: if href.startswith("http"): vcf_url = href else: vcf_url = urljoin(base, href) try: r = requests.get(vcf_url, auth=auth, timeout=10) if r.status_code == 200 and "BEGIN:VCARD" in r.text: vcards.append(r.text) except requests.RequestException: continue return vcards def _vcards_aus_xml(self, xml_text): """vCard-Daten + Metadaten (href, etag) aus XML-Response extrahieren.""" ergebnisse = [] # Pro -Block parsen response_pattern = ( r'<[^>]*?response[^>]*?>(.*?)]*?response>' ) responses = re.findall( response_pattern, xml_text, re.DOTALL | re.IGNORECASE) for block in responses: # href extrahieren href_match = re.search( r'<[^>]*?href[^>]*?>([^<]+)]*?href>', block, re.IGNORECASE) href = href_match.group(1).strip() if href_match else "" # etag extrahieren etag_match = re.search( r'<[^>]*?getetag[^>]*?>([^<]+)]*?getetag>', block, re.IGNORECASE) etag = etag_match.group(1).strip() if etag_match else "" # vCard-Daten extrahieren vcard_match = re.search( r'<[^>]*?address-data[^>]*?>(.*?)]*?address-data>', block, re.DOTALL | re.IGNORECASE) if not vcard_match: continue vcard = vcard_match.group(1).strip() # XML-Entities dekodieren vcard = vcard.replace("<", "<").replace(">", ">") vcard = vcard.replace("&", "&").replace(""", '"') vcard = vcard.replace(" ", "\r") if vcard.startswith("BEGIN:VCARD"): ergebnisse.append({ "vcard_text": vcard, "href": href, "etag": etag, }) return ergebnisse def _vcard_parsen(self, vcard_text, account_name="", href="", etag=""): """Eine vCard in ein Kontakt-Dict umwandeln.""" try: card = vobject.readOne(vcard_text) except Exception: return None # Name name = "" if hasattr(card, "fn"): name = card.fn.value elif hasattr(card, "n"): n = card.n.value name = f"{n.given} {n.family}".strip() if not name: return None # E-Mail email = "" if hasattr(card, "email"): email = card.email.value # Firma firma = "" if hasattr(card, "org"): org = card.org.value if isinstance(org, list): firma = org[0] if org else "" else: firma = str(org) # Telefonnummern nach Typ gruppieren nummern = {} tel_list = getattr(card, "tel_list", []) if not tel_list and hasattr(card, "tel"): tel_list = [card.tel] for tel in tel_list: nummer = self._nummer_bereinigen(tel.value) if not nummer: continue typ = self._tel_typ_bestimmen(tel) if typ == "handy" and "handy" not in nummern: nummern["handy"] = nummer elif typ == "geschaeftlich" and "geschaeftlich" not in nummern: nummern["geschaeftlich"] = nummer elif typ == "telefon" and "telefon" not in nummern: nummern["telefon"] = nummer elif typ not in ("handy", "geschaeftlich", "telefon"): if "sonstige" not in nummern: nummern["sonstige"] = nummer if not nummern: return None # Adresse adresse = "" if hasattr(card, "adr"): adr = card.adr.value teile = [] if adr.street: teile.append(adr.street) plz_ort = " ".join( filter(None, [adr.code or "", adr.city or ""])) if plz_ort: teile.append(plz_ort) if adr.region: teile.append(adr.region) if adr.country: teile.append(adr.country) adresse = ", ".join(teile) # Geburtstag geburtstag = "" if hasattr(card, "bday"): bday_raw = card.bday.value if isinstance(bday_raw, str): bday_raw = bday_raw.replace("-", "") if len(bday_raw) >= 8: geburtstag = ( f"{bday_raw[6:8]}.{bday_raw[4:6]}.{bday_raw[:4]}") else: try: geburtstag = bday_raw.strftime("%d.%m.%Y") except (AttributeError, ValueError): pass # Notizen notiz = "" if hasattr(card, "note"): notiz = card.note.value or "" # Berufsbezeichnung titel = "" if hasattr(card, "title"): titel = card.title.value or "" # Website url_wert = "" if hasattr(card, "url"): url_wert = card.url.value or "" # Alle E-Mails emails = [] email_list = getattr(card, "email_list", []) if not email_list and hasattr(card, "email"): email_list = [card.email] for em in email_list: if em.value and em.value not in emails: emails.append(em.value) return { "name": name, "email": email, "emails": emails, "firma": firma, "titel": titel, "nummern": nummern, "adresse": adresse, "geburtstag": geburtstag, "notiz": notiz, "url": url_wert, "quelle": "carddav", "account": account_name, "_href": href, "_etag": etag, } def _tel_typ_bestimmen(self, tel): """vCard TEL-Typ auf unsere Kategorien abbilden.""" params = tel.params if hasattr(tel, "params") else {} typen = params.get("TYPE", []) if isinstance(typen, str): typen = [typen] typen_lower = [t.lower() for t in typen] if "cell" in typen_lower or "mobile" in typen_lower: return "handy" elif "work" in typen_lower: return "geschaeftlich" elif "home" in typen_lower: return "telefon" elif "voice" in typen_lower: return "telefon" return "sonstige" def _nummer_bereinigen(self, nummer): """Telefonnummer bereinigen.""" if not nummer: return "" return re.sub(r"[^\d+*#]", "", nummer)