Source code for csoundengine.csoundlib

"""
This module provides miscellaneous functionality for working with csound.

This functionality includes:

* parse csound code
* generate a ``.csd`` file
* inspect the audio environment
* query different paths used by csound
* etc.

"""
from __future__ import annotations

import dataclasses
import sys
import functools as _functools
import logging as _logging
import math as _math
import os as _os
import re as _re
import shutil as _shutil
import subprocess as _subprocess
import tempfile as _tempfile
import textwrap as _textwrap
import struct

import cachetools as _cachetools
import numpy as np
from . import csounddefs

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from typing import Any, Callable, Sequence
    Curve = Callable[[float], float]
    from . import jacktools


logger = _logging.getLogger("csoundengine")


_cache: dict[str, Any] = {
    'opcodes': None,
    'versionTriplet': None
}


_audioDeviceRegex = r"(\d+):\s((?:adc|dac)\d+)\s*\((.*)\)(?:\s+\[ch:(\d+)\])?"


[docs] def midiDevices(backend='portmidi') -> tuple[list[MidiDevice], list[MidiDevice]]: """ Returns input and output midi devices for the given backend Args: backend: the backend used for realtime midi (as passed to csound via -+rtmidi={backend} Returns: a tuple (inputdevices, outputdevices), which each of these is a list of MidiDevice with attributes ``deviceid`` (the value passed to -M), ``name`` (the name of the device) and ``kind`` (one of 'input' or 'output') ======== =========================== Platform Possible Backends ======== =========================== linux portmidi, alsaseq, alsaraw macos portmidi windows portmidi ======== =========================== """ import libcsound csound = libcsound.Csound() csound.setOption(f"-+rtmidi={backend}") csound.setOption("-odac") csound.start() inputdevs = csound.midiDevList(False) outputdevs = csound.midiDevList(True) logger.debug(f"MIDI Inputs: {inputdevs}") logger.debug(f"MIDI Outputs: {outputdevs}") midiins = [MidiDevice(deviceid=d.deviceId, kind='input', name=f"{d.interfaceName}:{d.deviceName}") for d in inputdevs] midiouts = [MidiDevice(deviceid=d.deviceId, kind='output', name=f"{d.interfaceName}:{d.deviceName}") for d in outputdevs] return midiins, midiouts
[docs] @dataclasses.dataclass(unsafe_hash=True) class AudioBackend: """ Holds information about a csound audio backend Attributes: name: the name of this backend alwaysAvailable: is this backend always available? hasSystemSr: does this backend have a system samplerate? needsRealtime: the backend needs to be run in realtime platforms: a list of platform for which this backend is available longname: an alternative name for the backend defaultBufferSize: the default buffer size for this backend (-b) defaultNumBuffers: the number of buffers to fill a block (determines -B) audioDeviceRegex: a regex to grep the audio devices from csound's output acceptsDeviceIndex: can this backend accept a device in the form -dac0 or -adc1? """ name: str alwaysAvailable: bool = False needsRealtime: bool = False platforms: tuple[str, ...] = ('linux', 'darwin', 'win32') hasSystemSr: bool = False longname: str = "" defaultBufferSize: int = 1024 defaultNumBuffers: int = 2 audioDeviceRegex: str = '' acceptsDeviceIndex: bool = True priority: int = 0 def __post_init__(self): if not self.longname: self.longname = self.name if not self.audioDeviceRegex: self.audioDeviceRegex = _audioDeviceRegex
[docs] def searchAudioDevice(self, pattern: str, kind: str) -> AudioDevice | None: """ Search a certain audio device from the devices presented by this backend """ # we get the devices via getAudioDevices to enable caching # indevs, outdevs = self.audioDevices() indevs, outdevs = getAudioDevices(self.name) devs = indevs if kind == 'input' else outdevs match = next((d for d in devs if d.id == pattern), None) if match: return match return next((d for d in devs if _re.search(pattern, d.name)), None)
[docs] def isAvailable(self) -> bool: """ Is this backend available? """ if sys.platform not in self.platforms: return False if self.alwaysAvailable: return True indevices, outdevices = getAudioDevices(backend=self.name) return bool(indevices or outdevices)
[docs] def getSystemSr(self) -> int | None: """ Get the system samplerate for this backend, if available We use the default output device. """ if not self.hasSystemSr: logger.debug(f"Backend {self.name} does not have a system sr, returning default") return 44100 import libcsound cs = libcsound.Csound() cs.setOption("-odac") cs.setOption(f"-+rtaudio={self.name}") ok = cs.start() if ok == -1: logger.error(f"Backend {self.name} not available") return None sr = cs.systemSr(0) cs.stop() return int(sr) if sr > 0 else None
[docs] def bufferSizeAndNum(self) -> tuple[int, int]: """ The buffer size and number of buffers needed for this backend """ return (self.defaultBufferSize, self.defaultNumBuffers)
[docs] def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: return self.audioDevicesViaAPI()
[docs] def audioDevicesViaAPI(self) -> tuple[list[AudioDevice], list[AudioDevice]]: """ Query csound for audio devices for this backend Returns: a tuple (inputDevices: list[AudioDevice], outputDevices: list[AudioDevice]) """ logger.info(f"Querying csound's audio devices for backend {self.name}") import libcsound cs = libcsound.Csound() cs.createMessageBuffer() for opt in ['-+rtaudio='+self.name, "-m16", "-odac", "--use-system-sr"]: cs.setOption(opt) cs.start() csoutdevs = cs.audioDevList(isOutput=True) csindevs = cs.audioDevList(isOutput=False) outdevs = [AudioDevice(id=d.deviceId, name=d.deviceName, backend=self.name, kind='output', index=i, numChannels=d.maxNchnls) for i, d in enumerate(csoutdevs)] indevs = [AudioDevice(id=d.deviceId, name=d.deviceName, backend=self.name, kind='input', index=i, numChannels=d.maxNchnls) for i, d in enumerate(csindevs)] cs.stop() cs.destroyMessageBuffer() return indevs, outdevs
[docs] def defaultAudioDevices(self) -> tuple[AudioDevice | None, AudioDevice | None]: """ Returns the default audio devices for this backend Returns: a tuple ``(inputDevice: AudioDevice, outputDevice: AudioDevice)`` for this backend """ indevs, outdevs = getAudioDevices(self.name) # self.audioDevices() return indevs[0] if indevs else None, outdevs[0] if outdevs else None
@_cachetools.cached(cache=_cachetools.TTLCache(1, 20)) def _jackdata() -> tuple[jacktools.JackInfo, list[jacktools.JackClient]] | None: from . import jacktools info = jacktools.getInfo() if info is None: return None return info, jacktools.getClients() class _JackAudioBackend(AudioBackend): def __init__(self): super().__init__(name='jack', priority=10, alwaysAvailable=False, platforms=('linux', 'darwin', 'win32'), hasSystemSr=True) def getSystemSr(self) -> int: if (data := _jackdata()) is not None: return data[0].samplerate raise RuntimeError("Jack is not available") def bufferSizeAndNum(self) -> tuple[int, int]: data = _jackdata() if not data: raise RuntimeError("Jack is not available") blocksize = data[0].blocksize import emlib.mathlib if not emlib.mathlib.ispowerof2(blocksize): logger.warning(f"Jack's blocksize is not a power of 2: {blocksize}!") # jack buf: 512 -> -B 768 -b 256 # buffsize: 256, numbuffers: 2 buffsize = blocksize // 2 numbuffers = 2 return buffsize, numbuffers def isAvailable(self) -> bool: return _jackdata() is not None def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: data = _jackdata() if data is None: raise RuntimeError("Jack is not available") _, clients = data indevs = [AudioDevice(id=f'adc:{c.regex}', name=c.name, backend=self.name, kind='input', numChannels=len(c.ports), index=c.firstIndex, isPhysical=c.isPhysical) for c in clients if c.kind == 'output'] outdevs = [AudioDevice(id=f'dac:{c.regex}', name=c.name, backend=self.name, kind='output', numChannels=len(c.ports), index=c.firstIndex, isPhysical=c.isPhysical) for c in clients if c.kind == 'input'] return indevs, outdevs def defaultAudioDevices(self) -> tuple[AudioDevice|None, AudioDevice|None]: indevs, outdevs = self.audioDevices() defaultin = next((dev for dev in indevs if dev.isPhysical), None) defaultout = next((dev for dev in outdevs if dev.isPhysical), None) return defaultin, defaultout class _PulseAudioBackend(AudioBackend): def __init__(self): super().__init__(name='pulse', alwaysAvailable=False, hasSystemSr=True, defaultBufferSize=1024, defaultNumBuffers=2, platforms=('linux',), priority=0) def getSystemSr(self) -> int: from . import linuxaudio info = linuxaudio.pulseaudioInfo() return info.sr if info else 0 def isAvailable(self) -> bool: return self.getSystemSr() > 0 def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: from . import linuxaudio pulseinfo = linuxaudio.pulseaudioInfo() if pulseinfo is None: raise RuntimeError("PulseAudio not available") indevs = [AudioDevice(id="adc", name="adc", backend=self.name, kind='input', index=0, numChannels=pulseinfo.numchannels)] outdevs = [AudioDevice(id="dac", name="dac", backend=self.name, kind='output', index=0, numChannels=pulseinfo.numchannels)] return indevs, outdevs class _PortaudioBackend(AudioBackend): def __init__(self, kind='callback'): shortname = "pa_cb" if kind == 'callback' else 'pa_bl' longname = f"portaudio-{kind}" priority = 2 if kind == 'callback' else 0 hasSystemSr = False if sys.platform == 'linux': from . import linuxaudio hasSystemSr = linuxaudio.isPipewireRunning() super().__init__(name=shortname, alwaysAvailable=True, longname=longname, hasSystemSr=hasSystemSr, priority=priority) def getSystemSr(self) -> int | None: if sys.platform == 'linux' and self.hasSystemSr: from . import linuxaudio info = linuxaudio.pipewireInfo() return info.sr if info is not None else 44100 return super().getSystemSr() def defaultAudioDevices(self) -> tuple[AudioDevice | None, AudioDevice | None]: logger.debug("Querying default device via portaudio/sounddevice") try: import sounddevice as sd except ImportError: logger.warning('Could not initialize the sounddevice library. Falling back' ' to querying via csound') return self._defaultAudioDevicesViaCsound() devices = sd.query_devices() defaultoutdev, defaultindev = None, None indevs, outdevs = self.audioDevices() if indevs and sd.default.device[0] is not None: defaultName = devices[sd.default.device[0]]['name'] defaultindev = next((dev for dev in indevs if dev.name.split("[")[0].strip() == defaultName), None) if outdevs and sd.default.device[1] is not None: defaultName = devices[sd.default.device[1]]['name'] defaultoutdev = next((dev for dev in outdevs if dev.name.split("[")[0].strip() == defaultName), None) return defaultindev, defaultoutdev def _defaultAudioDevicesViaCsound(self) -> tuple[AudioDevice | None, AudioDevice | None]: indevs, outdevs = self.audioDevices() indev = next((d for d in indevs if _re.search(r"\bdefault\b", d.name)), None) outdev = next((d for d in outdevs if _re.search(r"\bdefault\b", d.name)), None) if indev is None: if not indevs: logger.warning(f"No input devices for backend {self.name}") else: indev = indevs[0] if outdev is None: if not outdevs: logger.warning(f"No output devices for backend {self.name}") else: outdev = outdevs[0] return indev, outdev # linux priorities: jack(10), portaudio-callback(2), alsa(1), pulse(0), portaudio-blocking(0) class _AlsaBackend(AudioBackend): def __init__(self): super().__init__(name="alsa", alwaysAvailable=True, platforms=('linux',), audioDeviceRegex=r"([0-9]+):\s((?:adc|dac):.*)\((.*)\)", acceptsDeviceIndex=False, priority=1) def getSystemSr(self) -> int | None: if (jackdata := _jackdata()) is not None: return jackdata[0].samplerate assert sys.platform == 'linux' from . import linuxaudio if linuxaudio.isPipewireRunning(): info = linuxaudio.pipewireInfo() return info.sr if info else None else: return 44100 @_functools.cache def _getAvailableAudioBackends() -> dict[str, AudioBackend]: _backendPortaudioCallback = _PortaudioBackend('callback') backends: dict[str, AudioBackend] = { 'portaudio': _backendPortaudioCallback, 'pa_cb': _backendPortaudioCallback, 'pa_bl': _PortaudioBackend('blocking'), } if _jackdata() is not None: backends['jack'] = _JackAudioBackend() if sys.platform == 'linux': backends['alsa'] = _AlsaBackend() backends['pulseaudio'] = _PulseAudioBackend() elif sys.platform == 'darwin': auhal = AudioBackend('auhal', alwaysAvailable=True, hasSystemSr=True, needsRealtime=False, longname="coreaudio", platforms=('darwin',)) backends['coreaudio'] = auhal backends['auhal'] = auhal return backends
[docs] def nextpow2(n:int) -> int: """ Returns the power of 2 higher or equal than n""" return int(2 ** _math.ceil(_math.log(n, 2)))
[docs] def findCsound() -> str | None: """ Find the csound binary or None if not found """ csound = _shutil.which("csound") if not csound: logger.error("csound is not in the path!") return csound
def _getVersionViaApi() -> tuple[int, int, int]: """ Returns the csound version as tuple (major, minor, patch) """ if (version := _cache.get('versionTriplet')) is not None: return version return _csoundGetInfoViaAPI()['versionTriplet']
[docs] @_functools.cache def getVersion(useApi=True) -> tuple[int, int, int | str]: """ Returns the csound version as tuple (major, minor, patch) Args: useApi: if True, the API is used to query the version. Otherwise the output of "csound --version" is parsed. Both versions might differ Returns: the versions as a tuple (major:int, minor:int, patch:int|str) Raises RuntimeError if csound is not present or its version can't be parsed """ if useApi: return _getVersionViaApi() csound = findCsound() if not csound: raise IOError("Csound not found") cmd = '{csound} --version'.format(csound=csound).split() proc = _subprocess.Popen(cmd, stderr=_subprocess.PIPE) proc.wait() if proc.stderr is None: return (0, 0, 0) outputbytes = proc.stderr.read() if not outputbytes: raise RuntimeError("Could not read csounds output") output = outputbytes.decode('utf8') lines = output.splitlines() for line in lines: if match := _re.search(r"Csound\s+version\s+(\d+)\.(\d+)(\.\w+)?", line): major = int(match.group(1)) minor = int(match.group(2)) patch = match.group(3) if patch is None: patch = 0 elif patch.isdigit(): patch = int(patch) return (major, minor, patch) else: raise RuntimeError(f"Did not find a csound version, csound output: '{output}'")
[docs] def csoundSubproc(args: list[str], piped=True, wait=False) -> _subprocess.Popen: """ Calls csound with given args in a subprocess, returns a subprocess.Popen object. Args: args: the args passed to csound (each argument is a string) piped: if True, stdout and stderr are piped to the Popen object wait: if True, wait until csound exits Returns: the subprocess.Popen object Raises RuntimeError if csound is not found Example ------- >>> from csoundengine import csoundlib >>> proc = csoundlib.csoundSubproc(["-+rtaudio=jack", "-odac", "myfile.csd"]) See Also ~~~~~~~~ * :func:`runCsd` """ csound = findCsound() if not csound: raise RuntimeError("Csound not found") p = _subprocess.PIPE if piped else None callargs = [csound] callargs.extend(args) proc = _subprocess.Popen(callargs, stderr=p, stdout=p) if wait: proc.wait() return proc
[docs] def getSystemSr(backend: str) -> float | None: """ Get the system samplerate for a given backend None is returned if the backend does not support a system sr. At the moment only **jack** and **coreaudio** (auhal) report a system-sr Args: backend: the name of the backend (jack, pa_cb, auhal, etc) Returns: the system sr if the backend reports this information, or None See Also ~~~~~~~~ * :func:`getAudioBackend` * :func:`getDefaultBackend` """ b = getAudioBackend(backend) if not b: raise ValueError(f"backend {backend} not known") return b.getSystemSr()
# def _getJackSrViaClient() -> float: # import jack # c = jack.Client("query") # sr = c.samplerate # c.close() # return sr # def _getCsoundSystemSr(backend: str) -> float: # if backend not in {'jack', 'auhal'}: # raise ValueError(f"backend {backend} does not support system sr") # import libcsound # csound = libcsound.Csound() # csound.setOption(f"-+rtaudio={backend}") # csound.setOption("-odac") # csound.setOption("--use-system-sr") # csound.start() # sr = csound.sr() # csound.stop() # return sr
[docs] def getDefaultBackend() -> AudioBackend: """ Get the default active backend for platform Discard any backend which is not available at the moment ============== ================================= Platform Backends (in order of priority) ============== ================================= Windows portaudio macOS auhal (coreaudio), portaudio linux jack (if running), portaudio ============== ================================= """ backends = audioBackends() if not backends: raise RuntimeError("No available backends") return max(backends, key=lambda b: b.priority)
[docs] def runCsd(csdfile: str, outdev='', indev='', backend='', nodisplay=False, nomessages=False, comment='', piped=False, extra: list[str] | None = None ) -> _subprocess.Popen: """ Run the given .csd as a csound subprocess Args: csdfile: the path to a .csd file outdev: "dac" to output to the default device, the label of the device (dac0, dac1, ...), or a filename to render offline (-o option) indev: The input to use (for realtime) (-i option) backend: The name of the backend to use. If no backend is given, the default for the platform is used (this is only meaningful if running in realtime) nodisplay: if True, eliminates debugging info from output nomessages: if True, suppress debugging messages piped: if True, the output of the csound process is piped and can be accessed through the Popen object (.stdout, .stderr) extra: a list of extra options arguments to be passed to csound comment: if given, will be added to the generated output as comment metadata (when running offline) Returns: the `subprocess.Popen` object. In order to wait until rendering is finished in offline mode, call .wait on the returned process .. seealso:: :func:`csoundSubproc` """ args = [] offline = True if outdev is not None and outdev: args.extend(["-o", outdev]) if outdev.startswith("dac"): offline = False if not offline and not backend: backend = getDefaultBackend().name if backend: args.append(f"-+rtaudio={backend}") if indev: args.append(f"-i{indev}") if nodisplay: args.append('-d') if nomessages: args.append('-m16') if comment and offline: args.append(f'-+id_comment="{comment}"') if extra: args.extend(extra) args.append(csdfile) return csoundSubproc(args, piped=piped)
[docs] def joinCsd(orc: str, sco='', options: list[str] | None = None) -> str: """ Joins an orc and a score (both as str), returns a csd as string Args: orc: the text of the orchestra sco: the text of the score options: any command line options to be included Returns: the text of the csd """ optionstr = "" if options is None else "\n".join(options) csd = r""" <CsoundSynthesizer> <CsOptions> {optionstr} </CsOptions> <CsInstruments> {orc} </CsInstruments> <CsScore> {sco} </CsScore> </CsoundSynthesizer> """.format(optionstr=optionstr, orc=orc, sco=sco) csd = _textwrap.dedent(csd) return csd
[docs] @dataclasses.dataclass class CsoundProc: """ A CsoundProc wraps a running csound subprocess Attributes: proc: the running csound subprocess backend: the backend used outdev: the output device sr: the sample rate of the running process nchnls: the number of channels csdstr: the csd being run, as a str """ proc: _subprocess.Popen backend: str outdev: str sr: int nchnls: int csdstr: str = ""
[docs] def testCsound(dur=8., nchnls=2, backend='', device="dac", sr=0, ksmps=64, verbose=True ) -> CsoundProc: """ Test the current csound installation for realtime output Args: dur: the duration of the test nchnls: the number of output channels backend: which backend to use. device: which device to use sr: the sample rate. Use 0 to use system sample rate if applicable, or a default sample rate otherwise verbose: if True, make csound display debugging and status information Returns: a :class:`CsoundProc` """ backend = backend or getDefaultBackend().name if not sr: sr = getSamplerateForBackend(backend) or 44100 printchan = "printk2 kchn" if verbose else "" orc = f""" sr = {sr} ksmps = {ksmps} nchnls = {nchnls} instr 1 iperiod = 1 kchn init -1 ktrig metro 1/iperiod kchn = (kchn + ktrig) % nchnls anoise pinker outch kchn+1, anoise {printchan} endin """ sco = f"i1 0 {dur}" orc = _textwrap.dedent(orc) logger.debug(orc) csd = joinCsd(orc, sco=sco) tmp = _tempfile.mktemp(suffix=".csd") open(tmp, "w").write(csd) proc = runCsd(tmp, outdev=device, backend=backend) return CsoundProc(proc=proc, backend=backend, outdev=device, sr=sr, nchnls=nchnls, csdstr=csd)
[docs] def installedOpcodes(cached=True, opcodedir: str = '') -> set[str]: """ Return a list of the opcodes present Args: cached: if True, results are remembered between calls opcodedir: if given, plugin libraries will be loaded from this path (option --opcode-dir in csound). In this case the cache is not used Returns: a list of all available opcodes """ if opcodedir: cached = False if cached and _cache.get('opcodes') is not None: return _cache['opcodes'] return _csoundGetInfoViaAPI(opcodedir=opcodedir)['opcodes']
def _csoundGetInfoViaAPI(opcodedir='') -> dict: global _cache import libcsound cs = libcsound.Csound() cs.setOption("-d") cs.setOption("--nosound") cs.createMessageBuffer(echo=False) if opcodedir: cs.setOption(f'--opcode-dir={opcodedir}') version = cs.version() vs = str(version) patch = int(vs[-1]) minor = int(vs[-3:-1]) major = int(vs[:-3]) versionTriplet = (major, minor, patch) opcodes = cs.getOpcodes() opcodenames = set(opc.name for opc in opcodes) _cache['versionTriplet'] = versionTriplet _cache['opcodes'] = opcodenames _cache['opcodedefs'] = opcodes cs.stop() return {'opcodedefs': opcodes, 'opcodes': opcodenames, 'versionTriplet': versionTriplet} # def _opcodesList(opcodedir='') -> list[str]: # options = ["-z"] # if opcodedir: # options.append(f'--opcode-dir={opcodedir}') # s = csoundSubproc(options) # assert s.stderr is not None # lines = s.stderr.readlines() # allopcodes = [] # for line in lines: # if line.startswith(b"end of score"): # break # opcodes = line.decode('utf8').split() # if opcodes: # allopcodes.extend(opcodes) # return allopcodes
[docs] def saveAsGen23(data: Sequence[float] | np.ndarray, outfile: str, fmt="%.12f", header='' ) -> None: """ Saves the data to a gen23 table .. note:: gen23 is a 1D list of numbers in text format, sepparated by a space Args: data: A 1D sequence (list or array) of floats outfile: The path to save the data to. Recommended extension: '.gen23' fmt: If saving frequency tables, fmt can be "%.1f" and save space, for amplitude the default if "%.12f" is best header: If specified it is included as a comment as the first line (csound will skip it). It is there just to document what is in the table Example ~~~~~~~ .. code-block:: python >>> import bpf4 >>> from csoundengine import csoundlib >>> a = bpf.linear(0, 0, 1, 10, 2, 300) >>> dt = 0.01 >>> csoundlib.saveAsGen23(a[::dt].ys, "out.gen23", header=f"dt={dt}") In csound: .. code-block:: csound gi_tab ftgen 0, 0, 0, -23, "out.gen23" instr 1 itotaldur = ftlen(gi_tab) * 0.01 ay poscil 1, 1/itotaldur, gi_tab endin """ if header: np.savetxt(outfile, data, fmt=fmt, header="# " + header) else: np.savetxt(outfile, data, fmt=fmt)
def _metadataAsComment(d: dict[str, Any], maxSignificantDigits=10, sep=", ") -> str: fmt = f"%.{maxSignificantDigits}g" parts = [] for key, val in d.items(): if isinstance(val, int): valstr = str(val) elif isinstance(val, float): valstr = fmt % val elif isinstance(val, str): valstr = f'"{val}"' else: raise TypeError(f"Value should be int, float or str, got {val}") parts.append(f"{key}: {valstr}") return sep.join(parts)
[docs] def saveMatrixAsMtx(outfile: str, data: np.ndarray, metadata: dict[str, str | float] | None = None, encoding="float32", title='', sr: int = 44100) -> None: """ Save `data` in wav format using the mtx extension This is not a real output. It is used to transfer the data in binary form to be read by another program. To distinguish this from a normal wav file an extension `.mtx` is recommended. Data is saved always flat, and a header with the shape of `data` is included before the data. **Header Format**:: headerlength, numRows, numColumns, ... The description of each metadata value is included as wav metadata at the comment key with the format:: "headerSize: xx, numRows: xx, numColumns: xx, columns: 'headerSize numRows numColumns ...'" This metadata can be retrieved in csound via: .. code-block:: csound itabnum ftgen 0, 0, 0, -1, "sndfile.mtx", 0, 0, 1 Scomment = filereadmeta("sndfile.mtx", "comment") imeta = dict_loadstr(Scomment) ScolumnNames = dict_get(imeta, "columns") idatastart = tab_i(0, itabnum) inumrows = dict_get(imeta, "numRows") ; inumrows can also be retrieved by reading the table at index 1 ; inumrows = tab_i(1, itabnum) inumcols = tab_i(2, itabnum) ; The data at (krow, kcol) can be read via kvalue = tab(idatastart + krow*inumcols + kcol, itabnum) ; Alternatively an array can be created as a view: kArr[] memview itabnum, idatastart reshapearray kArr, inumrows, inumcols kvalue = kArr[krow][kcol] Args: outfile (str): The path where the data is written to data (numpy array): a numpy array of shape (numcols, numsamples). A 2D matrix representing a series of streams sampled at a regular period (dt) metadata: Any float values here are included in the header, and the description of this data is included as metadata in the wav file encoding: the data can be encoded in float32 or float64 title: if given will be included in the output metadata sr: sample rate. I """ assert isinstance(outfile, str) assert encoding == 'float32' or encoding == 'float64' if _os.path.splitext(outfile)[1] != ".mtx": logger.warning(f"The extension should be .mtx, but asked to save" f"the matrix as {outfile}") import sndfileio header: list[int|float] = [3, data.shape[0], data.shape[1]] allmeta: dict[str, Any] = { 'headerSize': 3, 'numRows': data.shape[0], 'numColumns': data.shape[1] } columns = ['headerSize', 'numRows', 'numColumns'] if metadata: for k, v in metadata.items(): if isinstance(v, (int, float)): header.append(v) allmeta[k] = v columns.append(k) headersize = len(allmeta) allmeta['HeaderSize'] = headersize header[0] = headersize for k, v in metadata.items(): if isinstance(v, str): allmeta[k] = v allmeta['columns'] = " ".join(columns) wavmeta = {'comment': _metadataAsComment(allmeta), 'software': 'MTX1'} if title: wavmeta['title'] = title sndwriter = sndfileio.sndwrite_chunked(outfile=outfile, sr=sr, encoding=encoding, metadata=wavmeta, fileformat='wav') sndwriter.write(np.array(header, dtype=float)) sndwriter.write(data.ravel()) sndwriter.close()
[docs] def saveMatrixAsGen23(outfile: str, mtx: np.ndarray, extradata: list[float] | None = None, header=True ) -> None: """ Save a numpy 2D array as gen23 Args: outfile (str): the path to save the data to. Suggestion: use '.gen23' as ext mtx (np.ndarray): a 2D array of floats extradata: if given, this data will be prependedto the data in `mtx`. Implies `include_header=True` header: if True, a header of the form [headersize, numrows, numcolumns] is prepended to the data. .. note:: The format used by gen23 is a text format with numbers separated by any space. When read inside csound the table is of course 1D but can be interpreted as 2D with the provided header metadata. """ numrows, numcols = mtx.shape mtx = mtx.round(6) with open(outfile, "w") as f: if header or extradata: headerrow = [3., numrows, numcols] if extradata is not None: headerrow.extend(extradata) headerrow[0] = len(headerrow) f.write(" ".join(np.array(headerrow).astype(str))) f.write("\n") for row in mtx: rowstr = " ".join(row.astype(str)) f.write(rowstr) f.write("\n")
[docs] @dataclasses.dataclass class MidiDevice: """ A MidiDevice holds information about a midi device for a given backend Attributes: index: the index as listed by csound and passed to -M name: the name of the device kind: the kind of the device ('input', 'output') """ deviceid: str name: str kind: str = 'input'
[docs] @dataclasses.dataclass class AudioDevice: """ An AudioDevice holds information about a an audio device for a given backend Attributes: id: the device identification (dac3, adc2, etc) name: the name of the device index: the index of this audio device, as passed to adcXX or dacXX kind: 'output' or 'input' numChannels: the number of channels isPhysical: True if this is a physical (hardware) device. Used for jack """ id: str name: str kind: str backend: str index: int = -1 numChannels: int | None = 0 isPhysical: bool | None = None
[docs] def info(self) -> str: """ Returns a summary of this device in one line """ s = f"{self.id}:{self.name}" if self.kind == 'output': s += f":{self.numChannels}outs" else: s += f":{self.numChannels}ins" return s
[docs] @_cachetools.cached(cache=_cachetools.TTLCache(10, 20)) def getDefaultAudioDevices(backend='') -> tuple[AudioDevice | None, AudioDevice | None]: """ Returns the default audio devices for a given backend Args: backend: the backend to use (None to get the default backend) Returns: a tuple (input devices, output devices) .. note:: Results are cached for a period of time """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend {backend} not known") return backendDef.defaultAudioDevices()
[docs] @_cachetools.cached(cache=_cachetools.TTLCache(10, 20)) def getAudioDevices(backend) -> tuple[list[AudioDevice], list[AudioDevice]]: """ Returns (indevices, outdevices), where each of these lists is an AudioDevice. Args: backend: specify a backend supported by your installation of csound. None to use a default for you OS Returns: a tuple of (input devices, output devices) .. note:: For jack an audio device is a client Each returned device is an AudioDevice instance with attributes: * index: The device index * label: adc{index} for input devices, dac{index} for output devices. The label can be passed to csound directly with either the -i or the -o flag (``-i{label}`` or ``-o{label}``) * name: A description of the device * ins: number of input channels * outs: number of output channels ======== ==== ===== ==== ================== ======================= Backend OSX Linux Win Multiple-Devices Description ======== ==== ===== ==== ================== ======================= jack x x - - Jack auhal x - - x CoreAudio pa_cb x x x x PortAudio (Callback) pa_bl x x x x PortAudio (blocking) ======== ==== ===== ==== ================== ======================= """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend '{backend}' not supported, known backends: {list(_getAvailableAudioBackends().keys())}") return backendDef.audioDevices()
[docs] def getSamplerateForBackend(backend='') -> int | None: """ Returns the samplerate reported by the given backend Args: backend: the backend to query. None to use the default backend Returns: the samplerate for the given backend or None if failed """ backendDef = getAudioBackend(backend) if backendDef is None: possible = [name for name, backend in _getAvailableAudioBackends().items() if sys.platform in backend.platforms] raise ValueError(f"Backend '{backend}' not supported. Possible backends for this platform: {possible}") if not backendDef.isAvailable(): raise RuntimeError(f"Audiobackend {backendDef.name} is not available") return backendDef.getSystemSr()
[docs] def audioBackends() -> list[AudioBackend]: """ Return a list of available audio backends Returns: a list of AudioBackend If available is True, only those backends supported for the current platform and currently available are returned. For example, jack will not be returned in linux if the jack server is not running. Example ~~~~~~~ >>> from csoundengine import * >>> [backend.name for backend in audioBackends()] ['jack', 'pa_cb', 'pa_bl', 'alsa'] """ return list(_getAvailableAudioBackends().values())
[docs] def dumpAudioBackends() -> None: """ Prints all **available** backends and their properties as a table """ rows = [] headers = "backend longname sr".split() backends = audioBackends() backends.sort(key=lambda backend:backend.name) from emlib.misc import print_table for b in backends: if b.hasSystemSr: sr = str(b.getSystemSr()) else: sr = "-" rows.append((b.name, b.longname, sr)) print_table(rows, headers=headers, showindex=False)
[docs] def getAudioBackend(name='') -> AudioBackend | None: """ Given the name of the backend, return the AudioBackend structure Only available backends are considered. Some backends listed for a given platform might not be running and thus will not be listed Args: name: the name of the backend Returns: the AudioBackend structure, or None if the audio backend is not available or unknown ========== =================== ======= ========= ======= Name Description Linux Windows MacOS ========== =================== ======= ========= ======= pa_cb portaudio-callback ✓ ✓ ✓ pa_bl portaudio-blocking ✓ ✓ ✓ portaudio alias to pa_cb ✓ ✓ ✓ jack jack ✓ ? ✓ pulse pulseaudio ✓ ✗ ✗ pulseaudio alias to pulse ✓ ✗ ✗ auhal coreaudio ✗ ✗ ✓ coreaudio alias to auhal ✗ ✗ ✓ ========== =================== ======= ========= ======= """ if not name: return getDefaultBackend() return _getAvailableAudioBackends().get(name)
[docs] def getAudioBackendNames() -> list[str]: """ Returns a list with the names of the available audio backends for this Returns: a list with the names of all available backends for the given platform Example ------- >>> from csoundengine.csoundlib import * >>> getAudioBackendNames() # in Linux ['jack', 'pa_cb', 'pa_bl', 'alsa', 'pulse'] >>> getAudioBackendNames(platform='macos') ['pa_cb', 'pa_bl', 'auhal'] # In linux with pulseaudio disabled >>> getAudioBackendNames(available=True) ['jack', 'pa_cb', 'pa_bl', 'alsa'] """ return [b.name for b in audioBackends()]
[docs] def mincer(sndfile: str, outfile: str, timecurve: float | Callable[[float], float] = 1., pitchcurve: float | Callable[[float], float] = 1., lock=False, fftsize=2048, ksmps=128, debug=False ) -> dict: """ Stretch/Pitchshift a output using csound's mincer opcode (offline) Args: sndfile: the path to a soundfile timecurve: a func mapping time to playback time or a scalar indicating a timeratio (2 means twice as fast, 1 to leave unmodified) pitchcurve: a func time to pitchscale, or a scalar indicating a freqratio outfile: the path to a resulting outfile. The resulting file is always a 32-bit float .wav file. The samplerate and number of channels match those of the input file dt: the sampling period to sample the curves lock: should mincer be run with phase-locking? fftsize: the size of the fft ksmps: the ksmps to pass to the csound process debug: run csound with debug information Returns: a dict with information about the process (keys: outfile, csdstr, csd) .. note:: If the mapped time excedes the bounds of the sndfile, silence is generated. For example, a negative time or a time exceding the duration of the sndfile Examples -------- # Example 1: stretch a output 2x >>> from csoundengine import csoundlib >>> import bpf4 >>> import sndfileio >>> snddur = sndfileio.sndinfo("mono.wav").duration >>> timecurve = bpf4.linear(0, 0, snddur*2, snddur) >>> mincer(sndfile, "mono2.wav", timecurve=timecurve, pitchcurve=1) """ import sndfileio info = sndfileio.sndinfo(sndfile) sr = info.samplerate nchnls = info.channels t0 = 0 dt = 0.002 if isinstance(timecurve, (int, float)): t1 = info.duration / timecurve ts = np.arange(0, t1 + dt, dt) times = ts * (1./timecurve) elif callable(timecurve): t1 = info.duration ts = np.arange(0, t1 + dt, dt) times = [timecurve(float(t)) for t in ts] else: raise TypeError("timecurve should be either a scalar or a bpf") if isinstance(pitchcurve, (int, float)): pitches = np.ones_like(ts) * pitchcurve else: pitches = [pitchcurve(float(t)) for t in ts] ts = np.arange(t0, t1+dt, dt) fmt = "%.12f" _, time_gen23 = _tempfile.mkstemp(prefix='time-', suffix='.gen23') np.savetxt(time_gen23, times, fmt=fmt, header=str(dt), comments='') _, pitch_gen23 = _tempfile.mkstemp(prefix='pitch-', suffix='.gen23') np.savetxt(pitch_gen23, pitches, fmt=fmt, header=str(dt), comments='') ext = _os.path.splitext(outfile)[1][1:] extraoptions = [] extraoptions.extend(csounddefs.csoundOptionsForOutputFormat(fmt=ext)) optionsstr = '\n'.join(extraoptions) csd = f""" <CsoundSynthesizer> <CsOptions> -o {outfile} {optionsstr} </CsOptions> <CsInstruments> sr = {sr} ksmps = {ksmps} nchnls = {nchnls} 0dbfs = 1.0 gi_snd ftgen 0, 0, 0, -1, "{sndfile}", 0, 0, 0 gi_time ftgen 0, 0, 0, -23, "{time_gen23}" gi_pitch ftgen 0, 0, 0, -23, "{pitch_gen23}" instr vartimepitch idt tab_i 0, gi_time ilock = {int(lock)} ifftsize = {fftsize} ikperiod = ksmps/sr isndfiledur = ftlen(gi_snd) / ftsr(gi_snd) isndchnls = ftchnls(gi_snd) ifade = ikperiod*2 inumsamps = ftlen(gi_time) it1 = (inumsamps-2) * idt ; account for idt and last value kt timeinsts aidx linseg 1, it1, inumsamps-1 at1 tablei aidx, gi_time, 0, 0, 0 kpitch tablei k(aidx), gi_pitch, 0, 0, 0 kat1 = k(at1) kgate = (kat1 >= 0 && kat1 <= isndfiledur) ? 1 : 0 agate = interp(kgate) aenv linseg 0, ifade, 1, it1 - (ifade*2), 1, ifade, 0 aenv *= agate if isndchnls == 1 then a0 mincer at1, 1, kpitch, gi_snd, ilock, ifftsize, 8 outch 1, a0*aenv else a0, a1 mincer at1, 1, kpitch, gi_snd, ilock, ifftsize, 8 outs a0*aenv, a1*aenv endif if (kt >= it1 + ikperiod) then event "i", "exit", 0.1, 1 turnoff endif endin instr exit puts "exiting!", 1 exitnow endin </CsInstruments> <CsScore> i "vartimepitch" 0 -1 f 0 36000 </CsScore> </CsoundSynthesizer> """ _, csdfile = _tempfile.mkstemp(suffix=".csd") with open(csdfile, "w") as f: f.write(csd) _subprocess.call(["csound", "-f", csdfile]) if not debug: _os.remove(time_gen23) _os.remove(pitch_gen23) _os.remove(csdfile) return {'outfile': outfile, 'csdstr': csd, 'csd': csdfile}
def _instr_as_orc(instrid, body, initstr, sr, ksmps, nchnls): orc = """ sr = {sr} ksmps = {ksmps} nchnls = {nchnls} 0dbfs = 1 {initstr} instr {instrid} {body} endin """.format(sr=sr, ksmps=ksmps, instrid=instrid, body=body, nchnls=nchnls, initstr=initstr) return orc
[docs] def recInstr(body: str, events: list, init='', outfile='', sr=44100, ksmps=64, nchnls=2, a4=442, samplefmt='float', dur=None ) -> tuple[str, _subprocess.Popen]: """ Record one instrument for a given duration Args: dur: the duration of the recording body: the body of the instrument init: the initialization code (ftgens, global vars, etc) outfile: the generated output, or None to generate a temporary file events: a list of events, where each event is a list of pargs passed to the instrument, beginning with p2: delay, dur, [p4, p5, ...] sr: the samplerate a4: A4 frequency ksmps: block size nchnls: number of output channels samplefmt: defines the sample format used for outfile, one of (16, 24, 32, 'float') Returns: a tuple (outfile to be generated, _subprocess.Popen running csound) """ if not isinstance(events, list) or not all(isinstance(event, (tuple, list)) for event in events): raise ValueError("events is a data., where each item is a data. of pargs passed to" "the instrument, beginning with p2: [delay, dur, ...]" f"Got {events} instead") from .csd import Csd csd = Csd(sr=sr, ksmps=ksmps, nchnls=nchnls, a4=a4) if not outfile: outfile = _tempfile.mktemp(suffix='.wav', prefix='csdengine-rec-') if init: csd.addGlobalCode(init) instrnum = 100 csd.addInstr(instrnum, body) for event in events: start, dur = event[0], event[1] csd.addEvent(instrnum, start, dur, event[2:]) if dur is not None: csd.setEndMarker(dur) fmtoption = {16: '', 24: '-3', 32: '-f', 'float': '-f'}.get(samplefmt) if fmtoption is None: raise ValueError("samplefmt should be one of 16, 24, 32, or 'float'") csd.addOptions(fmtoption) renderjob = csd.run(output=outfile) assert renderjob.process is not None return outfile, renderjob.process
def _ftsaveReadBinary(path: str) -> list[tuple[dict, np.ndarray]]: """ Header: sizeof(FUNC) - sizeof(MYFLT) - SSTRSIZ = 152 where: sizeof(FUNC) = 1184 MYFLT = double (8 bytes) SSTRSIZ = 1024 """ # Format string for the 152-byte cropped FUNC struct # Assuming little-endian, 64-bit platform, MYFLT = double FUNC_FORMAT = ('<' 'I' # flen uint32_t 4 'i' # lenmask int32 4 'i' # lobits int32 4 'i' # lomask int32 4 'd' # lodiv MYFLT 8 'd' # cvtbas MYFLT 8 'd' # cpscvt MYFLT 8 'hh' # loopmode1, loopmode2 int16 x2 4 'xxxx' # padding 4 'iiii' # begin1, end1, begin2, end2 16 'ii' # soundend, flenfrms 8 'ii' # nchanls, fno 8 'd' # sr MYFLT 8 'Q' # *args pointer 8 'i' # argcnt int32_t 4 'xxxx') # padding 4 # Verify size assert struct.calcsize(FUNC_FORMAT) == 152, \ f"Expected 152 bytes, got {struct.calcsize(FUNC_FORMAT)}" """Read a cropped FUNC struct (152 bytes) from a binary file.""" with open(path, 'rb') as f: header = f.read(152) if len(header) < 152: raise ValueError(f"File too short: expected 152 bytes, got {len(data)}") fields = struct.unpack(FUNC_FORMAT, header) tablelen = fields[0] data = f.read((tablelen + 1) * 8) tabinfo = { 'flen': fields[0], 'lenmask': fields[1], 'lobits': fields[2], 'lomask': fields[3], 'lodiv': fields[4], 'cvtbas': fields[5], 'cpscvt': fields[6], 'loopmode1': fields[7], 'loopmode2': fields[8], 'begin1': fields[9], 'end1': fields[10], 'begin2': fields[11], 'end2': fields[12], 'soundend': fields[13], 'flenfrms': fields[14], 'nchanls': fields[15], 'fno': fields[16], 'sr': fields[17], 'args': fields[18], # raw pointer value (not dereferenceable) 'argcnt': fields[19], } arr = np.frombuffer(data, dtype=np.float64) return [(tabinfo, arr)] def _ftsaveReadText(path: str) -> list[np.ndarray]: # a file can have multiple tables saved lines = iter(open(path)) tables = [] while True: tablength = -1 try: headerstart = next(lines) if not headerstart.startswith("===="): raise IOError(f"Expecting header start, got {headerstart}") except StopIteration: # no more tables break # Read header for line in lines: if line.startswith("flen:"): tablength = int(line[5:]) if 'END OF HEADER' in line: break if tablength < 0: raise IOError("Could not read table length") values = np.zeros((tablength+1,), dtype=float) # Read data for i, line in enumerate(lines): if line.startswith("---"): break values[i] = float(line) tables.append(values) return tables
[docs] def ftsaveRead(path: str, mode="") -> list[tuple[dict, np.ndarray]]: """ Read a file saved by ftsave, returns a list of tables Args: path: the path to the saved table/tables mode: one of "text", "binary" or empty to auto-detect the format Returns: a list of (tableinfo: dict, tabledata: np.ndarray), where tableinfo is a dict containing information about the table. This dict is only populated if the table was saved in binary format, for text tables this dict will be empty """ if not mode: from . import tools mode = "binary" if tools.isFileBinary(path) else "text" if mode == "text": arrays = _ftsaveReadText(path) return [({}, arr) for arr in arrays] elif mode == "binary" or mode == "bin": return _ftsaveReadBinary(path) else: raise ValueError(f"mode {mode} not supported")
[docs] def getNchnls(backend='', outpattern='', inpattern='', defaultin=2, defaultout=2 ) -> tuple[int, int]: """ Get the default number of channels for a given device Args: backend: the backend, one of 'jack', 'portaudio', etc. None to use default outpattern: the output device. Use None for default device. Otherwise either the device id ("dac0") or a regex pattern matching the long name of the device inpattern: the input device. Use None for default device. Otherwise either the device id ("dac0") or a regex pattern matching the long name of the device defaultin: default value returned if it is not possible to determine the number of channels for given backend+device defaultout: default value returned if it is not possible to determine the number of channels for given backend/device Returns: a tuple (nchnls_i, nchnls) for that backend+device combination """ backendDef = getAudioBackend(backend) if not backendDef: raise RuntimeError(f"Backend '{backend}' not found") adc, dac = backendDef.defaultAudioDevices() if not outpattern: outdev = dac else: outdev = backendDef.searchAudioDevice(outpattern, kind='output') if not outdev: _, outdevs = backendDef.audioDevices() outdevids = [d.id for d in outdevs] raise ValueError(f"Output device '{outpattern}' not found. Possible devices " f"are: {outdevids}") nchnls = outdev.numChannels if (outdev and outdev.numChannels is not None) else defaultout if not inpattern: indev = adc else: indev = backendDef.searchAudioDevice(inpattern, kind='input') if not indev: raise ValueError(f"Input device {inpattern} not found") nchnlsi = indev.numChannels if (indev and indev.numChannels is not None) else defaultin return nchnlsi, nchnls
def _getNchnlsJackViaJackclient(indevice: str, outdevice: str ) -> tuple[int, int]: """ Get the number of ports for the given clients using JACK-Client This is faster than csound and should give the same results Args: indevice (str): A regex pattern matching the input client, or "adc" or None to query the physical ports outdevice (str): A regex pattern matching the output client Use "dac" or None to query the physical ports Returns: a tuple (number of inputs, number of outputs) """ import jack c = jack.Client("query") if indevice == "adc" or not indevice: inports = [p for p in c.get_ports(is_audio=True, is_output=True, is_physical=True)] else: inports = c.get_ports(indevice, is_audio=True, is_output=True) if outdevice == "dac" or not outdevice: outports = [p for p in c.get_ports(is_audio=True, is_input=True, is_physical=True)] else: outports = c.get_ports(outdevice, is_audio=True, is_input=True) c.close() return len(inports), len(outports) def _parsePortaudioDeviceName(name: str) -> tuple[str, str, int, int]: """ Parses a string like "HDA Intel PCH: CX20590 Analog (hw:0,0) [ALSA, 2 in, 4 out]" Args: name: the name of the device Returns: a tuple: ("HDA Intel PCH: CX20590 Analog (hw:0,0)", "ALSA", 2, 4) """ devname, rest = name.split("[") rest = rest.replace("]", "") if "," in rest: api, inchstr, outchstr = rest.split(",") inch = int(inchstr.split()[0]) outch = int(outchstr.split()[0]) else: api = rest inch = -1 outch = -1 return devname.strip(), api, inch, outch
[docs] def dumpAudioInfo(backend=''): """ Dump information about audio backends and audio devices for the selected backend """ if not backend: dumpAudioBackends() print() dumpAudioDevices(backend=backend)
[docs] def dumpAudioDevices(backend=''): """ Print a list of audio devices for the given backend. If backend is not given, the default backend (of all available backends for the current platform) is chosen """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend '{backend}' not supported") from emlib.misc import print_table print(f"Backend: {backendDef.name}") indevs, outdevs = getAudioDevices(backend=backend) fields = [field.name for field in dataclasses.fields(AudioDevice)] inputrows = [dataclasses.astuple(dev) for dev in indevs] outputrows = [dataclasses.astuple(dev) for dev in outdevs] print("\nInput Devices:") if inputrows: print_table(inputrows, headers=fields, showindex=False) else: print("-- No input devices") print("\nOutput Devices:") if outputrows: print_table(outputrows, headers=fields, showindex=False) else: print("-- No output devices")
[docs] def channelTypeFromValue(value: int | float | str | np.ndarray) -> str: """ Channel type (k, S, a) from value """ if isinstance(value, (int, float)): return 'k' elif isinstance(value, str): return 'S' elif isinstance(value, np.ndarray): return 'a' else: raise TypeError(f"Value of type {type(value)} not supported")