From 55e3efb3a8d4e8e2ca4cd07c03882c746d6a1f2c Mon Sep 17 00:00:00 2001 From: Christian Wolf Date: Sun, 14 Jan 2024 20:41:17 +0100 Subject: [PATCH] Create a basic code snippet to parse mails automatically Start of #27 --- .vscode/launch.json | 24 ++++ .../read-competition-notification/.gitignore | 1 + .../competitionNotificationReader/__init__.py | 50 +++++++ .../competitionNotificationReader/__main__.py | 3 + .../competitionNotificationReader/cli.py | 11 ++ .../competitionParser.py | 132 ++++++++++++++++++ .../contenttemplate.md.tmpl | 13 ++ .../headerExtractor.py | 30 ++++ .../competitionNotificationReader/mail.py | 11 ++ .../mailParser.py | 113 +++++++++++++++ .../mboxReader.py | 49 +++++++ .../requirements.txt | 5 + 12 files changed, 442 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 scripts/read-competition-notification/.gitignore create mode 100644 scripts/read-competition-notification/competitionNotificationReader/__init__.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/__main__.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/cli.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/competitionParser.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/contenttemplate.md.tmpl create mode 100644 scripts/read-competition-notification/competitionNotificationReader/headerExtractor.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/mail.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/mailParser.py create mode 100644 scripts/read-competition-notification/competitionNotificationReader/mboxReader.py create mode 100644 scripts/read-competition-notification/requirements.txt diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..50193e7 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,24 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Remote Attach", + "type": "python", + "request": "attach", + "connect": { + "host": "localhost", + "port": 5678 + }, + "pathMappings": [ + // { + // "localRoot": "${workspaceFolder}", + // "remoteRoot": "." + // } + ], + "justMyCode": true + } + ] +} diff --git a/scripts/read-competition-notification/.gitignore b/scripts/read-competition-notification/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/scripts/read-competition-notification/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/scripts/read-competition-notification/competitionNotificationReader/__init__.py b/scripts/read-competition-notification/competitionNotificationReader/__init__.py new file mode 100644 index 0000000..2fce7e5 --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/__init__.py @@ -0,0 +1,50 @@ +from . import cli +from . import mail +from . import headerExtractor +from . import mailParser +from . import competitionParser +from . import mboxReader + +import logging +import debugpy +import os + +def main(): + args = cli.getArgs() + + logging.basicConfig() + logger = logging.getLogger(__name__) + + verbosityMap = { + 0: logging.WARNING, + 1: logging.INFO, + } + rootLogger = logging.getLogger() + rootLogger.setLevel(verbosityMap.get(args.verbose, logging.DEBUG)) + + if args.debug: + debugpy.listen(5678) + debugpy.wait_for_client() + + mp = mailParser.MailParser() + cp = competitionParser.CompetitionParser() + + if args.read_mbox is not None: + if args.output_folder is None: + logger.error('Cannot use batch mode without explicit output folder.') + exit(1) + + reader = mboxReader.MBocReader() + mails = reader.parseMBoxFile(args.read_mbox[0]) + for mail in mails: + body = mp.parseMail(mail) + cp.parseMail(body) + filename = cp.getFilename(args.output_folder[0]) + logger.info('Using file %s to generate the output.', filename) + folder = os.path.dirname(filename) + os.makedirs(folder, exist_ok=True) + with open(filename, 'w') as fp: + fp.write(cp.getContent()) + else: + raise Exception('Not yet implemented') + diff --git a/scripts/read-competition-notification/competitionNotificationReader/__main__.py b/scripts/read-competition-notification/competitionNotificationReader/__main__.py new file mode 100644 index 0000000..de55085 --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/__main__.py @@ -0,0 +1,3 @@ +import competitionNotificationReader + +competitionNotificationReader.main() diff --git a/scripts/read-competition-notification/competitionNotificationReader/cli.py b/scripts/read-competition-notification/competitionNotificationReader/cli.py new file mode 100644 index 0000000..b27237e --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/cli.py @@ -0,0 +1,11 @@ +import argparse + +def getArgs(): + parser = argparse.ArgumentParser() + + parser.add_argument('--read-mbox', nargs=1, help='Read mails from mbox file instead of stdin') + parser.add_argument('-o', '--output-folder', nargs=1, help='Set the output folder of the generated files.') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase the verbosity') + parser.add_argument('--debug', action='store_true', help='Enable python debugger') + + return parser.parse_args() diff --git a/scripts/read-competition-notification/competitionNotificationReader/competitionParser.py b/scripts/read-competition-notification/competitionNotificationReader/competitionParser.py new file mode 100644 index 0000000..17282a4 --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/competitionParser.py @@ -0,0 +1,132 @@ +import bs4 +import logging +import re +import os +import jinja2 + +class ParsingFailedEception(Exception): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + +class CompetitionParser: + + def __init__(self): + self._l = logging.getLogger(__name__) + + self._partner = '' + self._partnerin = '' + self._date = '' + self._title = '' + self._number = '' + self._group = '' + self._class = '' + self._section = '' + self._ort = '' + self._verein = '' + self._telefon = '' + + self._reName = re.compile('Neue Meldung für (.*) / (.*)!') + self._reDate = re.compile('([0-9]+)\.([0-9]+)\.([0-9]+)') + self._reNumber = re.compile('Turnier: ([0-9]+)') + self._rePhone = re.compile('Telefon: ([0-9 /]+)') + self._rePlace = re.compile('Ort: (.*), (.*)') + self._reCompetition = re.compile('(.*) ([A-ES]) ((?:Std)|(?:Lat)|(?:Kombi))') + + self._reCleaningString = re.compile('[^a-z0-9-]') + self._reDashes = re.compile('-+') + + def parseMail(self, body: str): + parser = bs4.BeautifulSoup(body, 'html.parser') + self._getNames(parser.h2) + self._parseTable(parser.table) + + def _getNames(self, h2): + matcher = self._reName.match(h2.string) + if matcher is None: + self._l.error('Parsing of header "%s" failed.', h2) + raise ParsingFailedEception('Header could not be successfully parsed') + self._partner = matcher.group(1) + self._partnerin = matcher.group(2) + + def _parseTable(self, table): + def parseDate(date): + match = self._reDate.fullmatch(date) + if match is None: + raise ParsingFailedEception('Cannot parse date %s in mail' % date) + self._date = f'{match.group(3)}-{match.group(2)}-{match.group(1)}' + + def parseNumber(content): + match = self._reNumber.fullmatch(content) + if match is None: + raise ParsingFailedEception(f'Cannot parse the turnier number in field {content}') + self._number = match.group(1) + + def parseCompetition(competition): + match = self._reCompetition.fullmatch(competition) + if match is None: + raise ParsingFailedEception(f'Cannot parse the competition line {competition}') + self._group = match.group(1) + self._class = match.group(2) + self._section = match.group(3) + + def parsePlace(place): + match = self._rePlace.fullmatch(place) + if match is None: + raise ParsingFailedEception(f'Cannot parse the place entry {place}') + self._verein = match.group(1) + self._ort = match.group(2) + + def parsePhone(phone): + match = self._rePhone.fullmatch(phone) + if match is None: + raise ParsingFailedEception(f'Cannot parse the phone line {phone}') + self._telefon = match.group(1) + + tds = table('td') + parseDate(tds[0].string.strip()) + self._title = tds[1].string.strip() + parseNumber(tds[2].string.strip()) + parseCompetition(tds[3].string.strip()) + parsePlace(tds[4].string.strip()) + parsePhone(tds[5].string.strip()) + + def _cleanName(self, name: str) -> str: + cleanedName = name.lower() + cleanedName = re.sub('ä', 'ae', cleanedName) + cleanedName = re.sub('ö', 'oe', cleanedName) + cleanedName = re.sub('ü', 'ue', cleanedName) + cleanedName = re.sub('ß', 'ss', cleanedName) + cleanedName = re.sub(self._reCleaningString, '-', cleanedName) + cleanedName = re.sub(self._reDashes, '-', cleanedName) + return cleanedName.lower() + + def getFilename(self, prefix: str) -> str: + namePartner = self._cleanName(self._partner) + namePartnerin = self._cleanName(self._partnerin) + competition = f'{self._group} {self._class} {self._section}' + competitionName = self._cleanName(competition) + + return os.path.join( + prefix, + self._date[0:4], + f'{self._date}-{self._ort.lower()}-{namePartner}-{namePartnerin}-{competitionName}.md' + ) + + def getContent(self) -> str: + with open(os.path.join(os.path.dirname(__file__), 'contenttemplate.md.tmpl')) as fp: + tpl = fp.read() + j2 = jinja2.Template(tpl) + vars = { + 'date': self._date, + 'partner': self._partner, + 'partnerin': self._partnerin, + 'verein': self._verein, + 'ort': self._ort, + 'telefon': self._telefon, + 'group': self._group, + 'class': self._class, + 'section': self._section, + 'title': self._title, + 'number': self._number, + } + return j2.render(**vars) diff --git a/scripts/read-competition-notification/competitionNotificationReader/contenttemplate.md.tmpl b/scripts/read-competition-notification/competitionNotificationReader/contenttemplate.md.tmpl new file mode 100644 index 0000000..6628871 --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/contenttemplate.md.tmpl @@ -0,0 +1,13 @@ +--- +dateCompetition: {{ date }} +partner: "{{ partner }}" +partnerin: "{{ partnerin }}" +verein: "{{ verein }}" +ort: "{{ ort }}" +telefon: "{{ telefon }}" +gruppe: "{{ group }}" +klasse: "{{ class }}" +sektion: "{{ section }}" +titel: "{{ title }}" +nummer: {{ number }} +--- diff --git a/scripts/read-competition-notification/competitionNotificationReader/headerExtractor.py b/scripts/read-competition-notification/competitionNotificationReader/headerExtractor.py new file mode 100644 index 0000000..d3d6adf --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/headerExtractor.py @@ -0,0 +1,30 @@ +import competitionNotificationReader as cnr +import logging + +def splitHeaders(lines: list[str]) -> cnr.mail.Mail: + l = logging.getLogger(__name__) + + l.debug('Separating headers of an email') + + def _getHeaders(lines: list[str]): + headerLines = [] + for idx,l in enumerate(lines): + if l == '': + remainingLines = lines[idx+1:] + for j,rl in enumerate(remainingLines): + if rl.strip() != '': + return headerLines, remainingLines[j:] + return headerLines, [] + + if l.startswith('\t') or l.startswith(' '): + lastLine = headerLines.pop() + newLine = f'{lastLine[1]} {l.strip()}' + headerLines.append(tuple([lastLine[0], newLine])) + else: + parts = l.split(':', 1) + headerLines.append(tuple([parts[0].strip(), parts[1].strip()])) + + headerLines, bodyLines = _getHeaders(lines) + + mail = cnr.mail.Mail(headerLines, bodyLines) + return mail diff --git a/scripts/read-competition-notification/competitionNotificationReader/mail.py b/scripts/read-competition-notification/competitionNotificationReader/mail.py new file mode 100644 index 0000000..f60c5bf --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/mail.py @@ -0,0 +1,11 @@ +import dataclasses + +HeaderName_t = str +HeaderValue_t = str +HeaderEntry_t = tuple[HeaderName_t, HeaderValue_t] + +@dataclasses.dataclass +class Mail: + headers: list[HeaderEntry_t] + body: list[str] + diff --git a/scripts/read-competition-notification/competitionNotificationReader/mailParser.py b/scripts/read-competition-notification/competitionNotificationReader/mailParser.py new file mode 100644 index 0000000..6052ea1 --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/mailParser.py @@ -0,0 +1,113 @@ +import competitionNotificationReader as cnr +import logging +import re + +class MailParser: + def __init__(self): + self._l = logging.getLogger(__name__) + + def parseMail(self, rawMail: cnr.mail.Mail): + # Look for the correct Mail encoding + contentType, boundary = self._getContentType(rawMail) + subMails = self._splitMultipartBody(rawMail.body, boundary) + + def isCorrectContentType(mail): + for header in mail.headers: + if header[0].lower() != 'content-type': + continue + return header[1].startswith('text/html') + return False + subMails = list(filter(isCorrectContentType, subMails)) + + def isCorrectContentEncoding(mail): + for header in mail.headers: + if header[0].lower() != 'content-transfer-encoding': + continue + return header[1] == 'quoted-printable' + return False + subMails = list(filter(isCorrectContentEncoding, subMails)) + + if len(subMails) != 1: + raise Exception('Not implemented') + + body = self._mapQuotedrintable(subMails[0].body) + return body + + + def _getContentType(self, rawMail: cnr.mail.Mail) -> str: + ctHeaders = list(filter(lambda x: x[0].lower() == 'content-type', rawMail.headers)) + if len(ctHeaders) != 1: + self._l.error('No unique content type of the mail was found.') + exit(1) + + ct = ctHeaders[0][1] + if not ct.startswith('multipart/alternative'): + raise Exception('Not yet implemented') + + parser = re.compile('.*boundary="([^"]+)"') + matcher = parser.match(ct) + if matcher is None: + self._l.error('Cannot extract boundary from mail header.') + exit(1) + + boundary = matcher.group(1) + + return 'multipart/alternative', boundary + + def _splitMultipartBody(self, bodyLines: list[str], boundary: str): + parts = [] + subBody = [] + for line in bodyLines: + if line.startswith(f'--{boundary}'): + if len(subBody) > 0: + parts.append(subBody) + subBody = [] + else: + subBody.append(line) + return list(map(lambda x: cnr.headerExtractor.splitHeaders(x), parts)) + + def _mapQuotedrintable(self, lines: list[str]): + def mergeLines(): + # Drop terminating newlines + ret = [l for l in lines] + r = list(range(len(ret))) + r.reverse() + for i in r: + currentLine = ret[i] + if currentLine.endswith('='): + currentLine = currentLine[:-1] + ret.pop(i+1) + ret[i] = currentLine + + return ret + + mergedLines = mergeLines() + + def mapUnicodeChars(): + ret = [] + for line in mergedLines: + i = 0 + chars = [] + while i < len(line): + if line[i] != '=': + chars.extend(list(line[i].encode())) + else: + hexChars = line[i+1:i+3] + value = int(hexChars, 16) + # print(f'{hexChars} -> {value}') + chars.append(value) + i += 2 + i += 1 + ret.append(chars) + + return ret + + mappedLines = mapUnicodeChars() + + def decodeLine(l): + bytes = [x.to_bytes(1, 'big') for x in l] + decodedLine = b''.join(bytes).decode() + return decodedLine + decodedLines = list(map(decodeLine, mappedLines)) + + return ''.join(decodedLines) + diff --git a/scripts/read-competition-notification/competitionNotificationReader/mboxReader.py b/scripts/read-competition-notification/competitionNotificationReader/mboxReader.py new file mode 100644 index 0000000..2ce0e1f --- /dev/null +++ b/scripts/read-competition-notification/competitionNotificationReader/mboxReader.py @@ -0,0 +1,49 @@ +import logging +import re +import io + +import competitionNotificationReader as cnr + +class MBocReader: + + def __init__(self): + self._l = logging.getLogger(__name__) + + def parseMBoxFile(self, filename: str) -> list[cnr.mail.Mail]: + self._l.debug('Reading MBox file "%s"', filename) + + mails = [] + with open(filename) as fp: + return self._parseMails(fp) + + def _isNewMailLine(self, line: str): + return line.startswith('From ') + + def _fixSingleLine(self, line: str) -> str: + regex = re.compile('^>+From ') + matcher = regex.match(line) + + if matcher is None: + return line + + return line[1:] + + def _parseMails(self, fp: io.FileIO) -> list[cnr.mail.Mail]: + lines = [] + mails = [] + while True: + line = fp.readline() + if line == '': + if len(lines) > 0: + mails.append(self._parseSingleMail(lines)) + return mails + + if self._isNewMailLine(line): + if len(lines) > 0: + mails.append(self._parseSingleMail(lines)) + lines = [] + else: + lines.append(self._fixSingleLine(line[0:-1])) + + def _parseSingleMail(self, lines: list[str]) -> cnr.mail.Mail: + return cnr.headerExtractor.splitHeaders(lines) diff --git a/scripts/read-competition-notification/requirements.txt b/scripts/read-competition-notification/requirements.txt new file mode 100644 index 0000000..8582984 --- /dev/null +++ b/scripts/read-competition-notification/requirements.txt @@ -0,0 +1,5 @@ +beautifulsoup4==4.12.2 +debugpy==1.8.0 +Jinja2==3.1.3 +MarkupSafe==2.1.3 +soupsieve==2.5