Create a basic code snippet to parse mails automatically

Start of #27
This commit is contained in:
Christian Wolf 2024-01-14 20:41:17 +01:00
parent 05285d29d4
commit 55e3efb3a8
12 changed files with 442 additions and 0 deletions

24
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,24 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
// {
// "localRoot": "${workspaceFolder}",
// "remoteRoot": "."
// }
],
"justMyCode": true
}
]
}

View File

@ -0,0 +1 @@
__pycache__/

View File

@ -0,0 +1,50 @@
from . import cli
from . import mail
from . import headerExtractor
from . import mailParser
from . import competitionParser
from . import mboxReader
import logging
import debugpy
import os
def main():
args = cli.getArgs()
logging.basicConfig()
logger = logging.getLogger(__name__)
verbosityMap = {
0: logging.WARNING,
1: logging.INFO,
}
rootLogger = logging.getLogger()
rootLogger.setLevel(verbosityMap.get(args.verbose, logging.DEBUG))
if args.debug:
debugpy.listen(5678)
debugpy.wait_for_client()
mp = mailParser.MailParser()
cp = competitionParser.CompetitionParser()
if args.read_mbox is not None:
if args.output_folder is None:
logger.error('Cannot use batch mode without explicit output folder.')
exit(1)
reader = mboxReader.MBocReader()
mails = reader.parseMBoxFile(args.read_mbox[0])
for mail in mails:
body = mp.parseMail(mail)
cp.parseMail(body)
filename = cp.getFilename(args.output_folder[0])
logger.info('Using file %s to generate the output.', filename)
folder = os.path.dirname(filename)
os.makedirs(folder, exist_ok=True)
with open(filename, 'w') as fp:
fp.write(cp.getContent())
else:
raise Exception('Not yet implemented')

View File

@ -0,0 +1,3 @@
import competitionNotificationReader
competitionNotificationReader.main()

View File

@ -0,0 +1,11 @@
import argparse
def getArgs():
parser = argparse.ArgumentParser()
parser.add_argument('--read-mbox', nargs=1, help='Read mails from mbox file instead of stdin')
parser.add_argument('-o', '--output-folder', nargs=1, help='Set the output folder of the generated files.')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase the verbosity')
parser.add_argument('--debug', action='store_true', help='Enable python debugger')
return parser.parse_args()

View File

