# -*- coding: utf-8 -*-
"""
.. wikisection:: overview
:title: (6) Alignment-free local similarity detection with Word-Blot
The :mod:`biseqt.blot` module implements the WordBlot algorithm.
>>> from biseqt.blot import WordBlot
>>> from biseqt.sequence import Sequence, Alphabet
>>> from biseqt.stochastics import rand_seq, MutationProcess
>>> A = Alphabet('ACGT')
>>> S = rand_seq(A, 100)
>>> M = MutationProcess(A, go_prob=.1, ge_prob=.1, subst_probs=.2)
>>> T, _ = M.mutate(S)
>>> WB = WordBlot(S, T, wordlen=5, alphabet=A, g_max=.5, \
sensitivity=.99)
>>> list(WB.similar_segments(50, .7))
[(((-28, 7), (0, 99)), 1.1988215486845972, 0.7630903108462143)]
>>>
"""
import sys
import warnings
import numpy as np
import logging
from itertools import groupby, product
from scipy.special import erfcinv
from scipy.spatial import cKDTree
from .seeds import SeedIndex, SeedIndexMultiple
from .kmers import as_kmer_seq
from .util import Logger
# so we can catch numpy warnings
warnings.filterwarnings('error')
# A band (i-r, i+r) is considered of interest if xs[i] >threshold. This
# function returns a maximal (disjoint) set of bands of interest (i.e if two
# bands overlap they are reported as one bigger band).
[docs]def find_peaks(xs, rs, threshold):
"""Finds maximal (disjoint) peak regions in a sequence of real numbers.
Each value that is at least as large as the threshold constitutes the
center of a peak with radius according to its position. In the output all
overlapping peaks are merged into maximal peaks.
Args:
xs: the 1D data sequence of interest
rs: the radii for peaks defined at every point exceeding threshold,
could be a 1D sequence or a fixed number,
threshold: cutoff value to compare with ``xs`` values.
Returns:
list (tuple): A least of "peaks", each a tuple of ``(left, right)``
coordinates in ``xs``. Returned peaks are guaranteed to be disjoint.
"""
peaks = []
cur_peak = None
for idx, x in enumerate(xs):
radius = rs[idx] if isinstance(rs, list) else rs
assert isinstance(radius, int)
if x < threshold:
continue
peak_l, peak_r = max(0, idx - 1), min(len(xs) - 1, idx + 1)
if cur_peak is None:
cur_peak = (peak_l, peak_r)
continue
if peak_l < cur_peak[1] + radius: # overlaps with cur_peak
assert peak_r >= cur_peak[1]
cur_peak = (cur_peak[0], peak_r)
else:
peaks.append(cur_peak)
cur_peak = (peak_l, peak_r)
if cur_peak is not None:
peaks.append(cur_peak)
return [(int(l), int(r)) for (l, r) in peaks]
[docs]def wall_to_wall_distance(len0, len1, diag):
"""Wall to wall distance for a diagonal position
.. math::
L = \\min(l_0 - d, l_1) + \\min(d, 0)
with :math:`l_0,l_1` being the length of the sequences (i.e ``len0`` and
``len1`` arguments) and :math:`d` the starting diagonal (i.e ``diag``
argument).
"""
return min(len0 - diag, len1) + min(diag, 0)
[docs]def expected_overlap_len(len0, len1, diag, gap_prob):
"""Calculates the expected length of an overlap alignment given its starting
coordinates:
.. math::
K = \\left(\\frac{2}{2 - g}\\right) L
where :math:`L` is the :func:`wall_to_wall_distance`.
Args:
len0 (int): Length of the 1st sequence.
len1 (int): Length of the 2nd sequence.
diag (int): Starting diagonal of alignments to consider.
gap_prob (float): Probability of indels occuring at any position.
Returns:
int: Expected length of an overlap alignment.
"""
L = wall_to_wall_distance(len0, len1, diag)
expected_len = (2. / (2 - gap_prob)) * L
assert expected_len >= 0
return int(np.ceil(expected_len))
# band radius for edit path of length K
[docs]def band_radius(expected_len, gap_prob, sensitivity):
"""Calculates the smallest band radius in the dynamic programming table
such that an alignment of given expected length, with the given gap
probability, stays entirely within the diagonal band with probability given
as sensitivity. This is given by:
.. math::
r = \\mathrm{erf}^{-1}\\left(1-\\epsilon\\right)
\\sqrt{2gK}
where :math:`g` is the gap probability, :math:`1-\\epsilon` is the desired
sensitivity, and :math:`K` is the given expected length.
Args:
expected_len (int):
minimum expected length of similar region.
gap_prob (float): Probability of indels occuring at any position.
sensitivity (float): The probability that an alignment with given gap
probability remains entirely within the band.
Returns:
int: The smallest band radius guaranteeing the required sensitivity.
"""
assert 0 < gap_prob < 1 and 0 < sensitivity < 1
epsilon = 1. - sensitivity
C = erfcinv(epsilon) * np.sqrt(2 * gap_prob)
radius = C * np.sqrt(expected_len)
return max(1, int(np.ceil(radius)))
[docs]def band_radii(expected_lens, gap_prob, sensitivity):
"""Same as :func:`band_radius` but for bulk calculations (e.g. finding
overlaps).
Args:
expected_lens (list):
List of integers for which to calculate band radii.
gap_prob (float):
As in :func:`band_radius`.
sensitivity (float):
As in :func:`band_radius`.
"""
assert 0 < gap_prob < 1 and 0 < sensitivity < 1
epsilon = 1. - sensitivity
C = erfcinv(epsilon) * np.sqrt(2 * gap_prob)
return np.array([max(1, int(np.ceil(C * np.sqrt(K))))
for K in expected_lens])
[docs]def H0_moments(alphabet_len, wordlen, area):
"""The mean and standrad deviation of the limiting normal distribution
under the :math:`H_0` (unrelated) model given by:
.. math::
\\begin{aligned}
\mu_0 & = Ap^w \\\\
\sigma_0^2 & =
A\\left[(1 - p^w)\left(p^w + \\frac{2p^{w+1}}{1 - p}\\right)
- 2wp^{2w}\\right]
\\end{aligned}
where :math:`w` is the word length, :math:`A` is the area of the ROI, and
:math:`p = \\frac{1}{|\Sigma|}` with :math:`|\Sigma|` being the alphabet
length.
"""
p_H0 = 1. / alphabet_len
pw_H0 = p_H0 ** wordlen
mu_H0 = area * pw_H0
sd_H0 = np.sqrt(area * (
(1 - pw_H0) * (pw_H0 + 2 * p_H0 * pw_H0 / (1 - p_H0)) -
2 * wordlen * pw_H0 ** 2
))
return mu_H0, sd_H0
[docs]def H1_moments(alphabet_len, wordlen, area, seglen, p_match):
"""The mean and standrad deviation of the limiting normal distribution under
the :math:`H_1` (related) model given by:
.. math::
\\begin{aligned}
\mu_1 & = \mu_0 + Kp^w \\\\
\sigma_1^2 & = \sigma_0^2
+ 2K\\left[(1 - p^w)\left(p^w + \\frac{2p^{w+1}}{1 - p}\\right)
- 2wp^{2w}\\right]
\\end{aligned}
where :math:`w` is the word length, :math:`K` is the similarity length, and
:math:`p` is the match probability.
"""
mu_H0, sd_H0 = H0_moments(alphabet_len, wordlen, area)
p_H1 = p_match
if p_H1 == 1.:
# we can't let p_H1 == 1 because we get division by zero below
p_H1 = 1 - np.finfo(float).eps
pw_H1 = p_H1 ** wordlen
mu_H1 = mu_H0 + seglen * pw_H1
sd_H1 = np.sqrt(sd_H0 ** 2 + seglen * (
(1 - pw_H1) * (pw_H1 + 2 * p_H1 * pw_H1 / (1 - p_H1)) -
2 * wordlen * pw_H1 ** 2
))
return mu_H1, sd_H1
# FIXME the fact that we have organized our data as self.S and self.T is the
# main blocker for merging pairwise and multiple sequence implementations.
# Also involved: SeedIndexMultiple
[docs]class WordBlot(SeedIndex):
"""A similarity finder based on m-dependent CLT statistics.
Attributes:
g_max (float):
Upper bound for indel probabilities in mutation model.
sensitivity (float):
Desired sensitivity of bands.
"""
def __init__(self, S, T, g_max=None, sensitivity=None, **kw):
assert 0 < g_max < 1 and 0 < sensitivity < 1
self.g_max = g_max
self.sensitivity = sensitivity
super(WordBlot, self).__init__(S, T, **kw)
[docs] def score_num_seeds(self, **kw):
"""Calculates our key central statistics based on m-dependent CLT. For
a given observation of number of seeds in a region of interest (ROI),
calculates the z-core against the :math:`H_0` (unrelated) and
:math:`H_1` (related) models. In either case, the distribution of
number of seeds, being a sum of m-dependent identically distributed
random variables, is asymptotically normal with parameters given by
:func:`H0_moments`, :func:`H1_moments`.
Keyword Args:
num_seeds (int):
Number of observed seed in the ROI.
area (int|float):
Area of the ROI.
seglen (int):
Similar segment length in H1 model.
p_match (float):
Expected match probability at any given position.
Returns:
tuple (float): z-scores in H0 and H1 models
"""
num_seeds = kw['num_seeds']
area = kw['area']
if area == 0:
return float('-inf'), float('-inf')
mu_H0, sd_H0 = H0_moments(len(self.alphabet), self.wordlen, area)
mu_H1, sd_H1 = H1_moments(len(self.alphabet), self.wordlen, area,
kw['seglen'], kw['p_match'])
z_H0 = (num_seeds - mu_H0) / sd_H0 # score under H0
z_H1 = (num_seeds - mu_H1) / sd_H1 # score under H1
return z_H0, z_H1
[docs] def band_radius(self, K):
"""Wraps :func:`band_radius` with our mutation parameters and sequence
lengths.
Args:
K (int): expected alignment length of interest.
Returns:
int: radius of band for desired :attr:`sensitivity`.
"""
return band_radius(K, self.g_max, self.sensitivity)
[docs] def segment_dims(self, d_band=None, a_band=None):
"""Calculate the edit path length :math:`K` and the area of the given
diagonal/antiodiagonal segment.
Keyword Args:
d_band (tuple): lower and upper diagonals limiting the segment.
a_band (tuple): lower and upper antidiagonal positions limiting the
segment.
Returns:
tuple: edit path length and segment area.
"""
a_min, a_max = a_band
d_min, d_max = d_band
K = (a_max - a_min) / 2
A = (d_max - d_min) * K
return K, A
[docs] def estimate_match_probability(self, num_seeds, d_band=None, a_band=None):
"""Estimate the edit path match probability given the provided observed
number of seeds :math:`n` in given sigment:
.. math::
\hat{p} = \\left(\\frac{n - Ap_\circ^w}{K}\\right)^{\\frac{1}{w}}
where :math:`n, A, p_\circ, K, w` are the number of seeds, segment
area, null probability of matching nucleotides, segment length, and the
word length, respectively.
Args:
num_seeds (int): number of seeds observed in segment.
Keywords Args:
d_band (tuple):
lower and upper diagonals limiting the segment.
a_band (tuple):
lower and upper antidiagonal positions limiting the segment.
Returns:
float: estimated match probability
"""
K, area = self.segment_dims(d_band=d_band, a_band=a_band)
word_p_null = (1./len(self.alphabet)) ** self.wordlen
# NOTE K effectively has become the projected alignment length not the
# full length. This is REALLY important in interpretation but I think
# must things are currently consistent (TODO full review needed).
word_p = (num_seeds - area * word_p_null) / K
try:
match_p = np.exp(np.log(word_p) / self.wordlen)
except Warning:
# presumably this happened because word_p was too small for log
match_p = 0
return min(match_p, 1)
[docs] @classmethod
def find_all_neighbors(cls, seeds, d_radius, a_radius):
"""For each seed finds all seeds in its neighborhood defined by:
.. math::
U_{(d, a)} = \\{(d', a'): |d - d'| < r_d, |a - a'| < r_a \\}
This is done using a Quad-Tree in :math:`O(m lg m)` time where m is the
number of seeds.
Returns:
list: tuples ``((d, a), neighs)`` where ``neighs`` is a list of
neighbor indices.
"""
# normalize the two diameters so we can use a standard L∞ neighborhood.
# typically a_diam is larger, so scale up d values proportionally
d_coeff = 1. * a_radius / d_radius
radius = a_radius
all_seeds = list(cls.to_diagonal_coordinates(i, j) for i, j in seeds)
if not all_seeds:
return []
all_seeds_scaled = np.array([(d * d_coeff, a) for d, a in all_seeds])
quad_tree = cKDTree(all_seeds_scaled)
all_neighs = quad_tree.query_ball_tree(quad_tree, radius,
p=float('inf'))
# all_neighs[i] is the indices of the neighbors of all_seeds[i]; this
# always contains the seed itself (i.e always: i in neighs[i])
for idx, _ in enumerate(all_neighs):
all_neighs[idx].remove(idx)
return zip(all_seeds, all_neighs)
[docs] def score_seeds(self, K):
"""Find the neighbors of each seed in the sense of
:func:`find_all_neighbors` and estimates the match probability of the
segment centered at the coordinates of each seed.
Args:
K (int): the similarity legnth of interest that dictates
neighborhood shapes and estimated match probabilities.
Returns:
list(dict): List of dictionaries with keys: ``seed`` (coordinates
of exactly matching kmer in diagonal coordinates), ``neighs`` (list
of indices of neighbors of this seed in the appropriate diagonal
strip), ``p`` the estimated match probability
of a segment centered at the seed.
"""
d_radius = int(np.ceil(self.band_radius(K)))
a_radius = K
seeds_with_neighs = self.find_all_neighbors(
self.seeds(exclude_trivial=True), d_radius, a_radius
)
# n is the number of seeds in neighborhood excluding the center seed,
def _p(d, a, n):
d_band = (d - d_radius, d + d_radius)
a_band = (a - a_radius, a + a_radius)
return self.estimate_match_probability(n + 1, d_band=d_band,
a_band=a_band)
return [{'seed': (d, a), 'neighs': neighs, 'p': _p(d, a, len(neighs))}
for (d, a), neighs in seeds_with_neighs]
[docs] def similar_segments(self, K_min, p_min, at_least_one=False):
"""Find all maximal local similarities of given minium length and match
probability. Additionally for each segment, the match probability is
estimated and H0/H1 scores are calculated.
Args:
K_min (int):
minimum required length of similarity.
p_min (float):
Minimum required match probability at each position.
score (bool):
Whether to score the segment against H1 by counting seeds
inside.
Yields:
dict: dictionary with keys: ``segment`` (coordinates of similar
region in diagonal coordinates ``((d_min, d_max), (a_min,
a_max))``)), ``p`` the estimated match probability, and ``score``
the H1 z-score if keyword argument ``score`` is true.
"""
self.log('finding local similarities between %s and %s' %
(self.S.content_id[:8], self.T.content_id[:8]))
d_radius = int(np.ceil(self.band_radius(K_min)))
a_radius = K_min
scored_seeds = self.score_seeds(K_min)
def _update_seg(seg, seed):
d, a = seed
if seg is None:
d_min, d_max = d - d_radius, d + d_radius
a_min, a_max = a - a_radius, a + a_radius
else:
(d_min, d_max), (a_min, a_max) = seg
d_min = min(d - d_radius, d_min)
d_max = max(d + d_radius, d_max)
a_min = min(a - a_radius, a_min)
a_max = max(a + a_radius, a_max)
return (d_min, d_max), (a_min, a_max)
avail = [rec['p'] >= p_min for rec in scored_seeds]
if not any(avail) and at_least_one:
# we're obliged to return something, let the highest probability
# seed go through.
assert len(scored_seeds), 'no seeds found while at_least_one=True'
avail[np.argmax([rec['p'] for rec in scored_seeds])] = True
while True:
try:
seed_idx = avail.index(True)
except ValueError:
break
stack = [seed_idx]
avail[seed_idx] = False
ps_in_seg = [scored_seeds[seed_idx]['p']]
seg = None
while stack:
idx = stack.pop()
ps_in_seg.append(scored_seeds[idx]['p'])
seg = _update_seg(seg, scored_seeds[idx]['seed'])
for neigh in scored_seeds[idx]['neighs']:
if avail[neigh]:
stack.append(neigh)
avail[neigh] = False
if seg is None:
break
else:
(d_min, d_max), (a_min, a_max) = seg
d_min = min(len(self.S), max(d_min, -len(self.T)))
d_max = min(len(self.S), max(d_max, -len(self.T)))
a_min = max(a_min, 0)
a_max = min(a_max, len(self.S) + len(self.T))
seg = (d_min, d_max), (a_min, a_max)
# NOTE the following is more justifiable but it matches the
# average. TODO turn this in into an experiment to justify
# p_hat = self.estimate_match_probability(
# n, d_band=seg[0], a_band=seg[1])
p_hat = sum(ps_in_seg) / len(ps_in_seg)
res = {'segment': seg, 'p': p_hat}
n = self.seed_count(d_band=seg[0], a_band=seg[1])
K_hat, area_hat = self.segment_dims(d_band=seg[0],
a_band=seg[1])
scores = self.score_num_seeds(num_seeds=n, area=area_hat,
seglen=K_hat, p_match=p_hat)
res['scores'] = scores
yield res
[docs]class WordBlotOverlap(WordBlot):
"""A specialized version of WordBlot for detecting overlap
(suffix-prefix) similarities between sequences (e.g. in a sequencing
context)."""
[docs] def score_seeds(self):
"""For each seed finds all seeds in its neighborhood defined by:
.. math::
U_{(d, a)} = \\{(d', a'): |d - d'| < r_d(d), |a - a'| < r_a(d) \\}
in such a way that each seed's neighborhood is the entire diagonal band
containing it. Each seed recieves an estimated match probability for a
similarity on its diagonal band and a z-score with respect to the H1
model.
Returns:
list: dicts with keys ``seed`` (diagonal coordinates of seed),
``p`` (estimated match probability of overlap alignment),
``L`` (the length of an alignment through the seed's band
according to sequence lengths), ``score`` (the z-score with
respect to H1), and ``r`` (band radius at the seed's
coordinates).
"""
def _len(d):
return expected_overlap_len(
len(self.S), len(self.T), d, self.g_max
)
def _rad(d):
return np.ceil(self.band_radius(_len(d)))
all_seeds = list(self.to_diagonal_coordinates(i, j)
for i, j in self.seeds(exclude_trivial=True))
if not all_seeds:
return []
all_seeds_scaled = np.array([(d / _rad(d), ) for d, a in all_seeds])
quad_tree = cKDTree(all_seeds_scaled)
all_neighs = quad_tree.query_ball_tree(quad_tree, 1, p=float('inf'))
# all_neighs[i] is the indices of the neighbors of all_seeds[i]; this
# always contains the seed itself (i.e always: i in neighs[i])
for idx, _ in enumerate(all_neighs):
all_neighs[idx].remove(idx)
seeds_with_neighs = zip(all_seeds, all_neighs)
# n excludes the center seed itself
def _p(d, n):
L = _len(d)
d_radius = int(np.ceil(self.band_radius(L)))
area = 2 * d_radius * L
word_p_null = (1./len(self.alphabet)) ** self.wordlen
word_p = (n + 1 - area * word_p_null) / L
try:
match_p = np.exp(np.log(word_p) / self.wordlen)
except Warning:
# presumably this happened because word_p was too small for log
match_p = 0
return min(match_p, 1)
return [{'seed': (d, a), 'r': _rad(d), 'L': _len(d),
'p': _p(d, len(neighs))}
for (d, a), neighs in seeds_with_neighs]
[docs] def highest_scoring_overlap_band(self):
"""Finds the highest scoring diagonal band according to probabiliy
estimations of :func:`score_seeds`.
Returns:
dict: with same keys ``p, score, d_band`` consistent with the
output of :func:`score_seeds` for the highest scoring
diagonal band.
"""
scored_seeds = self.score_seeds()
if not scored_seeds:
return None
idx = max(range(len(scored_seeds)), key=lambda i: scored_seeds[i]['p'])
seed, rad = scored_seeds[idx]['seed'], scored_seeds[idx]['r']
p_hat, overlap_len = scored_seeds[idx]['p'], scored_seeds[idx]['L']
d_band = seed[0] - rad, seed[0] + rad
res = {'d_band': d_band, 'p': p_hat, 'len': overlap_len}
area = 2 * rad * overlap_len
mu_H1, sd_H1 = H1_moments(len(self.alphabet), self.wordlen, area,
overlap_len, p_hat)
num_seeds = self.seed_count(d_band=d_band)
z_H1 = (num_seeds - mu_H1) / sd_H1
res['score'] = z_H1
return res
[docs]class WordBlotOverlapRef(WordBlotOverlap):
"""An in-memory, SQL-free version of :class:`WordBlotOverlap` for faster
comparisons. Due to implementation details the word length is constrained
above by the available memory.
Attributes:
allowed_memory (int|float): allocatable memory in GB for kmers index.
"""
def __init__(self, ref, allowed_memory=1, **kw):
self.wordlen = kw['wordlen']
self.alphabet = kw['alphabet']
self.g_max = kw['g_max']
self.sensitivity = kw['sensitivity']
self.log_level = kw.get('log_level', logging.INFO)
self.S = ref
num_kmers = len(self.alphabet) ** self.wordlen
mem_needed = sys.getsizeof(num_kmers) * num_kmers
mem_needed_gb = np.power(2, np.log2(mem_needed) - 30)
assert allowed_memory > 0, 'allowed memory must be positive'
self.allowed_memory = allowed_memory
if mem_needed_gb > self.allowed_memory:
msg = 'not enough memory (max = %.2f GB) ' % allowed_memory
msg += 'to store %d-mers ' % self.wordlen
msg += '(%.2f GB needed)' % mem_needed_gb
raise MemoryError(msg)
self.kmer_hits = [[] for _ in range(num_kmers)]
for pos, kmer in enumerate(as_kmer_seq(ref, self.wordlen)):
self.kmer_hits[kmer].append(pos)
self.T = None
relpath = 'python-object'
log_header = '%d-mer cache (%s)' % (self.wordlen, relpath)
self._logger = Logger(log_level=self.log_level, header=log_header)
self._seeds = {}
[docs] def seeds(self, exclude_trivial=True):
assert self.T is not None
if self.T.content_id not in self._seeds:
self._seeds = {self.T.content_id: []}
for pos, kmer in enumerate(as_kmer_seq(self.T, self.wordlen)):
for pos_ref in self.kmer_hits[kmer]:
if self.S == self.T and exclude_trivial and pos == pos_ref:
continue
self._seeds[self.T.content_id].append((pos_ref, pos))
return self._seeds[self.T.content_id]
[docs] def seed_count(self, d_band=None, a_band=None):
assert self.T is not None
seeds = self.seeds()
cnt = 0
if d_band:
d_min, d_max = d_band
if a_band:
a_min, a_max = a_band
for i, j in seeds:
d, a = self.to_diagonal_coordinates(i, j)
if d_band and not d_min <= d <= d_max:
continue
if a_band and not a_min <= a <= a_max:
continue
cnt += 1
return cnt
[docs] def score_seeds_(self, seq):
self.T = seq
return super(WordBlotOverlapRef, self).score_seeds()
[docs] def highest_scoring_overlap_band(self, seq):
self.T = seq
return super(WordBlotOverlapRef, self).highest_scoring_overlap_band()
[docs]class WordBlotLocalRef(WordBlot):
"""An in-memory, SQL-free version of :class:`WordBlot` for faster
comparisons. Due to implementation details the word length is constrained
above by the available memory.
Attributes:
allowed_memory (int|float): allocatable memory in GB for kmers index.
"""
def __init__(self, ref, allowed_memory=1, **kw):
self.wordlen = kw['wordlen']
self.alphabet = kw['alphabet']
self.g_max = kw['g_max']
self.sensitivity = kw['sensitivity']
self.log_level = kw.get('log_level', logging.INFO)
self.S = ref
num_kmers = len(self.alphabet) ** self.wordlen
mem_needed = sys.getsizeof(num_kmers) * num_kmers
mem_needed_gb = np.power(2, np.log2(mem_needed) - 30)
assert allowed_memory > 0, 'allowed memory must be positive'
self.allowed_memory = allowed_memory
if mem_needed_gb > self.allowed_memory:
msg = 'not enough memory (max = %.2f GB) ' % allowed_memory
msg += 'to store %d-mers ' % self.wordlen
msg += '(%.2f GB needed)' % mem_needed_gb
raise MemoryError(msg)
self.kmer_hits = [[] for _ in range(num_kmers)]
for pos, kmer in enumerate(as_kmer_seq(ref, self.wordlen)):
self.kmer_hits[kmer].append(pos)
self.T = None
relpath = 'python-object'
log_header = '%d-mer cache (%s)' % (self.wordlen, relpath)
self._logger = Logger(log_level=self.log_level, header=log_header)
self._seeds = {}
[docs] def seeds(self, exclude_trivial=True):
assert self.T is not None
if self.T.content_id not in self._seeds:
self._seeds = {self.T.content_id: []}
for pos, kmer in enumerate(as_kmer_seq(self.T, self.wordlen)):
for pos_ref in self.kmer_hits[kmer]:
if self.S == self.T and exclude_trivial and pos == pos_ref:
continue
self._seeds[self.T.content_id].append((pos_ref, pos))
return self._seeds[self.T.content_id]
[docs] def seed_count(self, d_band=None, a_band=None):
assert self.T is not None
seeds = self.seeds()
cnt = 0
if d_band:
d_min, d_max = d_band
if a_band:
a_min, a_max = a_band
for i, j in seeds:
d, a = self.to_diagonal_coordinates(i, j)
if d_band and not d_min <= d <= d_max:
continue
if a_band and not a_min <= a <= a_max:
continue
cnt += 1
return cnt
[docs] def score_seeds_(self, seq, K):
self.T = seq
return super(WordBlotLocalRef, self).score_seeds(K)
[docs] def similar_segments(self, seq, K_min, p_min, at_least_one=False):
self.T = seq
kw = {'at_least_one': at_least_one}
for res in super(WordBlotLocalRef, self).similar_segments(K_min, p_min,
**kw):
yield res
# FIXME lots of code duplication here, can it be cleaned up? (cf. note above
# WordBlot; the main issue is self.S and self.T being used everywhere there,
# and presumably lots of hidden pairwise assumptions). The good news is:
# every function implemented below should in principle work exactly as is for
# pairwise.
[docs]class WordBlotMultiple(SeedIndexMultiple):
"""A multiple sequence similarity finder based on m-dependent CLT
statistics.
Attributes:
g_max (float):
Upper bound for indel probabilities in mutation model.
sensitivity (float):
Desired sensitivity of bands.
"""
def __init__(self, *seqs, **kw):
g_max, sensitivity = kw.pop('g_max'), kw.pop('sensitivity')
assert 0 < g_max < 1 and 0 < sensitivity < 1
self.g_max = g_max
self.sensitivity = sensitivity
super(WordBlotMultiple, self).__init__(*seqs, **kw)
[docs] def band_radius(self, K):
"""Wraps :func:`band_radius` with our mutation parameters and sequence
lengths.
Args:
K (int): expected alignment length of interest.
Returns:
int: radius of band for desired :attr:`sensitivity`.
"""
return band_radius(K, self.g_max, self.sensitivity)
[docs] def wall_to_wall_distance(self, ds):
"""Wall to wall distance :math:`L` for a diagonal position :math:`d_1,
\ldots, d_{n-1}`.
The distance :math:`L` is the largest possible value of the
antidiagonal coordinate once all diagonal coordinates are fixed.
"""
raise NotImplementedError
[docs] def estimate_match_probability(self, num_seeds, K, volume):
"""Estimate the edit path match probability given the provided observed
number of seeds in given sigment.
.. math::
\hat{p} = \\left(
\\frac{n - Vp_\circ^{w(N-1)}}{K}
\\right)^{\\frac{1}{w(N-1)}}
where :math:`n, N, V, p_\circ, w` are the number of seeds, number of
sequences, segment n-d volume, null probability of matching
nucleotides, and word length, respectively.
Args:
num_seeds (int|float): number of seeds observed in segment.
K (int|float): the expected length of the alignment.
volume (int|float): the n-d volume of the region of interest.
Returns:
float: estimated match probability
"""
power = self.wordlen * (len(self.seqs) - 1)
word_p_null = (1. / len(self.alphabet)) ** power
if num_seeds > 0:
word_p = (num_seeds - volume * word_p_null) / K
try:
match_p = np.exp(np.log(word_p) / self.wordlen)
except Warning:
# presumably this happened because word_p was too small for log
match_p = 0
else:
match_p = 0
return min(match_p, 1)
[docs] def score_seeds(self, K):
"""Find the neighbors of each seed in the sense of
:func:`find_all_neighbors` and estimates the match probability of the
segment centered at the coordinates of each seed.
Args:
K (int): the similarity legnth of interest that dictates
neighborhood shapes and estimated match probabilities.
Returns:
list: list of tuples ``((d, a), neighs, p)`` where ``(d, a)``
is the diagonal coordinates of the seed, ``neighs`` is a list
of integer indices of seeds in its diagonal/antidiagonal
neighborhood, and ``p`` is the estimated match probability of
a segment of length ``K`` centered at the seed.
"""
d_radius = int(np.ceil(self.band_radius(K)))
a_radius = int(np.ceil(len(self.seqs) * K / 2.))
seeds_with_neighs = self.find_all_neighbors(d_radius, a_radius)
volume = (2 * d_radius) ** (len(self.seqs) - 1) * K
# n excludes the center seed itself
def _p(n):
return self.estimate_match_probability(n + 1, K, volume)
return [{'seed': (ds, a), 'neighs': neighs, 'p': _p(len(neighs))}
for (ds, a), neighs in seeds_with_neighs]
[docs] def find_all_neighbors(self, d_radius, a_radius):
"""For each seed finds all seeds in its neighborhood defined by:
.. math::
U_{(d_1,\ldots,d_{n-1}, a)} =
\\{(d_1', \ldots, d_{n-1}', a'):
|d_k - d_k'| < r_d, |a - a'| < r_a \\}
This is done using a kD-Tree in :math:`O(nm lg m)` time where m is the
number of seeds and n is the number of sequences.
Returns:
list: tuples ``((ds, a), neighs)`` where ``neighs`` is a list of
neighbor indices.
"""
self.log('finding all neighbors using a kD-tree')
# normalize the two diameters so we can use a standard L∞ neighborhood.
# typically a_diam is larger, so scale up d values proportionally
d_coeff = 1. * a_radius / d_radius
radius = a_radius
all_seeds = list(self.seeds())
if not all_seeds:
return []
all_seeds_scaled = np.array([[d * d_coeff for d in ds] + [a]
for ds, a in all_seeds])
quad_tree = cKDTree(all_seeds_scaled)
all_neighs = quad_tree.query_ball_tree(quad_tree, radius,
p=float('inf'))
# all_neighs[i] is the indices of the neighbors of all_seeds[i]; this
# always contains the seed itself (i.e always: i in neighs[i])
for idx, _ in enumerate(all_neighs):
all_neighs[idx].remove(idx)
self.log('found all neighbors')
return zip(all_seeds, all_neighs)
[docs] def score_num_seeds(self, num_seeds, **kw):
"""The exrtension of :func:`WordBlot.score_num_seeds` to multiple
sequence comparisons.
Keyword Args:
num_seeds (int):
Number of observed seed in the ROI.
volume (int|float):
Area of the ROI.
seglen (int):
Similar segment length in H1 model.
p_match (float):
Expected match probability at any given position.
Returns:
tuple (float): z-scores in H0 and H1 models
"""
p_match = kw['p_match']
seglen = kw['seglen']
volume = kw['volume']
if volume == 0:
return float('-inf'), float('-inf')
if p_match == 1.:
# we can't let p_H1 == 1 because we get division by zero below
p_match = 1 - np.finfo(float).eps
p_H0 = (1. / len(self.alphabet)) ** (len(self.seqs) - 1)
pw_H0 = p_H0 ** self.wordlen
mu_H0 = volume * pw_H0
sd_H0 = np.sqrt(volume * (
(1 - pw_H0) * (pw_H0 + 2 * p_H0 * pw_H0 / (1 - p_H0)) -
2 * self.wordlen * pw_H0 ** 2
))
p_H1 = p_match
pw_H1 = p_H1 ** self.wordlen
mu_H1 = mu_H0 + seglen * pw_H1
sd_H1 = np.sqrt(sd_H0 ** 2 + seglen * (
(1 - pw_H1) * (pw_H1 + 2 * p_H1 * pw_H1 / (1 - p_H1)) -
2 * self.wordlen * pw_H1 ** 2
))
z_H0 = (num_seeds - mu_H0) / sd_H0 # score under H0
z_H1 = (num_seeds - mu_H1) / sd_H1 # score under H1
return z_H0, z_H1
[docs] def similar_segments(self, K_min, p_min, at_least_one=False):
"""Find all maximal local similarities of given minium length and match
probability. Additionally for each segment, the match probability is
estimated and H0/H1 scores are calculated.
Args:
K_min (int):
minimum required length of similarity.
p_min (float):
Minimum required match probability at each position.
score (bool):
Whether to score the segment against H1 by counting seeds
inside.
Yields:
dict: dictionary with keys: ``segment`` (coordinates of similar
region in diagonal coordinates, ``p`` the estimated match
probability, and ``score`` the H1 z-score if keyword argument
``score`` is true.
"""
d_radius = int(np.ceil(self.band_radius(K_min)))
a_radius = int(np.ceil(len(self.seqs) * K_min / 2.))
scored_seeds = self.score_seeds(K_min)
self.log('finding local similarities between %d sequences' %
len(self.seqs))
def _update_seg(seg, seed):
ds, a = seed
if seg is None:
d_ranges = [None] * (len(self.seqs) - 1)
for i in range(len(self.seqs) - 1):
d_ranges[i] = ds[i] - d_radius, ds[i] + d_radius
a_range = a - a_radius, a + a_radius
else:
d_ranges, a_range = seg
assert all(len(r) == 2 for r in d_ranges) # pairs of min, max
for i in range(len(self.seqs) - 1):
d_min, d_max = d_ranges[i]
d_ranges[i] = (min(ds[i], d_min),
max(ds[i], d_max))
a_min, a_max = seg[-1]
a_range = (min(a - a_radius, a_min), max(a + a_radius, a_max))
return d_ranges, a_range
avail = [rec['p'] >= p_min for rec in scored_seeds]
if not any(avail) and at_least_one:
# we're obliged to return something, let the highest probability
# seed go through.
assert len(scored_seeds), 'no seeds found while at_least_one=True'
avail[np.argmax([rec['p'] for rec in scored_seeds])] = True
while True:
try:
# TODO grab the highest probability so we yield in decreasing
# order of quality
seed_idx = avail.index(True)
except ValueError:
break
stack = [seed_idx]
avail[seed_idx] = False
ps_in_seg = [scored_seeds[seed_idx]['p']]
seg = None
while stack:
idx = stack.pop()
ps_in_seg.append(scored_seeds[idx]['p'])
seg = _update_seg(seg, scored_seeds[idx]['seed'])
for neigh in scored_seeds[idx]['neighs']:
if avail[neigh]:
stack.append(neigh)
avail[neigh] = False
if seg is None:
break
else:
# clip overflowing values so translating back to standard
# coordinates gives meaningful numbers.
ds_range, a_range = seg
for idx in range(len(self.seqs) - 1):
d_min, d_max = ds_range[idx]
d_min = min(
len(self.seqs[0]),
max(d_min, -len(self.seqs[idx + 1]))
)
d_max = min(
len(self.seqs[0]),
max(d_max, -len(self.seqs[idx + 1]))
)
ds_range[idx] = d_min, d_max
a_min, a_max = a_range
a_min = max(a_min, 0)
a_max = min(a_max, sum(len(seq) for seq in self.seqs))
seg = ds_range, (a_min, a_max)
# NOTE the following is more justifiable but it matches the
# average. TODO turn this in into an experiment to justify
# p_hat = self.estimate_match_probability(
# n, d_band=seg[0], a_band=seg[1])
p_hat = sum(ps_in_seg) / len(ps_in_seg)
res = {'segment': seg, 'p': p_hat}
ds_band, a_band = seg
n = self.seed_count(ds_band=ds_band, a_band=a_band)
K_hat = np.ceil((a_band[1] - a_band[0]) / len(self.seqs))
volume = a_band[1] - a_band[0]
for d_band in ds_band:
volume *= d_band[1] - d_band[0]
scores = self.score_num_seeds(num_seeds=n, volume=volume,
seglen=K_hat, p_match=p_hat)
res['scores'] = scores
yield res
[docs]class WordBlotMultipleFast(WordBlotMultiple):
"""An in-memory, SQL-free version of :class:`WordBlotOverlapMultiple` for
faster comparisons. Due to implementation details the word length is
constrained above by the available memory.
Attributes:
allowed_memory (int|float): allocatable memory in GB for kmers index,
default is 1.
"""
def __init__(self, *seqs, **kw):
name = '_'.join(S.content_id[:8] for S in seqs)
self.name = name
self.wordlen = kw['wordlen']
self.alphabet = kw['alphabet']
self.g_max = kw['g_max']
self.sensitivity = kw['sensitivity']
self.log_level = kw.get('log_level', logging.INFO)
self.seqs = seqs
self.allowed_memory = kw.get('allowed_memory', 1)
assert self.allowed_memory > 0, 'allowed memory must be positive'
num_kmers = len(self.alphabet) ** self.wordlen
mem_needed = sys.getsizeof(num_kmers) * num_kmers
mem_needed_gb = np.power(2, np.log2(mem_needed) - 30)
if mem_needed_gb > self.allowed_memory:
msg = 'not enough memory (max = %.2f GB) ' % self.allowed_memory
msg += 'to store %d-mers ' % self.wordlen
msg += '(%.2f GB needed)' % mem_needed_gb
raise MemoryError(msg)
self.kmer_hits = [[] for _ in range(num_kmers)]
for idx, seq in enumerate(self.seqs):
for pos, kmer in enumerate(as_kmer_seq(seq, self.wordlen)):
self.kmer_hits[kmer].append((idx, pos))
log_header = '%d-mer word-blot (python-object)' % self.wordlen
self._logger = Logger(log_level=self.log_level, header=log_header)
[docs] def seeds(self):
for hits in self.kmer_hits:
hits = {seqid: [c[1] for c in seq_hits]
for seqid, seq_hits in groupby(hits,
key=lambda c: c[0])}
# only consider kmers present in all sequences
if len(hits) < len(self.seqs):
continue
for idxs in product(*hits.values()):
ds, a = self.to_diagonal_coordinates(*idxs)
yield list(ds), a
[docs] def seed_count(self, ds_band=None, a_band=None):
cnt = 0
for ds, a in self.seeds():
if ds_band:
if not all(ds_band[i][0] <= d <= ds_band[i][1]
for i, d in enumerate(ds)):
continue
if a_band and not a_band[0] <= a <= a_band[1]:
continue
cnt += 1
return cnt