# >>===================================================<< # ||+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+|| # ||--------------------|A|L|G|-----------------------||| # |||B|u|r|r|o|w|s|-|W|h|e|e|l|e|r|-|T|r|a|n|s|f|o|r|m||| # ||+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+|| # >>===================================================<< # Template by Leo Ackermann (2025) # Code by Leo Ackermann (2025) from ks import simple_kark_sort import random import matplotlib.pyplot as plt # >>==========================================================================<< # >> Run-length encoding << # >>==========================================================================<< def compress_RLE(seq): ''' Compress a sequence using run-length encoding, as a list of (letter, count) couples ''' seq_RLE = [] current_char = seq[0] current_count = 0 for c in seq: if c == current_char: current_count += 1 else: seq_RLE.append((current_char, current_count)) current_char = c current_count = 1 seq_RLE.append((current_char, current_count)) return seq_RLE def decompress_RLE(rle): ''' Decompress a run-length encoded sequence into a string ''' return ''.join(map(lambda x: x[0] * x[1], rle)) def evaluate_RLE_compression(fm, plot=False): crle = compress_RLE(fm.get_string()) cbwtrle = compress_RLE(fm.bwt) print() print(f"#runs in seq: \t {len(crle)}") print(f"#runs in bwt: \t {len(cbwtrle)} (-{(len(crle)-len(cbwtrle))/len(crle)*100 :.1f}%)") print() if plot: fig, ax = plt.subplots(figsize=(16, 10)) ax.hist([x[1] for x in cbwtrle], bins='auto', align='left') for rect in ax.patches: height = rect.get_height() ax.annotate(f'{int(height)}', xy=(rect.get_x()+rect.get_width()/2, height), xytext=(0, 5), textcoords='offset points', ha='center', va='bottom') plt.title("Distribution of run lengths") plt.show() # >>==========================================================================<< # >> FM-index class << # >>==========================================================================<< class FMindex: # >>===[ Class constructor ]================================================ def __init__(self, seq, verbose=False): # Dummy fields declaration, so that pretty printer works throughout the # tutorial # NOTE. Respective definitions are defered to their init functions letters = sorted(set(seq + '$')) + ['_'] self.sa = ['_' for _ in range(len(seq) + 1)] self.bwt = ['_' for _ in range(len(seq) + 1)] self.fm_count = dict([(x, '_') for x in letters]) self.fm_rank = ['_' for _ in range(len(seq) + 1)] self.fm_ranks = dict([(x, ['_' for _ in range(len(seq) + 1)]) for x in letters]) self.next_smallest_letter = dict([(x, '_') for x in letters]) # Fields initialization (order matters) if verbose: print("[FMindex] Compute SA") self.__compute_sa(seq) if verbose: print("[FMindex] Compute BWT") self.__compute_bwt_from_sa(seq) if verbose: print("[FMindex] Compute Rank") cpl = self.__compute_rank_and_counts_per_letter() if verbose: print("[FMindex] Compute Count") self.__compute_count(cpl) if verbose: print("[FMindex] Compute Ranks") self.__compute_ranks_from_rank() if verbose: print("[FMindex] Compute NSL") self.__compute_next_smallest_letter() # >>===[ Pretty printer ]=================================================== def pretty_print(self): ''' Pretty printer for the FM index ''' # NOTE. Some computations are inefficient. But we don't care in this # context (printable => small) F = sorted(self.bwt) C_as_array = list(map(lambda x: self.fm_count[x], F)) letters = sorted(set(self.bwt)) print() print("#\t\tC\tF\tL\tR\t\t" + '\t'.join([f"R[{x}]" for x in letters])) print("------------------------" + "--------" * len(letters)) for i in range(len(self.bwt)): print(f"{i}\t\t{C_as_array[i]}\t{F[i]}\t{self.bwt[i]}\t{self.fm_rank[i]}\t\t" \ + '\t\t'.join([f"{self.fm_ranks[x][i]}" for x in letters])) print() # >>===[ Attribute initialisation functions ]=============================== def __compute_sa(self, seq): ''' Compute the suffix array of the sequence (with termination symbol), in linear time ''' # NOTE. Relies on an external function self.sa = simple_kark_sort(seq + '$') def __compute_bwt_from_sa(self, seq): ''' Compute the Burrows-Wheeler transform using the SA-based definition, in linear time NOTE. self.sa must be initialized ''' self.bwt = ''.join(map(lambda x: seq[x-1] if x>0 else '$', self.sa)) def __compute_rank_and_counts_per_letter(self): ''' Compute the rank-array of the FM-index, in (expected) linear time NOTE. self.bwt must be initialized ''' counters_per_letter = {} self.fm_rank = [] for c in self.bwt: if c not in counters_per_letter: counters_per_letter[c] = 0 counters_per_letter[c] += 1 self.fm_rank.append(counters_per_letter[c]) return counters_per_letter def __compute_count(self, cpl): ''' Compute the count-map of the FM-index, in time O(|Sigma|*log|Sigma|) ''' self.fm_count = {} sorted_cpl = sorted(cpl.items()) total_count = 0 for (letter, count) in sorted_cpl: self.fm_count[letter] = total_count total_count += count def __compute_ranks_from_rank(self): ''' Compute the per-letter rank-arrays of the FM-index, in time proportional to |bwt|*|alphabet| NOTE. self.bwt and self.fm_rank must be initialized ''' self.fm_ranks = {} # Dispatch the rank array into ranks for i in range(len(self.bwt)): letter = self.bwt[i] rank = self.fm_rank[i] if letter not in self.fm_ranks: self.fm_ranks[letter] = [None for _ in range(len(self.bwt))] self.fm_ranks[letter][i] = rank # Fill the ranks arrays for letter in self.fm_ranks.keys(): rk = 0 for i in range(len(self.bwt)): if self.fm_ranks[letter][i] == None: self.fm_ranks[letter][i] = rk else: rk = self.fm_ranks[letter][i] def __compute_next_smallest_letter(self): ''' Compute a map of -> +None that maps a letter to the next one in the lexicographic order NOTE. self.count must be initialized ''' self.next_smallest_letter = {} letters = sorted(self.fm_count.keys()) for i in range(len(letters) - 1): self.next_smallest_letter[letters[i]] = letters[i+1] self.next_smallest_letter[letters[-1]] = None # >>===[ Burrows-Wheeler transform inversion ]============================== def get_string__naive(self): ''' Retrieve the string encoded within the BWT, in time O(|S|^3 * log|S|) NOTE. self.bwt must be initialized ''' first_cols = ["" for _ in range(len(self.bwt))] for _ in range(len(self.bwt)): first_cols = sorted(map(lambda x: x[0] + x[1], zip(self.bwt, first_cols))) return first_cols[0][1:] def get_string(self): ''' Retrieve the string encoded within the BWT, in linear time NOTE. self.bwt, self.fm_count and self.fm_rank must be initialized ''' def __lf_mapping(i): return self.fm_count[self.bwt[i]] + self.fm_rank[i] - 1 revS = '' i = 0 for _ in range(len(self.bwt) - 1): prec = self.bwt[i] revS += prec i = __lf_mapping(i) return ''.join(reversed(revS)) # >>===[ Pattern matching functions ]======================================= def __getPatternInterval(self, pattern): ''' Retrieve the F-interval where a pattern lies. If no such interval exists, returns (None, None). NOTE. self.bwt, self.fm_count, self.fm_ranks, self.next_smallest_letter must be initialized ''' # Pattern is processed right-to-left rev_pattern = list(reversed(pattern)) if rev_pattern[0] not in self.fm_count: return (None, None) # First F-interval (i_min, i_max), corresponding to pattern[-1] i_min = self.fm_count[rev_pattern[0]] nsl = self.next_smallest_letter[rev_pattern[0]] if nsl == None: i_max = len(self.bwt) - 1 else: i_max = self.fm_count[nsl] - 1 # Successive F-intervals (i_min, i_max), with one additional matched # character per iteration for c in rev_pattern[1:]: # Slight modification of the FM-index that shrink the interval to # characters only i_min = self.fm_count[c] + self.fm_ranks[c][i_min - 1] + 1 - 1 i_max = self.fm_count[c] + self.fm_ranks[c][i_max] - 1 if i_min > i_max: return (None, None) return (i_min, i_max) def membership(self, pattern): ''' FM-index based membership pattern-matching ''' return self.__getPatternInterval(pattern) != (None, None) def count(self, pattern): ''' FM-index based count pattern-matching ''' (i_min, i_max) = self.__getPatternInterval(pattern) if (i_min, i_max) == (None, None): return 0 else: return i_max - i_min + 1 def locate(self, pattern): ''' FM-index based locate pattern-matching ''' (i_min, i_max) = self.__getPatternInterval(pattern) if (i_min, i_max) == (None, None): return [] else: return sorted(self.sa[i_min:i_max + 1]) # >>==========================================================================<< # >> Miscellaneous << # >>==========================================================================<< def open_fasta_as_string(filepath): with open(filepath) as fastafile: return ''.join(fastafile.read().splitlines()[1:]).upper() def random_dna_of_length(n): dna = "" letters = ['A', 'T', 'C', 'G'] for _ in range(n): dna += random.choice(letters) return dna