@ -0,0 +1,132 @@
import bs4
import logging
import re
import os
import jinja2
class ParsingFailedEception(Exception):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class CompetitionParser:
def __init__(self):
self._l = logging.getLogger(__name__)
self._partner = ''
self._partnerin = ''
self._date = ''
self._title = ''
self._number = ''
self._group = ''
self._class = ''
self._section = ''
self._ort = ''
self._verein = ''
self._telefon = ''
self._reName = re.compile('Neue Meldung für (.*) / (.*)!')
self._reDate = re.compile('([0-9]+)\.([0-9]+)\.([0-9]+)')
self._reNumber = re.compile('Turnier: ([0-9]+)')
self._rePhone = re.compile('Telefon: ([0-9 /]+)')
self._rePlace = re.compile('Ort: (.*), (.*)')
self._reCompetition = re.compile('(.*) ([A-ES]) ((?:Std)|(?:Lat)|(?:Kombi))')
self._reCleaningString = re.compile('[^a-z0-9-]')
self._reDashes = re.compile('-+')
def parseMail(self, body: str):
parser = bs4.BeautifulSoup(body, 'html.parser')
self._getNames(parser.h2)
self._parseTable(parser.table)
def _getNames(self, h2):
matcher = self._reName.match(h2.string)
if matcher is None:
self._l.error('Parsing of header "%s" failed.', h2)
raise ParsingFailedEception('Header could not be successfully parsed')
self._partner = matcher.group(1)
self._partnerin = matcher.group(2)
def _parseTable(self, table):
def parseDate(date):
match = self._reDate.fullmatch(date)
if match is None:
raise ParsingFailedEception('Cannot parse date %s in mail' % date)
self._date = f'{match.group(3)}-{match.group(2)}-{match.group(1)}'
def parseNumber(content):
match = self._reNumber.fullmatch(content)
if match is None:
raise ParsingFailedEception(f'Cannot parse the turnier number in field {content}')
self._number = match.group(1)
def parseCompetition(competition):
match = self._reCompetition.fullmatch(competition)
if match is None:
raise ParsingFailedEception(f'Cannot parse the competition line {competition}')
self._group = match.group(1)
self._class = match.group(2)
self._section = match.group(3)
def parsePlace(place):
match = self._rePlace.fullmatch(place)
if match is None:
raise ParsingFailedEception(f'Cannot parse the place entry {place}')
self._verein = match.group(1)
self._ort = match.group(2)
def parsePhone(phone):
match = self._rePhone.fullmatch(phone)
if match is None:
raise ParsingFailedEception(f'Cannot parse the phone line {phone}')
self._telefon = match.group(1)
tds = table('td')
parseDate(tds[0].string.strip())
self._title = tds[1].string.strip()
parseNumber(tds[2].string.strip())
parseCompetition(tds[3].string.strip())
parsePlace(tds[4].string.strip())
parsePhone(tds[5].string.strip())
def _cleanName(self, name: str) -> str:
cleanedName = name.lower()
cleanedName = re.sub('ä', 'ae', cleanedName)
cleanedName = re.sub('ö', 'oe', cleanedName)
cleanedName = re.sub('ü', 'ue', cleanedName)
cleanedName = re.sub('ß', 'ss', cleanedName)
cleanedName = re.sub(self._reCleaningString, '-', cleanedName)
cleanedName = re.sub(self._reDashes, '-', cleanedName)
return cleanedName.lower()
def getFilename(self, prefix: str) -> str:
namePartner = self._cleanName(self._partner)
namePartnerin = self._cleanName(self._partnerin)
competition = f'{self._group} {self._class} {self._section}'
competitionName = self._cleanName(competition)
return os.path.join(
prefix,
self._date[0:4],
f'{self._date}-{self._ort.lower()}-{namePartner}-{namePartnerin}-{competitionName}.md'
)
def getContent(self) -> str:
with open(os.path.join(os.path.dirname(__file__), 'contenttemplate.md.tmpl')) as fp:
tpl = fp.read()
j2 = jinja2.Template(tpl)
vars = {
'date': self._date,
'partner': self._partner,
'partnerin': self._partnerin,
'verein': self._verein,
'ort': self._ort,
'telefon': self._telefon,
'group': self._group,
'class': self._class,
'section': self._section,
'title': self._title,
'number': self._number,
}
return j2.render(**vars)

View File

@ -0,0 +1,13 @@
---
dateCompetition: {{ date }}
partner: "{{ partner }}"
partnerin: "{{ partnerin }}"
verein: "{{ verein }}"
ort: "{{ ort }}"
telefon: "{{ telefon }}"
gruppe: "{{ group }}"
klasse: "{{ class }}"
sektion: "{{ section }}"
titel: "{{ title }}"
nummer: {{ number }}
---

View File

@ -0,0 +1,30 @@
import competitionNotificationReader as cnr
import logging
def splitHeaders(lines: list[str]) -> cnr.mail.Mail:
l = logging.getLogger(__name__)
l.debug('Separating headers of an email')
def _getHeaders(lines: list[str]):
headerLines = []
for idx,l in enumerate(lines):
if l == '':
remainingLines = lines[idx+1:]
for j,rl in enumerate(remainingLines):
if rl.strip() != '':
return headerLines, remainingLines[j:]
return headerLines, []
if l.startswith('\t') or l.startswith(' '):
lastLine = headerLines.pop()
newLine = f'{lastLine[1]} {l.strip()}'
headerLines.append(tuple([lastLine[0], newLine]))
else:
parts = l.split(':', 1)
headerLines.append(tuple([parts[0].strip(), parts[1].strip()]))
headerLines, bodyLines = _getHeaders(lines)
mail = cnr.mail.Mail(headerLines, bodyLines)
return mail

View File

@ -0,0 +1,11 @@
import dataclasses
HeaderName_t = str
HeaderValue_t = str
HeaderEntry_t = tuple[HeaderName_t, HeaderValue_t]
@dataclasses.dataclass
class Mail:
headers: list[HeaderEntry_t]
body: list[str]

View File

