Source code for biseqt.seeds

# -*- coding: utf-8 -*-
"""
.. wikisection:: overview
    :title: (5) Seeds (exactly matching kmers)

    The :mod:`biseqt.seeds` module provides tools for storing and analyzing
    exactly matching kmers, aka seeds.

    >>> from biseqt.seeds import SeedIndex
    >>> from biseqt.sequence import Sequence, Alphabet
    >>> A = Alphabet('ACGT')
    >>> S, T = A.parse('TAAGCGT'), A.parse('GGCGTAA')
    >>> seed_index = SeedIndex(S, T, path=':memory:', wordlen=3, alphabet=A)
    >>> list(seed_index.seeds())
    [(4, 2), (3, 1), (0, 4)]
"""
from itertools import chain, combinations
from itertools import groupby, product

from .kmers import KmerIndex, KmerDBWrapper


[docs]class SeedIndex(KmerDBWrapper): """An index for seeds in diagonal coordinates. Attributes: S (biseqt.sequence.Sequence): The 1st sequence. T (biseqt.sequence.Sequence): The 2nd sequence. cache (KmerCache): optional :class:`KmerCache` object to use for retrieving integer representations of sequences. """ def __init__(self, S, T, kmer_cache=None, **kw): name = '%s_%s' % (S.content_id[:8], T.content_id[:8]) super(SeedIndex, self).__init__(name=name, **kw) self.kmer_cache = kmer_cache self.self_comp = S == T self.S, self.T = S, T if self._table_exists(): self.log('seeds for %s and %s already indexed, skipping' % (S.content_id[:8], T.content_id[:8])) else: self.log('Indexing seeds for %s (%d) and %s (%d).' % (S.content_id[:8], len(S), T.content_id[:8], len(T))) self._index_seeds() self.log('Indexed seeds for %s (%d) and %s (%d).' % (S.content_id[:8], len(S), T.content_id[:8], len(T))) @property def seeds_table(self): """The seeds table name ``seeds_[name]``, cf. :attr:`KmerDBWrapper.name`.""" return 'seeds_' + self.name
[docs] @classmethod def to_diagonal_coordinates(cls, i, j): """Convert standard coordinates to diagonal coordinates via: .. math:: \\begin{aligned} d & = i - j \\\\ a & = i + j \\end{aligned} """ d = i - j a = i + j return d, a
[docs] @classmethod def to_ij_coordinates(cls, d, a): """Convert diagonal coordinates to standard coordinates: .. math:: \\begin{aligned} i & = \\frac{a + d}{2} \\\\ j & = \\frac{a - d}{2} \\end{aligned} """ i = (a + d) / 2 j = (a - d) / 2 return (i, j)
[docs] @classmethod def to_ij_coordinates_seg(cls, seg): """Convert a segment in diagonal coordinates to standard coordinates according to :func:`to_ij_coordinates`. Args: seg (tuple): :math:`(d_{\min}, d_{\max}), (a_{\min}, a_{\max})` Returns tuple: :math:`(i_s, i_e), (j_s, j_e)` start and end coordinates along the origin and mutant sequences (:math:`i,j` respectively). """ corners = [cls.to_ij_coordinates(d, a) for d, a in product(*seg)] i_start = min(i for i, _ in corners) j_start = min(j for _, j in corners) i_end = max(i for i, j in corners) j_end = max(j for _, j in corners) i_start, j_start = max(i_start, 0), max(j_start, 0) # end overflows cannot be fixed without sequence lenghts return (i_start, i_end), (j_start, j_end)
def _table_exists(self): with self.connection() as conn: q = """ SELECT name FROM sqlite_master WHERE type='table' AND name='%s'; """ % self.seeds_table cursor = conn.cursor() cursor.execute(q) for name in cursor: return True return False # idempotent operation def _index_seeds(self): with self.connection() as conn: conn.cursor().execute(""" CREATE TABLE %s ( 'd' INTEGER, -- zero-adjusted diagonal position 'a' INTEGER -- antidiagonal position ); """ % self.seeds_table) kmer_index_name = '%d_%s' % (self.wordlen, self.name) kmer_index = KmerIndex(path=self.path, name=kmer_index_name, wordlen=self.wordlen, alphabet=self.alphabet, log_level=self.log_level, mask=self.mask, kmer_cache=self.kmer_cache) kmer_index.index_kmers(self.S) if not self.self_comp: kmer_index.index_kmers(self.T) kmers = kmer_index.kmers() def _records(): for kmer in kmers: hits = kmer_index.hits(kmer) if self.self_comp: pairs = chain(combinations(hits, 2), [(x, x) for x in hits]) else: pairs = (((id0, pos0), (id1, pos1)) for (id0, pos0), (id1, pos1) in combinations(hits, 2) if id0 != id1) for (id0, pos0), (id1, pos1) in pairs: d, a = self.to_diagonal_coordinates(pos0, pos1) yield d, a self.log('Indexing seeds for %s.' % self.name) with self.connection() as conn: cursor = conn.cursor() cursor.executemany( 'INSERT INTO %s (d, a) VALUES (?, ?)' % self.seeds_table, _records() ) # FIXME is this necessary? self.log('Creating SQL index for table %s.' % self.seeds_table) cursor.execute('CREATE INDEX %s_diagonal ON %s(d);' % (self.seeds_table, self.seeds_table))
[docs] def seeds(self, d_band=None, exclude_trivial=False): """Yields all seeds, optionally those within a diagonal band. Keyword Args: d_band (tuple|None): If specified a ``(d_min, d_max)`` tuple restricting the seed count to a diagonal band. Yields: tuple: seeds coordinates :math:`(i, j)`. """ query = 'SELECT d, a FROM %s' % self.seeds_table if d_band is not None: assert len(d_band) == 2, 'need a 2-tuple for diagonal band' d_min, d_max = d_band query += ' WHERE d BETWEEN %d AND %d ' % \ (d_min, d_max) query += ' ORDER BY rowid' with self.connection() as conn: cursor = conn.cursor() cursor.execute(query) for d, a in cursor: i, j = self.to_ij_coordinates(d, a) if self.self_comp and exclude_trivial and i == j: continue yield (i, j) if self.self_comp and i != j: yield (j, i)
[docs] def seed_count(self, d_band=None, a_band=None): """Counts the number of seeds either in the whole table or in the specified diagonal band. Args: d_band (tuple|None): If specified a :math:`(d_{\min}, d_{\max})` tuple restricting the seed count to a diagonal band. a_band (tuple|None): If specified a :math:`(a_{\min}, a_{\max})` tuple restricting the seed count to an antidiagonal band. Returns: int: Number of seeds found in the entire table or in the specified diagonal band. """ query = 'SELECT COUNT(*) FROM %s' % self.seeds_table conds = [] if d_band is not None: assert len(d_band) == 2, 'need a 2-tuple for diagonal band' d_min, d_max = d_band cond = 'd BETWEEN %d AND %d' % \ (d_min, d_max) conds.append(cond) if a_band is not None: assert len(a_band) == 2, 'need a 2-tuple for antidiagonal band' conds.append('a BETWEEN %d AND %d' % a_band) if conds: query += ' WHERE ' + ' AND '.join(conds) with self.connection() as conn: cursor = conn.cursor() cursor.execute(query) for row in cursor: return row[0]
[docs]class SeedIndexMultiple(KmerDBWrapper): """An index for seeds between multiple sequences in diagonal coordinates. Attributes: seqs (list[biseqt.sequence.Sequence]): The sequences of interest. cache (KmerCache): optional :class:`KmerCache` object to use for retrieving integer representations of sequences. """ def __init__(self, *seqs, **kw): assert(len(seqs)) > 2 name = '_'.join(S.content_id[:8] for S in seqs) super(SeedIndexMultiple, self).__init__(name=name, **kw) self.kmer_cache = kw.get('kmer_cache', None) self.seqs = seqs self.d_cols = ['d_%d' % (idx + 1) for idx in range(len(self.seqs) - 1)] if self._table_exists(): self.log('Seeds for %s already indexed, skipping' % name) else: self._index_seeds() @property def seeds_table(self): """The seeds table name ``seeds_[name]``, cf. :attr:`KmerDBWrapper.name`.""" return 'seeds_' + self.name
[docs] @classmethod def to_diagonal_coordinates(cls, *idxs): """Convert standard coordinates to diagonal coordinates via: .. math:: \\begin{aligned} d_k & = i_1 - i_{k+1} \ \ k = 1, 2, \ldots, n - 1 \\\\ a & = \sum_{k=1}^n i_k \\end{aligned} """ ds = tuple(idxs[0] - idxs[k] for k in range(1, len(idxs))) a = sum(idxs) return ds, a
[docs] @classmethod def to_ij_coordinates(cls, ds, a): """Convert diagonal :math:`(d_1,\ldots, d_{n-1}, a)` coordinates to standard coordinates via: .. math:: \\begin{aligned} i_1 & = \\frac{a + \sum_{k=1}^{n-1}d_k}{N} \\\\ i_k & = i_1 - d_{k-1} \\end{aligned} """ N = len(ds) + 1 i0 = (a + sum(ds)) / N return tuple([i0] + [i0 - d for d in ds])
[docs] @classmethod def to_ij_coordinates_seg(cls, seg): """Convert a segment in diagonal coordinates to standard coordinates according to :func:`to_ij_coordinates`. Args: seg (tuple): :math:`(d_{1,\min}, d_{1,\max}), \ldots, (d_{n-1,\min}, d_{n-1,\max}), (a_{\min}, a_{\max})` Return: tuple: start and end coordinate in the :math:`n` sequences. """ ds_range, a_range = seg num_seqs = len(ds_range) + 1 seg_flat = list(ds_range) + [a_range] corners = [cls.to_ij_coordinates(ranges[:-1], ranges[-1]) for ranges in product(*seg_flat)] std_ranges = [] for idx in range(num_seqs): std_ranges.append(( max(min(corner[idx] for corner in corners), 0), # max value cannot be checked without sequence lengths max(corner[idx] for corner in corners) )) return std_ranges
def _table_exists(self): with self.connection() as conn: q = """ SELECT name FROM sqlite_master WHERE type='table' AND name='%s'; """ % self.seeds_table cursor = conn.cursor() cursor.execute(q) for name in cursor: return True return False # idempotent operation def _index_seeds(self): d_col_defs = ', '.join(col + ' INTEGER' for col in self.d_cols) init_query = """ CREATE TABLE %s ( %s, -- diagonal positions 'a' INTEGER -- antidiagonal position ); """ % (self.seeds_table, d_col_defs) with self.connection() as conn: conn.cursor().execute(init_query) kmer_index_name = '%d_%s' % (self.wordlen, self.name) kmer_index = KmerIndex(path=self.path, name=kmer_index_name, wordlen=self.wordlen, alphabet=self.alphabet, log_level=self.log_level, kmer_cache=self.kmer_cache) # FIXME if two sequences are identical the second one gets skipped for seq in self.seqs: kmer_index.index_kmers(seq) kmers = kmer_index.kmers() def _records(): for kmer in kmers: if kmer is None: continue hits = kmer_index.hits(kmer) hits = {seqid: [c[1] for c in seq_hits] for seqid, seq_hits in groupby(hits, key=lambda c: c[0])} # only consider kmers present in all sequences if len(hits) < len(self.seqs): continue for idxs in product(*hits.values()): ds, a = self.to_diagonal_coordinates(*idxs) yield tuple(list(ds) + [a]) self.log('Indexing seeds for %s.' % self.name) d_cols = ', '.join(self.d_cols) with self.connection() as conn: cursor = conn.cursor() q_marks = ', '.join(['?'] * len(self.seqs)) query = 'INSERT INTO %s (%s, a) VALUES (%s)' % \ (self.seeds_table, d_cols, q_marks) cursor.executemany(query, _records())
[docs] def seeds(self): """Yields all seeds in diagonal coordinates. Yields: tuple: seeds coordinates :math:`(d_1, \ldots, d_{n-1}, a)`. """ query = 'SELECT %s, a FROM %s' % \ (', '.join(self.d_cols), self.seeds_table) with self.connection() as conn: cursor = conn.cursor() cursor.execute(query) for rec in cursor: ds = [int(i) for i in rec[:-1]] yield ds, rec[-1]
[docs] def seed_count(self, ds_band=None, a_band=None): """Counts the number of seeds either in the whole table or in the specified diagonal band. Args: ds_band (List|None): If specified a list of :math:`n-1` tuples :math:`(d_{k,\min}, d_{k,\max})` restricting the seed count to a diagonal hyper-band. a_band (tuple|None): If specified a :math:`(a_{\min}, a_{\max})` tuple restricting the seed count to an antidiagonal band. Returns: int: Number of seeds found in the entire table or in the specified diagonal band. """ query = 'SELECT COUNT(*) FROM %s' % self.seeds_table conds = [] if ds_band is not None: assert len(ds_band) == len(self.seqs) - 1 for idx, d_band in enumerate(ds_band): if d_band is None: continue assert len(d_band) == 2 d_min, d_max = d_band cond = '%s BETWEEN %d AND %d' % \ (self.d_cols[idx], d_min, d_max) conds.append(cond) if a_band is not None: assert len(a_band) == 2, 'need a 2-tuple for antidiagonal band' conds.append('a BETWEEN %d AND %d' % a_band) if conds: query += ' WHERE ' + ' AND '.join(conds) with self.connection() as conn: cursor = conn.cursor() cursor.execute(query) for row in cursor: return row[0] return 0