"""
Engine class
============
An :class:`Engine` implements a simple interface to run and control a realtime
csound process.
For offline processing, see :class:`~csoundengine.offlineengine.OfflineEngine`
Example
-------
.. code::
from csoundengine import Engine
# create an engine with default options for the platform
engine = Engine()
# Compile an instrument
engine.compile(r'''
instr synth
; assign preset values for parameters
; p1 p2 p3 p4 p5 p6
pset 0, 0, 0, 60, 1, 4000 ; the first three values (p1, p2, p3) are not considered
kmidinote = p4
kamp = p5
kcutoff = p6
kfreq = mtof:k(kmidinote)
asig = vco2:a(kamp, kfreq)
asig = moogladder2(asig, kcutoff, 0.9)
asig *= linsegr:a(0, 0.1, 1, 0.1, 0)
outs asig, asig
endin
''')
# start an event with indefinite duration. This returns a unique (fractional)
# instance number, which can be used to control its parameters
# The `args` parameter expects any arguments starting with p4
p1 = engine.sched("synth", args=[67, 0.1, 3000], unique=True)
# any parameter with k-rate can be modified while running:
# change midinote
engine.setp(p1, 4, 67)
# modify cutoff
engine.setp(p1, 6, 1000, delay=4)
# stop the synth:
engine.unsched(p1)
# Parameters can also be set using their assigned names. Defaults
# can be set via the pset opcode. In this case the cutoff parameter
# will be assigned the default 4000
p1 = engine.sched("synth", kmidinote=60, kamp=0.1)
# Change the pitch
engine.setp(p1, 'kmidinote', 62, delay=2)
See also :class:`~csoundengine.session.Session` for a higher level interface:
.. code::
from csoundengine import *
session = Engine().session()
session.defInstr('mysynth', r'''
|kmidinote=60, kamp=0.1, kcutoff=3000|
kfreq = mtof:k(kmidinote)
asig = vco2:a(kamp, kfreq)
asig = moogladder2(asig, kcutoff, 0.9)
aenv = linsegr:a(0, 0.1, 1, 0.1, 0)
asig *= aenv
outs asig, asig
''')
# Session sched returns a Synth object
synth = session.sched('mysynth', kmidinote=67, kcutoff=2000)
# Change the midinote after 2 seconds
synth.setp(kmidinote=60, delay=2)
Instrument Numbers
------------------
An :class:`Engine` defines internal instruments to perform some of its
tasks (reading tables, sample playback, etc). To avoid clashes between these
internal instruments and user instruments, there are some reserved instrument
numbers: all instrument numbers from 1 to 99 are reserved for internal use, so
the first available instrument number is 100.
Configuration
-------------
Defaults for :class:`Engine` / :class:`~csoundengine.session.Session` can be
customized via::
from csoundengine import *
config.edit()
.. hint::
For more information, see :ref:`Configuration<configuration>`
Interactive Use
---------------
**csoundengine** is optimized to be used interactively and particularly
within `Jupyter <https://jupyter.org/>`_. See :ref:`Csoundengine inside Jupyter<jupyternotebook>`
IPython Magic
~~~~~~~~~~~~~
**csoundengine** also defines a set of ipython/jupyter :doc:`magics <magics>`
.. figure:: assets/eventui.png
"""
from __future__ import annotations
import atexit as _atexit
import ctypes as _ctypes
import fnmatch as _fnmatch
import os.path
import queue as _queue
import re as _re
import sys as _sys
import tempfile
import textwrap as _textwrap
import threading as _threading
import time
import numpy as np
import pitchtools as pt
from emlib import iterlib
from emlib.containers import IntPool
from . import csoundparse
from . import engineorc
from . import internal
from . import state as _state
from .config import config, logger
from .enginebase import TableInfo, _EngineBase
from .engineorc import BUSKIND_AUDIO, BUSKIND_CONTROL, CONSTS
from .errors import CsoundError, TableNotFoundError
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from matplotlib.figure import Figure
import socket
from typing import Callable, Sequence
from . import csoundlib
from . import session as _session
callback_t = Callable[[str, float], None]
import libcsound as lcs
elif 'sphinx' in _sys.modules:
import socket
from typing import Callable, Sequence
callback_t = Callable[[str, float], None]
__all__ = [
'Engine',
'getEngine',
]
_UNSET = float("-inf")
def _generateUniqueEngineName(prefix="engine") -> str:
for i in range(9999):
name = f"{prefix}{i}"
if name not in Engine.activeEngines:
return name
import uuid
return str(uuid.uuid4())
def _asEngine(e: str | Engine) -> Engine:
if isinstance(e, Engine):
return e
out = getEngine(e)
if out is None:
raise ValueError(f"No engine found with name {e}")
return out
[docs]
class Engine(_EngineBase):
"""
Implements a simple interface to run and control a realtime csound process.
For offline rendering, see :class:`~csoundengine.offline.OfflineSession` and
:class:`~csoundengine.offlineengine.OfflineEngine`
Args:
name: the name of the engine
sr: sample rate. If not given, the sr of the backend will be used, if possible
ksmps: samples per k-cycle
backend: passed to -+rtaudio (**"?" to select interactively**). If not given, the most
appropriate backend will be used.
outdev: the audio output device, passed to -o (**"?" to select interactively**). Leave
unset to use the default
indev: the audio input device, passed to -i (**"?" to select interactively**). Leave
unset to use the default
a4: freq of a4
nchnls: number of output channels (passed to nchnls). Leave unset to use the number of
channels defined by the backend (if known)
nchnls_i: number of input channels. Similar to nchnls. If not given it will either
fallback to the number of input channels provided by the backend, or to nchnls
buffersize: samples per buffer, corresponds to csound's -b option
numbuffers: the number of buffers to fill. Together with the buffersize determines
the latency of csound and any communication between csound and the python
host
globalcode: code to evaluate as instr0 (global variables, etc.)
includes: a list of files to include. Can be added later via :meth:`Engine.includeFile`
numAudioBuses: number of audio buses (see :ref:`Bus Opcodes<busopcodes>`)
numControlBuses: number of control buses (see :ref:`Bus Opcodes<busopcodes>`)
verbose: if True, do not suppress output of csound (-m16 -d). If you need to customize
the debugging output, set verbose to True (csound is verbose by default) and set
your own flags via commandlineOptions
udpserver: if True, start a udp server for communication (see udpport)
udpport: the udpport to use for real-time messages. 0=autoassign port
commandlineOptions: command line options passed verbatim to the
csound process when started
midiin: if given, use this device as midi input. Can be '?' to select
from a list, or 'all' to use all devices. None indicates no midi input
latency: an extra latency added when scheduling events to ensure synchronicity.
See also :meth:`Engine.lockClock` and :meth:`Engine.pushClock`
sampleAccurate: use sample-accurate scheduling
useProcessQueue: only valid for csound 6. If True, will set the process
callback to handle async tasks
suppressVersion: in csound7 a version message is printed by default
whenever a csound instance is created. Setting this flag to True
disables this message. The csound version and commit
can always be consulted via the command line or using the API directly.
Setting verbose=True sets this flag to False
.. note::
Any option with a default value of None has a corresponding slot in the
config. Default values can be configured via `config.edit()`, see
`Configuration <https://csoundengine.readthedocs.io/en/latest/config.html>`_
Example
-------
.. code::
from csoundengine import *
# create an engine with default options for the platform
engine = Engine()
engine.compile(r'''
instr synth
kmidinote = p4
kamp = p5
kcutoff = p6
asig = vco2:a(kamp, mtof:k(kmidinote))
asig = moogladder2(asig, kcutoff, 0.9)
asig *= linsegr:a(0, 0.1, 1, 0.1, 0)
outs asig, asig
endin
''')
# start a synth with indefinite duration. This returns a unique (fractional)
# instance number
p1 = engine.sched("synth", args=[67, 0.1, 3000])
# any parameter with k-rate can be modified while running
engine.setp(p1, 4, 60)
# modify cutoff
engine.setp(p1, 6, 1000, delay=4)
# stop the synth:
engine.unsched(p1)
"""
activeEngines: dict[str, Engine] = {}
"Active engines mapped by name (class variable)"
_builtinTables = engineorc.BUILTIN_TABLES
def __init__(self,
name: str = '',
sr=0,
ksmps: int = 0,
backend: str = '',
outdev: str = '',
indev: str = '',
a4: int = 0,
nchnls: int | None = None,
nchnls_i: int | None = None,
realtime=False,
buffersize: int = 0,
numbuffers: int = 0,
globalcode: str = "",
numAudioBuses: int | None = None,
numControlBuses: int | None = None,
verbose=False,
udpserver=False,
udpport: int = 0,
commandlineOptions: list[str] | None = None,
includes: list[str] | None = None,
midibackend: str = 'default',
midiin: str | None = None,
autosync=False,
latency: float | None = None,
sampleAccurate: bool = False,
numthreads: int = 0,
busSupport=False,
nosound=False,
useProcessQueue=False,
suppressVersion=True
):
import libcsound as lcs
from . import csoundlib
if not name:
name = _generateUniqueEngineName()
elif name in Engine.activeEngines:
raise KeyError(f"Engine '{name}' already exists")
if backend == 'portaudio':
backend = 'pa_cb'
cfg = config
availableBackends = csoundlib.getAudioBackendNames()
if not backend or backend == 'default':
backend = csoundlib.getDefaultBackend().name
elif backend == '?':
selection = internal.selectItem(availableBackends, title="Select Backend")
if not selection:
raise ValueError("No backend selected")
backend = selection
elif backend not in availableBackends:
logger.error(f"Backend {backend} unknown. Available backends: "
f"{availableBackends}")
cascadingBackends = [b.strip() for b in backend.split(",")]
resolvedBackend = internal.resolveOption(cascadingBackends, availableBackends)
logger.debug(f"Resolved backend: {resolvedBackend}")
if resolvedBackend is None:
raise RuntimeError("No audio backends available")
backendDef = csoundlib.getAudioBackend(resolvedBackend)
if not backendDef:
logger.error(f'Could not find any available backends for {backend}')
logger.error(f' Available backends: {", ".join(availableBackends)}')
logger.error('To configure the default backends, do:\n'
' from csoundengine import config\n'
' config.edit()\n'
f'And edit the "{internal.platform}.backend" key.')
raise CsoundError(f'Backend "{resolvedBackend}" not available')
indevs, outdevs = backendDef.audioDevices()
defaultin, defaultout = backendDef.defaultAudioDevices()
indevName, outdevName = "adc", "dac"
if nosound:
outdev, outdevName = '', ''
elif not outdev:
if not defaultout:
raise RuntimeError(f"No output devices for backend {backendDef.name}")
outdev, outdevName = defaultout.id, defaultout.name
logger.debug(f"No output device given for backend {resolvedBackend}, "
f"using default: {outdevName}, id: {outdev}")
if not nchnls:
nchnls = defaultout.numChannels
elif outdev == '?':
if len(outdevs) == 0:
raise RuntimeError("No output audio devices")
logger.debug(f"Selecting output device for backend {resolvedBackend}, devices: {outdevs}")
selected = internal.selectAudioDevice(outdevs, title="Select output device")
if selected is None:
raise RuntimeError("No output audio device selected")
outdev, outdevName = selected.id, selected.name
if not nchnls:
nchnls = selected.numChannels
elif isinstance(outdev, int) or _re.search(r"\bdac[0-9]+\b", outdev):
# dac1, dac8
if backendDef.name == 'jack':
logger.warning(
"This way of setting the audio device is discouraged with jack"
". Use a regex to select a specific client or None to connect"
"to the default client")
if isinstance(outdev, int):
outdev = f"dac{outdev}"
else:
if backendDef.name == 'jack':
outdevName = outdev
else:
selected = next((d for d in outdevs if _fnmatch.fnmatch(d.name, outdev)), None)
if not selected:
raise ValueError(f"Output device {outdev} not known. Possible devices: "
f"{outdevs}")
outdev, outdevName = selected.id, selected.name
if not indev:
if defaultin is None:
raise RuntimeError(f"No default device for backend {backend}")
indev, indevName = defaultin.id, defaultin.name
if not nchnls_i:
nchnls_i = defaultin.numChannels
elif indev == '?':
if len(indevs) == 0:
raise RuntimeError("No input audio devices")
selected = internal.selectAudioDevice(indevs, title="Select input device")
if selected is None:
raise RuntimeError("No output audio device selected")
indev, indevName = selected.id, selected.name
elif isinstance(indev, int) or _re.search(r"\badc[0-9]+\b", indev):
if resolvedBackend == 'jack':
logger.warning(
"This way of setting the audio device is discouraged with jack"
". Use a regex to select a specific client or None to connect"
"to the default client")
if isinstance(indev, int):
indev = f"adc{outdev}"
else:
if resolvedBackend == 'jack':
indevName = indev
else:
selected = next((d for d in indevs if _fnmatch.fnmatch(d.name, indev)), None)
if not selected:
raise ValueError(f"Output device {outdev} not known. Possible devices: "
f"{outdevs}")
indev, indevName = selected.id, selected.name
if midibackend == 'default':
midibackend = 'portmidi'
commandlineOptions = commandlineOptions if commandlineOptions is not None else []
sr = sr if sr else cfg['sr']
backendsr = csoundlib.getSamplerateForBackend(resolvedBackend)
backendDef = csoundlib.getAudioBackend(resolvedBackend)
assert backendDef is not None
if sr and backendDef.hasSystemSr and sr != backendsr:
logger.warning(f"sr requested: {sr}, but backend has a fixed sr ({backendsr})"
f". Using backend's sr")
sr = backendsr
elif not sr:
if backendDef.hasSystemSr:
sr = backendsr
else:
sr = 44100
logger.info("Asked for system sr, but backend '%s', does not"
"have a fixed sr. Using sr=%d", resolvedBackend, sr)
if not a4:
a4 = cfg['A4']
if numthreads == 0:
numthreads = config['numthreads']
if not ksmps:
ksmps = cfg['ksmps']
if nchnls_i is None:
nchnls_i = cfg['nchnls_i']
if nchnls is None:
nchnls = cfg['nchnls']
inchnls, outchnls = csoundlib.getNchnls(resolvedBackend,
outpattern=outdev, inpattern=indev)
if nchnls == -1:
nchnls = outchnls
if nchnls_i == -1:
nchnls_i = inchnls
assert nchnls is not None and nchnls > 0, f"{nchnls=}, {outchnls=}, {inchnls=}, {nchnls_i=}, {cfg=}"
assert nchnls_i is not None and nchnls_i >= 0
if nosound and '--nosound' not in commandlineOptions:
commandlineOptions.append('--nosound')
if not verbose:
commandlineOptions.append('-m0')
commandlineOptions.append('-d')
else:
# When debugging we force print version
suppressVersion = False
if suppressVersion and lcs.VERSION >= 7000:
commandlineOptions.append('--suppress-version')
if sampleAccurate:
commandlineOptions.append('--sample-accurate')
self.name = name
"Name of this Engine"
assert sr is not None and sr > 0
assert isinstance(ksmps, int) and ksmps > 0
if numAudioBuses is None:
numAudioBuses = int(config['num_audio_buses'])
if numControlBuses is None:
numControlBuses = int(config['num_control_buses'])
if busSupport:
if not numAudioBuses and not numControlBuses:
raise ValueError("At least one audio or control bus must be enabled")
super().__init__(sr=sr,
ksmps=ksmps,
nchnls=nchnls,
numAudioBuses=numAudioBuses,
numControlBuses=numControlBuses,
sampleAccurate=sampleAccurate,
a4=a4)
self.backend = resolvedBackend
"Name of the backend used (jack, portaudio, etc)"
self.outdev = outdev
"Output device used"
self.outdevName = outdevName
"Long name of the output device"
self.indev = indev
"Input device"
self.indevName = indevName
"Long name of the input device"
self.nchnls_i: int = nchnls_i
"Number of input channels"
self.globalCode: str = globalcode
"Global (init) code to execute at the start of the Engine"
self.started = False
"Is this engine started?"
self.extraOptions: list[str] = commandlineOptions
"Extra options passed to csound"
self.commandlineOptions: list[str] = []
"""All command line options used to start the engine"""
self.includes: list[str] = includes if includes is not None else []
"List of include files"
self.extraLatency: float = latency if latency is not None else config['sched_latency']
"Added latency for better synch"
self.udpPort = 0
"UDP port used (0 if no udp port is active)"
self.csound: lcs.Csound = lcs.Csound()
"The csound object"
self.autosync: bool = autosync
"""If True, call .sync whenever is needed"""
assert isinstance(busSupport, bool)
self._addBusSupport: bool = busSupport
"""Should this engine add bus support when started?"""
self._hasBusSupport: bool = False
"""Has bus support been added?"""
backendBufferSize, backendNumBuffers = backendDef.bufferSizeAndNum()
buffersize = (buffersize or backendBufferSize or config['buffersize'] or 256)
buffersize = max(self.ksmps * 2, buffersize)
numbuffers = (numbuffers or backendNumBuffers or config['numbuffers'] or
internal.determineNumbuffers(self.backend or "portaudio", buffersize=buffersize))
self.bufferSize: int = buffersize
"Buffer size"
self.numBuffers: int = numbuffers
"Number of buffers to fill"
self.midiBackend: None | str = midibackend
"Midi backend used"
self.started = False
"""Has this engine already started?"""
self.numthreads: int = numthreads
"""Number of threads to use in performance (corresponds to csound -j N)"""
if midiin == 'all':
midiindev = csoundlib.MidiDevice(deviceid='all', name='all')
elif midiin == '?':
midiindevs, midioutdevs = csoundlib.midiDevices(self.midiBackend)
midiindevs.append(csoundlib.MidiDevice('all', 'all'))
selecteddev = internal.selectMidiDevice(midiindevs)
if selecteddev is not None:
midiindev = selecteddev
else:
raise RuntimeError("No MIDI device selected")
elif midiin:
midiindev = csoundlib.MidiDevice(deviceid=midiin, name='')
else:
midiindev = None
self.midiin: csoundlib.MidiDevice | None = midiindev
"Midi input device"
self._uddocket: None | socket.socket = None
self._sendAddr: None | tuple[str, int] = None
if udpserver or udpport > 0:
import emlib.net
self.udpPort = udpport or emlib.net.findport()
self._udpSocket = emlib.net.udpsocket()
self._sendAddr = ("127.0.0.1", self.udpPort)
self._perfThread: lcs.PerformanceThread
self._fracnumdigits = 4 # number of fractional digits used for unique instances
self._exited = False # are we still running?
self._realtime = realtime
self._useProcessQueue = useProcessQueue if lcs.VERSION < 7000 else False
# counters to create unique instances for each instrument
self._instanceCounters: dict[int, int] = {}
# # Maps instrname/number: code
self._instrRegistry: dict[str | int, str] = {}
# Keeps parsed instr bodies, used to analyze pfields, defaults, etc.
self._parsedInstrs: dict[str, csoundparse.ParsedInstrBody] = {}
# a dict of callbacks, reacting to outvalue opcodes
self._outvalueCallbacks: dict[bytes, callback_t] = {}
# Maps used for strSet / strGet
self._indexToStr: dict[int, str] = {}
self._strToIndex: dict[str, int] = {}
self._strLastIndex = 20
# Marks the last modification to the state of the engine, to track sync
self._lastModification = 0.
# global code added to this engine
self._globalCode: dict[str, str] = {}
# This will be a numpy array pointing to a csound table of
# numTokens size. When an instrument wants to return a value to the
# host, the host sends a token, the instr sets table[token] = value
# and calls 'outvale "__sync__", token' to signal that an answer is
# ready
self._responsesTable: np.ndarray
# tokens start at 1, leave token 0 to signal that no sync is needed
# tokens are used as indices to _responsesTable, which is an alias of
# gi__responses
self._tokens = list(range(1, CONSTS['numTokens']))
# a pool of reserved table numbers
reservedTablesStart = CONSTS['reservedTablesStart']
self._tablePool = IntPool(CONSTS['numReservedTables'], start=reservedTablesStart)
# a dict of token:callback, used to register callbacks when asking for
# feedback from csound
self._responseCallbacks: dict[int, Callable] = {}
self._tableCache: dict[int, np.ndarray] = {}
self._tableInfo: dict[int, TableInfo] = {}
self._channelPointers: dict[str, np.ndarray] = {}
self._instrNumCache: dict[str, int] = {}
self._session: None | _session.Session = None
self._busTokenCountPtr: np.ndarray = np.empty((1,), dtype=float)
self._soundfontPresetCountPtr: np.ndarray = np.empty((1,), dtype=float)
self._kbusTable: np.ndarray | None = None
self._busIndexes: dict[int, int] = {}
self._busTokenToKind: dict[int, str] = {}
self._soundfontPresets: dict[tuple[str, int, int], int] = {}
self._soundfontPresetCount = 0
self._startTime = 0.
self._lockedElapsedTime = 0.
self._realElapsedTime = (0., -float('inf'))
# A stack holding locked states
self._clockStatesStack: list[tuple[bool, float]] = []
self._reservedInstrnums: set[int] = set()
self._reservedInstrnumRanges: list[tuple[str, int, int]] = [('builtinorc', CONSTS['reservedInstrsStart'], CONSTS['userInstrsStart']-1)]
self._minCyclesForAbsoluteMode = 4
self._fltptr = _ctypes.POINTER(lcs.MYFLT)
self.version = lcs.VERSION
"""Csound version as integer (6.18 = 6180)"""
self.start()
[docs]
def reservedInstrRanges(self) -> list[tuple[str, int, int]]:
"""
A dict containing reserved instrument number ranges
An Engine has some internal instruments for performing tasks like
automation, bus support, etc. Moreover, if an Engine has an attached
Session, the session will declare a range of instrument numbers
as reserved.
This method returns all those reserved ranges in the form of a
list of tuples, where each tuple represents a reserved range.
Each tuple has the form ``(rangename: str, minInstrNumber: int, maxInstrNumber: int)``,
where ``rangename`` is the name of the range, ``minInstrNumber`` and ``maxInstrNumber``
represent the instr numbers reserved
Any instr number outside of this range can be used.
"""
return self._reservedInstrnumRanges
[docs]
def userInstrumentsRange(self) -> tuple[int, int]:
"""
Returns the range of available instrument numbers
Notice that the first instrument numbers are reserved for internal instruments.
If this Engine has an attached Session, the session itself will reserve
a range of numbers for its events
Returns:
a tuple (mininstr: int, maxinstr: int) defining a range of available
instrument numbers for user instruments.
"""
maxinstr = CONSTS['maxNumInstrs']
if len(self._reservedInstrnumRanges) > 1:
maxinstr = self._reservedInstrnumRanges[1][1]
return CONSTS['userInstrsStart'], maxinstr
@property
def blockSize(self) -> int:
"""
The size of the processing block in samples.
csound defines two variables to control its communication with the audio
backend, a hardware buffer (-B option) and a software buffer (-b option).
With each audio backend these values are interpreted somewhat differently.
In general it can be said that ksmps must divide the software buffer (-b)
and the software buffer itself must divide the hardware buffer (-B).
Common values for these are: ksmps=64, software buffer=256, hardware
buffer=512
"""
return self.bufferSize * self.numBuffers
def __repr__(self):
outdev = self.outdev
if _re.search(r'\bdac([0-9]+)\b', outdev):
outdev += f" ({self.outdevName})"
indev = self.indev
if _re.search(r'\badc([0-9]+)\b', indev):
indev += f" ({self.indevName})"
parts = [f'name={self.name}, sr={self.sr}, backend={self.backend}, outdev={outdev}'
f', nchnls={self.nchnls}']
if self.nchnls_i > 0:
parts.append(f'indev={indev}, nchnls_i={self.nchnls_i}')
parts.append(f'bufferSize={self.bufferSize}')
return f"Engine({', '.join(parts)})"
def __del__(self):
self.stop()
def _getSyncToken(self) -> int:
"""
Get a unique token, to pass to csound for a sync response
"""
token = self._tokens.pop()
self._responsesTable[token] = _UNSET
return token
def _releaseToken(self, token: int) -> None:
""" Release token back to pool when done """
self._tokens.append(token)
def _assignTableNumber(self) -> int:
"""
Return a free table number and mark that as being used.
To release the table, call unassignTable
Returns:
the table number (an integer)
"""
if len(self._tablePool) == 0:
raise RuntimeError("Table pool is empty")
tabnum = self._tablePool.pop()
self._tableCache.pop(tabnum, None)
return tabnum
def _assignEventId(self, instrnum: int | str) -> float:
"""
Assign an eventid (fractional instr number) for this instr
This is not really a unique instance, there might be conflicts
with a previously scheduled event. To really generate a unique instance
we would need to call uniqinstance, which creates a roundtrip to csound
Args:
instrnum (int): the instrument number
Returns:
the eventid assigned, a float
"""
if isinstance(instrnum, str):
instrnum = self.queryNamedInstr(instrnum)
c = self._instanceCounters.get(instrnum, 0)
c += 1
self._instanceCounters[instrnum] = c
instancenum = (c % int(10 ** self._fracnumdigits - 2)) + 1
return self._makeEventId(instrnum, instancenum)
def _makeEventId(self, num: int, instance: int) -> float:
frac = (instance / (10**self._fracnumdigits)) % 1
return num + frac
def _busGlobalInstrs(self) -> None:
if self.hasBusSupport():
self._perfThread.scoreEvent(False, "i", [self._builtinInstrs['clearbuses_post'], 0, -1])
self._modified()
def _setupGlobalInstrs(self):
self._busGlobalInstrs()
[docs]
def stop(self):
"""
Stop this Engine
"""
if not hasattr(self, "name"):
return
elif not self.started or self._exited or (hasattr(self, "csound") and self.csound is None):
logger.debug(f"Engine {self.name} was not running, so can't stop it")
return
logger.debug(f"stopping Engine {self.name}")
if self._session is not None:
self._session.stop()
else:
self._perfThread.stop()
# time.sleep(0.1)
logger.debug("... stopping csound")
self.csound.stop()
# time.sleep(0.1)
logger.debug("... cleaning up")
self.csound.cleanup()
self._exited = True
self._instanceCounters.clear()
self._instrRegistry.clear()
self.activeEngines.pop(self.name, None)
self.started = False
[docs]
def start(self):
"""
Start this engine.
The call to .start() is performed as part of the init process and
only needs to be called explicitely if the engine was previously
stopped. If the engine has already been started this method
does nothing
"""
if self.started:
logger.debug(f"start: Engine {self.name} already started")
return
if priorengine := self.activeEngines.get(self.name):
logger.debug(f"Stopping prior engine with same name ('{self.name}')")
priorengine.stop()
logger.debug("Starting engine '%s'", self.name)
import libcsound as lcs
buffersize = self.bufferSize
optB = buffersize * self.numBuffers
if self.backend == 'jack':
# Resulting -B is always one buffer size smaller, so if we pass -B512 and -b128
# the resulting -B will be 384. We need to account for that
optB = buffersize * (self.numBuffers + 1)
from . import jacktools
jackinfo = jacktools.getInfo()
if jackinfo is None:
logger.error("Asked to use jack as backend, but jack is not running")
raise RuntimeError("jack is not running")
self.sr = jackinfo.samplerate
minB = jackinfo.blocksize if jackinfo.onPipewire else jackinfo.blocksize*2
if optB <= minB:
optB = minB
self.numBuffers = (optB // self.bufferSize)
optB = self.bufferSize * (self.numBuffers + 1)
# self.numBuffers = optB // self.bufferSize
logger.warning(f"csoundengine: Using -b {self.bufferSize}, -B {optB} "
f"(numBuffers: {self.numBuffers}, "
f"jack's blocksize: {jackinfo.blocksize})")
options = ["-d", # suppress all displays
f"-+rtaudio={self.backend}",
f"-b{self.bufferSize}",
f"-B{optB}"]
options.append(f'-o"{self.outdev}"')
options.append(f'-i"{self.indev}"')
if self.numthreads > 1:
options.append(f'-j {self.numthreads}')
if self.midiin is not None:
if self.midiBackend:
options.append(f"-+rtmidi={self.midiBackend}")
options.append(f"-M{self.midiin.deviceid}")
if self._realtime:
options.append("--realtime")
if self.extraOptions:
options.extend(self.extraOptions)
if self.backend == 'jack':
if self.name is not None:
clientname = self.name.strip().replace(" ", "_")
options.append(f'-+jack_client=csoundengine.{clientname}')
if self.udpPort:
options.append(f"--port={self.udpPort}")
if config['disable_signals']:
lcs.csoundInitialize(atExitHandler=False, signalHandler=False)
else:
logger.debug("Starting a csound instance without disabling signals...")
# cs = lcs.Csound()
if self._exited:
self.csound = lcs.Csound()
cs = self.csound
csversion = cs.version()
if csversion < 6180:
raise RuntimeError(f"Csound's version should be >= 6.18, got {csversion:.2f}")
options = list(iterlib.unique(options))
for opt in options:
cs.setOption(opt)
self.commandlineOptions = options
if self.includes:
includelines = [f'#include "{include}"' for include in self.includes]
includestr = "\n".join(includelines)
else:
includestr = ""
assert self.sr > 0
orcheader = engineorc.makeOrcHeader(sr=self.sr, ksmps=self.ksmps, nchnls=self.nchnls, nchnls_i=self.nchnls_i, a4=self.a4)
orc, instrmap = engineorc.makeOrc(globalcode=self.globalCode,
includestr=includestr)
self._builtinInstrs = instrmap
self._reservedInstrnums = set(instrmap.values())
cs.compileOrc(orcheader)
err = cs.compileOrc(orc)
if err:
tmporc = tempfile.mktemp(prefix="csoundengine-", suffix=".orc")
with open(tmporc, "w") as f:
f.write(orcheader)
f.write(orc)
logger.error(f"Error compiling base orchestra. A copy of the orchestra"
f" has been saved to {tmporc}")
logger.error(internal.addLineNumbers(orc))
raise CsoundError(f"Error compiling base ochestra, error: {err}")
logger.debug("Starting csound with options: %s", options)
err = cs.start()
if err != 0:
logger.error(f"\nCsound failed to start, options used: {options}\n")
raise CsoundError(f"Could not start csound (error code: {err})")
responsesTable = cs.table(self._builtinTables['responses'])
if responsesTable is None:
raise RuntimeError("Could not create responses table")
self._responsesTable = responsesTable
self._startTime = time.time()
self._orc = orc
if config['set_sigint_handler']:
internal.setSigintHandler()
# Bidirectional channel to keep track of soundfont presets
chanptr, err = cs.channelPtr("_soundfontPresetCount", kind='control', mode='rw')
assert isinstance(chanptr, np.ndarray), f"_soundfontPresetCount channel is not set: {err}"
self._soundfontPresetCountPtr = chanptr
if self._addBusSupport:
self.addBusSupport()
if csversion < 7000:
pt = cs.performanceThread()
if self._useProcessQueue:
pt.setProcessQueue()
else:
pt = cs.performanceThread()
pt.play()
self._perfThread = pt
self.started = True
self._setupCallbacks()
self._setupGlobalInstrs()
self.activeEngines[self.name] = self
strsets = ["cos", "linear", "smooth", "smoother"]
for s in strsets:
self.strSet(s)
self._modified()
self.sync()
[docs]
def restart(self, wait=0.) -> None:
""" Restart this engine. All defined instrs / tables are removed"""
self.stop()
if wait:
import time
time.sleep(wait)
self.start()
def _outvalueCallback(self, _, channelName, valptr, chantypeptr):
func = self._outvalueCallbacks.get(channelName)
if not func:
logger.error(f"outvalue: callback not set for channel {channelName}")
return
if valptr is not None:
val = _ctypes.cast(valptr, self._fltptr).contents.value
func(channelName, val)
else:
logger.warning(f"outvalueCallback: {channelName=} called with null pointer, skipping")
def _setupCallbacks(self) -> None:
assert self.started
def _syncCallback(_, token):
""" Called with outvalue __sync__, the value is put
in gi__responses at token idx, then __sync__ is
called with token to signal that a response is
waiting. The value can be retrieved via self._responsesTable[token]
"""
token = int(token)
callback = self._responseCallbacks.get(token)
if callback:
callback(token)
self._releaseToken(token)
del self._responseCallbacks[token]
else:
logger.error(f"Unknown sync token: {token}")
self.registerOutvalueCallback("__sync__", _syncCallback)
self.csound.setOutputChannelCallback(self._outvalueCallback)
[docs]
def registerOutvalueCallback(self, chan: str, func: callback_t) -> None:
"""
Set a callback to be fired when "outvalue" is used in csound
Register a function ``func(channelname:str, newvalue: float) -> None``,
which will be called whenever the given channel is modified via
the "outvalue" opcode. Multiple functions per channel can be registered
Args:
chan: the name of a channel
func: a function of the form ``func(chan:str, newvalue: float) -> None``
"""
key = bytes(chan, "ascii")
previousCallback = self._outvalueCallbacks.get(key)
if chan.startswith("__"):
if previousCallback:
logger.warning("Attempting to set a reserved callback, but one "
"is already present. The new one will replace the old one")
self._outvalueCallbacks[key] = func
else:
if previousCallback:
logger.warning(f"Callback for channel {chan} already set, replacing it")
self._outvalueCallbacks[key] = func
[docs]
def bufferLatency(self) -> float:
"""
The latency of the communication to the csound process.
::
bufferLatencySeconds = buffersize * numbuffers / sr
This latency depends on the buffersize and number of buffers.
"""
return self.bufferSize/self.sr * self.numBuffers
[docs]
def controlLatency(self) -> float:
"""
Time latency between a scheduled action and its response.
This is normally ``ksmps/sr * 2`` but the actual latency varies if
the engine is being run in realtime (in that case init-pass is done
async, which might result in longer latency).
"""
return self.ksmps/self.sr * 2
[docs]
def sync(self, timeout=0., force=False, threshold=2.) -> bool:
"""
Block until csound has processed its immediate events
Args:
timeout: a timeout in seconds; 0. = use default timeout as defined
in the configuration (TODO: add link to configuration docs)
force: if True, sync even if not needed
threshold: if time since last modification is longuer than this
threshold assume that sync is not needed
Returns:
True if it synced, False if no sync was performed
Raises TimeoutError if the sync operation takes too long
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine(...)
>>> tables = [e.makeEmptyTable(size=1000) for _ in range(10)]
>>> e.sync()
>>> # do something with the tables
"""
if not force and not self.needsSync():
return False
if self.version >= 7000:
self._perfThread.flushMessageQueue()
elif self._perfThread._processQueue:
self._perfThread.flushProcessQueue()
else:
self.pingback(timeout=timeout)
self._lastModification = 0
return True
def pingback(self, timeout: float = 0.):
token = self._getSyncToken()
self._eventWait(token, [self._builtinInstrs['pingback'], 0, 0, token], timeout=timeout)
def _compileCode(self, code: str, block=False) -> None:
if not block and self.udpPort and config['prefer_udp']:
logger.debug("Sengind code via udp: \n{code}")
self._udpSend(code)
self._modified()
return
if self.version >= 7000:
self._perfThread.compileOrc(code)
if block:
self._perfThread.flushMessageQueue()
elif self._perfThread._processQueue is None:
self.csound.compileOrc(code, block=block)
else:
if not block:
self._perfThread.processQueueTask(lambda cs: cs.compileOrc(code))
else:
q = _queue.SimpleQueue()
self._perfThread.processQueueTask(lambda cs, q=q: q.put(cs.compileOrc(code)))
err = q.get()
if err:
logger.error("compileOrc error: ")
logger.error(internal.addLineNumbers(code))
raise CsoundError("Could not compile code")
self._modified()
def _modified(self, status=True) -> None:
self._lastModification = time.time() if status else 0
[docs]
def needsSync(self, threshold=1.) -> bool:
"""
True if ths engine has been modified
Actions that modify an engine: code compilation, table allocation, ...
"""
if self._lastModification == 0:
return False
if time.time() - self._lastModification > threshold:
self._lastModification = 0
return False
return True
def _compileInstr(self, instrname: str|int, code: str, block=False) -> None:
self._instrRegistry[instrname] = code
self._compileCode(code, block=block)
if isinstance(instrname, str):
# if named instrs are defined we sync in order to avoid assigning
# the same number to different instrs. This should be taken
# care by csound by locking but until this is in place, we
# need to sync
if not block and instrname not in self._instrNumCache:
self.sync()
self._queryNamedInstrs([instrname], timeout=0.1 if block else 0)
[docs]
def compile(self, code: str, block=False) -> None:
"""
Send orchestra code to the running csound instance.
The code sent can be any orchestra code
Args:
code: the code to compile
block: if True, this method will block until the code
has been compiled
Raises :class:`CsoundError` if the compilation failed
.. note::
If this instance has been started with a UDP port and
the config option 'prefer_udp' is true, the code will be sent
via udp. Otherwise the API is used. This might have an impact in
the resulting latency of the operation, since using the API when
running a performance thread can cause delays under certain
circumstances
Example
~~~~~~~
>>> e = Engine()
>>> e.compile("giMelody[] fillarray 60, 62, 64, 65, 67, 69, 71")
>>> code = open("myopcodes.udo").read()
>>> e.compile(code)
"""
codeblocks = csoundparse.parseOrc(code, keepComments=False)
for codeblock in codeblocks:
if codeblock.kind == 'include':
includepath = csoundparse.splitInclude(codeblock.text)
if not os.path.exists(includepath):
logger.warning(f"Include path not found '{includepath}'")
self.includes.append(includepath)
elif codeblock.kind == 'instr':
parsedbody = csoundparse.instrParseBody(csoundparse.instrGetBody(codeblock.lines))
self._parsedInstrs[codeblock.name] = parsedbody
self._instrRegistry[codeblock.name] = codeblock.text
if codeblock.name[0].isdigit():
instrnum = int(codeblock.name)
for rangename, mininstr, maxinstr in self._reservedInstrnumRanges:
if mininstr <= instrnum < maxinstr:
logger.error(f"Instrument number {instrnum} is reserved. Code:")
logger.error("\n" + _textwrap.indent(codeblock.text, " "))
raise ValueError(f"Cannot use instrument number {instrnum}, "
f"the range {mininstr} - {maxinstr} is reserved for '{rangename}'")
if instrnum in self._reservedInstrnums:
raise ValueError("Cannot compile instrument with number "
f"{instrnum}: this is a reserved instr and "
f"cannot be redefined. Reserved instrs: "
f"{sorted(self._reservedInstrnums)}")
self._compileCode(code)
names = [b.name for b in codeblocks if b.kind == 'instr' and b.name[0].isalpha()]
if names:
# if named instrs are defined we sync in order to avoid assigning
# the same number to different instrs. This should be taken
# care by csound by locking but until this is in place, we
# need to sync
if not block and any(name not in self._instrNumCache for name in names):
self.sync()
namesToRegister = [n for n in names if n not in self._instrNumCache]
if namesToRegister:
self._queryNamedInstrs(namesToRegister, timeout=0.1 if block else 0, delay=self.bufferLatency())
[docs]
def evalCode(self, code: str) -> float:
"""
Evaluate code, return the result of the evaluation
The expression to evaluate must be prefixed by ``return``
Args:
code (str): the code to evaluate. Usually an expression returning
a float value (see example)
Returns:
the result of the evaluation
Example
~~~~~~~
>>> e = Engine()
>>> e.compile(r'''
... instr foo
... outch 1, oscili(0.1, p4)
... endin''')
>>> e.evalCode('return nstrnum("foo")')
"""
assert self.started and self.csound is not None
if self.version >= 7000:
out = self._perfThread.evalCode(code)
elif self._useProcessQueue:
q = _queue.SimpleQueue()
self._perfThread.processQueueTask(lambda cs, q=q: q.put(cs.evalCode(code)))
out = q.get()
else:
out = self.csound.evalCode(code)
self._modified(False)
return out
[docs]
def tableWrite(self, tabnum: int, idx: int, value: float, delay=0.) -> None:
"""
Write to a specific index of a table
Args:
tabnum (int): the table number
idx (int): the index to modify
value (float): the new value
delay (float): delay time in seconds. Use 0 to write synchronously
.. seealso::
:meth:`~Engine.getTableData`
:meth:`~Engine.fillTable`
"""
assert self.started
if delay == 0:
arr = self.tableData(tabnum)
if arr is None:
raise ValueError(f"table {tabnum} not found")
arr[idx] = value
else:
pargs = [self._builtinInstrs['tabwrite'], delay, 0, tabnum, idx, value]
self._perfThread.scoreEvent(False, "i", pargs)
[docs]
def tableData(self, idx: int, flat=False) -> np.ndarray:
"""
Returns a numpy array pointing to the data of the table.
Any modifications to this array will modify the table itself.
.. note::
Multichannel audio is loaded into a csound table as a flat
array with samples interleaved.
Args:
idx: the table index
flat: if True, the data will be returned as a flat (1D) array
even if the table holds a multi-channel sample.
Returns:
a numpy array pointing to the data array of the table. Raises IndexError
if the table was not found
"""
arr: np.ndarray | None = self._tableCache.get(idx)
if arr is not None:
return arr
if self.version >= 7000:
# Accessing the table directly is faster than using requestCallback :-)
arr = self.csound.table(idx)
# q = _queue.SimpleQueue()
# self._perfThread.requestCallback(lambda cs, q=q, idx=idx: q.put(cs.table(idx)))
# arr = q.get()
elif self._perfThread._processQueue is None:
arr = self.csound.table(idx)
else:
q = _queue.SimpleQueue()
self._perfThread.processQueueTask(lambda cs, q=q, idx=idx: q.put(cs.table(idx)))
arr = q.get()
if arr is None:
raise ValueError(f"Table {idx} does not exist")
if not flat:
tabinfo = self.tableInfo(idx)
if not tabinfo:
raise IndexError(f"Table {idx} not found")
if tabinfo.nchnls > 1:
if tabinfo.size == tabinfo.numFrames*tabinfo.nchnls+1:
arr = arr[:-1]
arr.shape = (tabinfo.numFrames, tabinfo.nchnls)
self._tableCache[idx] = arr
return arr
[docs]
def realElapsedTime(self, threshold=0.1) -> float:
"""
Reports the elapsed time of the engine, independent of any locking
Args:
threshold: the reporting threshold. If this method is called multiple times
during this time interval the engine time is extrapolated from the
time reported by python and no call to csound is made
Returns:
the time elapsed since start of the engine.
.. seealso:: :meth:`Engine.elapsedTime`
"""
# return self.csound.currentTimeSamples() / self.sr
reportedTime, lastTime = self._realElapsedTime
now = time.time()
if now - lastTime > threshold:
reportedTime = self.csound.currentTimeSamples() / self.sr
self._realElapsedTime = (reportedTime, now)
else:
reportedTime += now - lastTime
return reportedTime
[docs]
def elapsedTime(self) -> float:
"""
Returns the elapsed time since start of the engine
This time is used as a reference when scheduling events. Since scheduling
itself takes a small but not negligible amount of time, when scheduling
a great number of events, these will fall out of sync. For this reason
the elapsed time can be used as a reference to schedule events in
absolute time. Moreover, the elapsed time stays unmodified
as long as the engine's clock is locked for scheduling (see example)
Example
~~~~~~~
>>> from csoundengine import Engine
>>> import numpy as np
>>> e = Engine()
>>> e.compile(r'''
... instr 100
... ifreq = p4
... outch 1, oscili:a(0.1, ifreq) * linseg:a(0, 0.01, 1, 0.1, 0)
... endin
... ''')
>>> now = e.elapsedTime()
>>> for t in np.arange(0, 60, 0.2):
... e.sched(100, t+now, 0.15, args=[1000], relative=False)
... e.sched(100, t+now, 0.15, args=[800], relative=False)
>>> # The same result can be achieved by locking the elapsed-time clock:
>>> with e.lockedClock():
... for t in np.arange(0, 10, 0.2):
... e.sched(100, t, 0.15, args=[1000])
... e.sched(100, t, 0.15, args=[800])
"""
# _lockedELapsedTime will be a value if this engine is locked, or None otherwise
return self._lockedElapsedTime or self.realElapsedTime()
[docs]
def lockClock(self, lock=True):
"""
Lock the elapsed time clock
This ensures that events scheduled while the clock is locked will run
in sync. For this to work all events scheduled must have some latency (they
must run in the future)
Example
~~~~~~~
>>> from csoundengine import Engine
>>> import numpy as np
>>> e = Engine()
>>> e.compile(r'''
... instr 100
... ifreq = p4
... outch 1, oscili:a(0.1, ifreq) * linseg:a(0, 0.01, 1, 0.1, 0)
... endin
... ''')
>>> e.lockClock()
>>> for t in np.arange(0, 10, 0.2):
... e.sched(100, t, 0.15, args=[1000])
... e.sched(100, t, 0.15, args=[800])
>>> e.lockClock(False)
.. seealso:: :meth:`Engine.elapsedTime`, :meth:`Engine.lockedClock`
"""
if lock:
if self.isClockLocked():
logger.debug("The elapsed time clock is already locked")
else:
self._lockedElapsedTime = self.csound.currentTimeSamples()/self.sr
else:
if not self._lockedElapsedTime:
logger.debug("Asked to unlock the elapsed time clock, but it was not locked")
self._lockedElapsedTime = 0
[docs]
def isClockLocked(self) -> bool:
"""Returns True if the clock is locked"""
return self._lockedElapsedTime > 0
[docs]
def pushLock(self, latency: float | None = None):
"""
Lock the clock of this engine
Allows for recursive locking, so users do not need to
see if what the current state of the lock is
.. seealso:: :meth:`Engine.popLock`
"""
islocked = self.isClockLocked()
oldlatency = self.extraLatency
self._clockStatesStack.append((islocked, oldlatency))
if not islocked:
self.lockClock(True)
if latency is not None:
self.extraLatency = latency
logger.debug(f"Locking this engine's clock with latency {self.extraLatency}")
[docs]
def popLock(self):
"""
Reverts the action of pushLock, unlocking the clock
.. seealso:: :meth:`Engine.pushLock`
"""
if not self._clockStatesStack:
logger.warning("Clock stack is empty, ignoring")
return
waslocked, latency = self._clockStatesStack.pop()
if not waslocked:
self.lockClock(False)
if latency is not None:
self.extraLatency = latency
def __enter__(self):
self.pushLock()
return self
def __exit__(self, *args, **kws):
self.popLock()
[docs]
def lockedClock(self, latency: float | None = None) -> Engine:
"""
Context manager, locks and unlocks the reference time
By locking the reference time it is possible to ensure that
events which are supposed to be in sync are scheduled correctly
into the future.
.. note::
A shortcut for this is to just use the engine as context manager::
with engine:
engine.sched(...)
engine.sched(...)
engine.session().sched(...)
...
Example
~~~~~~~
>>> from csoundengine import Engine
>>> import numpy as np
>>> e = Engine()
>>> e.compile(r'''
... instr 100
... ifreq = p4
... outch 1, oscili:a(0.1, ifreq) * linseg:a(0, 0.01, 1, 0.1, 0)
... endin
... ''')
>>> with e.lockedClock():
... for t in np.arange(0, 10, 0.2):
... e.sched(100, t, 0.15, args=[1000])
... e.sched(100, t, 0.15, args=[800])
"""
return self
def _presched(self, delay: float, relative: bool) -> tuple[bool, float]:
"""
Args:
delay: time offset
relative: is this offset relative?
Returns:
tuple (isabsolute: bool, delay: float)
"""
if relative:
if (delay > self.onecycle * self._minCyclesForAbsoluteMode) or self._lockedElapsedTime:
t0 = self.elapsedTime()
delay = t0 + delay + self.extraLatency
relative = False
# 1 if absolute, 0 if relative
return not relative, delay
[docs]
def sched(self,
instr: int | float | str,
delay=0.,
dur=-1.,
*pfields,
args: np.ndarray | Sequence[float | str] = (),
relative=True,
unique=False,
**namedpfields
) -> float:
"""
Schedule an instrument
Args:
instr : the instrument number/name. If it is a fractional number,
that value will be used as the instance number.
An integer or a string will result in a unique instance assigned
by csound if unique is True. Named instruments
with a fractional number can also be scheduled (for example,
for an instrument named "myinstr" you canuse "myinstr.001")
delay: time to wait before instrument is started. If relative is False,
this represents the time since start of the engine (see examples)
dur: duration of the event
args: any other args expected by the instrument, starting with p4
(as a list of floats/strings, or a numpy array). Any
string arguments will be converted to a string index via strSet. These
can be retrieved in csound via strget
relative: if True, delay is relative to the scheduling time,
otherwise it is relative to the start time of the engine.
To get an absolute time since start of the engine, call
`engine.elapsedTime()`
unique: if True, assign a unique p1
namedpfields: pfields can be given as keyword arguments of the form p4=..., p6=...
Defaults are filled with values defined via ``pset``
Returns:
a fractional p1 of the instr started, which identifies this event.
If instr is a fractional named instr, like "synth.01", then this
same instr is returned as eventid (as a string).
Example
-------
.. code-block :: python
from csoundengine import *
e = Engine()
e.compile(r'''
instr 100
kfreq = p4
kcutoff = p5
Smode strget p6
asig vco2 0.1, kfreq
if strcmp(Smode, "lowpass") == 0 then
asig moogladder2 asig, kcutoff, 0.95
else
asig K35_hpf asig, kcutoff, 9.0
endif
outch 1, asig
endin
''')
eventid = e.sched(100, 2, args=[200, 400, "lowpass"])
# simple automation in python
for cutoff in range(400, 3000, 10):
e.setp(eventid, 5, cutoff)
time.sleep(0.01)
e.unsched(eventid)
#
# To ensure simultaneity between events:
now = e.elapsedTime()
for t in np.arange(2, 4, 0.2):
e.sched(100, t+now, 0.2, relative=False)
.. seealso:: :meth:`~csoundengine.engine.Engine.unschedAll`
"""
assert self.started
isabsolute, delay = self._presched(delay=delay, relative=relative)
if self.autosync and self.needsSync():
self.sync()
if pfields and args:
raise ValueError("Either pfields as positional arguments or args can be given, "
"got both")
elif pfields:
args = pfields
if namedpfields:
instrdef = self._parsedInstrs.get(instr if isinstance(instr, str) else str(int(instr)))
if instrdef:
kwargs = csoundparse.normalizeNamedPfields(namedpfields, instrdef.pfieldNameToIndex)
else:
assert all(csoundparse.isPfield(key) for key in namedpfields)
kwargs = {int(key[1:]):value for key, value in namedpfields.items()}
args = csoundparse.fillPfields(args, kwargs, defaults=instrdef.pfieldIndexToValue if instrdef else None)
if isinstance(instr, int):
instrfrac = self._assignEventId(instr) if unique else instr
elif isinstance(instr, float):
if unique and int(instr) == instr:
instrfrac = self._assignEventId(int(instr))
else:
instrfrac = instr
elif isinstance(instr, str):
if not unique and "." not in instr:
instrnum = self._instrNumCache.get(instr)
if not instrnum:
msg = f'i {instr} {delay} {dur}'
if args:
msg += ' '.join(map(str, args))
self._perfThread.inputMessage(msg)
return 0
instrfrac = instrnum
if "." in instr:
name, fractionstr = instr.split(".")
instrnum = self._instrNumCache.get(name)
if instrnum:
instrfrac = instrnum+float("."+fractionstr)
else:
msg = f'i {instr} {delay} {dur}'
if args:
msg += ' '.join(map(str, args))
self._perfThread.inputMessage(msg)
return 0
else:
instrfrac = self._assignEventId(instr)
else:
raise TypeError(f"Expected a float, an int or a str as instr, "
f"got {instr} (type {type(instr)})")
if isinstance(args, np.ndarray):
pargsnp = np.empty((len(args)+3,), dtype=float)
pargsnp[0] = instrfrac
pargsnp[1] = delay
pargsnp[2] = dur
pargsnp[3:] = args
# 1: we use always absolute time
self._perfThread.scoreEvent(isabsolute, "i", pargsnp)
elif not args:
pargs = [instrfrac, delay, dur]
self._perfThread.scoreEvent(isabsolute, "i", pargs)
elif isinstance(args, (list, tuple)):
pargs = [instrfrac, delay, dur]
pargs.extend(float(a) if not isinstance(a, str) else self.strSet(a) for a in args)
#if any(isinstance(a, str) and a not in self._strToIndex for a in args):
# self.sync()
self._perfThread.scoreEvent(isabsolute, "i", pargs)
else:
raise TypeError(f"Expected a sequence or array, got {args}")
return instrfrac
def _queryNamedInstrs(self, names: list[str], timeout=0.1, callback=None, delay=0.
) -> None:
"""
Query assigned instr numbers
This operation is async if timeout is 0 or a callback is given.
Otherwise it blocks for at most `timeout` time. If the operation
timesout an exception is raised. The results are placed in the internal
cache
Args:
names: the names to query
timeout: if 0, the operation is async. Otherwise active polling is done
for this amount of time
callback: a func of the form `func(name2instr:dict[str, int])` which will be
called when all instrs have an assigned instr number
called for
"""
if not timeout or callback:
# query async
if not callback:
for name in names:
self._queryNamedInstrAsync(name, delay=delay)
else:
results: dict[str, int] = {}
def mycallback(name, instrnum, n=len(names), results=results, callback=callback):
results[name] = instrnum
if len(results) == n:
callback(results)
for name in names:
self._queryNamedInstrAsync(name, delay=delay, callback=mycallback)
else:
# blocking
q = _queue.SimpleQueue()
def _func(csound, names=names, q=q):
nums = []
for name in names:
num = csound.evalCode(f'return nametoinstrnum "{name}"')
nums.append(num)
q.put(nums)
self._perfThread.requestCallback(_func)
nums = q.get()
for num, name in zip(nums, names):
if num > 0:
self._instrNumCache[name] = int(num)
if (body := self._instrRegistry.get(name)) is not None:
self._instrRegistry[num] = body
def _queryNamedInstrAsync(self, name: str, delay=0., callback=None) -> None:
"""
Query the assigned instr number async
The result is put in the cache and, if given, callback is called
as `callback(name: str, instrnum: int)`
"""
if self.version >= 7000:
def func7(cs, f=callback, name=name):
num = int(cs.evalCode(f'return nametoinstrnum:i("{name}")'))
self._instrNumCache[name] = num
if f:
f(name, num)
self._perfThread.requestCallback(func7)
else:
synctoken = self._getSyncToken()
msg = f'i {self._builtinInstrs["nstrnum"]} {delay} 0 {synctoken} "{name}"'
def _callback(synctoken, instrname=name, func=callback):
instrnum = int(self._responsesTable[synctoken])
self._instrNumCache[instrname] = instrnum
if (body := self._instrRegistry.get(instrname)):
self._instrRegistry[instrnum] = body
if func:
func(name, instrnum)
self._inputMessageWithCallback(synctoken, msg, _callback)
[docs]
def queryNamedInstr(self, instrname: str, cached=True, callback=None) -> int:
""" Find the instrument number corresponding to instrument name
Args:
instrname: the name of the instrument
cached: if True, results are cached
callback: if given, the operation is async and the callback will
be called when the result is available. Callback is of
the form ``func(instrname: str, instrnum: int) -> None``. If no
callback is given this method blocks until the instrument number
is returned
Returns:
the instr number if called without callback, 0 otherwise. If the instrument was
not found (either because it was never compiled or the compilation is not ready yet)
-1 will be returned
"""
if cached and (instrnum := self._instrNumCache.get(instrname, 0)) > 0:
if callback:
callback(instrname, instrnum)
return instrnum
if callback:
self._queryNamedInstrAsync(instrname, delay=0, callback=callback)
return 0
# Block!
if self.version >= 7000 or self._perfThread._processQueue is not None:
instrnum = int(self._perfThread.evalCode(f'return nametoinstrnum:i("{instrname}")'))
else:
token = self._getSyncToken()
msg = f'i {self._builtinInstrs["nstrnum"]} 0 0 {token} "{instrname}"'
out = self._inputMessageWait(token, msg)
if out is None or out <= 0:
raise RuntimeError(f"Could not query the instrument number for '{instrname}'")
instrnum = int(out)
self._instrNumCache[instrname] = instrnum
return instrnum
[docs]
def print(self, msg: str, delay=0.) -> None:
"""
Print a message via csound
Args:
msg: the message to print
delay: when to print it
"""
instrnum = self._builtinInstrs['print']
self._perfThread.inputMessage(f'i {instrnum} {delay} 0. "{msg}"')
[docs]
def unsched(self, p1: float | str, delay: float = 0, future=False) -> None:
"""
Stop a playing event
If p1 is a round number, all events with the given number
are unscheduled. Otherwise only an exact matching event
is unscheduled, if it exists
Args:
p1: the instrument number/name to stop
delay: if 0, remove the instance as soon as possible
future: if True, unsched an event in the future
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine(...)
>>> e.compile(r'''
... instr sine
... a0 oscili 0.1, 1000
... outch 1, a0
... endin
... ''')
>>> # sched an event with indefinite duration
>>> eventid = e.sched(10, 0, -1)
>>> e.unsched(eventid, 10)
.. seealso:: :meth:`~Engine.unschedAll`
"""
dur = 0
if future:
mode = -1
dur = 0.01
elif (isinstance(p1, float) and int(p1) != p1) or (isinstance(p1, str) and "." in p1):
mode = 4
else:
mode = 0
pfields = [self._builtinInstrs['turnoff'], delay, dur, p1, mode]
self._perfThread.scoreEvent(False, "i", pfields)
[docs]
def unschedAll(self) -> None:
"""
Remove all playing and future events
.. seealso:: :meth:`~csoundengine.engine.Engine.unsched`, :meth:`~csoundengine.engine.Engine.unschedFuture`
"""
self.csound.rewindScore()
self._setupGlobalInstrs()
[docs]
def session(self,
priorities: int | None = None,
maxControlsPerInstr: int | None = None,
numControlSlots: int | None = None
) -> _session.Session:
"""
Return the Session corresponding to this Engine
Since each Engine can have only one associated Session,
the parameters passed are only valid for the creation of
the Session. Any subsequent call to this method returns the
already created Session, and the arguments passed are not
taken into consideration.
Args:
priorities: the max. number of priorities for scheduled instrs
numControlSlots: the total number of slots allocated for
dynamic args. The default is determined by the config
'dynamic_args_num_slots'
maxControlsPerInstr: the max. number of dynamic args per instr
(the default is set in the config 'max_dynamic_args_per_instr')
Returns:
the corresponding Session
Example
~~~~~~~
>>> from csoundengine import *
>>> session = Engine().session()
>>> session.defInstr("synth", r'''
... kamp = p5 ; notice that p4 is reserved
... kmidi = p6
... asig vco2 kamp, mtof:k(kmidi)
... chnmix asig, "mix1"
... ''')
>>> session.defInstr("post", r'''
... a1 chnget "mix1"
... a2 chnget "mix2"
... aL, aR reverbsc a1, a2, 0.85, 12000, sr, 0.5, 1
... outch 1, aL, 2, aR
... chnclear "mix1", "mix2"
... ''')
>>> session.sched("post", priority=2)
>>> for i, midi in enumerate([60, 64, 67]):
... session.sched("synth", delay=i, dur=4, kamp=0.1, kmidi=midi)
"""
if self._session is None:
from .session import Session
self._session = Session(engine=self,
priorities=priorities,
numControlSlots=numControlSlots,
maxControlsPerInstr=maxControlsPerInstr)
else:
if maxControlsPerInstr is not None and maxControlsPerInstr != self._session.maxDynamicArgs:
logger.info(f"Asking to create a session with dynamicArgsPerInstr={maxControlsPerInstr}, "
f"which differs from the value of the current session "
f"({self._session.maxDynamicArgs}). The old value will be kept")
if numControlSlots is not None and numControlSlots != self._session._dynargsNumSlots:
logger.info(f"Asking to create a session with dynamicArgsSlices={numControlSlots}, "
f"which differs from the value of the current session "
f"({self._session._dynargsNumSlots}). The old value will be kept")
if priorities is not None and priorities != self._session.numPriorities:
logger.info(f"Asking to create a session with priorites={priorities}, "
f"which differs from the value of the current session "
f"({self._session.numPriorities}). The old value will be kept")
return self._session
def _reserveInstrRange(self, name: str, mininstrnum: int, maxinstrnum: int) -> None:
"""
Declares the instrument numbers in the given range as reserved
Instrument numbers within this range will not be allocated when using
named instruments. This is used internally by a :class:`~csoundengine.session.Session`
to reserve instrument numbers to be assigned to the different priorities.
Args:
name: the name of the reserved block
mininstrnum: lowest instrument number to reserve
maxinstrnum: highest instrument number to reserve (not included in the range)
"""
self._reservedInstrnumRanges.append((name, mininstrnum, maxinstrnum))
[docs]
def makeEmptyTable(self, size, numchannels=1, sr=0, delay=0., block=False
) -> int:
"""
Create an empty table, returns the index of the created table
Args:
size: the size of the table
numchannels: if the table will be used to hold audio, the
number of channels of the audio
sr: the samplerate of the audio, if the table is used to hold audio
delay: when to create the table
Example
~~~~~~~
Use a table as an array of buses
>>> from csoundengine import *
>>> engine = Engine()
>>> source = engine.makeEmptyTable(128)
>>> engine.compile(r'''
... instr 100
... imidi = p4
... iamptab = p5
... islot = p6
... kamp table islot, iamptab
... asig = oscili:a(interp(kamp), mtof(imidi))
... outch 1, asig
... endin
... ''')
>>> tabarray = engine.getTableData(source)
>>> tabarray[0] = 0.5
>>> eventid = engine.sched(100, args=[67, source, 0])
.. seealso::
:meth:`~Engine.makeTable`
:meth:`~Engine.fillTable`
:meth:`~Engine.automateTable`
"""
if block and delay > 0:
raise ValueError(f"Either block=True or delay > 0, got {block=}, {delay=}")
tabnum = self._assignTableNumber()
self._tableInfo[tabnum] = TableInfo(sr=sr, size=size, nchnls=numchannels)
if block or self.version >= 7000:
if sr == 0 and numchannels == 1:
self._perfThread.scoreEvent(False, "f", [tabnum, delay, -size, -2, 0])
if block:
self._perfThread.flushMessageQueue()
else:
self._compileCode(fr'''
i__tabnum__ ftgen {tabnum}, 0, {size}, -2, 0
ftsetparams {tabnum}, {sr}, {numchannels}
''', block=block)
else:
if sr == 0:
self._perfThread.scoreEvent(False, "f", [tabnum, delay, -size, -2, 0])
else:
itoken = 0 # we don't need a notification
iempty = 1
self._perfThread.scoreEvent(False, "i", [self._builtinInstrs['maketable'], delay,
0, itoken, tabnum, size, iempty, sr, numchannels])
self._modified()
return tabnum
[docs]
def makeTable(self,
data: Sequence[float] | np.ndarray,
sr: int = 0,
tabnum: int = -1,
block=True,
callback=None,
) -> int:
"""
Create a new table and fill it with data.
Args:
data: the data used to fill the table
tabnum: the table number. If -1, a number is assigned by the engine.
block: wait until the table is actually created
callback: call this function when ready - f(token, tablenumber) -> None
sr: only needed if filling sample data. If given, it is used to fill the
table metadata in csound, as if this table had been read via gen01
Returns:
the index of the new table, if wait is True
Example
~~~~~~~
.. code-block:: python
from csoundengine import *
e = Engine()
import sndfileio
sample, sr = sndfileio.sndread("stereo.wav")
# modify the sample in python
sample *= 0.5
source = e.makeTable(sample, sr=sr, block=True)
e.playSample(source)
.. seealso:: :meth:`~csoundengine.engine.Engine.readSoundfile`, :meth:`~csoundengine.engine.Engine.fillTable`
"""
dataarr = np.asarray(data)
nchnls = internal.arrayNumChannels(dataarr)
numitems = len(dataarr) * nchnls
flatdata = dataarr if nchnls == 1 else dataarr.flatten()
if tabnum == -1:
tabnum = self._assignTableNumber()
elif tabnum == 0:
tabnum = self._makeTableNotify(data=data, tabnum=0, sr=sr, numchannels=nchnls)
self._tableInfo[tabnum] = TableInfo(sr=sr, size=len(data), nchnls=nchnls)
self._modified()
return tabnum
self._tableCache.pop(int(tabnum), None)
if self.version < 7000 and not self._perfThread._processQueue:
if callback:
self._makeTableNotify(data=data, sr=sr, tabnum=tabnum, callback=callback)
elif not block:
self._makeTableAsync(tabnum=tabnum, data=flatdata, sr=sr, numchannels=nchnls)
else:
# block
q = _queue.SimpleQueue()
self._makeTableNotify(data=data, sr=sr, tabnum=tabnum, callback=lambda q=q: q.put(True))
_ = q.get()
else:
ev = _threading.Event() if block else None
def task(cs, func=callback, ev=ev):
lines = [f'i__tabnum ftgen {tabnum}, 0, {numitems}, -2, 0']
if sr > 0 or nchnls > 1:
# ftsetparams ift, isamplerate, inumchannels, iloopstart=0, ibasenote=60
lines.append(f'ftsetparams {tabnum}, {sr}, {nchnls}')
cs.compileOrc('\n'.join(lines))
tabptr = cs.table(tabnum)
if tabptr is None:
raise RuntimeError(f"Could not create csound table {tabnum}")
self._tableCache[tabnum] = tabptr
tabptr[:] = flatdata
if func:
func(tabnum)
elif ev:
ev.set()
if lcs.VERSION >= 7000:
self._perfThread.requestCallback(task)
else:
self._perfThread.processQueueTask(task)
if ev:
ev.wait()
self._tableInfo[tabnum] = TableInfo(sr=sr, size=len(data), nchnls=nchnls)
self._modified()
assert tabnum > 0
return tabnum
def _makeTableAsync(self, tabnum: int, data: np.ndarray, sr=0, numchannels=1, delay=0.):
"""
Create a table async, without process queue or csound 7's requestCallback
"""
assert len(data.shape) == 1, "Only 1D arrays accepted here"
arr = np.zeros((len(data)+4,), dtype=float)
arr[0:4] = [tabnum, delay, len(data), -2]
arr[4:] = data
self._perfThread.scoreEvent(False, "f", arr)
if sr > 0 or numchannels > 1:
pargs = [self._builtinInstrs['ftsetparams'], delay, 0., tabnum, sr, numchannels]
self._perfThread.scoreEvent(False, "i", pargs)
[docs]
def tableExists(self, tabnum: int) -> bool:
"""
Returns True if a table with the given number exists
"""
# TODO: really check if the table exists
try:
_ = self.tableInfo(tabnum)
except TableNotFoundError:
return False
return True
def _registerSync(self, token: int) -> _queue.SimpleQueue:
"""
Register a token for a __sync__ callback
Args:
token: an int token, as returned via _getSyncToken
Returns:
a Queue. It can be used to wait on, the value obtained
will be the returned value
"""
q: _queue.SimpleQueue[float] = _queue.SimpleQueue()
self._responseCallbacks[token] = lambda token, q=q, table=self._responsesTable: q.put(float(table[token]))
return q
[docs]
def callLater(self, delay: float, callback: Callable) -> None:
"""
Call callback after delay, triggered by csound scheduler
Args:
delay: the delay time, in seconds
callback: the callback, a function of the sort ``() -> None``
The callback will be called after the given delay, plus some jitter
(~ 2/3 k-cycles after, never before)
Example
~~~~~~~
>>> from csoundengine import *
>>> import time
>>> e = Engine()
>>> start = time.time()
>>> e.callLater(2, lambda:print(f"Elapsed time: {time.time() - start:.4f}"))
2.0018
"""
if delay == 0:
callback()
else:
token = self._getSyncToken()
pargs = [self._builtinInstrs['pingback'], delay, 0.01, token]
self._eventWithCallback(token, pargs, lambda token: callback())
def _eventWait(self, token: int, pargs: Sequence[float], timeout: float = 0.
) -> float | None:
if not timeout:
timeout = config['timeout']
else:
assert timeout > 0
q = self._registerSync(token)
self._perfThread.scoreEvent(False, "i", pargs)
try:
outvalue = q.get(block=True, timeout=timeout)
self._modified(False)
return outvalue if outvalue != _UNSET else None
except _queue.Empty:
raise TimeoutError(f"{token=}, {pargs=}")
[docs]
def plotTableSpectrogram(self,
tabnum: int,
fftsize=2048,
mindb=-90,
maxfreq: int = 0,
overlap: int = 4,
minfreq: int = 0,
sr: int = 44100,
chan=0
) -> None:
"""
Plot a spectrogram of the audio data in the given table
Requires that the samplerate is set, either because it was read via
gen01 (or using .readSoundfile), or it was manually set via setTableMetadata
Args:
source: the table to plot
fftsize (int): the size of the fft
mindb (int): the min. dB to plot
maxfreq (int): the max. frequency to plot
overlap (int): the number of overlaps per window
minfreq (int): the min. frequency to plot
sr: the fallback samplerate, used when the table has no samplerate
information of its own
chan: which channel to plot if the table is multichannel
Example
-------
>>> from csoundengine import *
>>> e = Engine()
>>> source = e.readSoundfile("mono.wav", block=True)
>>> e.plotTableSpectrogram(source)
.. image:: assets/tableproxy-plotspectrogram.png
"""
from . import plotting
data = self.tableData(tabnum)
if internal.arrayNumChannels(data) > 1:
data = data[:, chan]
tabinfo = self.tableInfo(tabnum)
assert tabinfo is not None
if tabinfo.sr > 0:
sr = tabinfo.sr
plotting.plotSpectrogram(data, sr, fftsize=fftsize, mindb=mindb,
maxfreq=maxfreq, minfreq=minfreq, overlap=overlap,
show=True)
[docs]
def plotTable(self, tabnum: int, sr: int = 0) -> Figure:
"""
Plot the content of the table via matplotlib.pyplot
If the sr is known the table is plotted as a waveform, with time as the x-coord.
Otherwise the table's raw data is plotted, with the index as x the x-coord.
The samplerate will be known if the table was created via
:meth:`Engine.readSoundfile` or read via GEN1. The sr can also be passed explicitely
as a parameter.
Args:
tabnum: the table to plot
sr: the samplerate of the data. Needed to plot as a waveform if the table was
not loaded via GEN1 (or via :meth:`Engine.readSoundfile`).
axes: if given, the plot is done to this axes
.. code::
from csoundengine import *
e = Engine()
source = e.readSoundfile("mono.wav", block=True)
# no sr needed here since the output was rea via readSoundfile
e.plotTable(source)
.. figure:: assets/tableproxy-plot.png
.. code::
import matplotlib.pyplot as plt
import sndfileio
fig, ax = plt.subplots()
data, sr = sndfileio.sndread("stereo.wav")
tabnum2 = e.makeTable(data, sr=sr)
e.plotTable(tabnum2, axes=ax)
.. figure:: assets/tableplot-stereo.png
.. code::
e = Engine()
xs = np.linspace(0, 6.28, 1000)
ys = np.sin(xs)
source = e.makeEmptyTable(len(ys))
e.fillTable(source, data=ys)
e.plotTable(source)
.. image:: assets/tableplot-sine.png
.. seealso::
* :meth:`~csoundengine.engine.Engine.readSoundfile`
* :meth:`~csoundengine.engine.Engine.makeTable`
"""
from csoundengine import plotting
assert isinstance(tabnum, int) and tabnum > 0
data = self.tableData(tabnum, flat=False)
tabinfo = self.tableInfo(tabnum)
if not sr and tabinfo is not None and tabinfo.sr > 0:
sr = tabinfo.sr
if data is None:
raise ValueError(f"Table {tabnum} is invalid")
if sr:
return plotting.plotSamples(data, samplerate=sr, show=True)
else:
import matplotlib.pyplot as plt
fig, axes = plt.subplots()
axes.plot(data)
fig.tight_layout(pad=0.1)
if not plotting.matplotlibIsInline():
plt.show()
return fig
[docs]
def schedSync(self,
instr: int | float | str,
delay: float = 0,
dur: float = -1,
args: np.ndarray | Sequence[float | str] | None = None,
timeout=-1
) -> tuple[float, float | None]:
"""
Schedule an instr, wait for a sync message
Similar to :meth:`~Engine.sched` but waits until the instrument sends a
sync message. The instrument should expect a sync token at p4 (see example)
and send an answer via 'sendsync' (``sendsync itoken, ivalue=0``)
.. note::
args should start with p5 since the sync token is sent as p4
Args:
instr: the instrument number/name. If it is a fractional number,
that value will be used as the instance number.
delay: time to wait before instrument is started
dur: duration of the event
args: any other args expected by the instrument, starting with p5
(as a list of floats/strings, or as a numpy float array). Any
string arguments will be converted to a string index via strSet. These
can be retrieved via strget in the csound instrument
timeout: if a non-negative number is given, this function will block
at most this time and then raise a TimeoutError
Returns:
the fractional p1 of the scheduled note, the sync return value (see example) or
None if the instrument does not return any value. Raises TimeoutError
if the operation times out
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> e.compile(r'''
... instr readsound
... itoken = p4
... Spath strget p5
... itab ftgen ftgen 0, 0, 0, -1, Spath, 0, 0, 0
... sendsync(itoken, itab)
... turnoff
... endin
... ''')
>>> eventid, tabnum = e.schedSync('readsound', args=['/path/to/sound.wav'])
"""
assert self.started
instrfrac = instr if isinstance(instr, float) else self._assignEventId(instr)
token = self._getSyncToken()
q = self._registerSync(token)
if not args:
pargs = [instrfrac, delay, dur, token]
self._perfThread.scoreEvent(False, "i", pargs)
elif isinstance(args, np.ndarray):
pargsnp = np.empty((len(args)+4,), dtype=float)
pargsnp[0] = instrfrac
pargsnp[1] = delay
pargsnp[2] = dur
pargsnp[3] = token
pargsnp[4:] = args
self._perfThread.scoreEvent(False, "i", pargsnp)
else:
pargs = [instrfrac, delay, dur, token]
pargs.extend(a if not isinstance(a, str) else self.strSet(a) for a in args)
self._perfThread.scoreEvent(False, "i", pargs)
try:
outvalue = q.get(block=True, timeout=timeout)
if outvalue == _UNSET:
outvalue = None
return instrfrac, outvalue
except _queue.Empty:
raise TimeoutError(f"{token=}, {instr=}")
def _eventWithCallback(self, token: int, pargs, callback) -> None:
"""
Create a csound "i" event with the given pargs with the possibility
of receiving a notification from the instrument
The event is passed a token as p4 and can set a return value by:
.. code-block:: csound
itoken = p4
tabw kreturnValue, itoken, gi__responses
; or tabw_i ireturnValue, itoken, gi__responses
outvalue "__sync__", itoken
Args:
token: a token as returned by self._getToken()
pargs: the pfields passed to the event (beginning by p1)
callback: A function (returnValue) -> None. It will be called when the instr
outvalues a "__sync__" message.
"""
assert token == pargs[3] and isinstance(token, int)
self._responseCallbacks[token] = lambda tok, t=self._responsesTable, c=callback: c(t[tok])
self._perfThread.scoreEvent(False, "i", pargs)
return None
def _inputMessageWait(self,
token: int,
inputMessage: str,
timeout: float | None = None
) -> float | None:
"""
This function passes the str `inputMessage` to csound and waits for
the instr to notify back via a "__sync__" outvalue
If the instr returned a value via gi__responses, this value
is returned. Otherwise, None is returned
The input message should pass the token:
.. code-block:: csound
itoken = p4
tabw kreturnValue, itoken, gi__responses
; or tabw_i ireturnValue, itoken, gi__responses
outvalue "__sync__", itoken
Args:
token: a sync token, assigned via _getSyncToken
inputMessage: the input message passed to csound
timeout: a timeout for blocking
Return:
a float response, or None if the instrument did not set a response value
"""
if timeout is None:
timeout = config['timeout']
q = self._registerSync(token)
self._perfThread.inputMessage(inputMessage)
try:
value = q.get(block=True, timeout=timeout)
self._modified(False)
return value if value != _UNSET else None
except _queue.Empty:
raise TimeoutError(f"{token=}, {inputMessage=}")
def _inputMessageWithCallback(self, token:int, inputMessage:str, callback) -> None:
"""
This function passes the str inputMessage to csound and before that
sets a callback waiting for an outvalue notification. If no callback
is passed the function will block until the instrument notifies us
Args:
token: a sync token, assigned via _getSyncToken
inputMessage: the input message passed to csound
callback: if given, this function will be called when the instrument
notifies us via `outvalue "__sync__", token`. The callback should
be of kind `(token:int) -> None`
"""
self._responseCallbacks[token] = callback
self._perfThread.inputMessage(inputMessage)
def _makeTableNotify(self,
data: Sequence[float] | np.ndarray | None = None,
size=0,
tabnum=0,
callback=None,
sr: int = 0,
numchannels=1) -> int:
"""
Create a table with data (or an empty table of the given size).
Lets csound generate a table index if needed.
Args:
data: the data to put in the table
size: if no data is given, size must be set
tabnum: the table number to create, 0 to let csound generate
a table number
callback: a callback func(source) -> None
If no callback is given this method will block until csound notifies
that the table has been created and returns the table number
sr: only needed if filling sample data. If given, it is used to fill
metadata in csound, as if this table had been read via gen01
numchannels: only needed if no data is given (only size). Size always
determines the size of the table, not the number of frames
Returns:
the table number
"""
token = self._getSyncToken()
maketableInstrnum = self._builtinInstrs['maketable']
delay = 0
assert tabnum >= 0
if data is None:
assert size > 1
# create an empty table of the given size
empty = 1
sr = 0
pargs = [maketableInstrnum, delay, 0, token, tabnum, size, empty,
sr, numchannels]
else:
# Create table with data
if not isinstance(data, np.ndarray):
data = np.asarray(data)
numchannels = internal.arrayNumChannels(data)
numitems = len(data) * numchannels
if numchannels > 1:
data = data.flatten()
# data = data.ravel()
if numitems < 1900:
# create a table with the given data
# if the table is small we can create it and fill it in one go
empty = 0
numchannels = internal.arrayNumChannels(data)
if numchannels > 1:
data = data.flatten()
pargs = [maketableInstrnum, delay, 0., token, tabnum, numitems, empty,
sr, numchannels]
pargs.extend(data)
else:
# create an empty table (blocking), fill it via a pointer
empty = 1
pargs = [maketableInstrnum, delay, 0., token, tabnum, numitems, empty,
sr, numchannels]
if not callback:
# the next line blocks until the table is created
response = self._eventWait(token, pargs)
if response is None:
raise RuntimeError(f"Failed to make table with args: {pargs}")
tabnum = int(response)
self.fillTable(tabnum, data=data)
self._tableInfo[tabnum] = TableInfo(sr=sr, size=numitems,
nchnls=numchannels)
else:
def callback2(tabnum, self=self, data=data, callback=callback):
self.fillTable(tabnum, data=data)
callback()
self._eventWithCallback(token, pargs, callback2)
return tabnum
if callback:
self._eventWithCallback(token, pargs, callback)
else:
response = self._eventWait(token, pargs)
if response is None:
raise RuntimeError(f"Failed to create table with args {pargs}")
tabnum = int(response)
assert tabnum > 0
self._tableInfo[tabnum] = TableInfo(sr=sr, size=size, nchnls=numchannels)
return tabnum
[docs]
def channelPointer(self, channel: str, kind='control', mode='rw') -> np.ndarray:
"""
Get a pointer to a csound channel
Args:
channel: the name of the channel
kind: one of 'control' or 'audio'
mode: one of 'r' (input), 'w' (output), 'rw' (input/output)
Returns:
the pointer to the data as a numpy array
"""
if kind != 'control' and kind != 'audio':
raise NotImplementedError("Only kind 'control' and 'audio' are implemented at the moment")
ptr = self._channelPointers.get(channel)
if ptr is None:
ptr, err = self.csound.channelPtr(channel, kind=kind, mode=mode)
if err:
raise RuntimeError(f"Error while trying to retrieve/create a channel pointer: {err}")
assert isinstance(ptr, np.ndarray)
self._channelPointers[channel] = ptr
return ptr
[docs]
def setChannel(self, channel: str, value: float | str | np.ndarray,
method='', delay=0.
) -> None:
"""
Set the value of a software channel (threadsafe)
Args:
channel: the name of the channel
value: the new value, should match the type of the channel (a float for
a control channel, a string for a string channel or a numpy array
for an audio channel)
method: one of ``'api'``, ``'score'``, ``'udp'``. An empty str will choose the most appropriate
method for the current engine/args
delay: a delay to set the channel
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> e.initChannel("mastergain", 1.0)
>>> e.compile(r'''
... instr 100
... asig oscili 0.1, 1000
... kmastergain = chnget:k("mastergain")
... asig *= intrp(kmastergain)
... endin
... ''')
>>> eventid = e.sched(100)
>>> e.setChannel("mastergain", 0.5)
"""
isaudio = isinstance(value, np.ndarray)
if delay > 0:
method = "score"
elif isaudio or not method:
method = "api"
if method == 'api':
if isinstance(value, (int, float)):
self.csound.setControlChannel(channel, value)
elif isinstance(value, str):
self.csound.setStringChannel(channel, value)
else:
self.csound.setAudioChannel(channel, value)
elif method == 'score':
if isinstance(value, (int, float)):
instrnum = self._builtinInstrs['chnset']
s = f'i {instrnum} {delay} 0 "{channel}" {value}'
self._perfThread.inputMessage(s)
else:
instrnum = self._builtinInstrs['chnsets']
s = f'i {instrnum} {delay} 0 "{channel}" "{value}"'
self._perfThread.inputMessage(s)
self._modified()
elif method == 'pointer':
if isinstance(value, str):
raise ValueError("Method 'pointer' not available for string channels")
ptr = self.channelPointer(channel)
if isinstance(value, float):
ptr[0] = value
elif isinstance(value, np.ndarray):
assert len(value) == self.ksmps
ptr[:] = value
else:
raise ValueError(f"method {method} not supported "
f"(choices: 'api', 'score', 'udp')")
[docs]
def initChannel(self,
channel: str,
value: float | str | np.ndarray = 0,
kind='',
mode="r") -> None:
"""
Create a channel and set its initial value
Args:
channel: the name of the channel
value: the initial value of the channel,
will also determine the type (k, a, S)
kind: One of 'k', 'S', 'a'. Use None to auto determine the channel type.
mode: r for read, w for write, rw for both.
.. note::
the `mode` is set from the perspective of csound. A read (input)
channel is a channel which can be written to by the api and read
from csound. An write channel (output) can be written by csound
and read from the api
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> e.initChannel("mastergain", 1.0)
>>> e.compile(r'''
... instr 100
... asig oscili 0.1, 1000
... kmastergain = chnget:k("mastergain")
... asig *= intrp(kmastergain)
... endin
... ''')
>>> eventid = e.sched(100)
>>> e.setChannel("mastergain", 0.5)
"""
modei = {
"r": 1,
"w": 2,
"rw": 3
}[mode]
if not kind:
if isinstance(value, (int, float)):
kind = 'k'
elif isinstance(value, str):
kind = 'S'
elif isinstance(value, np.ndarray):
kind = 'a'
if kind == 'k':
self.compile(f'chn_k "{channel}", {modei}\n')
self.setChannel(channel, value, method="score")
elif kind == 'a':
self.compile(f'chn_a "{channel}", {modei}', block=True)
if value:
self.setChannel(channel, value)
elif kind == 'S':
self.compile(f'chn_S "{channel}", {modei}\n', block=True)
self.setChannel(channel, value)
else:
raise TypeError("Expected an initial value of type float or string")
[docs]
def channelValue(self, channel: str) -> float:
"""
Get the value of a channel
Args:
channel: the name of the channel
Returns:
the value of the channel. Raises KeyError if the channel
does not exist.
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> e.initChannel("freq", mode="w")
>>> e.compile('''
... instr pitchtrack
... asig inch 1
... afreq, alock plltrack asig, 0.25, 20, 0.33, 50, 800
... kfreq = k(afreq)
... chnset kfreq, "freq"
... endin
... ''')
>>> eventid = e.sched("pitchtrack")
>>> while True:
... freq = e.channelValue("freq")
... print(f"freq: {freq:.1f}")
... time.sleep(0.1)
"""
value, errorCode = self.csound.controlChannel(channel)
if errorCode != 0:
raise KeyError(f"control channel {channel} not found, error: {errorCode}, value: {value}")
return value
[docs]
def fillTable(self, tabnum: int, data: Sequence[float] | np.ndarray) -> None:
"""
Fill an existing table with data
This is a blocking operation
Args:
tabnum: the table number
data: the data to put into the table
block: if True, block while performing the operation
Example
-------
>>> from csoundengine import *
>>> import numpy as np
>>> e = Engine()
>>> xs = np.linspace(0, 6.28, 1000)
>>> ys = np.sin(xs)
>>> source = e.makeEmptyTable(len(ys))
>>> e.fillTable(source, data=ys)
>>> e.plotTable(source)
.. figure:: assets/tableplot-sine.png
.. seealso::
* :meth:`~Engine.makeTable`
* :meth:`~Engine.plotTable`
* :meth:`~Engine.readSoundfile`
"""
if not isinstance(tabnum, int) or tabnum <= 0:
raise ValueError(f"tabnum should be an int > 0, got {tabnum}")
arr = data if isinstance(data, np.ndarray) else np.asarray(data)
if len(arr.shape) == 2:
arr = arr.flatten()
elif len(arr.shape) > 2:
raise ValueError(f"data should be a 1D or 2D array, got shape {arr.shape}")
numpyptr = self.tableData(tabnum)
if numpyptr is None:
raise IndexError(f"Table {tabnum} does not exist")
size = len(numpyptr)
numpyptr[:] = data if size < len(data) else data[:size]
[docs]
def tableInfo(self, tabnum: int, cache=True) -> TableInfo | None:
"""
Retrieve information about the given table
Args:
tabnum: the table number
cache: if True, query the cache to see if info for this table
has already been requested
Returns:
a TableInfo with fields `tableNumber`, `sr` (``ftsr``),
`numChannels` (``ftchnls``), `numFrames` (``nsamps``),
`size` (``ftlen``). Returns None if the table was not found
.. note::
Some information, like *sr*, is only available for tables
allocated via ``GEN01`` (for example, using :meth:`~Engine.readSoundfile`).
This data can also be set explicitely via :meth:`~Engine.setTableMetadata`
.. rubric:: Example
.. code-block:: python
>>> from csoundengine import *
>>> e = Engine()
>>> source = e.readSoundfile("stereo.wav", block=True)
>>> e.tableInfo(source)
TableInfo(tableNumber=200, sr=44100.0, numChannels=2, numFrames=88200, size=176401)
.. seealso::
* :meth:`~Engine.readSoundfile`
* :meth:`~Engine.plotTable`
* :meth:`~Engine.getTableData`
"""
info = self._tableInfo.get(tabnum)
if info and cache:
if info.path and info.sr == 0 and info.size == 0:
# Table created via .readSoundFile, info only has the
# path of the soundfile
info = TableInfo.get(info.path)
self._tableInfo[tabnum] = info
return info
toks = [self._getSyncToken() for _ in range(4)]
pargs = [self._builtinInstrs['tableInfo'], 0, 0., tabnum]
pargs.extend(toks)
q: _queue.SimpleQueue[list[float]] = _queue.SimpleQueue()
def callback(tok0, _q=q, t=self._responsesTable, _toks=toks):
values = [float(t[_tok]) for _tok in _toks]
_q.put(values)
self._responseCallbacks[toks[0]] = callback
self._perfThread.scoreEvent(False, "i", pargs)
vals = q.get(block=True)
for tok in toks:
self._releaseToken(tok)
sr = vals[0]
if sr <= 0:
return None
tabinfo = TableInfo(sr=int(vals[0]), nchnls=int(vals[1]),size=int(vals[3]))
self._tableInfo[tabnum] = tabinfo
return tabinfo
[docs]
def includeFile(self, include: str) -> None:
"""
Add an #include file to this Engine
Args:
include: the path to the include file
"""
abspath = os.path.abspath(include)
for f in self.includes:
if abspath == f:
return
self.includes.append(abspath)
code = open(abspath).read()
self.compile(code)
[docs]
def readSoundfile(self, path="?", tabnum: int | None = None, chan=0,
callback=None, block=False, skiptime=0.
) -> int:
"""
Read a soundfile into a table (via GEN1), returns the table number
Args:
path: the path to the output -- **"?" to open file interactively**
tabnum: if given, a table index. If None, an index is
autoassigned
chan: the channel to read. 0=read all channels
block: if True, wait until output is read, then return
callback: if given, this function () -> None, will be called when
output has been read.
skiptime: time to skip at the beginning of the soundfile.
Returns:
the index of the created table
>>> from csoundengine import *
>>> engine = Engine()
>>> source = engine.readSoundfile("stereo.wav", block=True)
>>> eventid = engine.playSample(source)
>>> # Reduce the gain to 0.8 after 2 seconds
>>> engine.setp(eventid, 4, 0.8, delay=2)
.. seealso::
* :meth:`~Engine.playSample`
* :meth:`~Engine.makeTable`
* :meth:`~Engine.fillTable`
* :meth:`~Engine.getTableData`
"""
if block and callback:
raise ValueError("blocking mode not supported when a callback is given")
if path == "?":
path = _state.openSoundfile(ensureSelection=True)
if not block and not callback:
return self._readSoundfileAsync(path=path, tabnum=tabnum, chan=chan)
if tabnum is None:
tabnum = self._assignTableNumber()
elif tabnum == 0 and not callback and not block:
logger.debug("readSoundfile: tabnum==0 indicates that csound must assign"
"a table number. This operation will block until the soundfile"
"is read. To avoid this, set tabnum to None; in this case"
"csoundengine will assign a table number itself and the"
"operation can be non-blocking")
block = True
if block and tabnum:
self._compileCode(f'i__tab__ ftgen {tabnum},0,0,-1, "{path}", {skiptime}, 0, {chan}', block=True)
else:
token = self._getSyncToken() if (block or callback) else 0
p1 = self._builtinInstrs['readSndfile']
msg = f'i {p1} 0 0.01 {token} "{path}" {tabnum} {chan} {skiptime}'
if callback:
self._inputMessageWithCallback(token, msg, lambda *args: callback())
elif block:
self._inputMessageWait(token, msg)
else:
self._perfThread.inputMessage(msg)
self._tableInfo[tabnum] = TableInfo(sr=0, size=0, nchnls=1, path=path)
return tabnum
[docs]
def soundfontPlay(self, index: int, pitch: float, amp=0.7, delay=0.,
dur=-1., vel: int = 0, chan=1
) -> float:
"""
Play a note of a previously loaded soundfont
The soundfont's program (bank, preset) must have been read before
via :meth:`Engine.soundfontPreparePreset`
Args:
index: as returned via :meth:`~Engine.soundfontPrearePreset`
pitch: the pitch of the played note, as a midinote (can
be fractional)
amp: the amplitude. If vel (velocity) is left as None, this
is used to determine the velocity. Otherwise, set the velocity
(this might be used by the soundfont to play the correct sample)
and the amplitude is used to scale the output
vel: the velocity of the played note, used internally to determine
which sample/layer to play. If not given, a velocity is calculated
from the given amplitude
chan: the first channel to send output to (channels start with 1)
delay: when to start playback
dur: the duration of playback. Use -1 to play until the end
(the note will be stopped when the soundfont playback detects the
end of the sample)
Returns:
the instance number of the playing instrument.
.. important::
**Dynamic Fields**
- **p4**: `kpitch`
- **p5**: `kamp`
Example
~~~~~~~
.. code::
from csoundengine import *
e = Engine()
# Since the preset is not specified, this will launch a gui dialog
# to select a preset from a list of available presets
idx = e.soundfontPreparePreset('Orgue-de-salon.sf2')
event = e.soundfontPlay(idx, 60)
# Automate kpitch (p4) a major 3rd glissando from the current pitch,
offset, glissdur = 2, 8
e.automatep(event, 4, [offset, 60, offset+glissdur, 64])
.. figure:: assets/select-preset.png
.. seealso::
* :meth:`~Engine.soundfontPreparePreset`
* :meth:`~Engine.playSample`
"""
assert index in self._soundfontPresets.values()
if not vel:
vel = int(amp * 127)
args = [pitch, amp, index, vel, chan]
return self.sched(self._builtinInstrs['soundfontPlay'], delay=delay, dur=dur,
args=args)
[docs]
def soundfontPreparePreset(self,
sf2path: str,
preset: tuple[int, int] | None = None
) -> int:
"""
Prepare a soundfont's preset to be used
Assigns an index to a soundfont bank:preset to be used with sfplay or via
:meth:`~Engine.soundfontPlay`
The soundfont is loaded if it was not loaded before
.. figure:: assets/select-preset.png
Args:
sf2path: the path to a sf2 file -- **Use "?" to select a file interactively**
preset: a tuple (bank, presetnum), where both bank and presetnum
are ints in the range (0-127). None to select a preset interactively
Returns:
an index assigned to this preset, which can be used with
sfplay/sfplay3 or with :meth:``~Engine.soundfontPlay``
.. seealso::
* :meth:`~Engine.soundfontPlay`
* :meth:`~Engine.playSample`
"""
if sf2path == "?":
sf2path = _state.openSoundfont(ensureSelection=True)
if preset is None:
from . import sftools
item = sftools.soundfontSelectPreset(sf2path)
if item is None:
return 0
presetname, bank, presetnum = item
else:
bank, presetnum = preset
tup = (sf2path, bank, presetnum)
idxnum = self._soundfontPresets.get(tup)
if idxnum is not None:
return idxnum
idx = self._soundfontPresetCountPtr[0]
self._soundfontPresetCountPtr[0] += 1
self._soundfontPresets[tup] = idx
instrnum = self._builtinInstrs['sfPresetAssignIndex']
s = f'i {instrnum} 0 0 "{sf2path}" {bank} {presetnum} {idx}'
self._perfThread.inputMessage(s)
return idx
def _readSoundfileAsync(self,
path: str,
tabnum: int | None = None,
chan=0) -> int:
assert self.started
if tabnum is None:
tabnum = self._assignTableNumber()
else:
assert tabnum > 0
s = f'f {tabnum} 0 0 -1 "{path}" 0 0 {chan}'
self._perfThread.inputMessage(s)
self._tableInfo[tabnum] = TableInfo(sr=0, size=0, nchnls=1, path=path)
return tabnum
[docs]
def uniqueInstrInstance(self, instr: int | str) -> float:
"""
Returns a unique instance number (a float p1) for `instr`
The instance itself is not created
Args:
instr (int|str): an already defined csound instrument
Returns:
a unique p1.
"""
if isinstance(instr, int):
token = self._getSyncToken()
pargs = [self._builtinInstrs['uniqinstance'], 0, 0.01, token, instr]
uniqinstr = self._eventWait(token, pargs)
if uniqinstr is None:
raise RuntimeError("failed to get unique instance")
return uniqinstr
else:
raise NotImplementedError("str instrs not implemented yet")
[docs]
def playSample(self, tabnum: int, delay=0., chan=1, speed=1., gain=1., fade=0.,
starttime=0., lagtime=0.01, dur=-1.
) -> float:
"""
Play a sample already loaded into a table.
Speed and gain can be modified via setp while playing
Args:
tabnum: the table where the sample data was loaded
delay: when to start playback
chan: the first channel to send output to (channels start with 1)
speed: the playback speed
gain: a gain applied to this sample
fade: fadein/fadeout time in seconds
starttime: playback can be started from anywhere within the table
lagtime: a lag value for dynamic pfields (see below)
dur: the duration of playback. Use -1 to play until the end
Returns:
the instance number of the playing instrument.
.. admonition:: Dynamic Fields
:class: important
- **p4**: `gain`
- **p5**: `speed`
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> import sndfileio
>>> sample, sr = sndfileio.sndread("stereo.wav")
>>> # modify the samples in python
>>> sample *= 0.5
>>> table = e.makeTable(sample, sr=sr, block=True)
>>> eventid = e.playSample(table)
... # gain (p4) and speed (p5) can be modified while playing
>>> e.setp(eventid, 5, 0.5) # Play at half speed
.. seealso::
* :meth:`~Engine.playSoundFromDisk`
* :meth:`~Engine.makeTable`
* :meth:`~Engine.readSoundfile`
* :meth:`~Engine.soundfontPlay`
"""
if not isinstance(tabnum, int):
raise TypeError(f"Expected a table number as integer, got {tabnum}")
args = [gain, speed, tabnum, chan, fade, starttime, lagtime]
return self.sched(self._builtinInstrs['playgen1'], delay=delay, dur=dur,
args=args)
[docs]
def playSoundFromDisk(self, path: str, delay=0., dur=-1, chan=1, speed=1., fade=0.01,
gain=1.
) -> float:
"""
Play a soundfile from disk via diskin2
Args:
path: the path to the output
delay: time offset to start playing
dur: duration of playback, use -1 to play until the
end of the file
chan: first channel to output to
speed: playback speed (2.0 will sound an octave higher)
fade: fadein/out in seconds
Returns:
the instance number of the scheduled event
**Dynamic parameters**
* p5: kgain - Modifies the gain
* p6: kspeed - Modifies the playback speed
.. seealso:: :py:meth:`readSoundfile`, :py:meth:`playSample`
"""
assert self.started
p1 = self._assignEventId(self._builtinInstrs['playsndfile'])
if chan <= 0:
raise ValueError("chan must be >= 1")
msg = f'i {p1} {delay} {dur} "{path}" {gain} {speed} {chan} {fade}'
self._perfThread.inputMessage(msg)
return p1
[docs]
def setp(self, p1: int | float | str, *pairs, delay=0.) -> None:
"""
Modify a pfield of an active note
Multiple pfields can be modified simultaneously. It only makes sense to
modify a pfield if a control-rate (k) variable was assigned to this pfield
(see example)
Args:
p1: the p1 of the instrument to automate. A float or a "<name>.<instanceid>" will set
the value for a specific instance, an int or a unqualified name will set
the value of the given parameter for all instances
*pairs: each pair consists of a pfield index and a value. The index is an int,
matching the pfield number (4=p4, 5=p5, etc), the value can be a number
(string values are not supported)
delay: when to start the automation
Example
~~~~~~~
>>> engine = Engine(...)
>>> engine.compile(r'''
... instr 100
... kamp = p5
... kfreq = p6
... a0 oscili kamp, kfreq
... outch 1, a0
... endin
... ''')
>>> p1 = engine.sched(100, args=[0.1, 440])
>>> engine.setp(p1, 5, 0.2, delay=0.5)
.. seealso:: :meth:`~Engine.automatep`
"""
if isinstance(p1, str):
raise TypeError("named instrs not supported yet")
numpairs = len(pairs) // 2
assert len(pairs) % 2 == 0 and numpairs <= 5
# this limit is just the limit of the pwrite instr, not of the opcode
args = [0, p1, numpairs]
args.extend(pairs)
self.sched(self._builtinInstrs['pwrite'], delay=delay, dur=0, args=args)
def _getp(self, eventid: float, idx: int) -> float | None:
"""
Get the current pfield value of an active note.
.. note::
This action has always a certain latency, since it implies
scheduling an internal event to read the value and send it
back to python. The action is blocking
Args:
eventid: the (fractional) id (a.k.a p1) of the event
idx: the index of the p-field, starting with 1 (4=p4)
Returns:
the current value of the given pfield
Example
~~~~~~~
TODO
.. seealso::
:meth:`~Engine.setp`
"""
if self.version >= 7000:
value = self.csound.evalCode(f'return pread({eventid}, {idx})')
else:
token = self._getSyncToken()
notify = 1
pargs = [self._builtinInstrs['pread'], 0, 0, token, eventid, idx, notify]
value = self._eventWait(token, pargs)
return value
[docs]
def automateTable(self,
tabnum: int,
idx: int,
pairs: Sequence[float] | np.ndarray,
mode='linear',
delay=0.,
overtake=False) -> float:
"""
Automate a table slot
Args:
tabnum: the number of the table to modify
idx: the slot index
pairs: the automation data is given as a flat sequence of pairs (time,
value). Times are relative to the start of the automation event.
mode: one of 'linear', 'cos', 'expon(xx)', 'smooth'. See the opcode
`interp1d` for more information
delay: the time delay to start the automation.
overtake: if True, the first value of pairs is replaced with
the current value in the param table of the running instance
Returns:
the eventid of the instance performing the automation
Example
~~~~~~~
>>> engine = Engine(...)
>>> engine.compile(r'''
... instr 100
... itab = p4
... kamp table 0, itab
... kfreq table 1, itab
... outch 1, oscili:a(0.1, kfreq)
... ftfree itab, 1 ; free the table when finished
... endin
... ''')
>>> source = engine.makeTable([0.1, 1000])
>>> eventid = engine.sched(100, 0, 10, args=(source,))
>>> # automate the frequency (slot 1)
>>> engine.automateTable(source, 1, [0, 1000, 3, 200, 5, 200])
>>> # Automate from the current value, will produce a fade-out
>>> engine.automateTable(source, 0, [0, -1, 2, 0], overtake=True, delay=5)
.. seealso::
* :meth:`~Engine.setp`
* :meth:`~Engine.automatep`
"""
if self.version >= 7000 or len(pairs) <= 1900:
# iargtab = p4, iargidx = p5, imode = p6, iovertake = p7, ilenpairs = p8
args: list[float|int] = [tabnum, idx, self.strSet(mode), int(overtake), len(pairs)]
if isinstance(pairs, np.ndarray):
args.extend(pairs.tolist())
else:
args.extend(pairs)
return self.sched(self._builtinInstrs['automateTableViaPargs'],
delay=delay,
dur=args[-2] + self.ksmps / self.sr,
args=args)
else:
events = [self.automateTable(tabnum=tabnum, idx=idx, pairs=subgroup,
mode=mode, delay=delay+subdelay,
overtake=overtake)
for subdelay, subgroup in internal.splitAutomation(pairs, 1900//2)]
return events[0]
[docs]
def automatep(self,
p1: float | str,
pidx: int,
pairs: Sequence[float] | np.ndarray,
mode='linear',
delay=0.,
overtake=False
) -> float:
"""
Automate a pfield of a running event
The automation is done by another csound event, so it happens within the
"csound" realm and thus is assured to stay in sync
Args:
p1: the fractional instr number of a running event, or an int number
to modify all running instances of that instr. A named instr with
an optional fractional part is also accepted
pidx: the pfield index. For example, if the pfield to modify if p4,
pidx should be 4. Values of 1, 2, and 3 are not allowed.
pairs: the automation data is given as a flat data. of pairs (time, value).
Times are relative to the start of the automation event
mode: one of 'linear', 'cos', 'expon(xx)', 'smooth'. See the csound opcode
`interp1d` for more information
(https://csound-plugins.github.io/csound-plugins/opcodes/interp1d.html)
delay: the time delay to start the automation.
overtake: if True, the first value of pairs is replaced with
the current value in the running instance
Returns:
the p1 associated with the automation synth
Example
~~~~~~~
>>> e = Engine()
>>> e.compile(r'''
... instr 100
... kfreq = p4
... outch 1, oscili:a(0.1, kfreq)
... endin
... ''')
>>> eventid = e.sched(100, 0, 10, args=(1000,))
>>> e.automatep(eventid, 4, [0, 1000, 3, 200, 5, 200])
.. seealso:: :meth:`~Engine.setp`, :meth:`~Engine.automateTable`
"""
maxDataSize618 = 1900
if self.version >= 7000 or len(pairs) <= maxDataSize618:
if isinstance(p1, str):
args = [float(self.strSet(p1)), float(pidx), float(self.strSet(mode)), float(overtake), float(len(pairs)), 1.]
else:
args = [p1, float(pidx), float(self.strSet(mode)), float(overtake), float(len(pairs)), 0.]
if isinstance(pairs, np.ndarray):
args.extend(pairs.tolist())
else:
args.extend(pairs)
return self.sched(self._builtinInstrs['automatePargViaPargs'],
delay=delay,
dur=pairs[-2] + self.ksmps / self.sr,
args=args)
else:
events = [self.automatep(p1=p1, pidx=pidx, pairs=subgroup, mode=mode, delay=delay+subdelay,
overtake=overtake)
for subdelay, subgroup in internal.splitAutomation(pairs, maxDataSize618 // 2)]
return events[0]
def strSet(self, s: str) -> int:
stridx = self._strToIndex.get(s)
if stridx:
return stridx
stridx = self._getStrIndex()
self._strToIndex[s] = stridx
self._indexToStr[stridx] = s
cmd = f'strset {stridx}, "{s}"'
if self.version >= 7000:
self._perfThread.compileOrc(cmd)
elif self._useProcessQueue:
def task(cs):
cs.compileOrc(cmd)
self._perfThread.processQueueTask(task)
else:
self.csound.compileOrc(cmd)
self._modified()
return stridx
def _strSet(self, s: str, sync=False) -> int:
"""
Assign a numeric index to a string to be used inside csound
Args:
s: the string to set
sync: if True, block if needed until the csound process receives the message
Returns:
the index associated with *s*. When passed to a csound instrument
it can be used to retrieve the original string via
``Sstr = strget(idx)``
.. seealso:: :meth:`~Engine.strGet`
"""
assert self.started and s
stringIndex = self._strToIndex.get(s)
if stringIndex:
return stringIndex
stringIndex = self._getStrIndex()
self._strToIndex[s] = stringIndex
self._indexToStr[stringIndex] = s
instrnum = self._builtinInstrs['strset']
msg = f'i {instrnum} 0 0 "{s}" {stringIndex}'
self._perfThread.inputMessage(msg)
if sync:
self.sync()
else:
self._modified()
return stringIndex
[docs]
def definedStrings(self) -> dict[str, int]:
"""
Returns a dict mapping defined strings to their integer id
These are strings defined via :meth:`~Engine.strSet` by the Engine,
not internally using csound itself
.. warning::
Using strset within an instrument or as global code will probably
result in conflicts with the strings defined via the Engine
using :meth:`Engine.setStr`
Returns:
a dict mapping defined strings to their corresponding index
"""
return self._strToIndex
[docs]
def strGet(self, index: int) -> str | None:
"""
Get a string previously set via strSet.
This method will not retrieve any string set internally via the
`strset` opcode, only strings set via :meth:`~Engine.strSet`
Example
~~~~~~~
>>> e = Engine(...)
>>> idx = e.strSet("foo")
>>> e.strGet(idx)
foo
.. seealso:: :meth:`~Engine.strSet`
"""
return self._indexToStr.get(index)
def _getStrIndex(self) -> int:
out = self._strLastIndex
self._strLastIndex += 1
return out
def _releaseTableNumber(self, tableindex: int) -> None:
"""
Mark the given table as freed, so that it can be assigned again.
It assumes that the table was deallocated already and the index
can be assigned again.
"""
if tableindex not in self._tablePool:
self._tablePool.push(tableindex)
else:
logger.warning(f"Table number {tableindex} was not assigned by csoundengine")
[docs]
def freeTable(self, tableindex: int, delay=0.) -> None:
"""
Free the table with the given index
Args:
tableindex: the index of the table to free
delay: when to free it (0=right now)
.. seealso:: :meth:`~Engine.makeTable`
"""
logger.debug(f"Freeing table {tableindex}")
self._releaseTableNumber(tableindex)
pargs = [self._builtinInstrs['freetable'], delay, 0., tableindex]
self._perfThread.scoreEvent(False, "i", pargs)
[docs]
def testAudio(self, dur=4., delay=0., period=1., mode='pink',
gaindb=-6., echo=True) -> float:
"""
Test this engine's output
Args:
dur: the duration of the test
delay: when to start the test
period: the duration of sound output on each channel
mode: the test mode, one of 'pink', 'sine'
gaindb: the gain of the output, in dB
echo: if True, csound prints the channel to which it is outputting audio
Returns:
the p1 of the scheduled event
"""
assert self.started
modeid = {
'pink': 0,
'sine': 1
}.get(mode)
if modeid is None:
raise ValueError(f"mode must be one of 'pink', 'sine', got {mode}")
return self.sched(self._builtinInstrs['testaudio'], dur=dur, delay=delay,
args=[modeid, period, pt.db2amp(gaindb), int(echo)])
def _udpSend(self, code: str) -> None:
if not self.udpPort:
logger.warning("This csound instance was started without udp")
return
assert self._sendAddr is not None
msg = code.encode("ascii")
logger.debug(f"_udpSend: {code}")
self._udpSocket.sendto(msg, self._sendAddr)
[docs]
def udpSendOrc(self, code: str) -> None:
"""
Send orchestra code via UDP.
Args:
code (str): the code to send
.. seealso::
:meth:`~Engine.udpSendScoreline`
:meth:`~Engine.udpSetChannel`
"""
if self.udpPort is None:
raise RuntimeError("This engine has no udp port assigned")
assert self._sendAddr is not None
msg = code.encode("ascii")
if len(msg) < 60000:
self._udpSocket.sendto(msg, self._sendAddr)
return
msgs = internal.splitBytes(msg, 60000)
self._udpSocket.sendto(b"{{ " + msgs[0], self._sendAddr)
for msg in msgs[1:-1]:
self._udpSocket.sendto(msg, self._sendAddr)
self._udpSocket.sendto(msgs[-1] + b" }}", self._sendAddr)
self._modified()
[docs]
def udpSendScoreline(self, scoreline:str) -> None:
"""
Send a score line to csound via udp
Example
~~~~~~~
>>> e = Engine(udpserver=True)
>>> e.compile(r'''
... instr 100
... ifreq = p4
... outch 1, oscili:a(0.1, ifreq)
... endin
... ''')
>>> e.udpSendScoreline("i 100 0 4 440")
.. seealso::
:meth:`~Engine.udpSetChannel`
:meth:`~Engine.udpSendOrc`
"""
self._udpSend(f"& {scoreline}\n")
[docs]
def udpSetChannel(self, channel: str, value: float | str) -> None:
"""
Set a channel via UDP. The value will determine the kind of channel
Args:
channel: the channel name
value: the new value
.. seealso::
:meth:`~Engine.udpSendScoreline`
:meth:`~Engine.udpSendOrc`
"""
if isinstance(value, (int, float)):
self._udpSend(f"@{channel} {value}")
else:
self._udpSend(f"%{channel} {value}")
[docs]
def writeBus(self, bus: int, value: float, delay=0.) -> None:
"""
Set the value of a control bus
Args:
bus: the bus token, as returned via :meth:`Engine.assignBus`
value: the new value
delay: if given, the modification is scheduled in the future
.. seealso::
:meth:`~Engine.readBus`
:meth:`~Engine.assignBus`
:meth:`~Engine.automateBus`
Example
~~~~~~~
>>> e = Engine(...)
>>> e.compile(r'''
... instr 100
... ifreqbus = p4
... kfreq = busin:k(ifreqbus)
... outch 1, vco2:a(0.1, kfreq)
... endin
... ''')
>>> freqbus = e.assignBus(value=1000)
>>> e.sched(100, 0, 4, args=[freqbus])
>>> e.writeBus(freqbus, 500, delay=0.5)
"""
if not self.hasBusSupport():
raise RuntimeError("This engine does not have bus support")
bus = int(bus)
kind = self._busTokenToKind.get(bus)
if not kind:
logger.warning(f"Bus token {bus} not known")
elif kind != 'control':
raise ValueError(f"Only control buses can be written to, got {kind}")
if delay <= self.onecycle:
busindex = self._busIndexes.get(bus)
if busindex is not None:
assert self._kbusTable is not None
self._kbusTable[busindex] = value
else:
self.sched(self._builtinInstrs['busoutk'], delay=delay, dur=self.onecycle*2, args=(int(bus), value))
self._getBusIndex(bus, blocking=False)
else:
self.sched(self._builtinInstrs['busoutk'], delay=delay, dur=self.onecycle*2, args=(int(bus), value))
[docs]
def readBus(self, bus: int, default: float | None = None) -> float | None:
"""
Read the current value of a control bus
Buses can be used to allow communication between instruments, or
between a running csound instrument and python. Buses are useful
for continuous streams; when using buses to communicate discrete
values with python an opcode like trighold might be necessary.
In general for discrete events it might be better to use other
communication means, like OSC, which provide buffering.
Args:
bus: the bus number, as returned by assignBus
default: the value returned if the bus does not exist
Returns:
the current value of the bus, or `default` if the bus does not exist
.. seealso::
:meth:`~Engine.assignBus`
:meth:`~Engine.writeBus`
Example
~~~~~~~
>>> e = Engine()
>>> e.compile(r'''
... instr 100
... irmsbus = p4
... asig inch 1
... krms = rms:k(asig)
... busout irmsbus, krms
... endin
... ''')
>>> rmsbus = e.assignBus(kind='control')
>>> event = e.sched(100, 0, args=[rmsbus])
>>> while True:
... rmsvalue = e.readBus(rmsbus)
... print(f"Rms value: {rmsvalue}")
... time.sleep(0.1)
"""
if not self.hasBusSupport():
raise RuntimeError("This engine does not have bus support")
# The int conversion makes it possible to pass a Bus object created
# via Session.assignBus, see busproxy.Bus.__int__
busidx = self._getBusIndex(int(bus), blocking=True)
if busidx is None:
raise RuntimeError(f"Could not get the actual csound bus for token {bus}")
if busidx < 0:
return default
assert self._kbusTable is not None
return self._kbusTable[busidx]
def _getBusIndex(self, bus: int, blocking=True, whendone: Callable | None = None
) -> int | None:
"""
Find the bus index corresponding to `bus` token.
Args:
bus: the bus token, as returned by assignBus
blocking: if True, block execution until csound has responded. Returns
the bus index
whendone: a callable of the form (busindex) -> None, will be called with
the bus index corresponding to the bus token. Passing a callback
will make this method async
Returns:
if blocking, will return the bus index as int. Otherwise, returns None
"""
assert isinstance(bus, int)
kind = self._busTokenToKind.get(bus)
if not kind:
raise ValueError(f"Bus not found, token: {bus}")
if kind != 'control':
raise ValueError("Only control buses are supported here")
bus = int(bus)
index = self._busIndexes.get(bus)
if index is not None:
if whendone:
whendone(index)
return index
if whendone:
blocking = False
synctoken = self._getSyncToken()
ikind = BUSKIND_CONTROL # control bus
# 1 2 3 4 5 6 7 8 8
pfields = [self._builtinInstrs['busassign'], 0, self.onecycle, synctoken, bus, ikind, 0, 0]
if blocking:
out = self._eventWait(synctoken, pfields)
if out is None:
raise RuntimeError(f"Could not fetch the bus index for token {bus}")
index = int(out)
self._busIndexes[bus] = index
return index
else:
def callback(busnum, bustoken=bus, self=self, callback=whendone):
busnum = int(busnum)
self._busIndexes[bustoken] = busnum
if callback:
callback(busnum)
self._eventWithCallback(token=synctoken, pargs=pfields, callback=callback)
return None
[docs]
def releaseBus(self, bus: int, delay: float | None = None) -> None:
"""
Release a persistent bus
Args:
bus: the bus to release, as returned by :py:meth:`assignBus`
delay: when to release the bus (relative time). None means now
.. seealso:: :meth:`~Engine.assignBus`
"""
# bus is the bustoken
if not self.hasBusSupport():
raise RuntimeError("This Engine was created without bus support")
self._busIndexes.pop(bus, None)
pargs = [self._builtinInstrs['busrelease'], delay or 0., 0, int(bus)]
self._perfThread.scoreEvent(False, "i", pargs)
[docs]
def hasBusSupport(self) -> bool:
return self._hasBusSupport and (self.numAudioBuses > 0 or self.numControlBuses > 0)
def addBusSupport(self, numAudioBuses: int|None = None, numControlBuses: int|None = None) -> None:
if self.hasBusSupport():
return
if numAudioBuses is None:
numAudioBuses = self.numAudioBuses or config['num_audio_buses']
if numControlBuses is None:
numControlBuses = self.numControlBuses or config['num_control_buses']
logger.debug(f"Adding bus support, {numAudioBuses=}, {numControlBuses=}")
startInstr = max(instrnum for instrnum in self._builtinInstrs.values() if instrnum < CONSTS['postProcInstrnum']) + 1
postInstrnum = 1 + max(max(self._builtinInstrs.values()), CONSTS['postProcInstrnum'])
busorc, businstrs = engineorc.makeBusOrc(numAudioBuses=numAudioBuses,
numControlBuses=numControlBuses,
startInstr=startInstr,
postInstr=postInstrnum)
self._builtinInstrs.update(businstrs)
self._reservedInstrnumRanges.append(('busorc', min(businstrs.values()), max(businstrs.values())))
self._hasBusSupport = True
self.numAudioBuses = numAudioBuses
self.numControlBuses = numControlBuses
chanptr, error = self.csound.channelPtr("_busTokenCount", kind='control', mode='rw')
assert isinstance(chanptr, np.ndarray)
self._busTokenCountPtr = chanptr
if self.started:
self._compileCode(busorc, block=True)
kbustable = int(self.evalCode("return gi__bustable"))
self._kbusTable = self.tableData(kbustable)
# self.sync()
self._busGlobalInstrs()
# Setup clearbuses
else:
self.csound.compileOrc(busorc)
kbustable = int(self.csound.evalCode("return gi__bustable"))
self._kbusTable = self.csound.table(kbustable)
assert self._kbusTable is not None
[docs]
def assignBus(self, kind='', value: float | None = None, persist=True
) -> int:
"""
Assign one audio/control bus, returns the bus number.
Audio buses are always mono.
Args:
kind: the kind of bus, "audio" or "control". If left unset and value
is not given it defaults to an audio bus. Otherwise, if value
is given a control bus is created. Explicitely asking for an
audio bus and setting an initial value will raise an expection
value: for control buses it is possible to set an initial value for
the bus. If a value is given the bus is created as a control
bus. For audio buses this should be left as None
persist: if True the bus created is kept alive until the user
calls :meth:`~Engine.releaseBus`. Otherwise, the bus is
reference counted and is released after the last
user releases it.
Returns:
the bus token, can be passed to any instrument expecting a bus
to be used with the built-in opcodes :ref:`busin`, :ref:`busout`, etc.
A bus created here can be used together with the built-in opcodes :ref:`busout`,
:ref:`busin` and :ref:`busmix`. A bus can also be created directly in csound by
calling :ref:`busassign`
A bus is reference counted and is collected when there are no more clients
using it. At creation the bus is "parked", waiting to be used by any client.
As long as no clients use it, the bus stays in this state and is ready to
be used.
Multiple clients can use a bus and the bus is kept alive as long as there
are clients using it or if the bus was created as *persistent*.
When each client starts using the bus via any of the bus opcodes, like :ref:`busin`,
the reference count of the bus is increased. After a client has finished
using it the reference count is automatically decreased and if it reaches
0 the bus is collected.
Order of evaluation is important: **audio buses are cleared at the end of each
performance cycle** and can only be used to communicate from a low
priority to a high priority instrument.
For more information, see :ref:`Bus Opcodes<busopcodes>`
Example
~~~~~~~
Pass audio from one instrument to another. The bus will be released after the events
are finished.
>>> e = Engine(...)
>>> e.compile(r'''
... instr 100
... ibus = p4
... kfreq = 1000
... asig vco2 0.1, kfreq
... busout(ibus, asig)
... endin
... ''')
>>> e.compile(r'''
... instr 110
... ibus = p4
... asig = busin(ibus)
... ; do something with asig
... asig *= 0.5
... outch 1, asig
... endin
... ''')
>>> bus = e.assignBus("audio")
>>> s1 = e.sched(100, 0, 4, (bus,))
>>> s2 = e.sched(110, 0, 4, (bus,))
Modulate one instr with another, at k-rate. **NB: control buses act like global
variables, the are not cleared at the end of each cycle**.
>>> e = Engine(...)
>>> e.compile(r'''
... instr 130
... ibus = p4
... ; lfo between -0.5 and 0 at 6 Hz
... kvibr = linlin(lfo:k(1, 6), -0.5, 0, -1, 1)
... busout(ibus, kvibr)
... endin
...
... instr 140
... itranspbus = p4
... kpitch = p5
... ktransp = busin:k(itranspbus, 0)
... kpitch += ktransp
... asig vco2 0.1, mtof(kpitch)
... outch 1, asig
... endin
... ''')
>>> bus = e.assignBus()
>>> s1 = e.sched(140, 0, -1, (bus, 67))
>>> s2 = e.sched(130, 0, -1, (bus,)) # start moulation
>>> e.unsched(s2) # remove modulation
>>> e.writeBus(bus, 0) # reset value
>>> e.unschedAll()
.. seealso:: :meth:`~Engine.writeBus`, :meth:`~Engine.readBus`, :meth:`~Engine.releaseBus`
"""
if not self.hasBusSupport():
raise RuntimeError("This Engine was created without bus support")
if kind:
if value is not None and kind == 'audio':
raise ValueError(f"You asked to assign an audio bus but gave an initial "
f"value ({value})")
else:
kind = 'audio' if value is None else 'control'
bustoken = int(self._busTokenCountPtr[0])
assert isinstance(bustoken, int)
assert kind == 'audio' or kind == 'control'
self._busTokenToKind[bustoken] = kind
ikind = BUSKIND_AUDIO if kind == 'audio' else BUSKIND_CONTROL
ivalue = value if value is not None else 0.
self._busTokenCountPtr[0] = bustoken + 1
# We call busassign to assign a new bus. We are not interested, at this point,
# to get the actual bus index for the given token. Getting the actual index
# is only relevant if we want to read/write to it.
synctoken = 0
# 1 2 3 4 5 6 7 8
pfields = [self._builtinInstrs['busassign'], 0, 0, synctoken, bustoken, ikind, int(persist), ivalue]
self._perfThread.scoreEvent(False, "i", pfields)
return bustoken
[docs]
def busSystemStatus(self) -> dict:
"""
Get debugging information about the status of the bus system
This is only provided for debugging
Returns:
a dict containing information about the status of the bus system
(used buses, free buses, etc)
"""
if not self.hasBusSupport():
raise RuntimeError("This Engine has no bus support")
controlBusesFree = int(self.evalCode('return pool_size:i(gi__buspoolk)'))
audioBusesFree = int(self.evalCode('return pool_size:i(gi__buspool)'))
return {'audioBusesFree': audioBusesFree,
'controlBusesFree': controlBusesFree,
'numAudioBuses': self.numAudioBuses,
'numControlBuses': self.numControlBuses,
'busIndexes': self._busIndexes}
[docs]
def eventUI(self, eventid: float, **pargs: tuple[float, float]) -> None:
"""
Modify pfields through an interactive user-interface
If run inside a jupyter notebook, this method will create embedded widgets
to control the values of the dynamic pfields of an event
Args:
eventid: p1 of the event to modify
**pfields: a dict mapping pfield to a tuple (minvalue, maxvalue)
Example
~~~~~~~
.. code::
from csoundengine import *
e = Engine()
e.compile(r'''
instr synth
kmidinote = p4
kampdb = p5
kcutoff = p6
kres = p7
kfreq = mtof:k(kmidinote)
asig = vco2:a(ampdb(kampdb), kfreq)
asig = moogladder2(asig, kcutoff, kres)
asig *= linsegr:a(0, 0.1, 1, 0.1, 0)
outs asig, asig
endin
''')
ev = e.sched('synth', args=[67, -12, 3000, 0.9])
e.eventUI(ev, p4=(0, 127), p5=(-48, 0), kcutoff=(200, 5000), kres=(0.1, 1))
.. figure:: assets/eventui.png
"""
from . import interact
specs: dict[int|str, interact.ParamSpec] = {}
body = self._instrRegistry.get(int(eventid)) or self._instrRegistry.get(internal.instrNameFromP1(eventid))
pfieldsNameToIndex = csoundparse.instrParseBody(body).pfieldNameToIndex if body else None
for pfield, spec in pargs.items():
minval, maxval = spec
idx = internal.resolvePfieldIndex(pfield, pfieldsNameToIndex)
if not idx:
raise KeyError(f"pfield {pfield} not understood")
value = self._getp(eventid, idx)
specs[idx] = interact.ParamSpec(pfield,
minvalue=minval, maxvalue=maxval,
startvalue=value if value is not None else 0.,
widgetHint='slider')
return interact.interactPargs(self, eventid, specs=specs)
@_atexit.register
def _cleanup() -> None:
engines = list(Engine.activeEngines.values())
if engines:
logger.debug("Exiting python, closing all active engines")
for engine in engines:
logger.debug(f"... stopping {engine.name}")
engine.stop()
[docs]
def getEngine(name: str) -> Engine | None:
"""
Get an already created engine by name
Example
~~~~~~~
>>> import csoundengine as ce
>>> ce.Engine(name='old', a4=435)
>>> getEngine('old')
Engine(name=old, sr=44100, backend=jack, outdev=dac, nchnls=2, indev=adc, nchnls_i=2,
bufferSize=256)
>>> getEngine('foo') is None
True
"""
return Engine.activeEngines.get(name)