@ -0,0 +1,113 @@
import competitionNotificationReader as cnr
import logging
import re
class MailParser:
def __init__(self):
self._l = logging.getLogger(__name__)
def parseMail(self, rawMail: cnr.mail.Mail):
# Look for the correct Mail encoding
contentType, boundary = self._getContentType(rawMail)
subMails = self._splitMultipartBody(rawMail.body, boundary)
def isCorrectContentType(mail):
for header in mail.headers:
if header[0].lower() != 'content-type':
continue
return header[1].startswith('text/html')
return False
subMails = list(filter(isCorrectContentType, subMails))
def isCorrectContentEncoding(mail):
for header in mail.headers:
if header[0].lower() != 'content-transfer-encoding':
continue
return header[1] == 'quoted-printable'
return False
subMails = list(filter(isCorrectContentEncoding, subMails))
if len(subMails) != 1:
raise Exception('Not implemented')
body = self._mapQuotedrintable(subMails[0].body)
return body
def _getContentType(self, rawMail: cnr.mail.Mail) -> str:
ctHeaders = list(filter(lambda x: x[0].lower() == 'content-type', rawMail.headers))
if len(ctHeaders) != 1:
self._l.error('No unique content type of the mail was found.')
exit(1)
ct = ctHeaders[0][1]
if not ct.startswith('multipart/alternative'):
raise Exception('Not yet implemented')
parser = re.compile('.*boundary="([^"]+)"')
matcher = parser.match(ct)
if matcher is None:
self._l.error('Cannot extract boundary from mail header.')
exit(1)
boundary = matcher.group(1)
return 'multipart/alternative', boundary
def _splitMultipartBody(self, bodyLines: list[str], boundary: str):
parts = []
subBody = []
for line in bodyLines:
if line.startswith(f'--{boundary}'):
if len(subBody) > 0:
parts.append(subBody)
subBody = []
else:
subBody.append(line)
return list(map(lambda x: cnr.headerExtractor.splitHeaders(x), parts))
def _mapQuotedrintable(self, lines: list[str]):
def mergeLines():
# Drop terminating newlines
ret = [l for l in lines]
r = list(range(len(ret)))
r.reverse()
for i in r:
currentLine = ret[i]
if currentLine.endswith('='):
currentLine = currentLine[:-1] + ret.pop(i+1)
ret[i] = currentLine
return ret
mergedLines = mergeLines()
def mapUnicodeChars():
ret = []
for line in mergedLines:
i = 0
chars = []
while i < len(line):
if line[i] != '=':
chars.extend(list(line[i].encode()))
else:
hexChars = line[i+1:i+3]
value = int(hexChars, 16)
# print(f'{hexChars} -> {value}')
chars.append(value)
i += 2
i += 1
ret.append(chars)
return ret
mappedLines = mapUnicodeChars()
def decodeLine(l):
bytes = [x.to_bytes(1, 'big') for x in l]
decodedLine = b''.join(bytes).decode()
return decodedLine
decodedLines = list(map(decodeLine, mappedLines))
return ''.join(decodedLines)

View File

@ -0,0 +1,49 @@
import logging
import re
import io
import competitionNotificationReader as cnr
class MBocReader:
def __init__(self):
self._l = logging.getLogger(__name__)
def parseMBoxFile(self, filename: str) -> list[cnr.mail.Mail]:
self._l.debug('Reading MBox file "%s"', filename)
mails = []
with open(filename) as fp:
return self._parseMails(fp)
def _isNewMailLine(self, line: str):
return line.startswith('From ')
def _fixSingleLine(self, line: str) -> str:
regex = re.compile('^>+From ')
matcher = regex.match(line)
if matcher is None:
return line
return line[1:]
def _parseMails(self, fp: io.FileIO) -> list[cnr.mail.Mail]:
lines = []
mails = []
while True:
line = fp.readline()
if line == '':
if len(lines) > 0:
mails.append(self._parseSingleMail(lines))
return mails
if self._isNewMailLine(line):
if len(lines) > 0:
mails.append(self._parseSingleMail(lines))
lines = []
else:
lines.append(self._fixSingleLine(line[0:-1]))
def _parseSingleMail(self, lines: list[str]) -> cnr.mail.Mail:
return cnr.headerExtractor.splitHeaders(lines)

View File

@ -0,0 +1,5 @@
beautifulsoup4==4.12.2
debugpy==1.8.0
Jinja2==3.1.3
MarkupSafe==2.1.3
soupsieve==2.5