# -*- coding: utf-8 -*-
# FIXME docs
"""
.. wikisection:: overview
:title: (4) Handling Kmers
The :mod:`biseqt.kmers` module provides tools for k-mer analysis. Kmers are
represented as integer (and hence a maximum word length is imposed by
integer size of the machine).
>>> from biseqt.sequence import Alphabet, Sequence
>>> from biseqt.kmers import as_kmer_seq, KmerIndex
>>> A = Alphabet('ACGT')
>>> S = A.parse('AAACGCGT')
>>> wordlen = 3
>>> as_kmer_seq(S, wordlen)
[0, 1, 6, 25, 38, 27]
>>> kmer_index = KmerIndex(path=':memory:', alphabet=A, wordlen=wordlen)
>>> kmer_index.index_kmers(S)
1 # the internal 'id' assigned to sequence S
>>> kmer_index.kmers()
[0, 1, 6, 25, 27, 38]
>>> kmer_index.hits(27)
[(1, 5)] # (seqid, position)
.. wikisection:: dev
:title: Insert-or-Append Queries
It may be useful to perform an SQL query which either inserts a new record
or appends the given value to some column in case of conflict. This is
achived by using one of SQLite's conflict resolution mechanisms_ ``ON
CONFLICT REPLACE`` or in short ``OR REPLACE``. For instance, consider a
table::
id | field
1 | foo
where we wish ``INSERT INTO ... (id, field) VALUES (2, 'bar')`` to give::
id | field
1 | foo
2 | bar
and ``INSERT INTO ... (id, field) VALUES (1, 'bar')`` to give::
id | field
1 | foo,bar
This can be implemented by using the following query format:
.. code-block:: sql
INSERT INTO ... (id, field) VALUES
SELECT ?, IFNULL(SELECT field FROM ... WHERE id = ?, "") || ?
invoked like this:
.. code-block:: python
id, field = ...
conn = apsw.Connection('example.db')
conn.cursor().execute(query, (id, id, ',' + field))
Note that this pattern only works if the ``id`` column has a unique
constraint on it. Otherwise, no conflict will arise to be resolved and new
values will appear in new records instead of being appended to old ones.
.. _mechanisms: https://www.sqlite.org/lang_conflict.html
.. wikisection:: dev
:title: SQLite Performance Tuning
Tuning strategy naturally depends on the balance between the volume of
insert vs. select queries and the concurrency requirements. Here we will
assume:
* The volume of inserts is much larger than selects,
* Application logic can be trusted with respecting unique constraints (i.e
the code creating data does not violate semantic constraints).
* Usage pattern consists of bulk of inserts followed by bulk of selects
(e.g. not interleaved).
Under these circumstances, the following guidelines are suggested:
* Create indices after bulk inserts not before. For instance, instead of:
.. code-block:: sql
CREATE TABLE foo ('f1' int, 'f2' int, UNIQUE(f1, f2))
-- INSERT INTO foo VALUES ...
it's more performant to say:
.. code-block:: sql
CREATE TABLE foo ('f1' int, 'f2' int)
-- INSERT INTO foo VALUES ...
CREATE UNIQUE INDEX foo_index ON foo (f1, f2)
* If there is no concern about data corruption upon application or
operating sytem crash journaling can be turned off as well, from `docs
<journaling_docs>`_:
| If the application crashes in the middle of a transaction when the
| OFF journaling mode is set, then the database file will very likely
| go corrupt.
To turn off journaling:
.. code-block:: sql
PRAGMA journaling_mode = OFF
Note that turning off journaling breaks rollbacks:
| The OFF journaling mode disables the rollback journal completely. No
| rollback journal is ever created and hence there is never a rollback
| journal to delete. The OFF journaling mode disables the atomic commit
| and rollback capabilities of SQLite.
* When a table has a unique integer key it should be declared as ``INTEGER
PRIMARY KEY`` so that it would take over the default ``rowid`` field.
This saves space (and thus a small amount of time) on both the field and
the corresponding index.
* Foreign key constraints, if enforced, slow down bulk inserts
significantly. However, by default, `foreign key checks`_ are turned off.
To turn it on:
.. code-block:: sql
PRAGMA foreign_keys = ON;
Note that this default maybe modified by compile time flags (i.e foreign
keys may be turned on by default). Furthermore, if foreign keys are
turned on, consider deferring_ foreign key enforcement to transaction
commits and and keep in mind that ``pysqlite`` (following python's
DB-API) fudges with transaction beginning and ends.
* Larger `page sizes <pagesize_docs>`_ can marginally improve read/write
performance. To increase the page size:
.. code-block:: sql
PRAGMA page_size = 65536
.. _journaling_docs: https://www.sqlite.org/pragma.html#pragma_journal_mode
.. _pagesize_docs: https://www.sqlite.org/pragma.html#pragma_page_size
.. _foreign key checks: https://www.sqlite.org/foreignkeys.html#fk_enable
.. _deferring: https://www.sqlite.org/foreignkeys.html#fk_deferred
.. rubric: References
* http://stackoverflow.com/a/1712873
* http://codereview.stackexchange.com/q/26822
* http://stackoverflow.com/q/3134900
"""
import struct
import os
import apsw
import logging
from .util import Logger
from .sequence import Alphabet, Sequence
DIGITS = '0123456789abcdefghijklmnopqrstuvwxyz'
[docs]def kmer_as_int(contents, alphabet):
"""Calculates the integer representation of a kmer by treating its
contents as digits in the base of alphabet length. For instance, in the
DNA alphabet ``AGA`` becomes :math:`(020)_4` which is 8. Note that
each kmer gives a unique integer as long as all kmers have the same
word length (which is the case here). There are two restrictions
imposed on the word length and alphabet size (enforced in
:func:`__init__`):
* The alphabet must be such that all letters can be represented by
single ASCII characters between ``[0-9a-z]`` (cf. int_). This
implies a maximum alphabet size of 36.
* The word length must be such that a single integer can store the
entire representation of a kmer. This requires that we have:
.. math::
k < \\frac{I-1}{2}
where :math:`k` is the word length and :math:`I` is the number of
bits allocated for an integer. For instance, on a 64-bit system the
maximum word length is 31.
.. _int: https://docs.python.org/2/library/functions.html#int
.. wikisection:: dev
:title: Integer Sizes
All kmers are stored as their integer representation to save on
space and processing time. Python_ is flexible with the maximum
size of integers, as integers automatically switch to longs, which
have "unlimited precision". SQLite_, too, is flexible but has a
maximum integer cap of 64-bits: integers take 2, 4, 6, or 8 bytes
per integer dependning on the size it needs.
The checks resulting from maximum integer size are performed in
:func:`KmerIndex.__init__ <KmerIndex>` which basically block kmers
taking more than 64-bit integers to represent.
.. _python: https://docs.python.org/2/library/stdtypes.html\
#numeric-types-int-float-long-complex
.. _sqlite: https://www.sqlite.org/datatype3.html#section_2
"""
assert isinstance(alphabet, Alphabet)
# TODO document dependence on word length
as_str = ''.join(DIGITS[c] for c in contents)
return int(as_str, len(alphabet))
[docs]def as_kmer_seq(seq, wordlen, mask=[]):
"""A generator for kmer hit tuples of the form ``(kmer, pos)``. Kmers
are represented in integer form (cf. :func:`kmer_as_int`).
Args:
seq (sequence.Sequence): The sequence to be scanned.
wordlen (int): Size of kmers.
mask (list): A list of sets of integers ``(i_1, ..., i_k)`` which mask
kmers (represented by ``None``) if the kmer content (set of letters
appearing in the kmer, represented as integers as in
:attr:`Sequence.contents`) matches the set.
Returns:
list: of integers representing kmers.
"""
assert isinstance(seq, Sequence)
assert all(isinstance(lets, set) for lets in mask)
kmers = []
for pos in range(len(seq) - wordlen + 1):
if mask:
lets = set(seq[pos: pos + wordlen])
if lets in mask:
kmers.append(None)
continue
kmer = kmer_as_int(seq.contents[pos: pos + wordlen], seq.alphabet)
kmers.append(kmer)
return kmers
[docs]class KmerDBWrapper(object):
"""Generic wrapper for an SQLite database for Kmers.
Attributes:
name (str): String name used as a suffix for table names.
path (str): Path to the SQLite datbase (or ``:memory:``).
alphabet (sequence.Alphabet):
The alphabet for sequences in the database.
wordlen (int): Length of kmers of interest to this index.
mask (list): A list of sets of integers which mask kmers (represented
by ``None``), cf. :func:`as_kmer_seq`.
init_script (str): SQL script to be executed upon initialization;
typically creates tables needed by the class.
"""
def __init__(self, name='', path=':memory:', alphabet=None, wordlen=None,
mask=[], log_level=logging.INFO, init_script=None):
self.name = name
assert all(isinstance(lets, set) for lets in mask)
self.mask = mask
assert isinstance(wordlen, int)
assert isinstance(alphabet, Alphabet)
assert len(alphabet) <= len(DIGITS), \
'Maximum alphabet size of %d exceeded' % len(DIGITS)
self.alphabet = alphabet
int_size = 8 * struct.calcsize('P')
assert wordlen < (int_size - 1)/2., \
'Maximum kmer length %d for %d-bit integers exceeded' % \
(wordlen, int_size)
self.wordlen = wordlen
if path == ':memory:':
self.path = path
relpath = path
else:
self.path = os.path.abspath(path)
assert os.path.exists(self.path) or \
os.access(os.path.dirname(self.path), os.W_OK), \
'Database %s is not writable' % self.path
relpath = os.path.relpath(self.path, os.getcwd())
log_header = '%d-mer (%s)' % (self.wordlen, relpath)
self._logger = Logger(log_level=log_level, header=log_header)
self.log_level = log_level
self._connection = None
self.init_script = init_script
if self.init_script:
with self.connection() as conn:
conn.cursor().execute(self.init_script)
conn.cursor().execute('PRAGMA journaling_mode = OFF;')
# FIXME compare with old-tip and figure out what the deal with resetting is
[docs] def connection(self, reset=False):
"""Provides a SQLite database connection that can be used as a context
manager. The returned object is always the same connection object
belonging to the :class:`KmerIndex` instance (otherwise in-memory
connections would reset the database contents upon every invocation).
Returns:
apsw.Connection
"""
if reset or self._connection is None:
self._connection = apsw.Connection(self.path)
return self._connection
[docs] def log(self, *args, **kwargs):
"""Wraps :class:`Logger.log`."""
self._logger.log(*args, **kwargs)
[docs]class KmerCache(KmerDBWrapper):
"""A cache backed by SQLite for representations of sequences as integer
sequences. Upon initialization the following SQL script is executed
.. code-block:: sql
CREATE TABLE seq_kmers (
'seq' VARCHAR, -- content identifier of sequence
'kmres' VARCHAR, -- comma separated representation of sequence
-- as integers encoding kmers.
);
This implies that any time only one ``KmerCache`` can exist with the same
path.
FIXME: using the same database for different word lengths / alphabets is
quietly accepted (and wrong results returned).
"""
def __init__(self, name='', **kw):
self.name = name
init_script = """
CREATE TABLE IF NOT EXISTS %s (
'seq' VARCHAR, -- content identifier of sequence
'kmers' VARCHAR -- comma separated representation of sequence
-- as integers encoding kmers.
);
""" % self.kmers_table
kw['name'] = name
super(KmerCache, self).__init__(init_script=init_script, **kw)
@property
def kmers_table(self):
"""The kmer hits table name ``seq_kmers_[name]``, cf.
:attr:`KmerDBWrapper.name`."""
return 'seq_kmers_%s' % self.name
[docs] def cached_seqs(self):
"""Returns content identifiers for all cached sequences."""
with self.connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT seq from %s' % self.kmers_table)
return [x[0] for x in cursor]
[docs] def as_kmer_seq(self, seq):
"""Return the integer representation of a given sequence.
Args:
seq (sequence.Sequence): input sequence.
Returns:
list: list of integers of length ``n-w+1`` containing kmers in
input sequence represented as an integer, cf.
:func:`as_kmer_seq`.
"""
with self.connection() as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT kmers from %s WHERE seq = ?' % self.kmers_table,
(seq.content_id,)
)
for kmer_seq in cursor:
return eval(kmer_seq[0])
# cache miss, translate to kmer sequence
self.log('producing kmer representation for sequence %s' %
seq.content_id[:8])
kmer_seq = [i for i in as_kmer_seq(seq, self.wordlen, mask=self.mask)
if i is not None]
with self.connection() as conn:
q = 'INSERT INTO %s (seq, kmers) VALUES (?, ?)' % self.kmers_table
conn.cursor().execute(q, (seq.content_id, repr(kmer_seq)))
return kmer_seq
[docs]class KmerIndex(KmerDBWrapper):
"""An index backed by SQLite for occurences of kmers in a body of
sequences. Upon initialization the following script is executated:
.. code-block:: sql
CREATE TABLE kmers_[name] (
'kmer' INTEGER, -- The kmer in integer representation.
'seq' INTEGER, -- integer identifier of sequence
'pos' INTEGER -- the position of kmer in sequence.
);
CREATE TABLE IF NOT EXISTS kmer_indexed_[name] (
'seq' VARCHAR, -- content id,
'seqid' INTEGER PRIMARY KEY AUTOINCREMENT -- integer id.
);
Attributes:
name (str):
cache (KmerCache): optional :class:`KmerCache` object to use for
retrieving integer representations of sequences.
"""
def __init__(self, name='', kmer_cache=None, **kw):
self.name = name
init_script = """
CREATE TABLE IF NOT EXISTS %s (
'kmer' INTEGER, -- the kmer in integer representation.
'seqid' INTEGER, -- integer identifier of sequence
-- REFERENCES kmer_indexed(seqid), but not
-- declared to avoid integrity checks
'pos' INTEGER -- the position of kmer in sequence.
);
CREATE TABLE IF NOT EXISTS %s (
'seq' VARCHAR, -- content id,
'seqid' INTEGER PRIMARY KEY AUTOINCREMENT -- integer id.
);
""" % (self.kmers_table, self.log_table)
kw['name'] = name
super(KmerIndex, self).__init__(init_script=init_script, **kw)
if kmer_cache:
assert isinstance(kmer_cache, KmerCache)
assert kmer_cache.wordlen == self.wordlen
assert kmer_cache.alphabet == self.alphabet
self.kmer_cache = kmer_cache
else:
self.kmer_cache = None
@property
def kmers_table(self):
"""The kmer hits table name ``kmers_[name]``, cf.
:attr:`KmerDBWrapper.name`."""
return 'kmers_' + self.name
@property
def log_table(self):
"""The log table name ``kmer_indexed_[name]``, cf.
:attr:`KmerDBWrapper.name`."""
return 'kmer_indexed_' + self.name
[docs] def index_kmers(self, seq):
""" Indexes all kmers observed in the given sequence in
:attr:`kmers_table`.
Args:
seq (sequence.Sequence): The sequence just inserted into the
database.
seqid (int): The integer identifier to use for sequence.
"""
if self.kmer_cache:
kmer_seq = self.kmer_cache.as_kmer_seq(seq)
else:
kmer_seq = as_kmer_seq(seq, self.wordlen, mask=self.mask)
with self.connection() as conn:
self.log('indexing %d-mers for sequence %s (%d)' %
(self.wordlen, seq.content_id[:8], len(seq)))
cursor = conn.cursor()
q = 'SELECT seqid FROM %s WHERE seq = ?' % self.log_table
# FIXME if the two sequences are identical in content we trip over
# this and not index the "trivial" kmers. How should we respect
# this when hits are recalled? Or is the responsibility of
# SeedIndex to deal with this?
for seqid in cursor.execute(q, (seq.content_id,)):
self.log('sequence %s already indexed, skipping.' %
seq.content_id[:8])
return seqid[0]
q = """
INSERT INTO %s (seq) VALUES (?);
SELECT last_insert_rowid();
""" % self.log_table
seqid = cursor.execute(q, (seq.content_id,)).next()[0]
q = """
INSERT INTO %s (kmer, seqid, pos)
VALUES (?,?,?)
""" % self.kmers_table
cursor.executemany(q, ((kmer, seqid, pos)
for pos, kmer in enumerate(kmer_seq)
if kmer is not None))
return seqid
[docs] def create_sql_index(self):
"""Creates SQL index over the ``kmer`` column of ``kmers`` table."""
self.log('Creating SQL index for table %s.' % self.kmers_table)
with self.connection() as conn:
q = """
CREATE INDEX IF NOT EXISTS idx_%s ON %s (kmer);
""" % (self.kmers_table, self.kmers_table)
conn.cursor().execute(q)
self.log('Created SQL index for table %s.' % self.kmers_table)
[docs] def hits(self, kmer):
"""Returns all hits of a given kmer in indexed sequences.
Args:
kmer (int): kmer of interest.
Returns:
list:
A list of 2-tuples containing sequence ids (int) and positions.
"""
assert isinstance(kmer, int)
query = 'SELECT seqid, pos FROM %s WHERE kmer = ?' % self.kmers_table
with self.connection() as conn:
return list(conn.cursor().execute(query, (kmer,)))
[docs] def kmers(self):
"""Returns all observed kmers.
Returns:
list: list of kmers in integer representation.
"""
self.create_sql_index() # FIXME do we need this?
query = 'SELECT DISTINCT kmer FROM %s' % self.kmers_table
with self.connection() as conn:
cursor = conn.cursor()
cursor.execute(query)
return [x[0] for x in cursor]
[docs] def drop_data(self):
"""Drop all tables created by this object."""
with self.connection() as conn:
conn.cursor().execute('DROP TABLE %s; DROP TABLE %s;' %
(self.kmers_table, self.logs_table))