# -*- coding: utf-8 -*-
"""
.. wikisection:: overview
:title: (3) Pairwise Alignment
The :mod:`pw` module exposes the pairwise sequence alignment algorithms
implemented in C in `pwlib <biseqt.pwlib.html>`_.
>>> from biseqt.sequence import Alphabet, Sequence
>>> from biseqt.pw import Aligner
>>> A = Alphabet('ACGT')
>>> S = A.parse('AAACGCGT')
>>> T = A.parse('AACGCCTT')
>>> with Aligner(S, T) as aligner:
... score = aligner.solve()
... aln = aligner.traceback()
>>> print type(aln)
<class 'biseqt.pw.Alignment'>
>>> print aln
origin[0]: AAACGC-GT-
mutant[0]: AA-CGCC-TT
>>> print aln.render_term() # same as above but with colored output
"""
import os
import termcolor
import re
from cffi import FFI
from collections import namedtuple
from .sequence import Sequence
lib = None
"""The loaded shared object for ``pwlib``. All functions defined in the header
file are accessible through this object. This object is automatically populated
upon loading this module (cf. :func:`setup_ffi`) and users never have to
manipulate it."""
ffi = None
"""The main FFI instance used throughout this module. This object is
automatically populated upon loading this module (cf. :func:`setup_ffi`) and
users never have to manipulate it."""
[docs]def setup_ffi():
"""Instantiates an FFI object as :attr:`ffi` and loads the shared object
for pwlib into :attr:`lib`. This function is automatically called when
this module loads.
Note:
CFFI has issues with loading macros as they are defined in a header
file. For this reason, and since we don't use the macros in python code
any line that begins with ``#define`` is ignored from the header file.
This means multiline macros will not work.
"""
pwlib_so = os.path.join(os.path.dirname(__file__), 'pwlib', 'pwlib.so')
pwlib_h = os.path.join(os.path.dirname(__file__), 'pwlib', 'pwlib.h')
global ffi, lib
ffi = FFI()
lib = ffi.dlopen(pwlib_so)
with open(pwlib_h) as f:
# ignore macro definitions in header file; we don't use them in python
# and cffi is unhappy about parsing them:
headers = '\n'.join(line for line in f.read().split('\n')
if not line.startswith('#define'))
ffi.cdef(headers)
setup_ffi()
# alignment modes
STD_MODE = lib.STD_MODE
"""Standard alignment type; time and memory complexity is quadratic in sequence
lengths."""
BANDED_MODE = lib.BANDED_MODE
"""Banded alignment type; time and memory complexity is linear in sequence
lengths with a constant proportional to band width. This mode is incompatible
with local alignments."""
# standard alignment types:
GLOBAL = lib.GLOBAL
"""Standard global alignment problem, i.e Needleman-Wunsch."""
LOCAL = lib.LOCAL
"""Standard local alignment problem, i.e Smith-Waterman."""
START_ANCHORED = lib.START_ANCHORED
"""Standard local alignment demanding that it begins at the start of frame of
both sequences."""
END_ANCHORED = lib.END_ANCHORED
"""Standard local alignment demanding that it ends at the end of frame of both
sequences."""
OVERLAP = lib.OVERLAP
"""Standard suffix-prefix alignment in any direction; this includes alignments
where a prefix of either sequence matches a suffix of the other and alignments
where one sequence is a substring of the other."""
START_ANCHORED_OVERLAP = lib.START_ANCHORED_OVERLAP
"""Standard suffix-prefix alignment demanding that it begins at the start of
frame of both sequences."""
END_ANCHORED_OVERLAP = lib.END_ANCHORED_OVERLAP
"""Standard suffix-prefix alignment demanding that it ends at the end of frame
of both sequences."""
# banded alignment types:
B_GLOBAL = lib.B_GLOBAL
"""Banded global alignment problem; may not be well-defined (end points of the
table may not lie in band)."""
B_OVERLAP = lib.B_OVERLAP
"""Banded suffix-prefix alignment problem in either direction including
substring alignments."""
B_LOCAL = lib.B_LOCAL
"""Banded local alignment problem."""
ALN_TYPES = {
STD_MODE: [GLOBAL, LOCAL, START_ANCHORED, END_ANCHORED, OVERLAP,
START_ANCHORED_OVERLAP, END_ANCHORED_OVERLAP],
BANDED_MODE: [B_GLOBAL, B_OVERLAP, B_LOCAL],
}
[docs]class Aligner(object):
"""Provides a context that solves a pairwise alignment problem. Memory is
allocated upon entering the context and is freed upon leaving it. All
alignment calculations (:func:`solve` and :func:`traceback`) are explicitly
invoked by the caller.
Args:
origin (sequence.Sequence): The original ("from") sequence.
mutant (sequence.Sequence): The mutant ("to") sequence.
Keyword Args:
origin_range (tuple): The original ("from") sequence; cf.
:c:member:`alnframe::origin_range`.
mutant_range (tuple): The mutant ("to") sequence; cf.
:c:member:`alnframe::mutant_range`.
alnmode (int): One of the :attr:`STD_MODE` or :attr:`BANDED_MODE`,
default is ``STD_MODE``; cf. :c:member:`alnprob::mode`.
alntype (int): One of the allowed alingment types for the given
*alnmode*, see :attr:`ALN_TYPES`; default is ``GLOBAL``; cf.
:c:type:`std_alnparams` and :c:type:`banded_alnparams`.
subst_scores (list): The overriding definition of the substitution
score matrix; cf. :c:member:`alnscores::subst_scores`. Default is
None in which case the score matrix is populated based on match and
mismatch scores.
match_score (float): If ``subst_scores`` is not given, this parameter
is used to populate the diagonal entries of the substitution score
matrix; default is 1.
mismatch_score (float): If ``subst_scores`` is not given, this
parameter is used to populate the off-diagonal entries of the
substitution score matrix; default is 0.
go_score (float): The gap open score; cf.
:c:member:`alnscores::gap_open_score`. Default is 0.
ge_score (float): The gap extend score; cf.
:c:member:`alnscores::gap_extend_score`. Default is 0.
max_new_mins (int): Maximum number of tolerated new minima encountered
in the running score of an alignment; cf.
:c:member:`alnprob::max_new_mins`. Default is -1 in which case
no such constraint is imposed.
diag_range (tuple): If in :attr:`BANDED_MODE` this argument specifies
the upper and lower limit on diagonals of the dynamic programming
table to be populated; cf. :c:type:`banded_alnparams`.
min_score (float): The minimum required score for an alignment to be
reported; default is ``float("-inf")`` in which case all alignments
are reported.
"""
def __init__(self, origin, mutant, **kw):
self.min_score = kw.get('min_score', float('-inf'))
self.alnmode = kw.get('alnmode', STD_MODE)
self.alntype = kw.get('alntype', GLOBAL)
assert self.alnmode in [STD_MODE, BANDED_MODE]
assert self.alntype in ALN_TYPES[self.alnmode]
# set origin, mutant, and alphabet
assert isinstance(origin, Sequence) and isinstance(mutant, Sequence)
assert origin.alphabet == mutant.alphabet
self.origin, self.mutant = origin, mutant
self.alphabet = origin.alphabet
# set origin_range and mutant_range
origin_range = kw.get('origin_range', (0, len(self.origin)))
mutant_range = kw.get('mutant_range', (0, len(self.mutant)))
assert 0 <= origin_range[0] <= origin_range[1] <= len(self.origin)
assert 0 <= mutant_range[0] <= mutant_range[1] <= len(self.mutant)
self.origin_range, self.mutant_range = origin_range, mutant_range
# set alignment scores
self.go_score = kw.get('go_score', 0)
self.ge_score = kw.get('ge_score', 0)
L = len(self.alphabet)
subst_scores = kw.get('subst_scores', None)
if subst_scores is None:
mismatch = kw.get('mismatch_score', 0)
match = kw.get('match_score', 1)
subst_scores = [
[match if i == j else mismatch for i in range(L)]
for j in range(L)
]
assert isinstance(subst_scores, list) and len(subst_scores) == L
self.subst_scores = subst_scores
self.max_new_mins = kw.get('max_new_mins', -1)
self.diag_range = kw.get('diag_range', None)
# create all the C data structures
self.c_subst_scores_rows = [ffi.new('double[]', self.subst_scores[i])
for i in range(L)]
self.c_subst_scores = ffi.new('double *[]', self.c_subst_scores_rows)
self.c_alnscores = ffi.new('alnscores*', {
'subst_scores': self.c_subst_scores,
'gap_open_score': self.go_score,
'gap_extend_score': self.ge_score,
})
self.c_origin = ffi.new('int[]', self.origin.contents)
self.c_mutant = ffi.new('int[]', self.mutant.contents)
self.c_alnframe = ffi.new('alnframe*', {
'origin': self.c_origin,
'mutant': self.c_mutant,
'origin_range': self.origin_range,
'mutant_range': self.mutant_range,
})
if self.alnmode == STD_MODE:
self.c_alnparams = ffi.new('std_alnparams*',
{'type': self.alntype})
elif self.alnmode == BANDED_MODE:
self.min_diag, self.max_diag = kw['diag_range']
assert -len(mutant) <= self.min_diag <= \
self.max_diag <= len(origin)
self.c_alnparams = ffi.new('banded_alnparams*', {
'type': self.alntype,
'dmin': self.min_diag,
'dmax': self.max_diag,
})
self.c_alnprob = ffi.new('alnprob*', {
'frame': self.c_alnframe,
'scores': self.c_alnscores,
'mode': self.alnmode,
'max_new_mins': self.max_new_mins,
'std_params' if self.alnmode == STD_MODE else 'banded_params':
self.c_alnparams,
})
self.c_dptable = ffi.new('dptable*', {
'prob': self.c_alnprob,
'cells': ffi.NULL,
'num_rows': -1,
'row_lens': ffi.NULL,
})
def __enter__(self):
"""Allocates memory for the dynamic programming table and initializes
all cells."""
if lib.dptable_init(self.c_dptable) == -1:
raise Exception('Failed to initialize the DP table.')
return self
def __exit__(self, *args):
"""Frees the allocated memory for the dynamic programming table."""
lib.dptable_free(self.c_dptable)
# FIXME check whether we are in context; does it crash if we are not?
[docs] def solve(self):
"""Populates the regions of interest in the dynamic programming table
and reports the optimal score; if any. This function must be called
within the context, cf. :func:`__enter__`, :func:`__exit__`.
Returns:
score (float): The score of the optimal alignment or None if none
found.
"""
self.opt = lib.dptable_solve(self.c_dptable)
if self.opt.i == -1 or self.opt.j == -1:
self.opt = None
return None
score = self.c_dptable.cells[self.opt.i][self.opt.j].choices[0].score
if score < self.min_score:
self.opt = None
return None
return score
[docs] def table_scores(self):
"""Returns a 2D array of scores calculated by :func:`solve()`."""
if self.alnmode != STD_MODE:
raise NotImplementedError
scores = [[self.c_dptable.cells[i][j].choices[0].score
for j in range(*self.mutant_range)]
for i in range(*self.origin_range)]
return scores
[docs] def traceback(self):
"""Traces back the optimal alignment identified by :func:`solve`. This
function has to be called within the context and after :func:`solve`.
Otherwise no alignment would be found.
Returns:
Alignment: The optimal alignment or None if none found.
"""
if self.opt is None:
return None
alignment = lib.dptable_traceback(self.c_dptable, self.opt)
assert alignment != ffi.NULL
transcript = ffi.string(alignment.transcript)
if not transcript:
return None
return Alignment(self.origin, self.mutant, transcript,
score=alignment.score,
origin_start=alignment.origin_idx,
mutant_start=alignment.mutant_idx)
[docs] def calculate_score(self, alignment):
"""Scores a given alignment for :attr:`origin` and :attr:`mutant`.
Args:
alignment (Alignment): The alignment to be evaluated.
Returns:
float:
The score of the alignment based on :attr:`subst_scores`,
:attr:`go_score`, and :attr:`ge_score`.
"""
return alignment.calculate_score(self.subst_scores, self.go_score,
self.ge_score)
[docs]class Alignment(object):
"""Represents a pairwise alignment.
Attributes:
origin (sequence.Sequence): The original ("from") sequence.
mutant (sequence.Sequence): The mutant ("to") sequence.
alphabet (sequence.Alphabet): The shared alphabet of *origin* and
*mutant*.
transcript (str): The sequence of edit operations that transforms
*origin* to *mutant*. The alphabet for edit operations is ``M`` for
match, ``S`` for substitution (mismatch), and ``I`` and ``D`` for
insertion and deletion.
origin_start (int): Starting position on the original sequence; default
is 0.
mutant_start (int): Starting position on the mutant sequence; default
is 0.
score (float): The score of the alignment; default is None.
"""
def __init__(self, origin, mutant, transcript, score=None,
origin_start=0, mutant_start=0):
assert isinstance(origin, Sequence) and isinstance(mutant, Sequence)
assert origin.alphabet == mutant.alphabet
self.alphabet = origin.alphabet
assert all(c in 'MSID' for c in transcript)
assert len(transcript) > 0
origin_end = origin_start + self.projected_len(transcript, on='origin')
mutant_end = mutant_start + self.projected_len(transcript, on='mutant')
assert 0 <= origin_start and origin_end <= len(origin)
assert 0 <= mutant_start and mutant_end <= len(mutant)
self.transcript = str(transcript)
self.origin, self.mutant = origin, mutant
self.origin_start, self.mutant_start = origin_start, mutant_start
self.score = score
def __str__(self):
return self.render_term(term_width=float('+inf'), margin=0, colored=0)
def __eq__(self, other):
assert isinstance(other, Alignment)
return other.origin == self.origin and \
other.mutant == self.mutant and \
other.transcript == self.transcript and \
other.origin_start == self.origin_start and \
other.mutant_start == self.mutant_start
[docs] @classmethod
def projected_len(cls, transcript, on='origin'):
"""Calculates the projected length of a given transcript on either of
the involved sequences. For instance:
>>> biseqt.Alignment.projected_len('MSI', on='origin')
2
>>> biseqt.Alignment.projected_len('MSI', on='mutant')
3
Args:
transcript (str): A sequence of edit operations, cf.
:attr:`Alignment.transcript`.
Keyword Args:
on (str): Either of ``origin`` or ``mutant``.
Returns:
int: The projected length of the edit transcript.
"""
assert on in ['origin', 'mutant']
ops = 'MSD' if on == 'origin' else 'MSI'
return sum(int(op in ops) for op in transcript)
[docs] def calculate_score(self, subst_scores, go_score, ge_score):
"""Scores a this alignment according to given scoring scheme.
Args:
subst_scores (list): The substitution score matrix, cf.
:attr:`Aligner.subst_scores`.
go_score (float): The gap open score; cf. :attr:`Aligner.go_score`.
ge_score (float): The gap extend score; cf.
:attr:`Aligner.ge_score`.
Returns:
float:
The score of the alignment for :attr:`origin` and
:attr:`mutant` based on given scores.
"""
score = 0.
i, j = self.origin_start, self.mutant_start
def tokens():
for match in re.finditer(r'(.)\1*', self.transcript):
match = match.group(0)
yield match[0], len(match)
for op, num in tokens():
if op in 'MS':
score += sum(
subst_scores[self.origin[i + k]][self.mutant[j + k]]
for k in range(num)
)
i, j = i + num, j + num
else:
assert op in 'ID'
score += go_score + ge_score * num
if op == 'I':
j = j + num
else:
i = i + num
return score
[docs] def truncate_to_match(self):
tx_start = 0
origin_start, mutant_start = self.origin_start, self.mutant_start
tx_end = len(self.transcript) - 1
while self.transcript[tx_start] != 'M':
if self.transcript[tx_start] in 'DS':
origin_start += 1
if self.transcript[tx_start] in 'IS':
mutant_start += 1
tx_start += 1
while self.transcript[tx_end] != 'M':
tx_end -= 1
if tx_start < tx_end:
tx = self.transcript[tx_start:tx_end + 1]
return Alignment(self.origin, self.mutant, tx,
origin_start=origin_start,
mutant_start=mutant_start)
else:
return None
[docs] def render_term(self, term_width=120, margin=0, colored=True):
"""Renders a textual representation of the alignment.
Keyword Args:
term_width (int): Terminal width used for wrapping; default is 120
and the smallest valid value is 30.
margin (length): Length of leading and trailing substring to
include in original and mutant sequences; default is 20.
colored (bool): Whether or not to use ANSI color codes in output;
default is True.
Returns:
str
"""
assert term_width >= 30
assert margin >= 0
letlen = self.alphabet._letlen
Carriage = namedtuple('carriage',
['pos', 'o_idx', 'm_idx', 'o_line', 'm_line'])
term_color = {'M': 'green', 'S': 'red'}
term_on_color = {}
# In the rest: o_X and m_X mean X for origin and mutatnt, resp.
# Creates an alignment line preamble, i.e a double line for origin and
# sequence, given starting positions on each. The output is a tuple
# (pos, o_line, m_line) where pos is the position in line after the
# preamble.
def start_line(o_idx, m_idx):
kw = {
'o_idx': o_idx,
'm_idx': m_idx,
'o_line': 'origin[%d]: ' % o_idx,
'm_line': 'mutant[%d]: ' % m_idx,
}
pos = max(len(kw['o_line']), len(kw['m_line']))
assert pos <= term_width, \
'Alignment preamble does not fit in width %d' % term_width
return Carriage(pos=pos, **kw)
# returns a right adjusted double line given the two lines of an
# alignment, i.e the origin and mutant versions.
def carriage_flush(carriage):
line_len = max(len(carriage.o_line), len(carriage.m_line))
o_line = carriage.o_line.rjust(line_len)
m_line = carriage.m_line.rjust(line_len)
return '%s\n%s\n' % (o_line, m_line)
def carriage_fwd(carriage, op=None):
gap = '.' * letlen if op is None else '-' * letlen
o_contents, m_contents = gap, gap
if op is None:
if carriage.o_idx >= 0 and carriage.o_idx < len(self.origin):
o_contents = self.alphabet[self.origin[carriage.o_idx]]
if carriage.m_idx >= 0 and carriage.m_idx < len(self.mutant):
m_contents = self.alphabet[self.mutant[carriage.m_idx]]
else:
assert op in 'MSID'
if op in 'MSD':
o_contents = self.alphabet[self.origin[carriage.o_idx]]
if op in 'MSI':
m_contents = self.alphabet[self.mutant[carriage.m_idx]]
length = len(o_contents)
assert length == len(m_contents)
colors = {'color': None, 'on_color': None}
if colored and op in term_color:
colors['color'] = term_color[op]
if colored and op in term_on_color:
colors['on_color'] = term_on_color[op]
o_contents = termcolor.colored(o_contents, **colors)
m_contents = termcolor.colored(m_contents, **colors)
output = ''
if carriage.pos >= term_width:
output += carriage_flush(carriage)
carriage = start_line(carriage.o_idx, carriage.m_idx)
return output, Carriage(
pos=carriage.pos + length,
o_idx=carriage.o_idx + int(op is None or op in 'MSD'),
m_idx=carriage.m_idx + int(op is None or op in 'MSI'),
o_line=carriage.o_line + o_contents,
m_line=carriage.m_line + m_contents
)
# the arguments are the starting positions in the origin/mutant.
def pre_margin(o_idx, m_idx):
margin_len = min(margin, max(o_idx, m_idx) * letlen)
carriage = start_line(o_idx - margin_len, m_idx - margin_len)
output = ''
# the pre-margin
for i in range(margin_len):
out, carriage = carriage_fwd(carriage, op=None)
output += out
return output, carriage
# the arguments are the ending positions in the origin/mutant.
def post_margin(carriage):
output = ''
margin_len = min(margin,
max((len(self.origin) - carriage.o_idx) * letlen,
(len(self.mutant) - carriage.m_idx) * letlen))
for i in range(margin_len):
out, carriage = carriage_fwd(carriage, op=None)
output += out
return output + carriage_flush(carriage)
output, carriage = pre_margin(self.origin_start, self.mutant_start)
for op in self.transcript:
out, carriage = carriage_fwd(carriage, op=op)
output += out
output += post_margin(carriage)
# when output is not supposed to be cleared remove the spurious color
# reset ANSI escape sequence that termcolor adds:
if not colored:
output = output.replace(termcolor.RESET, '')
return output