# -*- coding: utf-8 -*-
"""
.. wikisection:: overview
:title: (2) Random Processes
The :mod:`biseqt.stochastics` module provides a collection of tools for
statistical calculations and for simulating random processes.
>>> from biseqt.sequence import Alphabet
>>> from biseqt.stochastics import rand_seq, MutationProcess
>>> A = Alphabet('ACGT')
>>> seq = rand_seq(A, 10)
>>> print(seq)
CTTGGAAAAA
>>> M = MutationProcess(A, go_prob=.1, ge_prob=.2, subst_probs=.3)
>>> mutant, transcript = M.mutate(seq)
>>> print transcript
SMIMMMMSSMS
"""
from math import log
from itertools import product
import numpy as np
from .sequence import Sequence, Alphabet
[docs]def rand_seq(alphabet, size, p=None):
"""Generate a random :class:`Sequence` of the given length from the
given :class:`Alphabet`.
Keyword Args:
alphabet (Alphabet)
size (int): The length of the randomly generated sequence.
p (list): The discrete probability distribution for letters of
alphabet to appear at each position; default is None in which
case letters are chosen uniformly.
"""
assert isinstance(alphabet, Alphabet)
contents = np.random.choice(range(len(alphabet)), size=int(size), p=p)
return Sequence(alphabet, contents)
[docs]def rand_read(seq, len_mean=None, len_sd=1, expected_coverage=None, num=None):
"""Generates a random collection of lossless reads from the given sequence.
Each read is generated by taking a substring with Gaussian length and
uniformly chosen starting point. The number of reads is either determined
as an argument or calculated based on the expected coverage:
.. math::
num = \\frac{\\mathbb{E}[coverage]\\times |seq|}{\\mathbb{E}[len]}
Args:
seq (sequence.Sequence): The original sequence to be sampled.
Keyword Args:
num (int): Number of reads to generate; default is None in which case
``expected_coverage`` takes precedence. If both are None a single
read is returned.
expected_coverage (float): The expected number of times each letter in
the sequence ought to appear in the entire read collection; default
is None in which case ``num`` takes precedence. If both are None
a single read is returned.
len_mean (float): The mean of the normal distribution of
read lengths. Default is 100.
len_sd (float): The standard deviation of the normal
distribution or read lengths. Default is 1.
Yields:
tuple:
The read :class:`Sequence` and its starting position ``int``.
"""
assert len_mean < len(seq), \
'Expected read length must be smaller than the sequence length'
assert num is None or expected_coverage is None, \
'At most one of expected_coverage or num can be specified'
if num is None:
if expected_coverage is None:
num = 1
else:
num = int(1. * len(seq) * expected_coverage / len_mean)
for length in np.random.normal(loc=len_mean, scale=len_sd, size=num):
# force a minimum read length of 1 and max |S| - 1
length = max(1, min(len(seq) - 1, int(length)))
start = np.random.choice(len(seq) - length)
read = seq[start:start + length]
yield read, start
[docs]class MutationProcess(object):
"""
Attributes:
alphabet (Alphabet): The :class:`sequence.Alphabet` this mutation
process operates on.
subst_probs (list): The probability matrix for the
distribution of substitutions such that ``subst_probs[i][j]`` is
the probability of the i-th letter being replaced by the j-th
letter at a non-indel edit operation. Alternatively, a single
number can be provided in which case it is treated as the
probability of *any* substitution to occur and all letters and all
substitutions are considered equally. For instance, if the single
number 0.2 is given and the alphabet has 3 letters the probability
matrix will be:
.. math::
\\begin{pmatrix} 0.8 & 0.1 & 0.1 \\\\ 0.1 & 0.8 & 0.1 \\\\
0.1 & 0.1 & 0.8 \\end{pmatrix}
go_prob (float): probability of a single indel following a subtitution,
a match, or an indel of a different kind (i.e from insertion to
deletion); default is 0 (cf. :attr:`ge_prob`).
ge_prob (float): The probability of an open gap to be extended by an
indel of the same kind; default is 0. For consistency, it is always
required that the gap-extend probability to be at least as large as
the gap-open probability with equality implying a linear model
which translates to a linear gap penalty scheme in
:func:`log_odds_scores`.
insert_dist (list): the probability distribution for inserted
content; default is None which is taken to mean uniform.
"""
def __init__(self, alphabet, subst_probs=None, ge_prob=0, go_prob=0,
insert_dist=None):
assert isinstance(alphabet, Alphabet)
self.alphabet = alphabet
if not isinstance(subst_probs, list):
L = len(self.alphabet)
assert subst_probs < 1 and subst_probs >= 0
any_subst = float(subst_probs)
each_subst = any_subst / (L - 1)
match = 1 - any_subst
self.subst_probs = [[match if i == j else each_subst
for j in range(L)] for i in range(L)]
assert go_prob < 1 and ge_prob < 1 and go_prob >= 0 and ge_prob >= 0
assert go_prob <= ge_prob, 'Gap-open probability cannot be larger ' + \
'than gap-extend probability'
self.go_prob, self.ge_prob = go_prob, ge_prob
# let insert_dist remain None; np.random.choice treats it as uniform.
self.insert_dist = insert_dist
[docs] def mutate(self, seq):
"""Returns a mutant of the given sequence by copying it while at each
position:
- either, the current letter is replaced by a random letter according
to the distribution dictated by :attr:`subst_probs`. This could be a
match or a substitution.
- or, a gap is opened (with probability :attr:`go_prob`) the length of
which follows a geometric distribution with parameter
:attr:`ge_prob`.
Accordingly, a transcript is generated which corresponds to the
performed edit sequence.
Args:
seq (Sequence): The original sequence.
Returns:
tuple:
The mutant :class:`Sequence <biseqt.sequence.Sequence>` and the
corresponding edit transcript (as a string).
"""
L = len(self.alphabet)
pos = 0
T = []
op, opseq = '', ''
# np.random.choice defaults to uniform if p == None.
def rand_let(p): return np.random.choice(L, p=p)
def coin_toss(p=0.5): return np.random.choice([1, 0], p=[p, 1 - p])
while pos < len(seq):
if op and op in 'ID':
# previous op was an indel, decide whether to extend it:
if coin_toss(self.ge_prob):
if op == 'I':
T.append(rand_let(self.insert_dist))
else:
pos = pos + 1
else:
op = '' # force the gap to end
else:
# previous op is not an indel, decide whether to open a gap:
if coin_toss(self.go_prob):
# It's an insertion or a deletion with equal chance:
if coin_toss():
op = 'D'
pos += 1
else:
op = 'I'
T.append(rand_let(self.insert_dist))
else:
copy = rand_let(self.subst_probs[seq[pos]])
T.append(copy)
op = 'M' if copy == seq[pos] else 'S'
pos += 1
opseq += op
return Sequence(self.alphabet, T), opseq
[docs] def noisy_read(self, seq, **kw):
"""Wraps :func:`rand_read` to generates a collection of lossy reads
from the given sequence. First, lossless reads are generated by
:func:`rand_read` (all keyword arguments are passed as is), then each
read is modified by this mutation process, as in :func:`mutate`.
Args:
seq (sequence.Sequence): The original sequence to be sampled.
Keyword Args:
num (int): Number of reads to generate; if not given
``expected_coverage`` must be specified.
expected_coverage (float): The expected number of times each letter
in the sequence ought to appear in the entire read collection;
if not given ``num`` must be specified.
len_mean (float): The mean of the normal distribution of read
lengths.
len_sd (float): The standard deviation of the normal
distribution or read lengths; default is 1.
Yields:
tuple:
The noisy read :class:`Sequence <biseqt.sequence.Sequence>`,
the starting position ``int`` of the original lossless read,
and the corresponding :class:`EditTranscript
<biseqt.sequence.EditTranscript>`.
"""
for read, start in rand_read(seq, **kw):
read, tx = self.mutate(read)
yield read, start, tx
[docs] def log_odds_scores(self, null_hypothesis=None):
"""Converts the mutation probabilities (substitution and gap) to
log-odds scores usable for sequence alignment (cf.
:class:`AlignScores`).
Keyword Args:
null_hypothesis(list): The probability distribution of letters to
use as the null hypothesis; default is None which is taken to
mean uniform distribution.
Returns:
tuple:
The substitution score matrix (a list of length equal to the
alphabet containing lists each of the same length containing
the substitution scores) and the gap scores as a tuple of
gap-open and gap-extend scores.
.. rubric:: Substitution scores
The scores are natural logs of odds ratios, namely the substitution
score of letter :math:`a_i` to letter :math:`a_j` is:
.. math::
S(a_i\\rightarrow a_j) = \log[(1-g)\Pr(a_j|a_i)] - \log[\Pr(a_j)]
where :math:`g` is the gap-extend
probability (see below), :math:`\Pr(a_j|a_i)` is the :attr:`substution
probability <subst_probs>`, and :math:`\Pr(a_j)` is the null hypothesis
distribution.
.. rubric:: Gap scores
Gap :attr:`open <go_prob>` and :attr:`extend <ge_prob>` probabilities
are converted to an affine gap penalty scheme (but reported values are
"scores" not penalties, i.e they are negative). The probabilitstic
model is as described in :func:`MutationProcess` (also cf.
:func:`mutate`). Accordingly, given the gap-open and gap-extend
probabilities :math:`g_o, g_e`, the score of a gap of length :math:`n
\\ge 1` is
.. math::
\\log g_o + (n-1)\\log g_e =
\\log\\left( \\frac{g_o}{g_e} \\right) + n\\log g_e
where the first term in the RHS is the reported gap-score and the
coefficient of :math:`n` in the second term is the reported gap-extend
score.
Note:
* In the calculation of substitution scores the gap-open
probability is ignored and a linear model, with its sole
parameter equal to the gap-extend probability, is assumed. This
is because under an affine model where the probability of opening
a gap differs from that of extending a gap, the substitution
probabilities also become context-dependent (see where :math:`g`
appears in the formula above) which is not supported by
``pwlib``.
* The above formula for gap scores is the technical reason why we
always demand that :math:`g_e\\ge g_o` so that the gap score is
not positive, with equality implying linear gap penalties (i.e
the gap-open *score* is 0 when the gap-open and gap-extend
*probabilities* are identical).
"""
if null_hypothesis is None:
null_hypothesis = [1./len(self.alphabet)] * len(self.alphabet)
err = 'Zero probabilities are not allowed for score calculation'
assert all(x > 0 and x < 1 for x in null_hypothesis), err
assert all(x > 0 and x < 1 for y in self.subst_probs for x in y), err
assert self.ge_prob > 0 and self.go_prob > 0, err
subst_scores = [[0] * len(self.alphabet) for _ in self.alphabet]
for i, j in product(range(len(self.alphabet)), repeat=2):
subst_scores[i][j] = log(1 - self.ge_prob) + \
log(self.subst_probs[i][j]) - log(null_hypothesis[j])
gap_scores = log(self.go_prob) - log(self.ge_prob), log(self.ge_prob)
return subst_scores, gap_scores