import logging from pprint import pformat import re import solo_turnier from solo_turnier import html_parser from .reader import ResultRow from .types import HtmlCompetitionResultRow as CompetitionResult from . import types from . import competition_class class HtmlPerson: def __init__(self, name, id, group): self.name = name self.id = id self.group = group def __repr__(self): return f'{self.name} ({self.id}, {self.group})' def __eq__(self, o): if not isinstance(o, HtmlPerson): return False return str(self) == str(o) def __hash__(self): return str(self).__hash__() class ResultPerson: def __init__(self, firstName, lastName, club, id = None, group = None): self.firstName = firstName self.lastName = lastName self.name = f'{firstName} {lastName}' self.club = club self.id = id self.group = group @staticmethod def extractFromResultRow(row: ResultRow): return ResultPerson( firstName=row.firstName, lastName=row.lastName, club=row.club ) def __eq__(self, o): if not isinstance(o, ResultPerson): return False return ( self.firstName == o.firstName and self.lastName == o.lastName and self.club == o.club and self.id == o.id ) def __repr__(self): if self.id is None: return f'{self.name} ({self.club})' else: return f'{self.name} ({self.club}) [{self.id}]' def __hash__(self): text = str(self) return text.__hash__() class ImportNotParsableException(Exception): pass ParserList_t = dict[str, html_parser.HtmlParser] class PreviewWorker: def __init__(self): self.l = logging.getLogger('solo_turnier.worker.PreviewWorker') self.participants = {} self.previewResults = {} def filterFilesPreview(self, files: list[str]) -> ParserList_t: self.l.debug('Filtering the list of parsers by removing all non preview entries.') ret = {} for file in files: with open(file, 'r') as fp: text = fp.read() parser = html_parser.HtmlParser(text, file) try: data = parser.guessDataFromHtmlTitle() except: self.l.error(f'Unable to parse html file in {file}. Please check manually.') continue if data['class_'] == 'Sichtung': self.l.debug(f"Found candidate in {file}. Adding to the list.") ret[file] = parser else: self.l.debug(f'Rejecting file {file} as the name {data["class_"]} did not match.') return ret def __extractPersonsFromSinglePreview(self, parser: html_parser.HtmlParser): imported = parser.parsePreparationRound() parser.cleanPreparationRoundImport(imported) data = imported['data'] headerData = parser.guessDataFromHtmlTitle() dance = headerData['dance'] classParser = solo_turnier.competition_class.CompetitionClassParser() def getRowIndexOfClass(): return data['titles'].index('Platz von\nPlatz bis') self.l.log(5, data) if data['titles'][0] != 'Wertungsrichter': self.l.fatal('Cannot parse the parsed content of the preview file.') raise ImportNotParsableException('Incompatible export file') if data['titles'][-1] == 'Startgruppe': self.l.debug('Combined competition found. Extracting group from table required.') extractGroup = True else: self.l.debug('Using group from the title.') group = parser.guessDataFromHtmlTitle(imported['title'])['group'] extractGroup = False classRowIndex = getRowIndexOfClass() for index, e in enumerate(data['table'][0]): if e['text'] == '': # Skip empty columns continue # Extract data from column name = e['meta'] id = int(e['text']) if extractGroup: group = data['table'][-1][index]['text'] # dance = class_ = classParser.parseClass(data['table'][classRowIndex][index]['text']) participant = types.HtmlPreviewParticipant(name, id, group) l = self.participants.get(id, []) self.l.log(5, 'Checking for existence of %s in %s: %s', participant, l, participant in l) if participant not in l: l.append(participant) self.participants[id] = l results = self.previewResults.get(participant, {}) results[dance] = class_ self.previewResults[participant] = results def importAllData(self, parsers: ParserList_t) -> types.HtmlPreviewImport: self.participants = {} for file in parsers: parser = parsers[file] try: self.__extractPersonsFromSinglePreview(parser) except: self.l.error('Failed to parse preview round in file %s. Skipping this file\'s content.', parser.fileName) return types.HtmlPreviewImport(self.participants, self.previewResults) class ResultExtractor: def __init__(self): self.l = logging.getLogger('solo_turnier.worker.ResultExtractor') self.rePlaceSingle = re.compile(' *([0-9]+) *') self.rePlaceDouble = re.compile(' *([0-9]+) *- *([0-9]+) *') def getAllParsers(self, files: list[tuple[str,str]]) -> ParserList_t: ret = {} classParser = competition_class.CompetitionClassParser() for filePair in files: with open(filePair[0], 'r') as fp: text = fp.read() parser = html_parser.HtmlParser(text, filePair[0]) if filePair[1] is None: parserTab = None else: with open(filePair[1], 'r') as fp: textTab = fp.read() parserTab = html_parser.HtmlParser(textTab, filePair[1]) try: data = parser.guessDataFromHtmlTitle() except: self.l.error('Cannot parse HTML file %s to check if it is a valid result. Check manually.', filePair[0]) continue try: guessedClass = classParser.parseClass(data['class_']) except: self.l.error('Issue parsing class of file %s. Check manually.', filePair[0]) continue self.l.debug('Fetched result data: %s, guessed class %s', data, guessedClass) ret[filePair] = (parser, parserTab) return ret def _extractPlace(self, placeStr: str): s = placeStr.replace('.', '') matches = self.rePlaceSingle.fullmatch(s) if matches is not None: return (int(matches.group(1)), None) matches = self.rePlaceDouble.fullmatch(s) if matches is not None: return (int(matches.group(1)), int(matches.group(2))) self.l.error('Could not parse place string "%s"', placeStr) raise Exception('Place cannot be parsed') def _analyzeSingleParser(self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults): data = parser.guessDataFromHtmlTitle() competitionClass = data['class_'] competitionGroup = data['group'] dance = data['dance'] result = parser.parseResult() self.l.log(5, 'Raw data extracted: %s', result) for person in result.results.keys(): placeStr = result.results[person] place, placeTo = self._extractPlace(placeStr) competitionResult = types.HtmlSingleCompetitionResult(person.name, place, placeTo, person.finalist) results.add(competitionGroup, competitionClass, dance, person.id, competitionResult) # def _analyzeIndividualResults(self, parser: html_parser.HtmlParser, results: types.HtmlCompetitionTotalResults): data = parser.guessDataFromHtmlTitle() competitionClass = data['class_'] competitionGroup = data['group'] dance = data['dance'] result = parser.parseIndividualResult(competitionGroup, competitionClass, dance) self.l.log(5, 'Found individual results: %s', result.participants) results.tabges.update(result.participants) def extractAllData(self, parsers: ParserList_t) -> types.HtmlCompetitionTotalResults: ret = types.HtmlCompetitionTotalResults() for fileNameTuple in parsers: fileName = fileNameTuple[0] self.l.debug('Extracting data from file %s', fileName) self._analyzeSingleParser(parsers[fileNameTuple][0], ret) if parsers[fileNameTuple][1] is None: self.l.info('Skipping extraction of individual result as class is not yet finished.') else: self.l.debug('Fetching individual result of combined competitions in %s', fileName) self._analyzeIndividualResults(parsers[fileNameTuple][1], ret) return ret class DataWorker: def __init__(self): self.l = logging.getLogger('solo_turnier.worker.DataWorker') def combineRowsByPerson(self, rows: list[ResultRow]) -> dict[ResultPerson, list[CompetitionResult]]: ret = {} for row in rows: result = CompetitionResult.extractFromResultRow(row) if result.place == '-' or result.placeTo == '-': continue person = ResultPerson.extractFromResultRow(row) if person not in ret: ret[person] = [] ret[person].append(result) return ret def checkUniqueIds(self, data: dict[ResultPerson, list[CompetitionResult]]) -> bool: unique = True for person in data: ids = set([c.id for c in data[person]]) if len(ids) == 1: person.id = list(ids)[0] else: unique = False return unique """ Return a tuple The first one is True, if all persons could be unambiguously identified a group The second one is True if there was the need to override a group but it was possible to extract from other data The second one can be seen as a warning """ def consolidateGroups(self, data:dict[ResultPerson, list[CompetitionResult]]) -> tuple[bool, bool]: ambiguous = False warnChange = False unambiguousGroups = set(['Kin.', 'Jun.', 'Jug.']) combinations = set(['Kin./Jun.', 'Jun./Jug.']) for person in data: groupsRaw = set([c.group for c in data[person]]) unknown = groupsRaw.difference(unambiguousGroups).difference(combinations) if len(unknown) > 0: raise Exception(f'There were unknown groups found for {person}: {unknown}') numUnambiguousGroups = len(groupsRaw.intersection(unambiguousGroups)) if numUnambiguousGroups == 0: if len(groupsRaw) == 2: warnChange = True person.group = 'Jun.' else: ambiguous = True if len(groupsRaw) == 1: person.group = list(groupsRaw)[0] elif numUnambiguousGroups == 1: if len(groupsRaw.intersection(combinations)) > 0: warnChange = True person.group = list(groupsRaw.intersection(unambiguousGroups))[0] else: raise Exception(f'{person} cannot have different groups.') return (not ambiguous, warnChange) def _createHtmlLUT(self, htmlImports: list[html_parser.HtmlImport]): ret = {} parser = html_parser.HtmlParser('') for imp in htmlImports: parsed = parser.guessDataFromHtmlTitle(imp.title) key = (parsed['group'], parsed['class_'], parsed['dance']) ret[key] = imp self.l.debug('LUT[%s] = %s', key, imp) self.l.debug('LUT completed') return ret def mergeHtmlData(self, data:dict[ResultPerson, list[CompetitionResult]], htmlImports: list[html_parser.HtmlImport]): lut = self._createHtmlLUT(htmlImports) for person in data: for competition in data[person]: key = (competition.competitionGroup, competition.competitionClass, competition.dance) htmlImport = lut[key] participant = htmlImport.participants[str(competition.id)] if participant.name != person.name: self.l.error(f'Names for {person} and participant in HTML import ({participant}) do not match. Please check carefully.') competition.finalist = participant.finalist def getAllDancesInCompetitions(self, data:dict[ResultPerson, list[CompetitionResult]]) -> list[str]: allDances = [ 'Samba', 'Cha Cha', 'Rumba', 'Paso Doble', 'Jive', 'Langs. Walzer', 'Tango', 'Wiener Walzer', 'Slowfox', 'Quickstep' ] dancesPresent = {d: False for d in allDances} for person in data: for competition in data[person]: dancesPresent[competition.dance] = True return [d for d in allDances if dancesPresent[d]] def collectPersonsInGroups(self, data:dict[ResultPerson, list[CompetitionResult]]) -> list[tuple[str, list[ResultPerson]]]: groups = { 'Kin.': [p for p in data.keys() if p.group == 'Kin.'], 'Jun.': [p for p in data.keys() if p.group == 'Jun.'], 'Jug.': [p for p in data.keys() if p.group == 'Jug.'], } found = groups['Kin.'] + groups['Jun.'] + groups['Jug.'] groups['Sonst'] = [p for p in data.keys() if p not in found] return groups def sortPersonsInGroup(self, persons: list[ResultPerson]) -> list[ResultPerson]: ids = [p.id for p in persons] def decorateByName(p: ResultPerson): return (f'{p.name} ({p.club})', p) def decorateById(p: ResultPerson): return (p.id, p) if any([id == None for id in ids]): # We need to sort by name decorated = [decorateByName(p) for p in persons] showIds = False else: decorated = [decorateById(p) for p in persons] showIds = True decorated.sort() return ([d[1] for d in decorated], showIds) def mapPersonResultsToDanceList(self, results: list[CompetitionResult], dances: list[str]) -> list[CompetitionResult|None]: ret = [] for dance in dances: competitions = [c for c in results if c.dance == dance] if len(competitions) == 0: ret.append(None) elif len(competitions) > 1: raise Exception(f'Multiple competitions with the same dance "{dance}" found.') else: ret.append(competitions[0]) return ret class Worker: def __init__(self): self.l = logging.getLogger('solo_turnier.worker.Worker') self._allDances = ( ['Samba', 'Cha Cha', 'Rumba', 'Paso Doble', 'Jive'] + ['Langs. Walzer', 'Tango', 'Wiener Walzer', 'Slowfox', 'Quickstep'] ) def collectAllData( self, htmlCandidatesPreview: list[str], htmlResultsFileNames: list[str] ) -> types.State3: previewWorker = PreviewWorker() self.l.info('Filtering for pure preview rounds.') parsers = previewWorker.filterFilesPreview(htmlCandidatesPreview) self.l.debug('Remaining files: %s', list(parsers.keys())) self.l.info('Extracting person data from the preview rounds.') previewImport = previewWorker.importAllData(parsers) self.l.debug('Total preview imported participants: %s', pformat(previewImport.participants)) self.l.log(5, 'Total preview results: %s', pformat(previewImport.results)) resultExtractor = ResultExtractor() resultParsers = resultExtractor.getAllParsers(htmlResultsFileNames) htmlResults = resultExtractor.extractAllData(resultParsers) self.l.info('Overall result data extracted: %s', pformat(htmlResults.results)) return types.State3(previewImport, htmlResults) def combineData(self, importedData: types.State3): self.l.info('Starting to build data sets.') groups = self._extractGroups(importedData) self.l.debug('Found groups in the dataset: %s', groups) totalResult = {} for group in groups: self.l.debug('Collecting data for total result of group %s', group) dances = self._extractDancesPerGroup(importedData, group) self.l.log(5, 'Found dances in group %s: %s', group, dances) participants = self._extractParticipantsPerGroup(importedData, group) self.l.log(5, 'Related participants %s', participants) results = {} for participant in participants: self.l.log(5, 'Collecting data for %s', participant) resultsOfParticipant = self._getResultOfSingleParticipant( participant, group, importedData.previewImport, importedData.htmlResults, dances ) self.l.log(5, 'Obtained result %s', resultsOfParticipant) results[participant] = resultsOfParticipant self.l.log(5, 'Result before native fixing: %s', pformat(results)) # self._fixNativePlaces(dances, results) self._fixNativeDataFromTable(dances, results, importedData.htmlResults) self.l.log(5, 'Result after native fixing: %s', pformat(results)) # self.l.log(5,'Fixed data %s', results) totalResult[group] = types.TotalGroupResult(dances, results) self.l.log(5, 'Total result of all groups: %s', pformat(totalResult)) ret = types.State4(totalResult) return ret def _extractGroups(self, data: types.State3): groupParser = solo_turnier.group.GroupParser() groupSet = set([]) # for id in data.previewImport.participants: # participants = data.previewImport.participants[id] # for participant in participants: # groupSet.add(participant.group) for tup in data.htmlResults.results.keys(): gr = groupParser.parseClass(tup[0]) groupSet.add(gr) # self.l.log(5, 'Group type %s', type(gr)) self.l.log(5, 'Set of active groups: %s', groupSet) groups = groupParser.getGroupsAsSortedList(groupSet) return groups def _extractDancesPerGroup(self, data: types.State3, group: solo_turnier.group.Group): groupParser = solo_turnier.group.GroupParser() dances = set() additionalDances = set() foundDances = set() for tup in data.htmlResults.results.keys(): if not groupParser.parseClass(tup[0]) == group: continue foundDances.add(tup[2]) dances.update(foundDances.intersection(self._allDances)) additionalDances.update(foundDances.difference(self._allDances)) if len(additionalDances) > 0: self.l.error('There were dances found, that are not registered. A bug? The dances were: %s', additionalDances) dancesList = [x for x in self._allDances if x in dances] additionalDancesList = list(additionalDances) additionalDancesList.sort() return dancesList + additionalDancesList def _extractParticipantsPerGroup( self, importedData: types.State3, # previewData: types.HtmlPreviewImport, group: solo_turnier.group.Group ) -> list[types.HtmlPreviewParticipant]: groupParser = types.group.GroupParser() ret = [] self.l.log(5, 'Table %s', pformat(importedData.htmlResults.tabges)) self.l.log(5, 'Results %s', pformat(importedData.htmlResults.results)) for tup in importedData.htmlResults.results.keys(): gr = groupParser.parseClass(tup[0]) if not gr == group: continue part = importedData.htmlResults.results[tup][0] part.id = int(tup[3]) ret.append(part) self.l.log(5, 'ret %s', ret) # raise Exception('Test') # for id in previewData.participants: # participantList = previewData.participants[id] # for participant in participantList: # if participant.group == group: # ret.append(participant) return ret def _getResultOfSingleParticipant( self, participant: types.HtmlParticipant, nominalGroup: solo_turnier.group.Group, previewResults: types.HtmlPreviewImport, totalResults: types.HtmlCompetitionTotalResults, allDances: list[str] ) -> list[types.SingleParticipantResult|None]: rawResults = totalResults.getById(participant.id) self.l.log(5, 'Found result data for id %i (raw): %s', participant.id, rawResults) results = [None for x in allDances] for danceIdx, dance in enumerate(allDances): # self.l.log(5, '%s %s', dance, danceIdx) def getResult() -> types.SingleParticipantResult|None: for key in rawResults: if key[0] != dance: continue rawResult = rawResults[key] if len(rawResult) != 1: raise Exception('Multiple results found with same key') rawResult = rawResult[0] nativeClass = key[2] # nativeClass = previewResults.results[participant][dance] # nativeClass = key[2] # self.l.log(5, 'Result %s => %s', key, rawResult) ret = types.SingleParticipantResult( key[2], nativeClass, dance, rawResult.finalist, rawResult.place, rawResult.placeTo ) return ret return None results[danceIdx] = getResult() return results def _fixNativeDataFromTable( self, dances: list[str], data: dict[types.HtmlPreviewParticipant, list[types.SingleParticipantResult]], importedData: types.HtmlCompetitionTotalResults ): rePlace = re.compile('([0-9]+)(?:-([0-9]+))?') classParser = competition_class.CompetitionClassParser() for participant in data.keys(): self.l.log(5, 'fixing participant %s', participant) results = data[participant] for result in results: if result is None: continue self.l.log(5, 'Looking at result set %s', result) def selectEntry(k): return k[2] == result.dance and int(k[3]) == participant.id keys = list(importedData.tabges.keys()) selected = list(map(selectEntry, keys)) try: selectedIndex = selected.index(True) except: continue raw = importedData.tabges[keys[selectedIndex]] self.l.log(5,'Raw %s', raw) nativePlaceRaw = raw[0] matcher = rePlace.fullmatch(nativePlaceRaw) if matcher is None: self.l.error('Cannot parse place string %s for participant %u (%s) in dance %s', nativePlaceRaw, participant.id, participant, result.dance) continue self.l.log(5, 'Found strings by regex: %s', matcher.groups()) result.placeNative = matcher.group(1) result.placeNativeTo = matcher.group(2) if raw[1] is not None: result.nativeClass = classParser.parseAbbreviatedClass(raw[1]) pass def _fixNativePlaces( self, dances: list[str], data: dict[types.HtmlPreviewParticipant, list[types.SingleParticipantResult]] ): classParser = solo_turnier.competition_class.CompetitionClassParser() allClasses = classParser.getAllClasses() allClasses.reverse() for class_ in allClasses: for danceIdx, dance in enumerate(dances): self.l.log(5, 'Fixing native places for class %s in dance %s', class_, dance) remainingParticipants = [] for participant in data.keys(): results = data[participant] danceResult = results[danceIdx] if danceResult is None: continue # self.l.log(5, 'Result of dance: %s', danceResult) if classParser.isABetterThanB(danceResult.nativeClass, class_): # self.l.log(5, 'Skipping %s as the native class is higher', participant) continue remainingParticipants.append((danceResult.place, participant.id, participant)) remainingParticipants.sort() # self.l.log(5, 'Remaining participants %s', remainingParticipants) def getAllParticipantsWithSamePlace(): first = remainingParticipants.pop(0) ret = [first] while len(remainingParticipants) > 0 and remainingParticipants[0][0] == first[0]: ret.append(remainingParticipants.pop(0)) return ret def updateNativePlaces(samePlaced, placeStart): nextPlace = placeStart + len(samePlaced) if len(samePlaced) == 1: placeTo = None else: placeTo = nextPlace - 1 for p in samePlaced: data[p[2]][danceIdx].placeNative = placeStart data[p[2]][danceIdx].placeNativeTo = placeTo return nextPlace places = list(map(lambda x: x[0], remainingParticipants)) place = 1 while len(remainingParticipants) > 0: samePlaced = getAllParticipantsWithSamePlace() place = updateNativePlaces(samePlaced, place) # self.l.log(5, '(Partially) fixed places: %s', (data)) def filterOutFinalists(self, data: types.State4, filterOut: bool): for group in data.results: self.l.debug('Cleaning up group %s', group.name) participants = data.results[group].results.keys() droppedParticipants = [] for participant in participants: self.l.debug('Checking %s', participant) def isFinalistInDance(x: types.HtmlSingleCompetitionResult|None): if x is None: return False return x.finalist mapped = list(map(isFinalistInDance, data.results[group].results[participant])) finalist = True in mapped self.l.log(5,'Check for finalist (in dances %s): %s', mapped, finalist) if finalist: participant.finalist = True else: participant.finalist = False self.l.warning('Dropping %s from the output as no finalist', participant) droppedParticipants.append(participant) if filterOut: for droppedParticipant in droppedParticipants: data.results[group].results.pop(droppedParticipant)