Source code for biseqt.stochastics

# -*- coding: utf-8 -*-
"""
.. wikisection:: overview
    :title: (2) Random Processes

    The :mod:`biseqt.stochastics` module provides a collection of tools for
    statistical calculations and for simulating random processes.

    >>> from biseqt.sequence import Alphabet
    >>> from biseqt.stochastics import rand_seq, MutationProcess
    >>> A = Alphabet('ACGT')
    >>> seq = rand_seq(A, 10)
    >>> print(seq)
    CTTGGAAAAA

    >>> M = MutationProcess(A, go_prob=.1, ge_prob=.2, subst_probs=.3)
    >>> mutant, transcript = M.mutate(seq)
    >>> print transcript
    SMIMMMMSSMS
"""
from math import log
from itertools import product
import numpy as np

from .sequence import Sequence, Alphabet


[docs]def rand_seq(alphabet, size, p=None): """Generate a random :class:`Sequence` of the given length from the given :class:`Alphabet`. Keyword Args: alphabet (Alphabet) size (int): The length of the randomly generated sequence. p (list): The discrete probability distribution for letters of alphabet to appear at each position; default is None in which case letters are chosen uniformly. """ assert isinstance(alphabet, Alphabet) contents = np.random.choice(range(len(alphabet)), size=int(size), p=p) return Sequence(alphabet, contents)
[docs]def rand_read(seq, len_mean=None, len_sd=1, expected_coverage=None, num=None): """Generates a random collection of lossless reads from the given sequence. Each read is generated by taking a substring with Gaussian length and uniformly chosen starting point. The number of reads is either determined as an argument or calculated based on the expected coverage: .. math:: num = \\frac{\\mathbb{E}[coverage]\\times |seq|}{\\mathbb{E}[len]} Args: seq (sequence.Sequence): The original sequence to be sampled. Keyword Args: num (int): Number of reads to generate; default is None in which case ``expected_coverage`` takes precedence. If both are None a single read is returned. expected_coverage (float): The expected number of times each letter in the sequence ought to appear in the entire read collection; default is None in which case ``num`` takes precedence. If both are None a single read is returned. len_mean (float): The mean of the normal distribution of read lengths. Default is 100. len_sd (float): The standard deviation of the normal distribution or read lengths. Default is 1. Yields: tuple: The read :class:`Sequence` and its starting position ``int``. """ assert len_mean < len(seq), \ 'Expected read length must be smaller than the sequence length' assert num is None or expected_coverage is None, \ 'At most one of expected_coverage or num can be specified' if num is None: if expected_coverage is None: num = 1 else: num = int(1. * len(seq) * expected_coverage / len_mean) for length in np.random.normal(loc=len_mean, scale=len_sd, size=num): # force a minimum read length of 1 and max |S| - 1 length = max(1, min(len(seq) - 1, int(length))) start = np.random.choice(len(seq) - length) read = seq[start:start + length] yield read, start
[docs]class MutationProcess(object): """ Attributes: alphabet (Alphabet): The :class:`sequence.Alphabet` this mutation process operates on. subst_probs (list): The probability matrix for the distribution of substitutions such that ``subst_probs[i][j]`` is the probability of the i-th letter being replaced by the j-th letter at a non-indel edit operation. Alternatively, a single number can be provided in which case it is treated as the probability of *any* substitution to occur and all letters and all substitutions are considered equally. For instance, if the single number 0.2 is given and the alphabet has 3 letters the probability matrix will be: .. math:: \\begin{pmatrix} 0.8 & 0.1 & 0.1 \\\\ 0.1 & 0.8 & 0.1 \\\\ 0.1 & 0.1 & 0.8 \\end{pmatrix} go_prob (float): probability of a single indel following a subtitution, a match, or an indel of a different kind (i.e from insertion to deletion); default is 0 (cf. :attr:`ge_prob`). ge_prob (float): The probability of an open gap to be extended by an indel of the same kind; default is 0. For consistency, it is always required that the gap-extend probability to be at least as large as the gap-open probability with equality implying a linear model which translates to a linear gap penalty scheme in :func:`log_odds_scores`. insert_dist (list): the probability distribution for inserted content; default is None which is taken to mean uniform. """ def __init__(self, alphabet, subst_probs=None, ge_prob=0, go_prob=0, insert_dist=None): assert isinstance(alphabet, Alphabet) self.alphabet = alphabet if not isinstance(subst_probs, list): L = len(self.alphabet) assert subst_probs < 1 and subst_probs >= 0 any_subst = float(subst_probs) each_subst = any_subst / (L - 1) match = 1 - any_subst self.subst_probs = [[match if i == j else each_subst for j in range(L)] for i in range(L)] assert go_prob < 1 and ge_prob < 1 and go_prob >= 0 and ge_prob >= 0 assert go_prob <= ge_prob, 'Gap-open probability cannot be larger ' + \ 'than gap-extend probability' self.go_prob, self.ge_prob = go_prob, ge_prob # let insert_dist remain None; np.random.choice treats it as uniform. self.insert_dist = insert_dist
[docs] def mutate(self, seq): """Returns a mutant of the given sequence by copying it while at each position: - either, the current letter is replaced by a random letter according to the distribution dictated by :attr:`subst_probs`. This could be a match or a substitution. - or, a gap is opened (with probability :attr:`go_prob`) the length of which follows a geometric distribution with parameter :attr:`ge_prob`. Accordingly, a transcript is generated which corresponds to the performed edit sequence. Args: seq (Sequence): The original sequence. Returns: tuple: The mutant :class:`Sequence <biseqt.sequence.Sequence>` and the corresponding edit transcript (as a string). """ L = len(self.alphabet) pos = 0 T = [] op, opseq = '', '' # np.random.choice defaults to uniform if p == None. def rand_let(p): return np.random.choice(L, p=p) def coin_toss(p=0.5): return np.random.choice([1, 0], p=[p, 1 - p]) while pos < len(seq): if op and op in 'ID': # previous op was an indel, decide whether to extend it: if coin_toss(self.ge_prob): if op == 'I': T.append(rand_let(self.insert_dist)) else: pos = pos + 1 else: op = '' # force the gap to end else: # previous op is not an indel, decide whether to open a gap: if coin_toss(self.go_prob): # It's an insertion or a deletion with equal chance: if coin_toss(): op = 'D' pos += 1 else: op = 'I' T.append(rand_let(self.insert_dist)) else: copy = rand_let(self.subst_probs[seq[pos]]) T.append(copy) op = 'M' if copy == seq[pos] else 'S' pos += 1 opseq += op return Sequence(self.alphabet, T), opseq
[docs] def noisy_read(self, seq, **kw): """Wraps :func:`rand_read` to generates a collection of lossy reads from the given sequence. First, lossless reads are generated by :func:`rand_read` (all keyword arguments are passed as is), then each read is modified by this mutation process, as in :func:`mutate`. Args: seq (sequence.Sequence): The original sequence to be sampled. Keyword Args: num (int): Number of reads to generate; if not given ``expected_coverage`` must be specified. expected_coverage (float): The expected number of times each letter in the sequence ought to appear in the entire read collection; if not given ``num`` must be specified. len_mean (float): The mean of the normal distribution of read lengths. len_sd (float): The standard deviation of the normal distribution or read lengths; default is 1. Yields: tuple: The noisy read :class:`Sequence <biseqt.sequence.Sequence>`, the starting position ``int`` of the original lossless read, and the corresponding :class:`EditTranscript <biseqt.sequence.EditTranscript>`. """ for read, start in rand_read(seq, **kw): read, tx = self.mutate(read) yield read, start, tx
[docs] def log_odds_scores(self, null_hypothesis=None): """Converts the mutation probabilities (substitution and gap) to log-odds scores usable for sequence alignment (cf. :class:`AlignScores`). Keyword Args: null_hypothesis(list): The probability distribution of letters to use as the null hypothesis; default is None which is taken to mean uniform distribution. Returns: tuple: The substitution score matrix (a list of length equal to the alphabet containing lists each of the same length containing the substitution scores) and the gap scores as a tuple of gap-open and gap-extend scores. .. rubric:: Substitution scores The scores are natural logs of odds ratios, namely the substitution score of letter :math:`a_i` to letter :math:`a_j` is: .. math:: S(a_i\\rightarrow a_j) = \log[(1-g)\Pr(a_j|a_i)] - \log[\Pr(a_j)] where :math:`g` is the gap-extend probability (see below), :math:`\Pr(a_j|a_i)` is the :attr:`substution probability <subst_probs>`, and :math:`\Pr(a_j)` is the null hypothesis distribution. .. rubric:: Gap scores Gap :attr:`open <go_prob>` and :attr:`extend <ge_prob>` probabilities are converted to an affine gap penalty scheme (but reported values are "scores" not penalties, i.e they are negative). The probabilitstic model is as described in :func:`MutationProcess` (also cf. :func:`mutate`). Accordingly, given the gap-open and gap-extend probabilities :math:`g_o, g_e`, the score of a gap of length :math:`n \\ge 1` is .. math:: \\log g_o + (n-1)\\log g_e = \\log\\left( \\frac{g_o}{g_e} \\right) + n\\log g_e where the first term in the RHS is the reported gap-score and the coefficient of :math:`n` in the second term is the reported gap-extend score. Note: * In the calculation of substitution scores the gap-open probability is ignored and a linear model, with its sole parameter equal to the gap-extend probability, is assumed. This is because under an affine model where the probability of opening a gap differs from that of extending a gap, the substitution probabilities also become context-dependent (see where :math:`g` appears in the formula above) which is not supported by ``pwlib``. * The above formula for gap scores is the technical reason why we always demand that :math:`g_e\\ge g_o` so that the gap score is not positive, with equality implying linear gap penalties (i.e the gap-open *score* is 0 when the gap-open and gap-extend *probabilities* are identical). """ if null_hypothesis is None: null_hypothesis = [1./len(self.alphabet)] * len(self.alphabet) err = 'Zero probabilities are not allowed for score calculation' assert all(x > 0 and x < 1 for x in null_hypothesis), err assert all(x > 0 and x < 1 for y in self.subst_probs for x in y), err assert self.ge_prob > 0 and self.go_prob > 0, err subst_scores = [[0] * len(self.alphabet) for _ in self.alphabet] for i, j in product(range(len(self.alphabet)), repeat=2): subst_scores[i][j] = log(1 - self.ge_prob) + \ log(self.subst_probs[i][j]) - log(null_hypothesis[j]) gap_scores = log(self.go_prob) - log(self.ge_prob), log(self.ge_prob) return subst_scores, gap_scores