Acoular 24.07 documentation

Parallel processing chains – SampleSplitter buffer handling.

«  Parallel processing chains – Multithreading with the SampleSplitter.   ::   IO and signal processing examples   ::   Tools  »

Parallel processing chains – SampleSplitter buffer handling.

This example shows the different behaviour of SampleSplitter class when the maximum size of a block buffer is reached for one object obtaining data.

Three different settings are available for the buffer overflow behaviour:
  • none: no warning, no error

  • warning: a warning appears

  • error: an error is raised

import threading
from time import sleep

import acoular as ac
import numpy as np

Set up data source. For convenience, we use a synthetic white noise with length of 1 s.

fs = 16000
ts = ac.TimeSamples(data=np.random.randn(fs * 1)[:, np.newaxis], sample_freq=fs)

Connect SampleSplitter to data source. We limit the buffer size to 5 blocks.

ss = ac.SampleSplitter(source=ts, buffer_size=5)

Create three objects to process the data

tp1 = ac.TimePower(source=ss)
tp2 = ac.TimePower(source=ss)

# register these objects at SampleSplitter
ss.register_object(tp1, tp2)  # register objects

Define some useful functions for inspecting and for reading data from the SampleSplitter buffers. Three different functions are defined to simulate different processing speeds (fast, slow).

def print_number_of_blocks_in_block_buffers():
    """Prints the number of data blocks in SampleSplitter-buffers. For each
    subsequent object, a buffer exist.
    """
    buffers = list(ss.block_buffer.values())
    elements = [len(buf) for buf in buffers]
    print(f"num blocks in buffers: {dict(zip(['tp1','tp2'], elements))}")


def get_data_fast(obj):  # not time consuming function
    """Gets data fast (pause 0.01 seconds)"""
    for _ in obj.result(2048):  #
        print('tp1 calls sample splitter')
        print_number_of_blocks_in_block_buffers()
        sleep(0.01)


def get_data_slow(obj):  # more time consuming function
    """Gets data slow (pause 0.1 seconds)"""
    for i in obj.result(2048):  #
        print('tp2 calls sample splitter')
        print_number_of_blocks_in_block_buffers()
        sleep(0.1)

Prepare and start processing in threads (no warning or error when block buffer is full)

print("buffer overflow behaviour == 'none'")

ss.buffer_overflow_treatment[tp1] = 'none'
ss.buffer_overflow_treatment[tp2] = 'none'

worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))

print('start threads')

worker1.start()
worker2.start()

worker1.join()
worker2.join()

print('threads finished')
buffer overflow behaviour == 'none'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
threads finished

Prepare and start processing in threads (only warning when block buffer is full)

print("buffer overflow behaviour == 'warning'")

ss.buffer_overflow_treatment[tp1] = 'warning'
ss.buffer_overflow_treatment[tp2] = 'warning'

worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))

print('start threads')

worker1.start()
worker2.start()

worker1.join()
worker2.join()

print('threads finished')
buffer overflow behaviour == 'warning'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
/home/runner/work/acoular/acoular/acoular/tprocess.py:2232: UserWarning: overfilled buffer for object: <acoular.tprocess.TimePower object at 0x7f3a6c18fb30> data will get lost
  warn('overfilled buffer for object: %s data will get lost' % obj, UserWarning, stacklevel=1)
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
threads finished

Prepare and start processing in threads (raise error when block buffer is full)

print("buffer overflow behaviour == 'error'")

ss.buffer_overflow_treatment[tp1] = 'error'
ss.buffer_overflow_treatment[tp2] = 'error'

worker1 = threading.Thread(target=get_data_fast, args=(tp1,))
worker2 = threading.Thread(target=get_data_slow, args=(tp2,))

print('start threads')

worker1.start()
worker2.start()

worker1.join()
worker2.join()

print('threads finished')
buffer overflow behaviour == 'error'
start threads
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp2 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 0}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 1}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 2}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 3}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 4}
tp1 calls sample splitter
num blocks in buffers: {'tp1': 0, 'tp2': 5}
Exception in thread Thread-9 (get_data_fast):
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
Exception in thread Thread-10 (get_data_slow):
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/work/acoular/acoular/examples/io_and_signal_processing_examples/example_sample_splitter_bufferhandling.py", line 71, in get_data_slow
    self.run()
  File "/opt/hostedtoolcache/Python/3.12.4/x64/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/home/runner/work/acoular/acoular/examples/io_and_signal_processing_examples/example_sample_splitter_bufferhandling.py", line 63, in get_data_fast
    for _ in obj.result(2048):  #
  File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 1417, in result
    for i in obj.result(2048):  #
  File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 1417, in result
    for temp in self.source.result(num):
  File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 2299, in result
    for temp in self.source.result(num):
  File "/home/runner/work/acoular/acoular/acoular/tprocess.py", line 2299, in result
    raise OSError(msg)
OSError: Maximum size of block buffer is reached!
    raise OSError(msg)
OSError: Maximum size of block buffer is reached!
threads finished

Total running time of the script: (0 minutes 1.358 seconds)

Gallery generated by Sphinx-Gallery

«  Parallel processing chains – Multithreading with the SampleSplitter.   ::   IO and signal processing examples   ::   Tools  »