From b68394ec68bf0db0950557c8cfd2784f2eea1952 Mon Sep 17 00:00:00 2001 From: Christian Wolf Date: Thu, 16 Jan 2025 15:43:06 +0100 Subject: [PATCH] Enable sychronization of events --- .../calendar_synchronizer/cal_helper.py | 86 +++++++++++++++++-- .../nc-cal-sync/calendar_synchronizer/sync.py | 73 ++++++++++++---- 2 files changed, 137 insertions(+), 22 deletions(-) diff --git a/scripts/nc-cal-sync/calendar_synchronizer/cal_helper.py b/scripts/nc-cal-sync/calendar_synchronizer/cal_helper.py index 5b27446..7dfb865 100644 --- a/scripts/nc-cal-sync/calendar_synchronizer/cal_helper.py +++ b/scripts/nc-cal-sync/calendar_synchronizer/cal_helper.py @@ -1,5 +1,6 @@ import datetime as dt import re, logging +import zoneinfo from .sync import Event @@ -15,21 +16,26 @@ class IcalHelper: 'Sun': 6 } + tzDE = zoneinfo.ZoneInfo('Europe/Berlin') + deltaWeek = dt.timedelta(weeks=1) + deltaDay = dt.timedelta(days=1) + firstDay = dt.date(year=dt.datetime.now().year, month=1, day=1) + def __init__(self): self._l = logging.getLogger(__name__) - self.dDay = dt.timedelta(days=1) - self.firstDay = dt.datetime(year=dt.datetime.now().year, month=1, day=1) self.reTime = re.compile(r'(\d{1,2}):(\d{2})') def getStart(self, event: Event, holidays): - self._getFirstWeekdayInYear(event.day) - self._getFirstOccurence(event, holidays) - pass + firstDay = self._getFirstOccurence(event, holidays) + + if firstDay is None: + return None, None + return firstDay, dt.datetime.combine(firstDay, self._getTime(event.start), tzinfo=self.tzDE) def _getFirstWeekdayInYear(self, weekday): candidate = self.firstDay while candidate.weekday() != self._WEEKDAY_MAP[weekday]: - candidate += self.dDay + candidate += self.deltaDay self._l.log(5, 'First %s in year is %s', weekday, candidate) @@ -40,7 +46,73 @@ class IcalHelper: def _getSortedFeasts(self, holidays): return sorted(holidays['feasts']) + + def _isFeast(self, candidate, feasts): + return candidate in feasts + + def _isHoliday(self, candidate, holidays): + for h in holidays: + if h['from'] <= candidate <= h['to']: + return True + + return False + + def _getLastHolidayException(self, holidays): + holidayList = list(self._getSortedHolidays(holidays)) + holidayList.reverse() + lastHoliday = holidayList[0] + lastHolidayDay = lastHoliday['to'] + + feastList = list(self._getSortedFeasts(holidays)) + feastList.reverse() + lastFeast = feastList[0] + lastFeastDay = lastFeast + + return max(lastHolidayDay, lastFeastDay) def _getFirstOccurence(self, event: Event, holidays): firstWeekday = self._getFirstWeekdayInYear(event.day) - firstWeekday.tzinfo = dt.timezone.tzname('Europe/Berlin') + + + candidate = firstWeekday + while candidate.year == firstWeekday.year: + if self._isFeast(candidate, holidays['feasts']): + self._l.log(5, 'Found feast %s', candidate) + candidate += self.deltaWeek + continue + + if self._isHoliday(candidate, holidays['holidays']): + self._l.log(5, 'Found holiday %s', candidate) + candidate += self.deltaWeek + continue + + self._l.log(5, 'Found first occurence %s', candidate) + return candidate + + return None + + # firstWeekday.tzinfo = dt.timezone.tzname('Europe/Berlin') + + def _getTime(self, startStr): + match = self.reTime.match(startStr) + return dt.time(int(match.group(1)), int(match.group(2))) + + def getHolidayExceptions(self, event: Event, startDay, holidays): + lastHolidayDay = self._getLastHolidayException(holidays) + + pointer = startDay + exeptions = [] + while pointer <= lastHolidayDay: + ex = dt.datetime.combine(pointer, self._getTime(event.start), tzinfo=self.tzDE) + + if self._isFeast(pointer, holidays['feasts']): + exeptions.append(ex) + + if self._isHoliday(pointer, holidays['holidays']) and not event.external: + exeptions.append(ex) + + pointer += self.deltaWeek + + if len(exeptions) > 0: + return exeptions + return None diff --git a/scripts/nc-cal-sync/calendar_synchronizer/sync.py b/scripts/nc-cal-sync/calendar_synchronizer/sync.py index 437a2eb..4e15b4a 100644 --- a/scripts/nc-cal-sync/calendar_synchronizer/sync.py +++ b/scripts/nc-cal-sync/calendar_synchronizer/sync.py @@ -1,9 +1,9 @@ import logging, yaml, pprint, json, re -import hashlib +import hashlib, uuid import caldav import datetime as dt -from . import login +from . import login, debug _l = logging.getLogger(__name__) @@ -50,7 +50,8 @@ class Event: def __repr__(self): wAge = f' ({self.age})' if self.age is not None else '' - return f'Ev({self.title}{wAge}) [{self.day}, {self.start}, {self.duration}]' + ext = 'Ext' if self.external else '' + return f'{ext}Ev({self.title}{wAge}) [{self.day}, {self.start}, {self.duration}]' def getHash(self, holidays): def fixHolidays(holidays): @@ -86,10 +87,22 @@ class Event: # pprint.pprint(data, indent=4, width=100) hasher = hashlib.sha1() hasher.update(json.dumps(data, sort_keys=True).encode('utf-8')) - return hasher.hexdigest() + first = hasher.hexdigest() + second = uuid.uuid4().hex + return f'{first}___{second}', first dDay = dt.timedelta(days=1) + _MAP_WEEKDAY_ICAL = { + 'Mon': 'MO', + 'Tue': 'TU', + 'Wed': 'WE', + 'Thu': 'TH', + 'Fri': 'FR', + 'Sat': 'SA', + 'Sun': 'SU', + } + def addToCalendar(self, calendar: caldav.Calendar, holidays): from . import cal_helper @@ -99,15 +112,44 @@ class Event: pass helper = cal_helper.IcalHelper() - helper.getStart(self, holidays) + startDay, start = helper.getStart(self, holidays) + + if start is None: + _l.warning('Could not get start time for event %s in the current year', self) + return + uid, uidFirst = self.getHash(holidays) icalData = { - 'uid': self.getHash(holidays), - 'DTSTART': dt.datetime.now(), - 'DTEND': dt.datetime.now() + dt.timedelta(minutes=self.duration), - 'SUMMARY': self.title + 'uid': uid, + 'DTSTART': start, + 'DTEND': start + dt.timedelta(minutes=self.duration), + 'SUMMARY': self.title, + 'RRULE': { + 'FREQ': 'DAILY', + 'BYDAY': self._MAP_WEEKDAY_ICAL[self.day] + } } - # calendar.add_event(**icalData) + # debug.debugger() + + holidayExceptions = helper.getHolidayExceptions(self, startDay, holidays) + if holidayExceptions is not None: + icalData['EXDATE'] = holidayExceptions + + desc = '' + if self.age is not None: + desc += f'Jahrgänge: {self.age}' + if len(self.description) > 0: + desc += '\n\n' + desc += self.description + if len(desc) > 0: + icalData['DESCRIPTION'] = desc + + _l.log(5, 'Adding event\n%s', pprint.pformat(icalData, indent=4, width=100)) + + ical = caldav.lib.vcal.create_ical(**icalData) + _l.log(5, 'Created event\n%s', pprint.pformat(ical, indent=4, width=100)) + _l.log(5, 'Created event\n%s', ical) + calendar.add_event(**icalData) def _unpackSchedules(schedule): def packSingleCalendar(cal): @@ -121,7 +163,7 @@ def _unpackSchedules(schedule): if 'age' in ev: e.age = ev['age'] - if ev.get('extern', False): + if ev.get('extern', False) or ev.get('external', False): e.external = True if 'desc' in ev: e.description = ev['desc'] @@ -216,7 +258,8 @@ class CalendarSynchonizer: ret = {} for e in events: - uid = str(e.icalendar_component['uid']) + uidRaw = str(e.icalendar_component['uid']) + uid = uidRaw.split('___', 1)[0] _l.log(5, 'Event with uid %s was found.', uid) ret[uid] = e @@ -228,9 +271,9 @@ class CalendarSynchonizer: ret = {} for e in schedule: - uid = e.getHash(holidays) - _l.log(5, 'Event with uid %s was found.', uid) - ret[uid] = e + uid, uidFirst = e.getHash(holidays) + _l.log(5, 'Event with uid part %s was found.', uidFirst) + ret[uidFirst] = e return ret