Source code for auditok.core

"""
This module gathers processing (i.e. tokenization) classes.

Class summary
=============

.. autosummary::

        StreamTokenizer
"""

from auditok.util import DataValidator

__all__ = ["StreamTokenizer"]


[docs]class StreamTokenizer(): """ Class for stream tokenizers. It implements a 4-state automaton scheme to extract sub-sequences of interest on the fly. :Parameters: `validator` : instance of `DataValidator` that implements `is_valid` method. `min_length` : *(int)* Minimum number of frames of a valid token. This includes all \ tolerated non valid frames within the token. `max_length` : *(int)* Maximum number of frames of a valid token. This includes all \ tolerated non valid frames within the token. `max_continuous_silence` : *(int)* Maximum number of consecutive non-valid frames within a token. Note that, within a valid token, there may be many tolerated \ *silent* regions that contain each a number of non valid frames up to \ `max_continuous_silence` `init_min` : *(int, default=0)* Minimum number of consecutive valid frames that must be **initially** \ gathered before any sequence of non valid frames can be tolerated. This option is not always needed, it can be used to drop non-valid tokens as early as possible. **Default = 0** means that the option is by default ineffective. `init_max_silence` : *(int, default=0)* Maximum number of tolerated consecutive non-valid frames if the \ number already gathered valid frames has not yet reached 'init_min'. This argument is normally used if `init_min` is used. **Default = 0**, by default this argument is not taken into consideration. `mode` : *(int, default=0)* `mode` can be: 1. `StreamTokenizer.STRICT_MIN_LENGTH`: if token *i* is delivered because `max_length` is reached, and token *i+1* is immediately adjacent to token *i* (i.e. token *i* ends at frame *k* and token *i+1* starts at frame *k+1*) then accept token *i+1* only of it has a size of at least `min_length`. The default behavior is to accept token *i+1* event if it is shorter than `min_length` (given that the above conditions are fulfilled of course). :Examples: In the following code, without `STRICT_MIN_LENGTH`, the 'BB' token is accepted although it is shorter than `min_length` (3), because it immediately follows the latest delivered token: .. code:: python from auditok import StreamTokenizer, StringDataSource, DataValidator class UpperCaseChecker(DataValidator): def is_valid(self, frame): return frame.isupper() dsource = StringDataSource("aaaAAAABBbbb") tokenizer = StreamTokenizer(validator=UpperCaseChecker(), min_length=3, max_length=4, max_continuous_silence=0) tokenizer.tokenize(dsource) :output: .. code:: python [(['A', 'A', 'A', 'A'], 3, 6), (['B', 'B'], 7, 8)] The following tokenizer will however reject the 'BB' token: .. code:: python dsource = StringDataSource("aaaAAAABBbbb") tokenizer = StreamTokenizer(validator=UpperCaseChecker(), min_length=3, max_length=4, max_continuous_silence=0, mode=StreamTokenizer.STRICT_MIN_LENGTH) tokenizer.tokenize(dsource) :output: .. code:: python [(['A', 'A', 'A', 'A'], 3, 6)] 2. `StreamTokenizer.DROP_TRAILING_SILENCE`: drop all tailing non-valid frames from a token to be delivered if and only if it is not **truncated**. This can be a bit tricky. A token is actually delivered if: - a. `max_continuous_silence` is reached :or: - b. Its length reaches `max_length`. This is called a **truncated** token In the current implementation, a `StreamTokenizer`'s decision is only based on already seen data and on incoming data. Thus, if a token is truncated at a non-valid but tolerated frame (`max_length` is reached but `max_continuous_silence` not yet) any tailing silence will be kept because it can potentially be part of valid token (if `max_length` was bigger). But if `max_continuous_silence` is reached before `max_length`, the delivered token will not be considered as truncated but a result of *normal* end of detection (i.e. no more valid data). In that case the tailing silence can be removed if you use the `StreamTokenizer.DROP_TRAILING_SILENCE` mode. :Example: .. code:: python tokenizer = StreamTokenizer(validator=UpperCaseChecker(), min_length=3, max_length=6, max_continuous_silence=3, mode=StreamTokenizer.DROP_TRAILING_SILENCE) dsource = StringDataSource("aaaAAAaaaBBbbbb") tokenizer.tokenize(dsource) :output: .. code:: python [(['A', 'A', 'A', 'a', 'a', 'a'], 3, 8), (['B', 'B'], 9, 10)] The first token is delivered with its tailing silence because it is truncated while the second one has its tailing frames removed. Without `StreamTokenizer.DROP_TRAILING_SILENCE` the output would be: .. code:: python [(['A', 'A', 'A', 'a', 'a', 'a'], 3, 8), (['B', 'B', 'b', 'b', 'b'], 9, 13)] 3. `StreamTokenizer.STRICT_MIN_LENGTH | StreamTokenizer.DROP_TRAILING_SILENCE`: use both options. That means: first remove tailing silence, then ckeck if the token still has at least a length of `min_length`. """ SILENCE = 0 POSSIBLE_SILENCE = 1 POSSIBLE_NOISE = 2 NOISE = 3 STRICT_MIN_LENGTH = 2 DROP_TRAILING_SILENCE = 4 # alias DROP_TAILING_SILENCE = 4 def __init__(self, validator, min_length, max_length, max_continuous_silence, init_min=0, init_max_silence=0, mode=0): if not isinstance(validator, DataValidator): raise TypeError("'validator' must be an instance of 'DataValidator'") if max_length <= 0: raise ValueError("'max_length' must be > 0 (value={0})".format(max_length)) if min_length <= 0 or min_length > max_length: raise ValueError("'min_length' must be > 0 and <= 'max_length' (value={0})".format(min_length)) if max_continuous_silence >= max_length: raise ValueError("'max_continuous_silence' must be < 'max_length' (value={0})".format(max_continuous_silence)) if init_min >= max_length: raise ValueError("'init_min' must be < 'max_length' (value={0})".format(max_continuous_silence)) self.validator = validator self.min_length = min_length self.max_length = max_length self.max_continuous_silence = max_continuous_silence self.init_min = init_min self.init_max_silent = init_max_silence self._mode = None self.set_mode(mode) self._strict_min_length = (mode & self.STRICT_MIN_LENGTH) != 0 self._drop_tailing_silence = (mode & self.DROP_TRAILING_SILENCE) != 0 self._deliver = None self._tokens = None self._state = None self._data = None self._contiguous_token = False self._init_count = 0 self._silence_length = 0 self._start_frame = 0 self._current_frame = 0
[docs] def set_mode(self, mode): """ :Parameters: `mode` : *(int)* New mode, must be one of: - `StreamTokenizer.STRICT_MIN_LENGTH` - `StreamTokenizer.DROP_TRAILING_SILENCE` - `StreamTokenizer.STRICT_MIN_LENGTH | StreamTokenizer.DROP_TRAILING_SILENCE` - `0` See `StreamTokenizer.__init__` for more information about the mode. """ if not mode in [self.STRICT_MIN_LENGTH, self.DROP_TRAILING_SILENCE, self.STRICT_MIN_LENGTH | self.DROP_TRAILING_SILENCE, 0]: raise ValueError("Wrong value for mode") self._mode = mode self._strict_min_length = (mode & self.STRICT_MIN_LENGTH) != 0 self._drop_tailing_silence = (mode & self.DROP_TRAILING_SILENCE) != 0
[docs] def get_mode(self): """ Return the current mode. To check whether a specific mode is activated use the bitwise 'and' operator `&`. Example: .. code:: python if mode & self.STRICT_MIN_LENGTH != 0: do_something() """ return self._mode
def _reinitialize(self): self._contiguous_token = False self._data = [] self._tokens = [] self._state = self.SILENCE self._current_frame = -1 self._deliver = self._append_token
[docs] def tokenize(self, data_source, callback=None): """ Read data from `data_source`, one frame a time, and process the read frames in order to detect sequences of frames that make up valid tokens. :Parameters: `data_source` : instance of the `DataSource` class that implements a 'read' method. 'read' should return a slice of signal, i.e. frame (of whatever \ type as long as it can be processed by validator) and None if \ there is no more signal. `callback` : an optional 3-argument function. If a `callback` function is given, it will be called each time a valid token is found. :Returns: A list of tokens if `callback` is None. Each token is tuple with the following elements: .. code python (data, start, end) where `data` is a list of read frames, `start`: index of the first frame in the original data and `end` : index of the last frame. """ self._reinitialize() if callback is not None: self._deliver = callback while True: frame = data_source.read() if frame == None: break self._current_frame += 1 self._process(frame) self._post_process() if callback is None: _ret = self._tokens self._tokens = None return _ret
def _process(self, frame): frame_is_valid = self.validator.is_valid(frame) if self._state == self.SILENCE: if frame_is_valid: # seems we got a valid frame after a silence self._init_count = 1 self._silence_length = 0 self._start_frame = self._current_frame self._data.append(frame) if self._init_count >= self.init_min: self._state = self.NOISE if len(self._data) >= self.max_length: self._process_end_of_detection(True) else: self._state = self.POSSIBLE_NOISE elif self._state == self.POSSIBLE_NOISE: if frame_is_valid: self._silence_length = 0 self._init_count += 1 self._data.append(frame) if self._init_count >= self.init_min: self._state = self.NOISE if len(self._data) >= self.max_length: self._process_end_of_detection(True) else: self._silence_length += 1 if self._silence_length > self.init_max_silent or \ len(self._data) + 1 >= self.max_length: # either init_max_silent or max_length is reached # before _init_count, back to silence self._data = [] self._state = self.SILENCE else: self._data.append(frame) elif self._state == self.NOISE: if frame_is_valid: self._data.append(frame) if len(self._data) >= self.max_length: self._process_end_of_detection(True) elif self.max_continuous_silence <= 0 : # max token reached at this frame will _deliver if _contiguous_token # and not _strict_min_length self._process_end_of_detection() self._state = self.SILENCE else: # this is the first silent frame following a valid one # and it is tolerated self._silence_length = 1 self._data.append(frame) self._state = self.POSSIBLE_SILENCE if len(self._data) == self.max_length: self._process_end_of_detection(True) # don't reset _silence_length because we still # need to know the total number of silent frames elif self._state == self.POSSIBLE_SILENCE: if frame_is_valid: self._data.append(frame) self._silence_length = 0 self._state = self.NOISE if len(self._data) >= self.max_length: self._process_end_of_detection(True) else: if self._silence_length >= self.max_continuous_silence: if self._silence_length < len(self._data): # _deliver only gathered frames aren't all silent self._process_end_of_detection() else: self._data = [] self._state = self.SILENCE self._silence_length = 0 else: self._data.append(frame) self._silence_length += 1 if len(self._data) >= self.max_length: self._process_end_of_detection(True) # don't reset _silence_length because we still # need to know the total number of silent frames def _post_process(self): if self._state == self.NOISE or self._state == self.POSSIBLE_SILENCE: if len(self._data) > 0 and len(self._data) > self._silence_length: self._process_end_of_detection() def _process_end_of_detection(self, truncated=False): if not truncated and self._drop_tailing_silence and self._silence_length > 0: # happens if max_continuous_silence is reached # or max_length is reached at a silent frame self._data = self._data[0: - self._silence_length] if (len(self._data) >= self.min_length) or \ (len(self._data) > 0 and \ not self._strict_min_length and self._contiguous_token): _end_frame = self._start_frame + len(self._data) - 1 self._deliver(self._data, self._start_frame, _end_frame) if truncated: # next token (if any) will start at _current_frame + 1 self._start_frame = self._current_frame + 1 # remember that it is contiguous with the just delivered one self._contiguous_token = True else: self._contiguous_token = False else: self._contiguous_token = False self._data = [] def _append_token(self, data, start, end): self._tokens.append((data, start, end))