Merge branch 'fix/split-modules' into tmp/live-changes

This commit is contained in:
Christian Wolf 2023-11-20 11:40:04 +01:00
commit 7382df03a8
8 changed files with 731 additions and 717 deletions

View File

@ -13,3 +13,5 @@ from . import output
from . import batch
from . import flask
from . import workers

View File

@ -25,7 +25,7 @@ class BatchWorker:
"Using HTML result files for result extraction: %s", htmlResultFiles
)
worker = solo_turnier.worker.Worker()
worker = solo_turnier.workers.Worker.Worker()
importedData = worker.collectAllData(htmlResultFiles)
combinedData = worker.combineData(importedData)

View File

@ -16,7 +16,7 @@ sectionMap = {
class AbstractOutputter:
def __init__(self):
self.worker = solo_turnier.worker.DataWorker()
self.worker = solo_turnier.workers.DataWorker.DataWorker()
self.groups = []
self.dances = []
self.showIds = False

View File

@ -1,14 +1,15 @@
import logging
from pprint import pformat
import re
import solo_turnier
from solo_turnier import html_parser
from .reader import ResultRow
from .types import HtmlCompetitionResultRow as CompetitionResult
from . import types
from . import competition_class
# import logging
# from pprint import pformat
# import re
# import solo_turnier
# from .types import HtmlCompetitionResultRow as CompetitionResult
# from . import types
# from . import competition_class
class HtmlPerson:
@ -64,708 +65,3 @@ class ResultPerson:
def __hash__(self):
text = str(self)
return text.__hash__()
ParserList_t = dict[str, html_parser.HtmlParser]
class ResultExtractor:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.ResultExtractor")
self.rePlaceSingle = re.compile(" *([0-9]+) *")
self.rePlaceDouble = re.compile(" *([0-9]+) *- *([0-9]+) *")
def getAllParsers(self, files: list[tuple[str, str]]) -> ParserList_t:
ret = {}
classParser = competition_class.CompetitionClassParser()
for filePair in files:
with open(filePair[0], "r") as fp:
text = fp.read()
parser = html_parser.HtmlParser(text, filePair[0])
if filePair[1] is None:
parserTab = None
else:
with open(filePair[1], "r") as fp:
textTab = fp.read()
parserTab = html_parser.HtmlParser(textTab, filePair[1])
try:
data = parser.guessDataFromHtmlTitle()
except:
self.l.error(
"Cannot parse HTML file %s to check if it is a valid result. Check manually.",
filePair[0],
)
continue
try:
guessedClass = classParser.parseClass(data["class_"])
except:
self.l.error(
"Issue parsing class of file %s. Check manually.", filePair[0]
)
continue
self.l.debug(
"Fetched result data: %s, guessed class %s", data, guessedClass
)
ret[filePair] = (parser, parserTab)
return ret
def _extractPlace(self, placeStr: str):
s = placeStr.replace(".", "")
matches = self.rePlaceSingle.fullmatch(s)
if matches is not None:
return (int(matches.group(1)), None)
matches = self.rePlaceDouble.fullmatch(s)
if matches is not None:
return (int(matches.group(1)), int(matches.group(2)))
self.l.error('Could not parse place string "%s"', placeStr)
raise Exception("Place cannot be parsed")
def _analyzeSingleParser(
self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults
):
data = parser.guessDataFromHtmlTitle()
competitionClass = data["class_"]
competitionGroup = data["group"]
dance = data["dance"]
result = parser.parseResult()
self.l.log(5, "Raw data extracted: %s", result)
for person in result.results.keys():
placeStr = result.results[person]
place, placeTo = self._extractPlace(placeStr)
competitionResult = types.HtmlSingleCompetitionResult(
person.name, place, placeTo, person.finalist
)
results.add(
competitionGroup, competitionClass, dance, person.id, competitionResult
)
#
def _analyzeIndividualResults(
self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults
):
data = parser.guessDataFromHtmlTitle()
competitionClass = data["class_"]
competitionGroup = data["group"]
dance = data["dance"]
result = parser.parseIndividualResult(competitionGroup, competitionClass, dance)
self.l.log(5, "Found individual results: %s", result.participants)
results.tabges.update(result.participants)
def extractAllData(
self, parsers: ParserList_t
) -> types.HtmlCompetitionTotalResults:
ret = types.HtmlCompetitionTotalResults()
for fileNameTuple in parsers:
fileName = fileNameTuple[0]
self.l.debug("Extracting data from file %s", fileName)
self._analyzeSingleParser(parsers[fileNameTuple][0], ret)
if parsers[fileNameTuple][1] is None:
self.l.info(
"Skipping extraction of individual result as class is not yet finished."
)
else:
self.l.debug(
"Fetching individual result of combined competitions in %s",
fileName,
)
self._analyzeIndividualResults(parsers[fileNameTuple][1], ret)
return ret
class DataWorker:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.DataWorker")
def combineRowsByPerson(
self, rows: list[ResultRow]
) -> dict[ResultPerson, list[CompetitionResult]]:
ret = {}
for row in rows:
result = CompetitionResult.extractFromResultRow(row)
if result.place == "-" or result.placeTo == "-":
continue
person = ResultPerson.extractFromResultRow(row)
if person not in ret:
ret[person] = []
ret[person].append(result)
return ret
def checkUniqueIds(self, data: dict[ResultPerson, list[CompetitionResult]]) -> bool:
unique = True
for person in data:
ids = set([c.id for c in data[person]])
if len(ids) == 1:
person.id = list(ids)[0]
else:
unique = False
return unique
"""
Return a tuple
The first one is True, if all persons could be unambiguously identified a group
The second one is True if there was the need to override a group but it was possible to extract from other data
The second one can be seen as a warning
"""
def consolidateGroups(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> tuple[bool, bool]:
ambiguous = False
warnChange = False
unambiguousGroups = set(["Kin.", "Jun.", "Jug."])
combinations = set(["Kin./Jun.", "Jun./Jug."])
for person in data:
groupsRaw = set([c.group for c in data[person]])
unknown = groupsRaw.difference(unambiguousGroups).difference(combinations)
if len(unknown) > 0:
raise Exception(
f"There were unknown groups found for {person}: {unknown}"
)
numUnambiguousGroups = len(groupsRaw.intersection(unambiguousGroups))
if numUnambiguousGroups == 0:
if len(groupsRaw) == 2:
warnChange = True
person.group = "Jun."
else:
ambiguous = True
if len(groupsRaw) == 1:
person.group = list(groupsRaw)[0]
elif numUnambiguousGroups == 1:
if len(groupsRaw.intersection(combinations)) > 0:
warnChange = True
person.group = list(groupsRaw.intersection(unambiguousGroups))[0]
else:
raise Exception(f"{person} cannot have different groups.")
return (not ambiguous, warnChange)
def _createHtmlLUT(self, htmlImports: list[html_parser.HtmlImport]):
ret = {}
parser = html_parser.HtmlParser("")
for imp in htmlImports:
parsed = parser.guessDataFromHtmlTitle(imp.title)
key = (parsed["group"], parsed["class_"], parsed["dance"])
ret[key] = imp
self.l.debug("LUT[%s] = %s", key, imp)
self.l.debug("LUT completed")
return ret
def mergeHtmlData(
self,
data: dict[ResultPerson, list[CompetitionResult]],
htmlImports: list[html_parser.HtmlImport],
):
lut = self._createHtmlLUT(htmlImports)
for person in data:
for competition in data[person]:
key = (
competition.competitionGroup,
competition.competitionClass,
competition.dance,
)
htmlImport = lut[key]
participant = htmlImport.participants[str(competition.id)]
if participant.name != person.name:
self.l.error(
f"Names for {person} and participant in HTML import ({participant}) do not match. Please check carefully."
)
competition.finalist = participant.finalist
def getAllDancesInCompetitions(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> list[str]:
allDances = [
"Samba",
"Cha Cha",
"Rumba",
"Paso Doble",
"Jive",
"Langs. Walzer",
"Tango",
"Wiener Walzer",
"Slowfox",
"Quickstep",
]
dancesPresent = {d: False for d in allDances}
for person in data:
for competition in data[person]:
dancesPresent[competition.dance] = True
return [d for d in allDances if dancesPresent[d]]
def collectPersonsInGroups(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> list[tuple[str, list[ResultPerson]]]:
groups = {
"Kin.": [p for p in data.keys() if p.group == "Kin."],
"Jun.": [p for p in data.keys() if p.group == "Jun."],
"Jug.": [p for p in data.keys() if p.group == "Jug."],
}
found = groups["Kin."] + groups["Jun."] + groups["Jug."]
groups["Sonst"] = [p for p in data.keys() if p not in found]
return groups
def sortPersonsInGroup(self, persons: list[ResultPerson]) -> list[ResultPerson]:
ids = [p.id for p in persons]
def decorateByName(p: ResultPerson):
return (f"{p.name} ({p.club})", p)
def decorateById(p: ResultPerson):
return (p.id, p)
if any([id == None for id in ids]):
# We need to sort by name
decorated = [decorateByName(p) for p in persons]
showIds = False
else:
decorated = [decorateById(p) for p in persons]
showIds = True
decorated.sort()
return ([d[1] for d in decorated], showIds)
def mapPersonResultsToDanceList(
self, results: list[CompetitionResult], dances: list[str]
) -> list[CompetitionResult | None]:
ret = []
for dance in dances:
competitions = [c for c in results if c.dance == dance]
if len(competitions) == 0:
ret.append(None)
elif len(competitions) > 1:
raise Exception(
f'Multiple competitions with the same dance "{dance}" found.'
)
else:
ret.append(competitions[0])
return ret
class Worker:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.Worker")
self._allDances = ["Samba", "Cha Cha", "Rumba", "Paso Doble", "Jive"] + [
"Langs. Walzer",
"Tango",
"Wiener Walzer",
"Slowfox",
"Quickstep",
]
self._groupParser = solo_turnier.group.GroupParser()
def collectAllData(self, htmlResultsFileNames: list[str]) -> types.State3:
resultExtractor = ResultExtractor()
resultParsers = resultExtractor.getAllParsers(htmlResultsFileNames)
htmlResults = resultExtractor.extractAllData(resultParsers)
self.l.debug("Overall result data extracted: %s", pformat(htmlResults.results))
return types.State3(htmlResults)
def combineData(self, importedData: types.State3):
self.l.info("Starting to build data sets.")
self.l.debug("Getting per participant groups")
groupMapping = self._getGroupMapping(importedData)
self.l.log(5, "ID-to-group mapping of the parsed data: %s", str(groupMapping))
# groups = self._extractGroups(importedData)
groups = self._extractGroupsFromGroupMapping(groupMapping)
self.l.debug("Found groups in the dataset: %s", groups)
invertedGroupMapping = self._invertGroupMapping(groupMapping, groups)
self.l.log(5, "Inverted group maping: %s", invertedGroupMapping)
totalResult = {}
for group in groups:
self.l.debug("Collecting data for total result of group %s", group)
dances = self._extractDancesPerGroup(importedData, group)
self.l.log(5, "Found dances in group %s: %s", group, dances)
participants = self._extractParticipantsPerGroup(importedData, group)
self.l.log(5, "Related participants %s", participants)
results = {}
for participant in participants:
self.l.log(5, "Collecting data for %s", participant)
resultsOfParticipant = self._getResultOfSingleParticipant(
participant,
group,
importedData.htmlResults,
dances,
)
self.l.log(5, "Obtained result %s", resultsOfParticipant)
results[participant] = resultsOfParticipant
self.l.log(5, "Result before native fixing: %s", pformat(results))
# self._fixNativePlaces(dances, results)
self._fixNativeDataFromTable(dances, results, importedData.htmlResults)
self.l.log(5, "Result after native fixing: %s", pformat(results))
# self.l.log(5,'Fixed data %s', results)
totalResult[group] = types.TotalGroupResult(dances, results)
self.l.log(5, "Total result of all groups: %s", pformat(totalResult))
ret = types.State4(totalResult)
return ret
def _extractGroups(self, data: types.State3):
groupParser = solo_turnier.group.GroupParser()
groupSet = set([])
# for id in data.previewImport.participants:
# participants = data.previewImport.participants[id]
# for participant in participants:
# groupSet.add(participant.group)
for tup in data.htmlResults.results.keys():
gr = groupParser.parseClass(tup[0])
# groupSet.add(gr)
groupSet.update(gr.getContainedGroups())
# self.l.log(5, 'Group type %s', type(gr))
self.l.log(5, "Set of active groups: %s", groupSet)
groups = groupParser.getGroupsAsSortedList(groupSet)
return groups
def _getGroupMapping(
self, importedData: types.State3
) -> dict[int, solo_turnier.group.Group | None]:
groupParser = solo_turnier.group.GroupParser()
def _getBestGroupGuess(groups, id):
counts = {}
grNones = 0
for gr in set(groups):
length = len(list(filter(lambda x: x == gr, groups)))
if isinstance(gr, tuple) or gr is None:
grNones = grNones + length
else:
counts[gr] = length
counts[None] = grNones
candidates = list(counts.keys())
def ccomp(i1):
return counts[i1]
candidates.sort(key=ccomp, reverse=True)
if len(candidates) == 1:
self.l.warning("Unrequired group guessing started.")
return candidates[0]
if len(candidates) == 0:
self.l.error("Problem during the group guessing triggered.")
return None
if counts[candidates[0]] > counts[candidates[1]]:
if candidates[0] is None:
self.l.error(
"Majority of guessed groups is ambigous. Guessing failed for id %d. Falling back to second best guess.",
id,
)
return candidates[1]
self.l.info("Using best fit %s for guessed group.", candidates[0])
return candidates[0]
self.l.warning("Group guessing failed.")
return None
groupsPerId = {}
for tup in importedData.htmlResults.results:
competitionGroup = groupParser.parseClass(tup[0])
fixture = importedData.htmlResults.tabges.get(tup, (None, None, None))
id = int(tup[3])
if fixture[2] is not None:
group = groupParser.parseClass(fixture[2])
else:
containedGroups = competitionGroup.getContainedGroups()
if len(containedGroups) > 1:
self.l.error(
"The group for participant %d is ambiguous in (%s %s %s).",
id,
tup[0],
tup[1],
tup[2],
)
group = containedGroups
else:
group = competitionGroup
knownGroups = groupsPerId.get(id, [])
if group is not None:
knownGroups.append(group)
groupsPerId[id] = knownGroups
ret = {}
for id in groupsPerId.keys():
groupCandidates = groupsPerId[id]
groupSet = set(groupCandidates)
if len(groupSet) == 1:
ret[id] = groupSet.pop()
elif len(groupSet) > 1:
self.l.warning(
"Multiple groups for id %d found: %s", id, groupsPerId[id]
)
ret[id] = _getBestGroupGuess(groupCandidates, id)
else:
self.l.warning("No group for id %d could be found.", id)
ret[id] = None
return ret
def _extractGroupsFromGroupMapping(self, mapping):
foundGroups = set()
for id in mapping:
foundGroups.add(mapping[id])
sortedGroup = self._groupParser.getGroupsAsSortedList(foundGroups)
missingGroups = foundGroups.difference(sortedGroup)
sortedGroup = sortedGroup + list(missingGroups)
return sortedGroup
def _invertGroupMapping(self, mapping, groups):
ret = {}
for group in groups:
ret[group] = []
for id in mapping:
ret[mapping[id]].append(id)
return ret
def _extractDancesPerGroup(
self, data: types.State3, group: solo_turnier.group.Group
):
groupParser = solo_turnier.group.GroupParser()
dances = set()
additionalDances = set()
foundDances = set()
for tup in data.htmlResults.results.keys():
currentGroup = groupParser.parseClass(tup[0])
if group not in currentGroup.getContainedGroups():
continue
foundDances.add(tup[2])
dances.update(foundDances.intersection(self._allDances))
additionalDances.update(foundDances.difference(self._allDances))
if len(additionalDances) > 0:
self.l.error(
"There were dances found, that are not registered. A bug? The dances were: %s",
additionalDances,
)
dancesList = [x for x in self._allDances if x in dances]
additionalDancesList = list(additionalDances)
additionalDancesList.sort()
return dancesList + additionalDancesList
def _extractParticipantsPerGroup(
self,
importedData: types.State3,
# previewData: types.HtmlPreviewImport,
group: solo_turnier.group.Group,
) -> list[types.HtmlPreviewParticipant]:
groupParser = types.group.GroupParser()
ret = []
# self.l.log(5, 'Table %s', pformat(importedData.htmlResults.tabges))
# self.l.log(5, 'Results %s', pformat(importedData.htmlResults.results))
for tup in importedData.htmlResults.results.keys():
currentGroup = groupParser.parseClass(tup[0])
activeGroups = currentGroup.getContainedGroups()
if group not in activeGroups:
continue
fixture = importedData.htmlResults.tabges.get(tup, None)
if fixture is None:
self.l.error("A fixture for the tuple %s could not be read.", tup)
else:
if (
fixture[2] is not None
and groupParser.parseClass(fixture[2]) != group
):
self.l.log(
5,
"Skipping id %s in group %s as in other group.",
tup[3],
group,
)
continue
part = importedData.htmlResults.results[tup][0]
part.id = int(tup[3])
ret.append(part)
self.l.log(5, "ret %s", ret)
# raise Exception('Test')
# for id in previewData.participants:
# participantList = previewData.participants[id]
# for participant in participantList:
# if participant.group == group:
# ret.append(participant)
return ret
def _getResultOfSingleParticipant(
self,
participant: types.HtmlParticipant,
nominalGroup: solo_turnier.group.Group,
totalResults: types.HtmlCompetitionTotalResults,
allDances: list[str],
) -> list[types.SingleParticipantResult | None]:
rawResults = totalResults.getById(participant.id)
self.l.log(
5, "Found result data for id %i (raw): %s", participant.id, rawResults
)
results = [None for x in allDances]
for danceIdx, dance in enumerate(allDances):
# self.l.log(5, '%s %s', dance, danceIdx)
def getResult() -> types.SingleParticipantResult | None:
for key in rawResults:
if key[0] != dance:
continue
rawResult = rawResults[key]
if len(rawResult) != 1:
raise Exception("Multiple results found with same key")
rawResult = rawResult[0]
nativeClass = key[2]
# nativeClass = previewResults.results[participant][dance]
# nativeClass = key[2]
# self.l.log(5, 'Result %s => %s', key, rawResult)
ret = types.SingleParticipantResult(
key[2],
nativeClass,
dance,
rawResult.finalist,
rawResult.place,
rawResult.placeTo,
)
return ret
return None
results[danceIdx] = getResult()
return results
def _fixNativeDataFromTable(
self,
dances: list[str],
data: dict[types.HtmlPreviewParticipant, list[types.SingleParticipantResult]],
importedData: types.HtmlCompetitionTotalResults,
):
rePlace = re.compile("([0-9]+)(?:-([0-9]+))?")
classParser = competition_class.CompetitionClassParser()
for participant in data.keys():
self.l.log(5, "fixing participant %s", participant)
results = data[participant]
for result in results:
if result is None:
continue
self.l.log(5, "Looking at result set %s", result)
def selectEntry(k):
return k[2] == result.dance and int(k[3]) == participant.id
keys = list(importedData.tabges.keys())
selected = list(map(selectEntry, keys))
try:
selectedIndex = selected.index(True)
except:
continue
raw = importedData.tabges[keys[selectedIndex]]
self.l.log(5, "Raw %s", raw)
nativePlaceRaw = raw[0]
matcher = rePlace.fullmatch(nativePlaceRaw)
if matcher is None:
self.l.error(
"Cannot parse place string %s for participant %u (%s) in dance %s",
nativePlaceRaw,
participant.id,
participant,
result.dance,
)
continue
self.l.log(5, "Found strings by regex: %s", matcher.groups())
result.placeNative = matcher.group(1)
result.placeNativeTo = matcher.group(2)
if raw[1] is not None:
result.nativeClass = classParser.parseAbbreviatedClass(raw[1])
pass
def filterOutFinalists(self, data: types.State4, filterOut: bool):
for group in data.results:
self.l.debug("Cleaning up group %s", group.name)
participants = data.results[group].results.keys()
droppedParticipants = []
for participant in participants:
self.l.debug("Checking %s", participant)
def isFinalistInDance(x: types.HtmlSingleCompetitionResult | None):
if x is None:
return False
return x.finalist
mapped = list(
map(isFinalistInDance, data.results[group].results[participant])
)
finalist = True in mapped
self.l.log(5, "Check for finalist (in dances %s): %s", mapped, finalist)
if finalist:
participant.finalist = True
else:
participant.finalist = False
self.l.info(
"Dropping %s from the output as no finalist", participant
)
droppedParticipants.append(participant)
if filterOut:
for droppedParticipant in droppedParticipants:
data.results[group].results.pop(droppedParticipant)

View File

@ -0,0 +1,190 @@
from ..reader import ResultRow
from ..worker import ResultPerson
from ..types import HtmlCompetitionResultRow as CompetitionResult
from solo_turnier import html_parser
import logging
class DataWorker:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.DataWorker")
def combineRowsByPerson(
self, rows: list[ResultRow]
) -> dict[ResultPerson, list[CompetitionResult]]:
ret = {}
for row in rows:
result = CompetitionResult.extractFromResultRow(row)
if result.place == "-" or result.placeTo == "-":
continue
person = ResultPerson.extractFromResultRow(row)
if person not in ret:
ret[person] = []
ret[person].append(result)
return ret
def checkUniqueIds(self, data: dict[ResultPerson, list[CompetitionResult]]) -> bool:
unique = True
for person in data:
ids = set([c.id for c in data[person]])
if len(ids) == 1:
person.id = list(ids)[0]
else:
unique = False
return unique
"""
Return a tuple
The first one is True, if all persons could be unambiguously identified a group
The second one is True if there was the need to override a group but it was possible to extract from other data
The second one can be seen as a warning
"""
def consolidateGroups(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> tuple[bool, bool]:
ambiguous = False
warnChange = False
unambiguousGroups = set(["Kin.", "Jun.", "Jug."])
combinations = set(["Kin./Jun.", "Jun./Jug."])
for person in data:
groupsRaw = set([c.group for c in data[person]])
unknown = groupsRaw.difference(unambiguousGroups).difference(combinations)
if len(unknown) > 0:
raise Exception(
f"There were unknown groups found for {person}: {unknown}"
)
numUnambiguousGroups = len(groupsRaw.intersection(unambiguousGroups))
if numUnambiguousGroups == 0:
if len(groupsRaw) == 2:
warnChange = True
person.group = "Jun."
else:
ambiguous = True
if len(groupsRaw) == 1:
person.group = list(groupsRaw)[0]
elif numUnambiguousGroups == 1:
if len(groupsRaw.intersection(combinations)) > 0:
warnChange = True
person.group = list(groupsRaw.intersection(unambiguousGroups))[0]
else:
raise Exception(f"{person} cannot have different groups.")
return (not ambiguous, warnChange)
def _createHtmlLUT(self, htmlImports: list[html_parser.HtmlImport]):
ret = {}
parser = html_parser.HtmlParser("")
for imp in htmlImports:
parsed = parser.guessDataFromHtmlTitle(imp.title)
key = (parsed["group"], parsed["class_"], parsed["dance"])
ret[key] = imp
self.l.debug("LUT[%s] = %s", key, imp)
self.l.debug("LUT completed")
return ret
def mergeHtmlData(
self,
data: dict[ResultPerson, list[CompetitionResult]],
htmlImports: list[html_parser.HtmlImport],
):
lut = self._createHtmlLUT(htmlImports)
for person in data:
for competition in data[person]:
key = (
competition.competitionGroup,
competition.competitionClass,
competition.dance,
)
htmlImport = lut[key]
participant = htmlImport.participants[str(competition.id)]
if participant.name != person.name:
self.l.error(
f"Names for {person} and participant in HTML import ({participant}) do not match. Please check carefully."
)
competition.finalist = participant.finalist
def getAllDancesInCompetitions(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> list[str]:
allDances = [
"Samba",
"Cha Cha",
"Rumba",
"Paso Doble",
"Jive",
"Langs. Walzer",
"Tango",
"Wiener Walzer",
"Slowfox",
"Quickstep",
]
dancesPresent = {d: False for d in allDances}
for person in data:
for competition in data[person]:
dancesPresent[competition.dance] = True
return [d for d in allDances if dancesPresent[d]]
def collectPersonsInGroups(
self, data: dict[ResultPerson, list[CompetitionResult]]
) -> list[tuple[str, list[ResultPerson]]]:
groups = {
"Kin.": [p for p in data.keys() if p.group == "Kin."],
"Jun.": [p for p in data.keys() if p.group == "Jun."],
"Jug.": [p for p in data.keys() if p.group == "Jug."],
}
found = groups["Kin."] + groups["Jun."] + groups["Jug."]
groups["Sonst"] = [p for p in data.keys() if p not in found]
return groups
def sortPersonsInGroup(self, persons: list[ResultPerson]) -> list[ResultPerson]:
ids = [p.id for p in persons]
def decorateByName(p: ResultPerson):
return (f"{p.name} ({p.club})", p)
def decorateById(p: ResultPerson):
return (p.id, p)
if any([id == None for id in ids]):
# We need to sort by name
decorated = [decorateByName(p) for p in persons]
showIds = False
else:
decorated = [decorateById(p) for p in persons]
showIds = True
decorated.sort()
return ([d[1] for d in decorated], showIds)
def mapPersonResultsToDanceList(
self, results: list[CompetitionResult], dances: list[str]
) -> list[CompetitionResult | None]:
ret = []
for dance in dances:
competitions = [c for c in results if c.dance == dance]
if len(competitions) == 0:
ret.append(None)
elif len(competitions) > 1:
raise Exception(
f'Multiple competitions with the same dance "{dance}" found.'
)
else:
ret.append(competitions[0])
return ret

View File

@ -0,0 +1,125 @@
from solo_turnier import html_parser
from .. import types
import logging
import re
from .. import competition_class
ParserList_t = dict[str, html_parser.HtmlParser]
class ResultExtractor:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.ResultExtractor")
self.rePlaceSingle = re.compile(" *([0-9]+) *")
self.rePlaceDouble = re.compile(" *([0-9]+) *- *([0-9]+) *")
def getAllParsers(self, files: list[tuple[str, str]]) -> ParserList_t:
ret = {}
classParser = competition_class.CompetitionClassParser()
for filePair in files:
with open(filePair[0], "r") as fp:
text = fp.read()
parser = html_parser.HtmlParser(text, filePair[0])
if filePair[1] is None:
parserTab = None
else:
with open(filePair[1], "r") as fp:
textTab = fp.read()
parserTab = html_parser.HtmlParser(textTab, filePair[1])
try:
data = parser.guessDataFromHtmlTitle()
except:
self.l.error(
"Cannot parse HTML file %s to check if it is a valid result. Check manually.",
filePair[0],
)
continue
try:
guessedClass = classParser.parseClass(data["class_"])
except:
self.l.error(
"Issue parsing class of file %s. Check manually.", filePair[0]
)
continue
self.l.debug(
"Fetched result data: %s, guessed class %s", data, guessedClass
)
ret[filePair] = (parser, parserTab)
return ret
def _extractPlace(self, placeStr: str):
s = placeStr.replace(".", "")
matches = self.rePlaceSingle.fullmatch(s)
if matches is not None:
return (int(matches.group(1)), None)
matches = self.rePlaceDouble.fullmatch(s)
if matches is not None:
return (int(matches.group(1)), int(matches.group(2)))
self.l.error('Could not parse place string "%s"', placeStr)
raise Exception("Place cannot be parsed")
def _analyzeSingleParser(
self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults
):
data = parser.guessDataFromHtmlTitle()
competitionClass = data["class_"]
competitionGroup = data["group"]
dance = data["dance"]
result = parser.parseResult()
self.l.log(5, "Raw data extracted: %s", result)
for person in result.results.keys():
placeStr = result.results[person]
place, placeTo = self._extractPlace(placeStr)
competitionResult = types.HtmlSingleCompetitionResult(
person.name, place, placeTo, person.finalist
)
results.add(
competitionGroup, competitionClass, dance, person.id, competitionResult
)
#
def _analyzeIndividualResults(
self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults
):
data = parser.guessDataFromHtmlTitle()
competitionClass = data["class_"]
competitionGroup = data["group"]
dance = data["dance"]
result = parser.parseIndividualResult(competitionGroup, competitionClass, dance)
self.l.log(5, "Found individual results: %s", result.participants)
results.tabges.update(result.participants)
def extractAllData(
self, parsers: ParserList_t
) -> types.HtmlCompetitionTotalResults:
ret = types.HtmlCompetitionTotalResults()
for fileNameTuple in parsers:
fileName = fileNameTuple[0]
self.l.debug("Extracting data from file %s", fileName)
self._analyzeSingleParser(parsers[fileNameTuple][0], ret)
if parsers[fileNameTuple][1] is None:
self.l.info(
"Skipping extraction of individual result as class is not yet finished."
)
else:
self.l.debug(
"Fetching individual result of combined competitions in %s",
fileName,
)
self._analyzeIndividualResults(parsers[fileNameTuple][1], ret)
return ret

View File

@ -0,0 +1,398 @@
import logging
import solo_turnier
from .. import types
from .ResultExtractor import ResultExtractor
from pprint import pformat
import re
from .. import competition_class
class Worker:
def __init__(self):
self.l = logging.getLogger("solo_turnier.worker.Worker")
self._allDances = ["Samba", "Cha Cha", "Rumba", "Paso Doble", "Jive"] + [
"Langs. Walzer",
"Tango",
"Wiener Walzer",
"Slowfox",
"Quickstep",
]
self._groupParser = solo_turnier.group.GroupParser()
def collectAllData(self, htmlResultsFileNames: list[str]) -> types.State3:
resultExtractor = ResultExtractor()
resultParsers = resultExtractor.getAllParsers(htmlResultsFileNames)
htmlResults = resultExtractor.extractAllData(resultParsers)
self.l.debug("Overall result data extracted: %s", pformat(htmlResults.results))
return types.State3(htmlResults)
def combineData(self, importedData: types.State3):
self.l.info("Starting to build data sets.")
self.l.debug("Getting per participant groups")
groupMapping = self._getGroupMapping(importedData)
self.l.log(5, "ID-to-group mapping of the parsed data: %s", str(groupMapping))
# groups = self._extractGroups(importedData)
groups = self._extractGroupsFromGroupMapping(groupMapping)
self.l.debug("Found groups in the dataset: %s", groups)
invertedGroupMapping = self._invertGroupMapping(groupMapping, groups)
self.l.log(5, "Inverted group maping: %s", invertedGroupMapping)
totalResult = {}
ret = types.State4(totalResult)
for group in groups:
self.l.debug("Collecting data for total result of group %s", group)
dances = self._extractDancesPerGroup(importedData, group)
self.l.log(5, "Found dances in group %s: %s", group, dances)
participants = self._extractParticipantsPerGroup(importedData, group)
self.l.log(5, "Related participants %s", participants)
results = {}
for participant in participants:
self.l.log(5, "Collecting data for %s", participant)
resultsOfParticipant = self._getResultOfSingleParticipant(
participant,
group,
importedData.htmlResults,
dances,
)
self.l.log(5, "Obtained result %s", resultsOfParticipant)
results[participant] = resultsOfParticipant
self.l.log(5, "Result before native fixing: %s", pformat(results))
# self._fixNativePlaces(dances, results)
self._fixNativeDataFromTable(dances, results, importedData.htmlResults)
self.l.log(5, "Result after native fixing: %s", pformat(results))
# self.l.log(5,'Fixed data %s', results)
totalResult[group] = types.TotalGroupResult(dances, results)
self.l.log(5, "Total result of all groups: %s", pformat(totalResult))
return ret
def _extractGroups(self, data: types.State3):
groupSet = set([])
# for id in data.previewImport.participants:
# participants = data.previewImport.participants[id]
# for participant in participants:
# groupSet.add(participant.group)
for tup in data.htmlResults.results.keys():
gr = self._groupParser.parseClass(tup[0])
# groupSet.add(gr)
groupSet.update(gr.getContainedGroups())
# self.l.log(5, 'Group type %s', type(gr))
self.l.log(5, "Set of active groups: %s", groupSet)
groups = self._groupParser.getGroupsAsSortedList(groupSet)
return groups
def _getGroupMapping(
self, importedData: types.State3
) -> dict[int, solo_turnier.group.Group | None]:
def _getBestGroupGuess(groups, id):
counts = {}
grNones = 0
for gr in set(groups):
length = len(list(filter(lambda x: x == gr, groups)))
if isinstance(gr, tuple) or gr is None:
grNones = grNones + length
else:
counts[gr] = length
counts[None] = grNones
candidates = list(counts.keys())
def ccomp(i1):
return counts[i1]
candidates.sort(key=ccomp, reverse=True)
if len(candidates) == 1:
self.l.warning("Unrequired group guessing started.")
return candidates[0]
if len(candidates) == 0:
self.l.error("Problem during the group guessing triggered.")
return None
if counts[candidates[0]] > counts[candidates[1]]:
if candidates[0] is None:
self.l.error(
"Majority of guessed groups is ambigous. Guessing failed for id %d. Falling back to second best guess.",
id,
)
return candidates[1]
self.l.info("Using best fit %s for guessed group.", candidates[0])
return candidates[0]
self.l.warning("Group guessing failed.")
return None
groupsPerId = {}
for tup in importedData.htmlResults.results:
competitionGroup = self._groupParser.parseClass(tup[0])
fixture = importedData.htmlResults.tabges.get(tup, (None, None, None))
id = int(tup[3])
if fixture[2] is not None:
group = self._groupParser.parseClass(fixture[2])
else:
containedGroups = competitionGroup.getContainedGroups()
if len(containedGroups) > 1:
self.l.error(
"The group for participant %d is ambiguous in (%s %s %s).",
id,
tup[0],
tup[1],
tup[2],
)
group = containedGroups
else:
group = competitionGroup
knownGroups = groupsPerId.get(id, [])
if group is not None:
knownGroups.append(group)
groupsPerId[id] = knownGroups
ret = {}
for id in groupsPerId.keys():
groupCandidates = groupsPerId[id]
groupSet = set(groupCandidates)
if len(groupSet) == 1:
ret[id] = groupSet.pop()
elif len(groupSet) > 1:
self.l.warning(
"Multiple groups for id %d found: %s", id, groupsPerId[id]
)
ret[id] = _getBestGroupGuess(groupCandidates, id)
else:
self.l.warning("No group for id %d could be found.", id)
ret[id] = None
return ret
def _extractGroupsFromGroupMapping(self, mapping):
foundGroups = set()
for id in mapping:
foundGroups.add(mapping[id])
sortedGroup = self._groupParser.getGroupsAsSortedList(foundGroups)
missingGroups = foundGroups.difference(sortedGroup)
sortedGroup = sortedGroup + list(missingGroups)
return sortedGroup
def _invertGroupMapping(self, mapping, groups):
ret = {}
for group in groups:
ret[group] = []
for id in mapping:
ret[mapping[id]].append(id)
return ret
def _extractDancesPerGroup(
self, data: types.State3, group: solo_turnier.group.Group
):
dances = set()
additionalDances = set()
foundDances = set()
for tup in data.htmlResults.results.keys():
currentGroup = self._groupParser.parseClass(tup[0])
if group not in currentGroup.getContainedGroups():
continue
foundDances.add(tup[2])
dances.update(foundDances.intersection(self._allDances))
additionalDances.update(foundDances.difference(self._allDances))
if len(additionalDances) > 0:
self.l.error(
"There were dances found, that are not registered. A bug? The dances were: %s",
additionalDances,
)
dancesList = [x for x in self._allDances if x in dances]
additionalDancesList = list(additionalDances)
additionalDancesList.sort()
return dancesList + additionalDancesList
def _extractParticipantsPerGroup(
self,
importedData: types.State3,
# previewData: types.HtmlPreviewImport,
group: solo_turnier.group.Group,
) -> list[types.HtmlPreviewParticipant]:
ret = []
# self.l.log(5, 'Table %s', pformat(importedData.htmlResults.tabges))
# self.l.log(5, 'Results %s', pformat(importedData.htmlResults.results))
for tup in importedData.htmlResults.results.keys():
currentGroup = self._groupParser.parseClass(tup[0])
activeGroups = currentGroup.getContainedGroups()
if group not in activeGroups:
continue
fixture = importedData.htmlResults.tabges.get(tup, None)
if fixture is None:
self.l.error("A fixture for the tuple %s could not be read.", tup)
else:
if (
fixture[2] is not None
and self._groupParser.parseClass(fixture[2]) != group
):
self.l.log(
5,
"Skipping id %s in group %s as in other group.",
tup[3],
group,
)
continue
part = importedData.htmlResults.results[tup][0]
part.id = int(tup[3])
ret.append(part)
self.l.log(5, "ret %s", ret)
# raise Exception('Test')
# for id in previewData.participants:
# participantList = previewData.participants[id]
# for participant in participantList:
# if participant.group == group:
# ret.append(participant)
return ret
def _getResultOfSingleParticipant(
self,
participant: types.HtmlParticipant,
nominalGroup: solo_turnier.group.Group,
totalResults: types.HtmlCompetitionTotalResults,
allDances: list[str],
) -> list[types.SingleParticipantResult | None]:
rawResults = totalResults.getById(participant.id)
self.l.log(
5, "Found result data for id %i (raw): %s", participant.id, rawResults
)
results = [None for x in allDances]
for danceIdx, dance in enumerate(allDances):
# self.l.log(5, '%s %s', dance, danceIdx)
def getResult() -> types.SingleParticipantResult | None:
for key in rawResults:
if key[0] != dance:
continue
rawResult = rawResults[key]
if len(rawResult) != 1:
raise Exception("Multiple results found with same key")
rawResult = rawResult[0]
nativeClass = key[2]
# nativeClass = previewResults.results[participant][dance]
# nativeClass = key[2]
# self.l.log(5, 'Result %s => %s', key, rawResult)
ret = types.SingleParticipantResult(
key[2],
nativeClass,
dance,
rawResult.finalist,
rawResult.place,
rawResult.placeTo,
)
return ret
return None
results[danceIdx] = getResult()
return results
def _fixNativeDataFromTable(
self,
dances: list[str],
data: dict[types.HtmlPreviewParticipant, list[types.SingleParticipantResult]],
importedData: types.HtmlCompetitionTotalResults,
):
rePlace = re.compile("([0-9]+)(?:-([0-9]+))?")
classParser = competition_class.CompetitionClassParser()
for participant in data.keys():
self.l.log(5, "fixing participant %s", participant)
results = data[participant]
for result in results:
if result is None:
continue
self.l.log(5, "Looking at result set %s", result)
def selectEntry(k):
return k[2] == result.dance and int(k[3]) == participant.id
keys = list(importedData.tabges.keys())
selected = list(map(selectEntry, keys))
try:
selectedIndex = selected.index(True)
except:
continue
raw = importedData.tabges[keys[selectedIndex]]
self.l.log(5, "Raw %s", raw)
nativePlaceRaw = raw[0]
matcher = rePlace.fullmatch(nativePlaceRaw)
if matcher is None:
self.l.error(
"Cannot parse place string %s for participant %u (%s) in dance %s",
nativePlaceRaw,
participant.id,
participant,
result.dance,
)
continue
self.l.log(5, "Found strings by regex: %s", matcher.groups())
result.placeNative = matcher.group(1)
result.placeNativeTo = matcher.group(2)
if raw[1] is not None:
result.nativeClass = classParser.parseAbbreviatedClass(raw[1])
pass
def filterOutFinalists(self, data: types.State4, filterOut: bool):
for group in data.results:
self.l.debug("Cleaning up group %s", group.name)
participants = data.results[group].results.keys()
droppedParticipants = []
for participant in participants:
self.l.debug("Checking %s", participant)
def isFinalistInDance(x: types.HtmlSingleCompetitionResult | None):
if x is None:
return False
return x.finalist
mapped = list(
map(isFinalistInDance, data.results[group].results[participant])
)
finalist = True in mapped
self.l.log(5, "Check for finalist (in dances %s): %s", mapped, finalist)
if finalist:
participant.finalist = True
else:
participant.finalist = False
self.l.info(
"Dropping %s from the output as no finalist", participant
)
droppedParticipants.append(participant)
if filterOut:
for droppedParticipant in droppedParticipants:
data.results[group].results.pop(droppedParticipant)

View File

@ -0,0 +1,3 @@
from . import ResultExtractor
from . import DataWorker
from . import Worker