# >>===================================================<< # ||+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+|| # ||--------------------|B|I|F|-----------------------||| # |||B|u|r|r|o|w|s|-|W|h|e|e|l|e|r|-|T|r|a|n|s|f|o|r|m||| # ||+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+|| # >>===================================================<< # Template and code by Leo Ackermann (2026) from ks import simple_kark_sort # >>==========================================================================<< # >> Textbook FM-index class << # >>==========================================================================<< class TextbookFMindex: # >>===[ Class constructor ]================================================ def __init__(self, seq, verbose=False): ''' Create a new FMindex (textbook version) ''' # Fields declaration self.bwt = None self.sa = None self.countmap = None self.ranks = None self.next_smallest_letter = None # Fields initialization if verbose: print("[FMindex] Compute SA") self._compute_sa(seq) if verbose: print("[FMindex] Compute BWT") self._compute_bwt(seq) if verbose: print("[FMindex] Compute Ranks") self._compute_ranks() if verbose: print("[FMindex] Compute Countmap") self._compute_countmap() # >>===[ Pretty printer ]=================================================== def pretty_print(self): ''' Pretty printer for the FM index ''' # This function take a list ((name_i, array_i))_i as input, and print # them vertically, side by side, using name_i's as headers # NOTE. We assume here that {header}\t and {content}\t reach the same # column def print_arrays_vertically(entitled_arrays, tabsize=4): # Prettier printing of None def xstr(s): return 'ยท' if s is None else str(s) if entitled_arrays == []: raise Exception(" cannot be empty") header = "#\t|\t" separator = '-' * len(f"#\t".expandtabs(tabsize)) \ + '+' + '-' * (len(f"+\t".expandtabs(tabsize))-1) for (name, array) in entitled_arrays: header += f"{name}\t" separator += '-' * len(f"{name}\t".expandtabs(tabsize)) print(header) print(separator) len_arrays = (len(entitled_arrays[0][1])) for i in range(len_arrays): row = f"{i}\t|\t" for (name, array) in entitled_arrays: row += f"{xstr(array[i])}\t" print(row) bwm = list(map(lambda x: '-------'.join(x), zip(sorted(self.bwt), self.bwt))) count_as_array = [self.countmap[c] for c in sorted(self.bwt)] print() print_arrays_vertically([ ("sa", self.sa), \ ("c", count_as_array), \ ("f l", bwm) ] + \ [(f"r_{x}", self.ranks[x]) for x in sorted(self.ranks)]) print() # >>===[ Attribute initialisation functions ]=============================== def _compute_sa(self, seq): ''' Compute the suffix array of the sequence (with termination symbol), in linear time ''' self.sa = simple_kark_sort(seq + '$') def _compute_bwt(self, seq): ''' Compute the Burrows-Wheeler transform using the SA-based definition, in linear time ''' if self.sa == None: raise Exception("The BWT computation requieres an actual field") self.bwt = ''.join(map(lambda x: seq[x-1] if x>0 else '$', self.sa)) def _compute_ranks(self): ''' Compute the ranks arrays while scanning the Burrows-Wheeler transform ''' if self.bwt == None: raise Exception("The ranks computation requieres an actual field") self.ranks = {} for (i, c) in zip(range(len(self.bwt)), self.bwt): for l in self.ranks: self.ranks[l].append(self.ranks[l][-1]) if c != '$': if c not in self.ranks: self.ranks[c] = [0 for _ in range(i+1)] self.ranks[c][-1] += 1 def _compute_countmap(self): ''' Compute the countmap array from the ranks arrays arrays ''' if self.ranks == None: raise Exception("The countmap computation requieres an actual field") self.countmap = {} self.countmap['$'] = 0 nb_of_accumulated_letters = 1 for c in sorted(self.ranks): self.countmap[c] = nb_of_accumulated_letters nb_of_accumulated_letters += self.ranks[c][-1] def _compute_next_smallest_letter(self): ''' Compute the map of -> +{None} that maps a letter to the next one in the lexicographic order, using countmap ''' self.next_smallest_letter = {} letters = sorted(self.countmap.keys()) for i in range(len(letters) - 1): self.next_smallest_letter[letters[i]] = letters[i+1] self.next_smallest_letter[letters[-1]] = None # >>===[ LF-mapping ]======================================================= def _lf_mapping(self, i): ''' The LF-mapping function, with O(1) lookup time ''' c = self.bwt[i] return self.countmap[c] + self.ranks[c][i] - 1 # >>===[ Burrows-Wheeler transform inversion ]============================== def get_string(self): ''' Retrieve the string encoded within the BWT, in linear time ''' revS = '' i = 0 for _ in range(len(self.bwt) - 1): prec = self.bwt[i] revS += prec i = self._lf_mapping(i) return ''.join(reversed(revS)) # >>===[ Pattern matching functions ]======================================= def _get_pattern_interval(self, pattern): ''' Retrieve the F-interval (i_min, i_max) where a pattern lies. If no such interval exists, returns (None, None). ''' # Pattern is processed right-to-left rev_pattern = list(reversed(pattern)) if rev_pattern[0] not in self.countmap: return (None, None) # First F-interval (i_min, i_max), corresponding to pattern[-1] i_min = self.countmap[rev_pattern[0]] nsl = self.next_smallest_letter[rev_pattern[0]] if nsl == None: i_max = len(self.bwt) - 1 else: i_max = self.countmap[nsl] - 1 # Successive F-intervals (i_min, i_max), with one additional matched # character per iteration for c in rev_pattern[1:]: # Slight modification of the FM-index that shrink the interval to # characters only i_min = self.countmap[c] + self.ranks[c][i_min - 1] + 1 - 1 i_max = self.countmap[c] + self.ranks[c][i_max] - 1 if i_min > i_max: return (None, None) return (i_min, i_max) def membership(self, pattern): ''' FM-index based membership pattern-matching ''' return self._getPatternInterval(pattern) != (None, None) def count(self, pattern): ''' FM-index based count pattern-matching ''' (i_min, i_max) = self._getPatternInterval(pattern) if (i_min, i_max) == (None, None): return 0 else: return i_max - i_min + 1 def locate(self, pattern): ''' FM-index based locate pattern-matching ''' (i_min, i_max) = self._getPatternInterval(pattern) if (i_min, i_max) == (None, None): return [] else: return sorted(self.sa[i_min:i_max + 1]) # >>==========================================================================<< # >> FM-index class << # >>==========================================================================<< class FMindex(TextbookFMindex): # >>===[ Class constructor ]================================================ def __init__(self, seq, rho=1, sigma=1, verbose=False): ''' Create a new FMindex (textbook version) ''' # Inherit fields and initialisation from the Textbook FMindex class super().__init__(seq, verbose) # Extra fields declaration self.rho = rho self.sigma = sigma # Extra fields initialization if verbose: print("[FMindex] Subsampling ranks") self._subsample_ranks() if verbose: print("[FMindex] Subsampling suffix array") self._subsample_sa() # >>===[ Subsampling and on-the-fly retrieval functions ]=================== def _subsample_ranks(self): ''' Subsample the ranks arrays, setting most entries to ''' for c in self.ranks: for i in range(len(self.ranks[c])): if i % self.rho: self.ranks[c][i] = None def _get_ranks_entry(self, letter, i): ''' Equivalent to TextbookFMindex's ''' if i % self.rho == 0: return self.ranks[letter][i] else: i_checkpoint = i//self.rho*self.rho rank = self.ranks[letter][i_checkpoint] for j in range(i_checkpoint+1, i+1): if self.bwt[j] == letter: rank += 1 return rank def _subsample_sa(self): ''' Subsample the suffix array, setting most entries to ''' for i in range(len(self.sa)): if self.sa[i] % self.sigma: self.sa[i] = None def _get_sa_entry(self, i): ''' Equivalent to TextbookFMindex's ''' lf_steps_counter = 0 j = i while self.sa[j] == None: j = self._lf_mapping(j) lf_steps_counter += 1 return self.sa[j] + lf_steps_counter # >>===[ OVERRIDE functions calling or ]============= # OVERRIDE # diff: # (-) ranks[c][i] # (+) _get_ranks_entry[c][i] def _lf_mapping(self, i): c = self.bwt[i] return self.countmap[c] + self._get_ranks_entry(c, i) - 1 # OVERRIDE # diff: # (-) ranks[c][i] # (+) _get_ranks_entry[c][i] def _getPatternInterval(self, pattern): ''' Retrieve the F-interval where a pattern lies. If no such interval exists, returns (None, None). NOTE. self.bwt, self.count, self.ranks, self.next_smallest_letter must be initialized ''' # Pattern is processed right-to-left rev_pattern = list(reversed(pattern)) if rev_pattern[0] not in self.countmap: return (None, None) # First F-interval (i_min, i_max), corresponding to pattern[-1] i_min = self.countmap[rev_pattern[0]] nsl = self.next_smallest_letter[rev_pattern[0]] if nsl == None: i_max = len(self.bwt) - 1 else: i_max = self.countmap[nsl] - 1 # Successive F-intervals (i_min, i_max), with one additional matched # character per iteration for c in rev_pattern[1:]: # Slight modification of the FM-index that shrink the interval to # characters only i_min = self.countmap[c] + self._get_ranks_entry(c, i_min - 1) + 1 - 1 i_max = self.countmap[c] + self._get_ranks_entry(c, i_max) - 1 if i_min > i_max: return (None, None) return (i_min, i_max) # OVERRIDE # diff: # (-) sa[i_min:i_max+1] # (+) [self._get_sa_entry(i) for i in range(i_min, i_max + 1)] def locate(self, pattern): ''' FM-index based locate pattern-matching ''' (i_min, i_max) = self._getPatternInterval(pattern) if (i_min, i_max) == (None, None): return [] else: return sorted([self._get_sa_entry(i) for i in range(i_min, i_max + 1)])