Source code for csoundengine.csoundlib

"""
This module provides miscellaneous functionality for working with csound.

This functionality includes:

* parse csound code
* generate a ``.csd`` file
* inspect the audio environment
* query different paths used by csound
* etc.

"""
from __future__ import annotations

import math as _math
import os as _os
import sys
import subprocess as _subprocess
import re as _re
import shutil as _shutil
import logging as _logging
import textwrap as _textwrap
import functools as _functools
import io as _io
from pathlib import Path as _Path
import tempfile as _tempfile
import dataclasses
import cachetools as _cachetools
import numpy as np

from ._common import *
from .renderjob import RenderJob
from csoundengine import jacktools
from csoundengine import linuxaudio
from csoundengine import state as _state
from csoundengine.config import config
from csoundengine import internal
import emlib.misc
import emlib.textlib
import emlib.dialogs
import emlib.mathlib
from emlib.common import runonce

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from typing import Callable, Sequence, Iterator, Any, Set
    Curve = Callable[[float], float]
    from sf2utils.sf2parse import Sf2File


try:
    import libcsound
except Exception as e:
    if 'sphinx' in sys.modules:
        print("Called while building sphinx documentation?")
        from sphinx.ext.autodoc.mock import _MockObject
        libcsound = _MockObject()
    else:
        print("libcsound not found! Install it via 'pip install libcsound'")
        raise e

logger = _logging.getLogger("csoundengine")


_cache: dict[str, Any] = {
    'opcodes': None,
    'versionTriplet': None
}


_audioDeviceRegex = r"(\d+):\s((?:adc|dac)\d+)\s*\((.*)\)(?:\s+\[ch:(\d+)\])?"


[docs] def midiDevices(backend='portmidi') -> tuple[list[MidiDevice], list[MidiDevice]]: """ Returns input and output midi devices for the given backend Args: backend: the backend used for realtime midi (as passed to csound via -+rtmidi={backend} Returns: a tuple (inputdevices, outputdevices), which each of these is a list of MidiDevice with attributes ``deviceid`` (the value passed to -M), ``name`` (the name of the device) and ``kind`` (one of 'input' or 'output') ======== =========================== Platform Possible Backends ======== =========================== linux portmidi, alsaseq, alsaraw macos portmidi windows portmidi ======== =========================== """ csound = libcsound.Csound() csound.setOption(f"-+rtmidi={backend}") csound.setOption("-odac") csound.start() inputdevs = csound.midiDevList(False) outputdevs = csound.midiDevList(True) logger.debug(f"MIDI Inputs: {inputdevs}") logger.debug(f"MIDI Outputs: {outputdevs}") midiins = [MidiDevice(deviceid=d.deviceId, kind='input', name=f"{d.interfaceName}:{d.deviceName}") for d in inputdevs] midiouts = [MidiDevice(deviceid=d.deviceId, kind='output', name=f"{d.interfaceName}:{d.deviceName}") for d in outputdevs] return midiins, midiouts
[docs] def compressionBitrateToQuality(bitrate: int, fmt='ogg') -> float: """ Convert a bitrate to a compression quality between 0-1, as passed to --vbr-quality Args: bitrate: the bitrate in kb/s, oneof 64, 80, 96, 128, 160, 192, 224, 256, 320, 500 fmt: the encoding format (ogg at the moment) """ if fmt == 'ogg': bitrates = [64, 80, 96, 128, 128, 160, 192, 224, 256, 320, 500] idx = emlib.misc.nearest_index(bitrate, bitrates) return idx / 10 else: raise ValueError(f"Format {fmt} not supported")
[docs] def compressionQualityToBitrate(quality: float, fmt='ogg') -> int: """ Convert compression quality to bitrate Args: quality: the compression quality (0-1) as passed to --vbr-quality fmt: the encoding format (ogg at the moment) Returns: the resulting bit rate ======= ======= quality bitrate ======= ======= 0.0 64 0.1 80 0.2 96 0.3 112 0.4 128 0.5 160 0.6 192 0.7 224 0.8 256 0.9 320 1.0 500 ======= ======= """ if fmt == 'ogg': idx = int(quality * 10 + 0.5) if idx > 10: idx = 10 return (64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 500)[idx] else: raise ValueError(f"Format {fmt} not supported")
[docs] @dataclasses.dataclass(unsafe_hash=True) class AudioBackend: """ Holds information about a csound audio backend Attributes: name: the name of this backend alwaysAvailable: is this backend always available? hasSystemSr: does this backend have a system samplerate? needsRealtime: the backend needs to be run in realtime platforms: a list of platform for which this backend is available longname: an alternative name for the backend defaultBufferSize: the default buffer size for this backend (-b) defaultNumBuffers: the number of buffers to fill a block (determines -B) audioDeviceRegex: a regex to grep the audio devices from csound's output acceptsDeviceIndex: can this backend accept a device in the form -dac0 or -adc1? """ name: str alwaysAvailable: bool = False needsRealtime: bool = False platforms: tuple[str, ...] = ('linux', 'darwin', 'win32') hasSystemSr: bool = False longname: str = "" defaultBufferSize: int = 1024 defaultNumBuffers: int = 2 audioDeviceRegex: str = '' acceptsDeviceIndex: bool = True def __post_init__(self): if not self.longname: self.longname = self.name if not self.audioDeviceRegex: self.audioDeviceRegex = _audioDeviceRegex
[docs] def searchAudioDevice(self, pattern: str, kind: str) -> AudioDevice | None: """ Search a certain audio device from the devices presented by this backend """ # we get the devices via getAudioDevices to enable caching indevs, outdevs = self.audioDevices() indevs, outdevs = getAudioDevices(self.name) devs = indevs if kind == 'input' else outdevs match = next((d for d in devs if d.id == pattern), None) if match: return match return next((d for d in devs if _re.search(pattern, d.name)), None)
[docs] def isAvailable(self) -> bool: """ Is this backend available? """ if sys.platform not in self.platforms: return False if self.alwaysAvailable: return True indevices, outdevices = getAudioDevices(backend=self.name) return bool(indevices or outdevices)
[docs] def getSystemSr(self) -> int | None: """ Get the system samplerate for this backend, if available We use the default output device. """ if not self.hasSystemSr: logger.debug(f"Backend {self.name} does not have a system sr, returning default") return 44100 cs = libcsound.Csound() cs.setOption("-odac") cs.setOption(f"-+rtaudio={self.name}") ok = cs.start() if ok == -1: logger.error(f"Backend {self.name} not available") return None sr = cs.systemSr(0) cs.stop() return int(sr) if sr > 0 else None
# def _getSystemSr(self) -> int | None: # """Get the system samplerate for this backend, if available""" # if not self.hasSystemSr: # return 44100 # proc = csoundSubproc(["-odac", f"-+rtaudio={self.name}", "--get-system-sr"], wait=True) # if not proc.stdout: # return None # for line in proc.stdout.readlines(): # if line.startswith(b"system sr:"): # uline = line.decode('utf-8') # sr = int(float(uline.split(":")[1].strip())) # return sr if sr > 0 else None # logger.error(f"Failed to get sr with backend {self.name}") # return None
[docs] def bufferSizeAndNum(self) -> tuple[int, int]: """ The buffer size and number of buffers needed for this backend """ return (self.defaultBufferSize, self.defaultNumBuffers)
[docs] def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: return self.audioDevicesViaAPI()
[docs] def audioDevicesViaAPI(self) -> tuple[list[AudioDevice], list[AudioDevice]]: """ Query csound for audio devices for this backend Returns: a tuple (inputDevices: list[AudioDevice], outputDevices: list[AudioDevice]) """ logger.info(f"Querying csound's audio devices for backend {self.name}") cs = libcsound.Csound() cs.createMessageBuffer() for opt in ['-+rtaudio='+self.name, "-m16", "-odac", "--use-system-sr"]: cs.setOption(opt) cs.start() csoutdevs = cs.audioDevList(isOutput=True) csindevs = cs.audioDevList(isOutput=False) outdevs = [AudioDevice(id=d.deviceId, name=d.deviceName, kind='output', index=i, numChannels=d.maxNchnls) for i, d in enumerate(csoutdevs)] indevs = [AudioDevice(id=d.deviceId, name=d.deviceName, kind='input', index=i, numChannels=d.maxNchnls) for i, d in enumerate(csindevs)] cs.stop() cs.destroyMessageBuffer() return indevs, outdevs
# def _audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: # """ # Returns a tuple (input devices, output devices) # """ # indevices: list[AudioDevice] = [] # outdevices: list[AudioDevice] = [] # proc = csoundSubproc(['-+rtaudio=%s' % self.name, '--devices']) # proc.wait() # assert proc.stderr is not None # lines = proc.stderr.readlines() # for line in lines: # line = line.decode("utf-8") # match = _re.search(self.audioDeviceRegex, line) # if not match: # continue # groups = match.groups() # if len(groups) == 3: # idxstr, devid, devname = groups # numchannels = None # else: # idxstr, devid, devname, numchannels_ = groups # numchannels = int(numchannels_) if numchannels_ is not None else 2 # kind = 'input' if devid.startswith("adc") else 'output' # dev = AudioDevice(index=int(idxstr), id=devid.strip(), name=devname, # kind=kind, numchannels=numchannels) # (indevices if kind == 'input' else outdevices).append(dev) # return indevices, outdevices
[docs] def defaultAudioDevices(self) -> tuple[AudioDevice | None, AudioDevice | None]: """ Returns the default audio devices for this backend Returns: a tuple ``(inputDevice: AudioDevice, outputDevice: AudioDevice)`` for this backend """ indevs, outdevs = getAudioDevices(self.name) # self.audioDevices() return indevs[0] if indevs else None, outdevs[0] if outdevs else None
class _JackAudioBackend(AudioBackend): def __init__(self): super().__init__(name='jack', alwaysAvailable=False, platforms=('linux', 'darwin', 'win32'), hasSystemSr=True) def getSystemSr(self) -> int: info = jacktools.getInfo() return info.samplerate if info else 0 def bufferSizeAndNum(self) -> tuple[int, int]: info = jacktools.getInfo() if not info: return 0, 0 blocksize = info.blocksize if not emlib.mathlib.ispowerof2(blocksize): logger.warning(f"Jack's blocksize is not a power of 2: {blocksize}") return 256, _math.ceil(blocksize / 256) else: # jack buf: 512 -> -B 1024 -b 256 periodsize = blocksize // 2 numbuffers = 4 return periodsize, numbuffers def isAvailable(self) -> bool: return jacktools.isJackRunning() def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: clients = jacktools.getClients() indevs = [AudioDevice(id=f'adc:{c.regex}', name=c.name, kind='input', numChannels=len(c.ports), index=c.firstIndex, isPhysical=c.isPhysical) for c in clients if c.kind == 'output'] outdevs = [AudioDevice(id=f'dac:{c.regex}', name=c.name, kind='output', numChannels=len(c.ports), index=c.firstIndex, isPhysical=c.isPhysical) for i, c in enumerate(clients) if c.kind == 'input'] return indevs, outdevs def defaultAudioDevices(self) -> tuple[AudioDevice|None, AudioDevice|None]: indevs, outdevs = self.audioDevices() defaultins = [dev for dev in indevs if dev.isPhysical] defaultin = next((dev for dev in indevs if dev.isPhysical), None) defaultout = next((dev for dev in outdevs if dev.isPhysical), None) return defaultin, defaultout class _PulseAudioBackend(AudioBackend): def __init__(self): super().__init__(name='pulse', alwaysAvailable=False, hasSystemSr=True, defaultBufferSize=1024, defaultNumBuffers=2) def getSystemSr(self) -> int: info = linuxaudio.pulseaudioInfo() return info.sr if info else 0 def isAvailable(self) -> bool: if linuxaudio.isPulseaudioRunning(): return True pwinfo = linuxaudio.pipewireInfo() return pwinfo is not None and pwinfo.isPulseServer def audioDevices(self) -> tuple[list[AudioDevice], list[AudioDevice]]: pulseinfo = linuxaudio.pulseaudioInfo() assert pulseinfo is not None indevs = [AudioDevice(id="adc", name="adc", kind='input', index=0, numChannels=pulseinfo.numchannels)] outdevs = [AudioDevice(id="dac", name="dac", kind='output', index=0, numChannels=pulseinfo.numchannels)] return indevs, outdevs class _PortaudioBackend(AudioBackend): def __init__(self, kind='callback'): shortname = "pa_cb" if kind == 'callback' else 'pa_bl' longname = f"portaudio-{kind}" if sys.platform == 'linux' and linuxaudio.isPipewireRunning(): hasSystemSr = True else: hasSystemSr = False super().__init__(name=shortname, alwaysAvailable=True, longname=longname, hasSystemSr=hasSystemSr) def getSystemSr(self) -> int | None: if sys.platform == 'linux' and linuxaudio.isPipewireRunning(): info = linuxaudio.pipewireInfo() assert info is not None return info.sr return super().getSystemSr() @_functools.cache def defaultAudioDevices(self) -> tuple[AudioDevice | None, AudioDevice | None]: logger.debug("Querying default device via portaudio/sounddevice") try: import sounddevice as sd except ImportError: logger.warning('Could not initialize the sounddevice library. Falling back' ' to querying via csound') return self._defaultAudioDevicesViaCsound() devices = sd.query_devices() defaultoutdev, defaultindev = None, None indevs, outdevs = self.audioDevices() if indevs and sd.default.device[0] is not None: defaultName = devices[sd.default.device[0]]['name'] defaultindev = next((dev for dev in indevs if dev.name.split("[")[0].strip() == defaultName), None) if outdevs and sd.default.device[1] is not None: defaultName = devices[sd.default.device[1]]['name'] defaultoutdev = next((dev for dev in outdevs if dev.name.split("[")[0].strip() == defaultName), None) return defaultindev, defaultoutdev def _defaultAudioDevicesViaCsound(self) -> tuple[AudioDevice | None, AudioDevice | None]: indevs, outdevs = self.audioDevices() indev = next((d for d in indevs if _re.search(r"\bdefault\b", d.name)), None) outdev = next((d for d in outdevs if _re.search(r"\bdefault\b", d.name)), None) if indev is None: if not indevs: logger.warning(f"No input devices for backend {self.name}") else: indev = indevs[0] if outdev is None: if not outdevs: logger.warning(f"No output devices for backend {self.name}") else: outdev = outdevs[0] return indev, outdev class _AlsaBackend(AudioBackend): def __init__(self): super().__init__(name="alsa", alwaysAvailable=True, platforms=('linux',), audioDeviceRegex=r"([0-9]+):\s((?:adc|dac):.*)\((.*)\)", acceptsDeviceIndex=False) def getSystemSr(self) -> int | None: if (jackinfo := jacktools.getInfo()) is not None: return jackinfo.samplerate if linuxaudio.isPipewireRunning(): info = linuxaudio.pipewireInfo() return info.sr if info else None return 44100 _backendJack = _JackAudioBackend() _backendPulseaudio = _PulseAudioBackend() _backendPortaudioBlocking = _PortaudioBackend('blocking') _backendPortaudioCallback = _PortaudioBackend('callback') _backendAlsa = _AlsaBackend() _backendCoreaudio = AudioBackend('auhal', alwaysAvailable=True, hasSystemSr=True, needsRealtime=False, longname="coreaudio", platforms=('darwin',)) _allAudioBackends: dict[str, AudioBackend] = { 'jack': _backendJack, 'auhal': _backendCoreaudio, 'coreaudio': _backendCoreaudio, 'portaudio': _backendPortaudioCallback, 'pa_cb': _backendPortaudioCallback, 'pa_bl': _backendPortaudioBlocking, 'pulse': _backendPulseaudio, 'pulseaudio': _backendPulseaudio, 'alsa': _backendAlsa } _backendsByPlatform: dict[str, list[AudioBackend]] = { 'linux': [_backendJack, _backendPortaudioCallback, _backendAlsa, _backendPortaudioBlocking, _backendPulseaudio], 'darwin': [_backendJack, _backendCoreaudio, _backendPortaudioCallback], 'win32': [_backendPortaudioCallback, _backendPortaudioBlocking] }
[docs] def nextpow2(n:int) -> int: """ Returns the power of 2 higher or equal than n""" return int(2 ** _math.ceil(_math.log(n, 2)))
@runonce def findCsound() -> str | None: """ Find the csound binary or None if not found """ csound = _shutil.which("csound") if not csound: logger.error("csound is not in the path!") return csound def _getVersionViaApi() -> tuple[int, int, int]: """ Returns the csound version as tuple (major, minor, patch) """ if (version := _cache.get('versionTriplet')) is not None: return version return _csoundGetInfoViaAPI()['versionTriplet']
[docs] def getVersion(useApi=True) -> tuple[int, int, int | str]: """ Returns the csound version as tuple (major, minor, patch) Args: useApi: if True, the API is used to query the version. Otherwise the output of "csound --version" is parsed. Both versions might differ Returns: the versions as a tuple (major:int, minor:int, patch:int|str) Raises RuntimeError if csound is not present or its version can't be parsed """ if useApi: return _getVersionViaApi() csound = findCsound() if not csound: raise IOError("Csound not found") cmd = '{csound} --version'.format(csound=csound).split() proc = _subprocess.Popen(cmd, stderr=_subprocess.PIPE) proc.wait() if proc.stderr is None: return (0, 0, 0) outputbytes = proc.stderr.read() if not outputbytes: raise RuntimeError("Could not read csounds output") output = outputbytes.decode('utf8') lines = output.splitlines() for line in lines: if match := _re.search(r"Csound\s+version\s+(\d+)\.(\d+)(\.\w+)?", line): major = int(match.group(1)) minor = int(match.group(2)) patch = match.group(3) if patch is None: patch = 0 elif patch.isdigit(): patch = int(patch) return (major, minor, patch) else: raise RuntimeError(f"Did not find a csound version, csound output: '{output}'")
[docs] def csoundSubproc(args: list[str], piped=True, wait=False) -> _subprocess.Popen: """ Calls csound with given args in a subprocess, returns a subprocess.Popen object. Args: args: the args passed to csound (each argument is a string) piped: if True, stdout and stderr are piped to the Popen object wait: if True, wait until csound exits Returns: the subprocess.Popen object Raises RuntimeError if csound is not found Example ------- >>> from csoundengine import csoundlib >>> proc = csoundlib.csoundSubproc(["-+rtaudio=jack", "-odac", "myfile.csd"]) See Also ~~~~~~~~ * :func:`runCsd` """ csound = findCsound() if not csound: raise RuntimeError("Csound not found") p = _subprocess.PIPE if piped else None callargs = [csound] callargs.extend(args) proc = _subprocess.Popen(callargs, stderr=p, stdout=p) if wait: proc.wait() return proc
[docs] def getSystemSr(backend: str) -> float | None: """ Get the system samplerate for a given backend None is returned if the backend does not support a system sr. At the moment only **jack** and **coreaudio** (auhal) report a system-sr Args: backend: the name of the backend (jack, pa_cb, auhal, etc) Returns: the system sr if the backend reports this information, or None See Also ~~~~~~~~ * :func:`getAudioBackend` * :func:`getDefaultBackend` """ b = getAudioBackend(backend) if not b: raise ValueError(f"backend {backend} not known") return b.getSystemSr()
def _getJackSrViaClient() -> float: import jack c = jack.Client("query") sr = c.samplerate c.close() return sr def _getCsoundSystemSr(backend: str) -> float: if backend not in {'jack', 'auhal'}: raise ValueError(f"backend {backend} does not support system sr") csound = libcsound.Csound() csound.setOption(f"-+rtaudio={backend}") csound.setOption("-odac") csound.setOption("--use-system-sr") csound.start() sr = csound.sr() csound.stop() return sr
[docs] def getDefaultBackend() -> AudioBackend: """ Get the default active backend for platform Discard any backend which is not available at the moment ============== ================================= Platform Backends (in order of priority) ============== ================================= Windows portaudio macOS auhal (coreaudio), portaudio linux jack (if running), portaudio ============== ================================= """ backends = audioBackends(available=True) if not backends: raise RuntimeError("No available backends") return backends[0]
_pluginsFolders = { '6.0': { 'linux': '$HOME/.local/lib/csound/6.0/plugins64', 'darwin': '$HOME/Library/csound/6.0/plugins64', 'win32': '%LOCALAPPDATA%/csound/6.0/plugins64' }, '7.0': { 'linux': '$HOME/.local/lib/csound/7.0/plugins64', 'darwin': '$HOME/Library/csound/7.0/plugins64', 'win32': '%LOCALAPPDATA%/csound/7.0/plugins64' }, 'float32': { 'linux': '$HOME/.local/lib/csound/6.0/plugins', 'darwin': '$HOME/Library/csound/6.0/plugins', 'win32': '%LOCALAPPDATA%/csound/6.0/plugins' } }
[docs] def userPluginsFolder(float64=True, apiversion='6.0') -> str: """ Returns the user plugins folder for this platform This is the folder where csound will search for user-installed plugins. The returned folder is always an absolute path. It is not checked if the folder actually exists. Args: float64: if True, report the folder for 64-bit plugins apiversion: 6.0 or 7.0 Returns: the user plugins folder for this platform **Folders for 64-bit plugins**: ======== ====================================================== OS Plugins folder ======== ====================================================== Linux ``~/.local/lib/csound/6.0/plugins64`` macOS ``~/Library/csound/6.0/plugins64`` windows ``C:/Users/<User>/AppData/Local/csound/6.0/plugins64`` ======== ====================================================== For 32-bit plugins the folder is the same, without the '64' ending (``.../plugins``) """ key = apiversion if float64 else 'float32' folders = _pluginsFolders[key] if sys.platform not in folders: raise RuntimeError(f"Platform {sys.platform} not known") folder = folders[sys.platform] return _os.path.abspath(_os.path.expandvars(folder))
[docs] def runCsd(csdfile: str, outdev='', indev='', backend='', nodisplay=False, nomessages=False, comment='', piped=False, extra: list[str] | None = None ) -> _subprocess.Popen: """ Run the given .csd as a csound subprocess Args: csdfile: the path to a .csd file outdev: "dac" to output to the default device, the label of the device (dac0, dac1, ...), or a filename to render offline (-o option) indev: The input to use (for realtime) (-i option) backend: The name of the backend to use. If no backend is given, the default for the platform is used (this is only meaningful if running in realtime) nodisplay: if True, eliminates debugging info from output nomessages: if True, suppress debugging messages piped: if True, the output of the csound process is piped and can be accessed through the Popen object (.stdout, .stderr) extra: a list of extraOptions arguments to be passed to csound comment: if given, will be added to the generated output as comment metadata (when running offline) Returns: the `subprocess.Popen` object. In order to wait until rendering is finished in offline mode, call .wait on the returned process See Also ~~~~~~~~ * :func:`csoundSubproc` """ args = [] offline = True if outdev is not None and outdev: args.extend(["-o", outdev]) if outdev.startswith("dac"): offline = False if not offline and not backend: backend = getDefaultBackend().name if backend: args.append(f"-+rtaudio={backend}") if indev: args.append(f"-i {indev}") if nodisplay: args.append('-d') if nomessages: args.append('-m16') if comment and offline: args.append(f'-+id_comment="{comment}"' ) if extra: args.extend(extra) args.append(csdfile) return csoundSubproc(args, piped=piped)
[docs] def joinCsd(orc: str, sco='', options: list[str] | None = None) -> str: """ Joins an orc and a score (both as str), returns a csd as string Args: orc: the text of the orchestra sco: the text of the score options: any command line options to be included Returns: the text of the csd """ optionstr = "" if options is None else "\n".join(options) csd = r""" <CsoundSynthesizer> <CsOptions> {optionstr} </CsOptions> <CsInstruments> {orc} </CsInstruments> <CsScore> {sco} </CsScore> </CsoundSynthesizer> """.format(optionstr=optionstr, orc=orc, sco=sco) csd = _textwrap.dedent(csd) return csd
[docs] @dataclasses.dataclass class CsoundProc: """ A CsoundProc wraps a running csound subprocess Attributes: proc: the running csound subprocess backend: the backend used outdev: the output device sr: the sample rate of the running process nchnls: the number of channels csdstr: the csd being run, as a str """ proc: _subprocess.Popen backend: str outdev: str sr: int nchnls: int csdstr: str = ""
[docs] def testCsound(dur=8., nchnls=2, backend='', device="dac", sr=0, verbose=True ) -> CsoundProc: """ Test the current csound installation for realtime output Args: dur: the duration of the test nchnls: the number of output channels backend: which backend to use. device: which device to use sr: the sample rate. Use 0 to use system sample rate if applicable, or a default sample rate otherwise verbose: if True, make csound display debugging and status information Returns: a :class:`CsoundProc` """ backend = backend or getDefaultBackend().name if not sr: sr = getSamplerateForBackend(backend) or 44100 printchan = "printk2 kchn" if verbose else "" orc = f""" sr = {sr} ksmps = 128 nchnls = {nchnls} instr 1 iperiod = 1 kchn init -1 ktrig metro 1/iperiod kchn = (kchn + ktrig) % nchnls anoise pinker outch kchn+1, anoise {printchan} endin """ sco = f"i1 0 {dur}" orc = _textwrap.dedent(orc) logger.debug(orc) csd = joinCsd(orc, sco=sco) tmp = _tempfile.mktemp(suffix=".csd") open(tmp, "w").write(csd) proc = runCsd(tmp, outdev=device, backend=backend) return CsoundProc(proc=proc, backend=backend, outdev=device, sr=sr, nchnls=nchnls, csdstr=csd)
[docs] def installedOpcodes(cached=True, opcodedir: str = '') -> set[str]: """ Return a list of the opcodes present Args: cached: if True, results are remembered between calls opcodedir: if given, plugin libraries will be loaded from this path (option --opcode-dir in csound). In this case the cache is not used Returns: a list of all available opcodes """ if opcodedir: cached = False if cached and _cache.get('opcodes') is not None: return _cache['opcodes'] return _csoundGetInfoViaAPI(opcodedir=opcodedir)['opcodes']
def _csoundGetInfoViaAPI(opcodedir='') -> dict: global _cache cs = libcsound.Csound() cs.setOption("-d") cs.setOption("--nosound") cs.createMessageBuffer(echo=False) if opcodedir: cs.setOption(f'--opcode-dir={opcodedir}') version = cs.version() vs = str(version) patch = int(vs[-1]) minor = int(vs[-3:-1]) major = int(vs[:-3]) versionTriplet = (major, minor, patch) opcodes = cs.getOpcodes() opcodenames = set(opc.name for opc in opcodes) _cache['versionTriplet'] = versionTriplet _cache['opcodes'] = opcodenames _cache['opcodedefs'] = opcodes # opcodes, n = cs.newOpcodeList() # assert opcodes is not None # opcodeNames = [opc.opname.decode('utf-8') for opc in opcodes] # cs.disposeOpcodeList(opcodes) # opcodes = list(set(opcodeNames)) # _cache['opcodes'] = opcodes # cs.stop() return {'opcodedefs': opcodes, 'opcodes': opcodenames, 'versionTriplet': versionTriplet} def _opcodesList(opcodedir='') -> list[str]: options = ["-z"] if opcodedir: options.append(f'--opcode-dir={opcodedir}') s = csoundSubproc(options) assert s.stderr is not None lines = s.stderr.readlines() allopcodes = [] for line in lines: if line.startswith(b"end of score"): break opcodes = line.decode('utf8').split() if opcodes: allopcodes.extend(opcodes) return allopcodes
[docs] def saveAsGen23(data: Sequence[float] | np.ndarray, outfile: str, fmt="%.12f", header='' ) -> None: """ Saves the data to a gen23 table .. note:: gen23 is a 1D list of numbers in text format, sepparated by a space Args: data: A 1D sequence (list or array) of floats outfile: The path to save the data to. Recommended extension: '.gen23' fmt: If saving frequency tables, fmt can be "%.1f" and save space, for amplitude the default if "%.12f" is best header: If specified it is included as a comment as the first line (csound will skip it). It is there just to document what is in the table Example ~~~~~~~ .. code-block:: python import bpf4 from csoundengine import csoundlib a = bpf.linear(0, 0, 1, 10, 2, 300) dt = 0.01 csoundlib.saveAsGen23(a[::dt].ys, "out.gen23", header=f"dt={dt}") In csound: .. code-block:: csound gi_tab ftgen 0, 0, 0, -23, "out.gen23" instr 1 itotaldur = ftlen(gi_tab) * 0.01 ay poscil 1, 1/itotaldur, gi_tab endin """ if header: np.savetxt(outfile, data, fmt=fmt, header="# " + header) else: np.savetxt(outfile, data, fmt=fmt)
def _metadataAsComment(d: dict[str, Any], maxSignificantDigits=10, sep=", ") -> str: fmt = f"%.{maxSignificantDigits}g" parts = [] for key, val in d.items(): if isinstance(val, int): valstr = str(val) elif isinstance(val, float): valstr = fmt % val elif isinstance(val, str): valstr = f'"{val}"' else: raise TypeError(f"Value should be int, float or str, got {val}") parts.append(f"{key}: {valstr}") return sep.join(parts)
[docs] def saveMatrixAsMtx(outfile: str, data: np.ndarray, metadata: dict[str, str | float] | None = None, encoding="float32", title='', sr: int = 44100) -> None: """ Save `data` in wav format using the mtx extension This is not a real output. It is used to transfer the data in binary form to be read by another program. To distinguish this from a normal wav file an extension `.mtx` is recommended. Data is saved always flat, and a header with the shape of `data` is included before the data. **Header Format**:: headerlength, numRows, numColumns, ... The description of each metadata value is included as wav metadata at the comment key with the format:: "headerSize: xx, numRows: xx, numColumns: xx, columns: 'headerSize numRows numColumns ...'" This metadata can be retrieved in csound via: .. code-block:: csound itabnum ftgen 0, 0, 0, -1, "sndfile.mtx", 0, 0, 1 Scomment = filereadmeta("sndfile.mtx", "comment") imeta = dict_loadstr(Scomment) ScolumnNames = dict_get(imeta, "columns") idatastart = tab_i(0, itabnum) inumrows = dict_get(imeta, "numRows") ; inumrows can also be retrieved by reading the table at index 1 ; inumrows = tab_i(1, itabnum) inumcols = tab_i(2, itabnum) ; The data at (krow, kcol) can be read via kvalue = tab(idatastart + krow*inumcols + kcol, itabnum) ; Alternatively an array can be created as a view: kArr[] memview itabnum, idatastart reshapearray kArr, inumrows, inumcols kvalue = kArr[krow][kcol] Args: outfile (str): The path where the data is written to data (numpy array): a numpy array of shape (numcols, numsamples). A 2D matrix representing a series of streams sampled at a regular period (dt) metadata: Any float values here are included in the header, and the description of this data is included as metadata in the wav file encoding: the data can be encoded in float32 or float64 title: if given will be included in the output metadata sr: sample rate. I """ assert isinstance(outfile, str) assert encoding == 'float32' or encoding == 'float64' if _os.path.splitext(outfile)[1] != ".mtx": logger.warning(f"The extension should be .mtx, but asked to save" f"the matrix as {outfile}") import sndfileio header: list[int|float] = [3, data.shape[0], data.shape[1]] allmeta: dict[str, Any] = { 'headerSize': 3, 'numRows': data.shape[0], 'numColumns': data.shape[1] } columns = ['headerSize', 'numRows', 'numColumns'] if metadata: for k, v in metadata.items(): if isinstance(v, (int, float)): header.append(v) allmeta[k] = v columns.append(k) headersize = len(allmeta) allmeta['HeaderSize'] = headersize header[0] = headersize for k, v in metadata.items(): if isinstance(v, str): allmeta[k] = v allmeta['columns'] = " ".join(columns) wavmeta = {'comment': _metadataAsComment(allmeta), 'software': 'MTX1'} if title: wavmeta['title'] = title sndwriter = sndfileio.sndwrite_chunked(outfile=outfile, sr=sr, encoding=encoding, metadata=wavmeta, fileformat='wav') sndwriter.write(np.array(header, dtype=float)) sndwriter.write(data.ravel()) sndwriter.close()
[docs] def saveMatrixAsGen23(outfile: str, mtx: np.ndarray, extradata: list[float] | None = None, header=True ) -> None: """ Save a numpy 2D array as gen23 Args: outfile (str): the path to save the data to. Suggestion: use '.gen23' as ext mtx (np.ndarray): a 2D array of floats extradata: if given, this data will be prependedto the data in `mtx`. Implies `include_header=True` header: if True, a header of the form [headersize, numrows, numcolumns] is prepended to the data. .. note:: The format used by gen23 is a text format with numbers separated by any space. When read inside csound the table is of course 1D but can be interpreted as 2D with the provided header metadata. """ numrows, numcols = mtx.shape mtx = mtx.round(6) with open(outfile, "w") as f: if header or extradata: headerrow = [3., numrows, numcols] if extradata is not None: headerrow.extend(extradata) headerrow[0] = len(headerrow) f.write(" ".join(np.array(headerrow).astype(str))) f.write("\n") for row in mtx: rowstr = " ".join(row.astype(str)) f.write(rowstr) f.write("\n")
[docs] @dataclasses.dataclass class MidiDevice: """ A MidiDevice holds information about a midi device for a given backend Attributes: index: the index as listed by csound and passed to -M name: the name of the device kind: the kind of the device ('input', 'output') """ deviceid: str name: str kind: str = 'input'
[docs] @dataclasses.dataclass class AudioDevice: """ An AudioDevice holds information about a an audio device for a given backend Attributes: id: the device identification (dac3, adc2, etc) name: the name of the device index: the index of this audio device, as passed to adcXX or dacXX kind: 'output' or 'input' numChannels: the number of channels isPhysical: True if this is a physical (hardware) device. Used for jack """ id: str name: str kind: str index: int = -1 numChannels: int | None = 0 isPhysical: bool | None = None
[docs] def info(self) -> str: """ Returns a summary of this device in one line """ s = f"{self.id}:{self.name}" if self.kind == 'output': s += f":{self.numChannels}outs" else: s += f":{self.numChannels}ins" return s
[docs] @_cachetools.cached(cache=_cachetools.TTLCache(10, 20)) def getDefaultAudioDevices(backend='') -> tuple[AudioDevice | None, AudioDevice | None]: """ Returns the default audio devices for a given backend Args: backend: the backend to use (None to get the default backend) Returns: a tuple (input devices, output devices) .. note:: Results are cached for a period of time """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend {backend} not known") return backendDef.defaultAudioDevices()
[docs] @_cachetools.cached(cache=_cachetools.TTLCache(10, 20)) def getAudioDevices(backend) -> tuple[list[AudioDevice], list[AudioDevice]]: """ Returns (indevices, outdevices), where each of these lists is an AudioDevice. Args: backend: specify a backend supported by your installation of csound. None to use a default for you OS Returns: a tuple of (input devices, output devices) .. note:: For jack an audio device is a client Each returned device is an AudioDevice instance with attributes: * index: The device index * label: adc{index} for input devices, dac{index} for output devices. The label can be passed to csound directly with either the -i or the -o flag (``-i{label}`` or ``-o{label}``) * name: A description of the device * ins: number of input channels * outs: number of output channels ======== ==== ===== ==== ================== ======================= Backend OSX Linux Win Multiple-Devices Description ======== ==== ===== ==== ================== ======================= jack x x - - Jack auhal x - - x CoreAudio pa_cb x x x x PortAudio (Callback) pa_bl x x x x PortAudio (blocking) ======== ==== ===== ==== ================== ======================= """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend '{backend}' not supported") return backendDef.audioDevices()
[docs] def getSamplerateForBackend(backend='') -> int | None: """ Returns the samplerate reported by the given backend Args: backend: the backend to query. None to use the default backend Returns: the samplerate for the given backend or None if failed """ backendDef = getAudioBackend(backend) if backendDef is None: possible = [name for name, backend in _allAudioBackends.items() if sys.platform in backend.platforms] raise ValueError(f"Backend '{backend}' not supported. Possible backends for this platform: {possible}") if not backendDef.isAvailable(): raise RuntimeError(f"Audiobackend {backendDef.name} is not available") return backendDef.getSystemSr()
def _csoundTestJackRunning(): proc = csoundSubproc(['-+rtaudio=jack', '-odac', '--get-system-sr'], wait=True, piped=True) assert proc.stderr is not None return b'could not connect to JACK server' not in proc.stderr.read()
[docs] def audioBackends(available=False, platform='') -> list[AudioBackend]: """ Return a list of audio backends for the given platform Args: available: if True, only available backends are returned. This is only possible if querying backends for the current platform platform: defaults to the current platform. Possible values: 'linux', 'macos', 'windows', but also any value returned by sys.platform Returns: a list of AudioBackend If available is True, only those backends supported for the current platform and currently available are returned. For example, jack will not be returned in linux if the jack server is not running. Example ~~~~~~~ >>> from csoundengine import * >>> [backend.name for backend in audioBackends(available=True)] ['jack', 'pa_cb', 'pa_bl', 'alsa'] """ if platform: platform = internal.normalizePlatform(platform) if available: platform = sys.platform elif not platform: platform = sys.platform if available and platform != sys.platform: available = False backends = _backendsByPlatform[platform] if available: backends = [b for b in backends if b.isAvailable()] return backends
[docs] def dumpAudioBackends() -> None: """ Prints all **available** backends and their properties as a table """ rows = [] headers = "name longname sr".split() backends = audioBackends(available=True) backends.sort(key=lambda backend:backend.name) for b in backends: if b.hasSystemSr: sr = str(b.getSystemSr()) else: sr = "-" rows.append((b.name, b.longname, sr)) from emlib.misc import print_table print_table(rows, headers=headers, showindex=False)
[docs] def getAudioBackend(name='') -> AudioBackend | None: """ Given the name of the backend, return the AudioBackend structure Args: name: the name of the backend ========== =================== ======= ========= ======= Name Description Linux Windows MacOS ========== =================== ======= ========= ======= pa_cb portaudio-callback ✓ ✓ ✓ pa_bl portaudio-blocking ✓ ✓ ✓ portaudio alias to pa_cb ✓ ✓ ✓ jack jack ✓ ? ✓ pulse pulseaudio ✓ ✗ ✗ pulseaudio alias to pulse ✓ ✗ ✗ auhal coreaudio ✗ ✗ ✓ coreaudio alias to auhal ✗ ✗ ✓ ========== =================== ======= ========= ======= """ if not name: return getDefaultBackend() return _allAudioBackends.get(name)
[docs] def getAudioBackendNames(available=False, platform='') -> list[str]: """ Returns a list with the names of the audio backends for a given platform Args: available: if True, return the names for only those backends which are currently available platform: if given, return only names for those backends present in the given platform Returns: a list with the names of all available backends for the given platform Example ======= >>> from csoundengine.csoundlib import * >>> getAudioBackendNames() # in Linux ['jack', 'pa_cb', 'pa_bl', 'alsa', 'pulse'] >>> getAudioBackendNames(platform='macos') ['pa_cb', 'pa_bl', 'auhal'] # In linux with pulseaudio disabled >>> getAudioBackendNames(available=True) ['jack', 'pa_cb', 'pa_bl', 'alsa'] """ backends = audioBackends(available=available, platform=platform) return [b.name for b in backends]
def _quoteIfNeeded(arg: float | int | str) -> float | int | str: if isinstance(arg, str): return emlib.textlib.quoteIfNeeded(arg) else: return arg _normalizer = emlib.textlib.makeReplacer({".":"_", ":":"_", " ":"_"})
[docs] def normalizeInstrumentName(name: str) -> str: """ Transform name so that it can be accepted as an instrument name """ return _normalizer(name)
_fmtoptions = { 'pcm16': '', 'pcm24': '--format=24bit', 'float32': '--format=float', # also -f 'float64': '--format=double', 'vorbis': '--format=vorbis' } _optionForSampleFormat = { 'wav': '--format=wav', # could also be --wave 'aif': '--format=aiff', 'aiff': '--format=aiff', 'flac': '--format=flac', 'ogg': '--format=ogg' } _csoundFormatOptions = {'-3', '-f', '--format=24bit', '--format=float', '--format=double', '--format=long', '--format=vorbis', '--format=short'} _defaultEncodingForFormat = { 'wav': 'float32', 'flac': 'pcm24', 'aif': 'float32', 'aiff': 'float32', 'ogg': 'vorbis' } def _normalizeArgs(args, quote=True) -> list[float | str]: out = [] for arg in args: if isinstance(arg, str): if quote: arg = emlib.textlib.quoteIfNeeded(arg) out.append(arg) elif isinstance(arg, (int, float)): out.append(arg) else: try: out.append(float(arg)) except ValueError as e: raise ValueError(f"Could not interpret {arg} as float: {e}") return out
[docs] def csoundOptionsForOutputFormat(fmt='wav', encoding='' ) -> list[str]: """ Returns the command-line options for the given format+encoding Args: fmt: the format of the output file ('wav', 'flac', 'aif', etc) encoding: the encoding ('pcm16', 'pcm24', 'float32', etc). If not given, the best encoding for the given format is chosen Returns: a tuple of two strings holding the command-line options for the given sample format/encoding Example ~~~~~~~ >>> csoundOptionsForOutputFormat('flac') ('--format=flac', '--format=24bit') >>> csoundOptionsForOutputFormat('wav', 'float32') ('--format=wav', '--format=float') >>> csoundOptionsForOutputFormat('aif', 'pcm16') ('--format=aiff', '--format=short') .. seealso:: :func:`csoundOptionForSampleEncoding` """ assert fmt in _defaultEncodingForFormat, f"Unknown format: {fmt}, possible formats are: " \ f"{_defaultEncodingForFormat.keys()}" if not encoding: encoding = _defaultEncodingForFormat.get(fmt) if not encoding: raise ValueError(f"Default encoding unknown for format {fmt}") encodingOption = csoundOptionForSampleEncoding(encoding) fmtOption = _optionForSampleFormat[fmt] options = [fmtOption] if encodingOption: options.append(encodingOption) return options
[docs] def csoundOptionForSampleEncoding(encoding: str) -> str: """ Returns the command-line option for the given sample encoding. Given a sample encoding of the form pcmXX or floatXX, where XX is the bit-rate, returns the corresponding command-line option for csound Args: fmt (str): the desired sample format. Either pcmXX, floatXX, vorbis where XX stands for the number of bits per sample (pcm24, float32, etc) Returns: the csound command line option corresponding to the given format Example ======= >>> csoundOptionForSampleEncoding("pcm24") --format=24bit >>> csoundOptionForSampleEncoding("float64") --format=double .. seealso:: :func:`csoundOptionsForOutputFormat` """ if encoding not in _fmtoptions: raise ValueError(f'format {encoding} not known. Possible values: ' f'{_fmtoptions.keys()}') return _fmtoptions[encoding]
[docs] def mincer(sndfile: str, outfile: str, timecurve: Curve | float, pitchcurve: Curve | float, dt=0.002, lock=False, fftsize=2048, ksmps=128, debug=False ) -> dict: """ Stretch/Pitchshift a output using csound's mincer opcode (offline) Args: sndfile: the path to a soundfile timecurve: a func mapping time to playback time or a scalar indicating a timeratio (2 means twice as fast, 1 to leave unmodified) pitchcurve: a func time to pitchscale, or a scalar indicating a freqratio outfile: the path to a resulting outfile. The resulting file is always a 32-bit float .wav file. The samplerate and number of channels match those of the input file dt: the sampling period to sample the curves lock: should mincer be run with phase-locking? fftsize: the size of the fft ksmps: the ksmps to pass to the csound process debug: run csound with debug information Returns: a dict with information about the process (keys: outfile, csdstr, csd) .. note:: If the mapped time excedes the bounds of the sndfile, silence is generated. For example, a negative time or a time exceding the duration of the sndfile Examples ======== # Example 1: stretch a output 2x >>> from csoundengine import csoundlib >>> import bpf4 >>> import sndfileio >>> snddur = sndfileio.sndinfo("mono.wav").duration >>> timecurve = bpf4.linear(0, 0, snddur*2, snddur) >>> mincer(sndfile, "mono2.wav", timecurve=timecurve, pitchcurve=1) """ import bpf4 as bpf import sndfileio info = sndfileio.sndinfo(sndfile) sr = info.samplerate nchnls = info.channels pitchbpf = bpf.util.asbpf(pitchcurve) if isinstance(timecurve, (int, float)): t0, t1 = 0, info.duration / timecurve timebpf = bpf.linear(0, 0, t1, info.duration) elif isinstance(timecurve, bpf.core.BpfInterface): t0, t1 = timecurve.bounds() timebpf = timecurve else: raise TypeError("timecurve should be either a scalar or a bpf") assert isinstance(pitchcurve, (int, float, bpf.core.BpfInterface)) ts = np.arange(t0, t1+dt, dt) fmt = "%.12f" _, time_gen23 = _tempfile.mkstemp(prefix='time-', suffix='.gen23') np.savetxt(time_gen23, timebpf.map(ts), fmt=fmt, header=str(dt), comments='') _, pitch_gen23 = _tempfile.mkstemp(prefix='pitch-', suffix='.gen23') np.savetxt(pitch_gen23, pitchbpf.map(ts), fmt=fmt, header=str(dt), comments='') csd = f""" <CsoundSynthesizer> <CsOptions> -o {outfile} </CsOptions> <CsInstruments> sr = {sr} ksmps = {ksmps} nchnls = {nchnls} 0dbfs = 1.0 gi_snd ftgen 0, 0, 0, -1, "{sndfile}", 0, 0, 0 gi_time ftgen 0, 0, 0, -23, "{time_gen23}" gi_pitch ftgen 0, 0, 0, -23, "{pitch_gen23}" instr vartimepitch idt tab_i 0, gi_time ilock = {int(lock)} ifftsize = {fftsize} ikperiod = ksmps/sr isndfiledur = ftlen(gi_snd) / ftsr(gi_snd) isndchnls = ftchnls(gi_snd) ifade = ikperiod*2 inumsamps = ftlen(gi_time) it1 = (inumsamps-2) * idt ; account for idt and last value kt timeinsts aidx linseg 1, it1, inumsamps-1 at1 tablei aidx, gi_time, 0, 0, 0 kpitch tablei k(aidx), gi_pitch, 0, 0, 0 kat1 = k(at1) kgate = (kat1 >= 0 && kat1 <= isndfiledur) ? 1 : 0 agate = interp(kgate) aenv linseg 0, ifade, 1, it1 - (ifade*2), 1, ifade, 0 aenv *= agate if isndchnls == 1 then a0 mincer at1, 1, kpitch, gi_snd, ilock, ifftsize, 8 outch 1, a0*aenv else a0, a1 mincer at1, 1, kpitch, gi_snd, ilock, ifftsize, 8 outs a0*aenv, a1*aenv endif if (kt >= it1 + ikperiod) then event "i", "exit", 0.1, 1 turnoff endif endin instr exit puts "exiting!", 1 exitnow endin </CsInstruments> <CsScore> i "vartimepitch" 0 -1 f 0 36000 </CsScore> </CsoundSynthesizer> """ _, csdfile = _tempfile.mkstemp(suffix=".csd") with open(csdfile, "w") as f: f.write(csd) _subprocess.call(["csound", "-f", csdfile]) if not debug: _os.remove(time_gen23) _os.remove(pitch_gen23) _os.remove(csdfile) return {'outfile': outfile, 'csdstr': csd, 'csd': csdfile}
def _instr_as_orc(instrid, body, initstr, sr, ksmps, nchnls): orc = """ sr = {sr} ksmps = {ksmps} nchnls = {nchnls} 0dbfs = 1 {initstr} instr {instrid} {body} endin """.format(sr=sr, ksmps=ksmps, instrid=instrid, body=body, nchnls=nchnls, initstr=initstr) return orc
[docs] def recInstr(body: str, events: list, init='', outfile='', sr=44100, ksmps=64, nchnls=2, a4=442, samplefmt='float', dur=None ) -> tuple[str, _subprocess.Popen]: """ Record one instrument for a given duration Args: dur: the duration of the recording body: the body of the instrument init: the initialization code (ftgens, global vars, etc) outfile: the generated output, or None to generate a temporary file events: a list of events, where each event is a list of pargs passed to the instrument, beginning with p2: delay, dur, [p4, p5, ...] sr: the samplerate a4: A4 frequency ksmps: block size nchnls: number of output channels samplefmt: defines the sample format used for outfile, one of (16, 24, 32, 'float') Returns: a tuple (outfile to be generated, _subprocess.Popen running csound) """ if not isinstance(events, list) or not all(isinstance(event, (tuple, list)) for event in events): raise ValueError("events is a data., where each item is a data. of pargs passed to" "the instrument, beginning with p2: [delay, dur, ...]" f"Got {events} instead") from .csd import Csd csd = Csd(sr=sr, ksmps=ksmps, nchnls=nchnls, a4=a4) if not outfile: outfile = _tempfile.mktemp(suffix='.wav', prefix='csdengine-rec-') if init: csd.addGlobalCode(init) instrnum = 100 csd.addInstr(instrnum, body) for event in events: start, dur = event[0], event[1] csd.addEvent(instrnum, start, dur, event[2:]) if dur is not None: csd.setEndMarker(dur) fmtoption = {16: '', 24: '-3', 32: '-f', 'float': '-f'}.get(samplefmt) if fmtoption is None: raise ValueError("samplefmt should be one of 16, 24, 32, or 'float'") csd.addOptions(fmtoption) renderjob = csd.run(output=outfile) assert renderjob.process is not None return outfile, renderjob.process
def _ftsaveReadText(path: str) -> list[np.ndarray]: # a file can have multiple tables saved lines = iter(open(path)) tables = [] while True: tablength = -1 try: headerstart = next(lines) if not headerstart.startswith("===="): raise IOError(f"Expecting header start, got {headerstart}") except StopIteration: # no more tables break # Read header for line in lines: if line.startswith("flen:"): tablength = int(line[5:]) if 'END OF HEADER' in line: break if tablength < 0: raise IOError("Could not read table length") values = np.zeros((tablength+1,), dtype=float) # Read data for i, line in enumerate(lines): if line.startswith("---"): break values[i] = float(line) tables.append(values) return tables
[docs] def ftsaveRead(path, mode="text") -> list[np.ndarray]: """ Read a file saved by ftsave, returns a list of tables """ if mode == "text": return _ftsaveReadText(path) else: raise ValueError(f"mode {mode} not supported")
[docs] def getNchnls(backend='', outpattern='', inpattern='', defaultin=2, defaultout=2 ) -> tuple[int, int]: """ Get the default number of channels for a given device Args: backend: the backend, one of 'jack', 'portaudio', etc. None to use default outpattern: the output device. Use None for default device. Otherwise either the device id ("dac0") or a regex pattern matching the long name of the device inpattern: the input device. Use None for default device. Otherwise either the device id ("dac0") or a regex pattern matching the long name of the device defaultin: default value returned if it is not possible to determine the number of channels for given backend+device defaultout: default value returned if it is not possible to determine the number of channels for given backend/device Returns: a tuple (nchnls_i, nchnls) for that backend+device combination """ backendDef = getAudioBackend(backend) if not backendDef: raise RuntimeError(f"Backend '{backend}' not found") adc, dac = backendDef.defaultAudioDevices() if not outpattern: outdev = dac else: outdev = backendDef.searchAudioDevice(outpattern, kind='output') if not outdev: indevs, outdevs = backendDef.audioDevices() outdevids = [d.id for d in outdevs] raise ValueError(f"Output device '{outpattern}' not found. Possible devices " f"are: {outdevids}") nchnls = outdev.numChannels if (outdev and outdev.numChannels is not None) else defaultout if not inpattern: indev = adc else: indev = backendDef.searchAudioDevice(inpattern, kind='input') if not indev: raise ValueError(f"Input device {inpattern} not found") nchnlsi = indev.numChannels if (indev and indev.numChannels is not None) else defaultin return nchnlsi, nchnls
def _getNchnlsJackViaJackclient(indevice: str, outdevice: str ) -> tuple[int, int]: """ Get the number of ports for the given clients using JACK-Client This is faster than csound and should give the same results Args: indevice (str): A regex pattern matching the input client, or "adc" or None to query the physical ports outdevice (str): A regex pattern matching the output client Use "dac" or None to query the physical ports Returns: a tuple (number of inputs, number of outputs) """ import jack c = jack.Client("query") if indevice == "adc" or not indevice: inports = [p for p in c.get_ports(is_audio=True, is_output=True, is_physical=True)] else: inports = c.get_ports(indevice, is_audio=True, is_output=True) if outdevice == "dac" or not outdevice: outports = [p for p in c.get_ports(is_audio=True, is_input=True, is_physical=True)] else: outports = c.get_ports(outdevice, is_audio=True, is_input=True) c.close() return len(inports), len(outports) def _parsePortaudioDeviceName(name: str) -> tuple[str, str, int, int]: """ Parses a string like "HDA Intel PCH: CX20590 Analog (hw:0,0) [ALSA, 2 in, 4 out]" Args: name: the name of the device Returns: a tuple: ("HDA Intel PCH: CX20590 Analog (hw:0,0)", "ALSA", 2, 4) """ devname, rest = name.split("[") rest = rest.replace("]", "") if "," in rest: api, inchstr, outchstr = rest.split(",") inch = int(inchstr.split()[0]) outch = int(outchstr.split()[0]) else: api = rest inch = -1 outch = -1 return devname.strip(), api, inch, outch
[docs] def dumpAudioInfo(backend=''): """ Dump information about audio backends and audio devices for the selected backend """ if not backend: dumpAudioBackends() print() dumpAudioDevices(backend=backend)
[docs] def dumpAudioDevices(backend=''): """ Print a list of audio devices for the given backend. If backend is not given, the default backend (of all available backends for the current platform) is chosen """ backendDef = getAudioBackend(backend) if backendDef is None: raise ValueError(f"Backend '{backend}' not supported") from emlib.misc import print_table print(f"Backend: {backendDef.name}") indevs, outdevs = getAudioDevices(backend=backend) fields = [field.name for field in dataclasses.fields(AudioDevice)] inputrows = [dataclasses.astuple(dev) for dev in indevs] outputrows = [dataclasses.astuple(dev) for dev in outdevs] print("\nInput Devices:") if inputrows: print_table(inputrows, headers=fields, showindex=False) else: print("-- No input devices") print("\nOutput Devices:") if outputrows: print_table(outputrows, headers=fields, showindex=False) else: print("-- No output devices")
[docs] def instrNames(instrdef: str) -> list[int | str]: """ Returns the list of names/instrument numbers in the instrument definition. Most of the time this list will have one single element, either an instrument number or a name Args: instrdef: the code defining an instrument Returns: a list of names/instrument numbers. An empty list is returned if this is not a valid instr definition Example ~~~~~~~ >>> instr = r''' ... instr 10, foo ... outch 1, oscili:a(0.1, 440) ... endin ... ''' >>> instrNames(instr) [10, "foo"] """ lines = instrdef.splitlines() matches = [line for line in lines if _re.match(r"^[\ \t]*\binstr\b", line)] if len(matches) > 1: raise ValueError(f"Expected only one instrument definition, got {matches}") elif len(matches) == 0: return [] line = matches[0].strip() names = [name.strip() for name in line[6:].split(",")] return [int(name) if name.isdigit() else name for name in names]
[docs] @dataclasses.dataclass class ParsedBlock: """ A ParsedBlock represents a block (an instr, an opcode, etc) in an orchestra Used by :func:`parseOrc` to split an orchestra in individual blocks Attributes: kind: the kind of block ('instr', 'opcode', 'header', 'include', 'instr0') text: thet text of the block startLine: where does this block start within the parsed orchestra endLine: where does this block end name: name of the block attrs: some blocks need extraOptions information. Opcodes define attrs 'outargs' and 'inargs' (corresponding to the xin and xout opcodes), header blocks have a 'value' attr """ kind: str lines: list[str] startLine: int endLine: int = -1 name: str = '' attrs: dict[str, str] | None = None def __post_init__(self): assert self.kind in ('instr', 'opcode', 'header', 'include', 'instr0') if self.endLine == -1: self.endLine = self.startLine @property def text(self): return self.lines[0] if len(self.lines) == 1 else '\n'.join(self.lines)
@dataclasses.dataclass class _OrcBlock: name: str startLine: int lines: list[str] endLine: int = 0 outargs: str = "" inargs: str = ""
[docs] def parseOrc(code: str, keepComments=True) -> list[ParsedBlock]: """ Parse orchestra code into blocks Each block is either an instr, an opcode, a header line, a comment or an instr0 line Example ------- .. code-block:: python >>> from csoundengine import csoundlib >>> orc = r''' ... sr = 44100 ... nchnls = 2 ... ksmps = 32 ... 0dbfs = 1 ... seed 0 ... ... opcode AddSynth,a,i[]i[]iooo ... /* iFqs[], iAmps[]: arrays with frequency ratios and amplitude multipliers ... iBasFreq: base frequency (hz) ... iPtlIndex: partial index (first partial = index 0) ... iFreqDev, iAmpDev: maximum frequency (cent) and amplitude (db) deviation */ ... iFqs[], iAmps[], iBasFreq, iPtlIndx, iFreqDev, iAmpDev xin ... iFreq = iBasFreq * iFqs[iPtlIndx] * cent(rnd31:i(iFreqDev,0)) ... iAmp = iAmps[iPtlIndx] * ampdb(rnd31:i(iAmpDev,0)) ... aPartial poscil iAmp, iFreq ... if iPtlIndx < lenarray(iFqs)-1 then ... aPartial += AddSynth(iFqs,iAmps,iBasFreq,iPtlIndx+1,iFreqDev,iAmpDev) ... endif ... xout aPartial ... endop ... ... ;frequency and amplitude multipliers for 11 partials of Risset's bell ... giFqs[] fillarray .56, .563, .92, .923, 1.19, 1.7, 2, 2.74, 3, 3.74, 4.07 ... giAmps[] fillarray 1, 2/3, 1, 1.8, 8/3, 5/3, 1.46, 4/3, 4/3, 1, 4/3 ... ... instr Risset_Bell ... ibasfreq = p4 ... iamp = ampdb(p5) ... ifqdev = p6 ;maximum freq deviation in cents ... iampdev = p7 ;maximum amp deviation in dB ... aRisset AddSynth giFqs, giAmps, ibasfreq, 0, ifqdev, iampdev ... aRisset *= transeg:a(0, .01, 0, iamp/10, p3-.01, -10, 0) ... out aRisset, aRisset ... endin ... ''') >>> csoundlib.parseOrc(orc) [ParsedBlock(kind='header'P, text='sr = 44100', startLine=1, endLine=1, name='sr', attrs={'value': '44100'}), ParsedBlock(kind='header', text='ksmps = 32', startLine=2, endLine=2, name='ksmps', attrs={'value': '32'}), ParsedBlock(kind='header', text='nchnls = 2', startLine=3, endLine=3, name='nchnls', attrs={'value': '2'}), ParsedBlock(kind='header', text='0dbfs = 1', startLine=4, endLine=4, name='0dbfs', attrs={'value': '1'}), ParsedBlock(kind='instr0', text='seed 0', startLine=6, endLine=6, name='', attrs=None), ParsedBlock(kind='opcode', text='opcode AddSynth,a,i[]i[]iooo\\n iFqs[], iAmps[], iBasFreq, iPtlIndx, <...>', name='AddSynth', attrs={'outargs': 'a', 'inargs': 'i[]i[]iooo'}), ParsedBlock(kind='comment', text=";frequency and amplitude multipliers for 11 partials of Risset's bell", startLine=19, endLine=19, name='', attrs=None), ParsedBlock(kind='instr0', text='giFqs[] fillarray .56, .563, .92, .923, 1.19, 1.7, 2, 2.74, 3, 3.74, 4.07', startLine=20, endLine=20, name='', attrs=None), ParsedBlock(kind='instr0', text='giAmps[] fillarray 1, 2/3, 1, 1.8, 8/3, 5/3, 1.46, 4/3, 4/3, 1, 4/3', startLine=21, endLine=21, name='', attrs=None), ParsedBlock(kind='instr', text='instr Risset_Bell\\n ibasfreq = p4\\n iamp = ampdb(p5)\\n <...>' startLine=23, endLine=31, name='Risset_Bell', attrs=None)] """ context = [] blocks: list[ParsedBlock] = [] block = _OrcBlock("", 0, []) for i, line in enumerate(code.splitlines()): strippedline = line.strip() if not strippedline: continue if match := _re.search(r"\binstr\s+(\d+|[a-zA-Z_]\w+)", line): context.append('instr') block = _OrcBlock(name=match.group(1), startLine=i, lines=[line]) elif strippedline == "endin": assert context[-1] == "instr" context.pop() assert block.name block.endLine = i block.lines.append(line) blocks.append(ParsedBlock(kind='instr', lines=block.lines, startLine=block.startLine, endLine=block.endLine, name=block.name)) elif strippedline == 'endop': assert context[-1] == "opcode" context.pop() block.endLine = i block.lines.append(line) blocks.append(ParsedBlock(kind='opcode', lines=block.lines, startLine=block.startLine, endLine=block.endLine, name=block.name, attrs={'outargs': block.outargs, 'inargs': block.inargs})) elif context and context[-1] in {'instr', 'opcode'}: block.lines.append(line) elif match := _re.search(r"^\s*(sr|ksmps|kr|A4|0dbfs|nchnls|nchnls_i)\s*=\s*(\d+)", line): blocks.append(ParsedBlock(kind='header', lines=[line], name=match.group(1), startLine=i, attrs={'value':match.group(2)})) elif _re.search(r"^\s*(;|\/\/)", line): if keepComments: blocks.append(ParsedBlock(kind='comment', startLine=i, lines=[line])) elif match := _re.search(r"^\s*opcode\s+(\w+)\s*,\s*([0ika\[\]]*),\s*([0ikaoOjJpP\[\]]*)", line): context.append('opcode') block = _OrcBlock(name=match.group(1), startLine=i, lines=[line], outargs=match.group(2), inargs=match.group(3)) elif strippedline.startswith('#include'): blocks.append(ParsedBlock(kind='include', startLine=i, lines=[line])) else: blocks.append(ParsedBlock(kind='instr0', startLine=i, lines=[line])) return blocks
def _hashdict(d: dict) -> int: return hash((frozenset(d.keys()), frozenset(d.values())))
[docs] @dataclasses.dataclass class ParsedInstrBody: """ The result of parsing the body of an instrument This is used by :func:`instrParseBody` """ pfieldIndexToName: dict[int, str] """Maps pfield index to assigned name""" pfieldLines: Sequence[str] """List of lines where pfields are defined""" body: str """The body parsed""" lines: Sequence[str] """The body, split into lines""" pfieldIndexToValue: dict[int, float] | None = None "Default values of the pfields, by pfield index" pfieldsUsed: set[int] | None = None "Which pfields are accessed" outChannels: set[int] | None = None "Which output channels are used" @_functools.cached_property def pfieldsText(self) -> str: """The text containing pfield definitions""" return "\n".join(self.pfieldLines) @_functools.cached_property def pfieldNameToIndex(self): """Maps pfield name to its index""" return {name: idx for idx, name in self.pfieldIndexToName.items()}
[docs] def numPfields(self) -> int: """ Returns the number of pfields in this instrument """ return 3 if not self.pfieldsUsed else max(self.pfieldsUsed)
@_functools.cached_property def pfieldNameToValue(self) -> dict[str, float]: """ Dict mapping pfield name to default value If a pfield has no explicit name assigned, p## is used. If it has no explicit value, 0. is used Example ~~~~~~~ Given a csound instr: >>> parsed = instrParseBody(r''' ... pset 0, 0, 0, 0.1, 400, 0.5 ... iamp = p4 ... kfreq = p5 ... ''') >>> parsed.pfieldNameToValue {'iamp': 0.1, 'kfreq': 400, 'p6': 0.5} """ if not self.pfieldNameToIndex: return EMPTYDICT if self.pfieldIndexToValue is not None: out1 = {(self.pfieldIndexToName.get(idx) or f"p{idx}"): value for idx, value in self.pfieldIndexToValue.items()} else: out1 = {} if self.pfieldIndexToName is not None: assert self.pfieldIndexToValue is not None out2 = {name: self.pfieldIndexToValue.get(idx, 0.) for idx, name in self.pfieldIndexToName.items()} else: out2 = {} out1.update(out2) return out1
[docs] def lastAssignmentToVariable(varname: str, lines: list[str]) -> int | None: """ Line of the last assignment to a variable Given a piece of code (normally the body of an instrument) find the line in which the given variable has its **last** assignment Args: varname: the name of the variable lines: the lines which make the instrument body. We need to split the body into lines within the function itself and since the user might need to split the code anyway afterwards, we already ask for the lines instead. Returns: the line number of the last assignment, or None if there is no assignment to the given variable Possible matches:: aout oscili 0.1, 1000 aout, aout2 pan2 ... aout = ... aout=... aout += ... aout2, aout = ... Example ~~~~~~~ >>> lastAssignmentToVariable("aout", r''' ... aout oscili:a(0.1, 1000) ... aout *= linen:a(...) ... aout = aout + 10 ... outch 1, aout ... '''.splitlines()) 3 """ rgxs = [ _re.compile(rf'^\s*({varname})\s*(=|\*=|-=|\+=|\/=)'), _re.compile(rf'^\s*({varname})\s*,'), _re.compile(rf'^\s*({varname})\s+[A-Za-z]\w*'), _re.compile(rf'^\s*(?:\w*,\s*)+\b({varname})\b') ] for i, l in enumerate(reversed(lines)): for rgx in rgxs: if rgx.search(l): return len(lines) - 1 - i return None
[docs] def locateDocstring(lines: list[str]) -> tuple[int | None, int]: """ Locate the docstring in this instr code Args: lines: the code to analyze, tipically the code inside an instr (between instr/endin), split into lines Returns: a tuple (firstline, lastline) indicating the location of the docstring within the given text. firstline will be None if no docstring was found """ assert isinstance(lines, list) docstringStart = None docstringEnd = 0 docstringKind = '' for i, line in enumerate(lines): line = line.strip() if not line: continue if docstringStart is None: if _re.search(r'(;|\/\/|\/\*)', line): docstringStart = i docstringKind = ';' if line[0] == ';' else line[:2] continue else: # Not a docstring, so stop looking break else: # inside docstring if docstringKind == '/*': # TODO pass elif line.startswith(docstringKind): docstringEnd = i+1 else: break if docstringStart is not None and docstringEnd < docstringStart: docstringEnd = docstringStart + 1 return docstringStart, docstringEnd
[docs] def splitDocstring(body: str | list[str]) -> tuple[str, str]: if isinstance(body, str): lines = body.splitlines() else: lines = body docstart, docend = locateDocstring(lines) if docstart is not None: docstring = '\n'.join(lines[docstart:docend]) rest = '\n'.join(lines[docend:]) else: docstring = '' rest = body if isinstance(body, str) else '\n'.join(lines) return docstring, rest
[docs] def instrGetBody(textOrLines: str | list[str]) -> str: if isinstance(textOrLines, str): lines = textOrLines.splitlines() lines = internal.stripTrailingEmptyLines(lines) assert lines[0].startswith('instr') and lines[-1].startswith('endin') lines = lines[1:-1] return '\n'.join(lines)
[docs] @_functools.cache def instrParseBody(body: str) -> ParsedInstrBody: """ Parses the body of an instrument, returns pfields used, output channels, etc. Args: body (str): the body of the instr (between instr/endin) Returns: a ParsedInstrBody Example ------- >>> from csoundengine import csoundlib >>> body = r''' ... pset 0, 0, 0, 1, 1000 ... ibus = p4 ... kfreq = p5 ... a0 = busin(ibus) ... a1 = oscili:a(0.5, kfreq) * a0 ... outch 1, a1 ... ''' >>> csoundlib.instrParseBody(body) ParsedInstrBody(pfieldsIndexToName={4: 'ibus', 5: 'kfreq'}, pfieldLines=['ibus = p4', ['kfreq = p5'], body='\\na0 = busin(ibus)\\n a1 = oscili:a(0.5, kfreq) * a0\\noutch 1, a1', pfieldsDefaults={1: 0.0, 2: 0.0, 3: 0.0, 4: 1.0, 5: 1000.0}, pfieldsUsed={4, 5}, outChannels={1}, pfieldsNameToIndex={'ibus': 4, 'kfreq': 5}) """ if not body.strip(): return ParsedInstrBody(pfieldIndexToValue={}, pfieldLines=(), body='', lines=(), pfieldIndexToName={}) pfieldLines = [] bodyLines = [] pfieldIndexToValue = {} insideComment = False pfieldsUsed = set() pfieldIndexToName: dict[int, str] = {} outchannels: set[int] = set() lines = body.splitlines() for i, line in enumerate(lines): if insideComment: bodyLines.append(line) if _re.match(r"\*\/", line): insideComment = False continue elif _re.match(r"^\s*(;|\/\/)", line): # A line comment bodyLines.append(line) continue else: # Not inside comment if pfieldsInLine := _re.findall(r"\bp\d+", line): for p in pfieldsInLine: pfieldsUsed.add(int(p[1:])) if _re.match(r"^\s*\/\*", line): insideComment = True bodyLines.append(line) elif _re.match(r"\*\/", line) and insideComment: insideComment = False bodyLines.append(line) elif m := _re.search(r"\bpassign\s+(\d+)", line): if "[" in line: # array form, iarr[] passign 4, 6 bodyLines.append(line) else: pfieldLines.append(line) pstart = int(m.group(1)) argsstr, rest = line.split("passign") args = argsstr.split(",") for j, name in enumerate(args, start=pstart): pfieldsUsed.add(j) pfieldIndexToName[j] = name.strip() elif _re.search(r"^\s*\bpset\b", line): s = line.strip()[4:] psetValues = {j: float(v) for j, v in enumerate(s.split(","), start=1) if v.strip()[0].isnumeric()} pfieldIndexToValue.update(psetValues) elif m := _re.search(r"^\s*\b(\w+)\s*(=|init\s)\s*p(\d+)", line): # 'ival = p4' / kval = p4 or 'ival init p4' pname = m.group(1) pfieldIndex = int(m.group(3)) pfieldLines.append(line) pfieldIndexToName[pfieldIndex] = pname.strip() pfieldsUsed.add(pfieldIndex) else: if _re.search(r"\bouts\s+", line): outchannels.update((1, 2)) elif _re.search(r"\bout\b", line): outchannels.add(1) elif _re.search(r"\boutch\b", line): args = line.strip()[5:].split(",") channels = args[::2] for chans in channels: if chans.isnumeric(): outchannels.add(int(chans)) bodyLines.append(line) for pidx in range(1, 4): pfieldIndexToValue.pop(pidx, None) pfieldIndexToName.pop(pidx, None) bodyLines = [line for line in bodyLines if line.strip()] return ParsedInstrBody(pfieldIndexToValue=pfieldIndexToValue, pfieldIndexToName=pfieldIndexToName, pfieldsUsed=pfieldsUsed, outChannels=outchannels, pfieldLines=pfieldLines, body="\n".join(bodyLines), lines=lines)
[docs] def bestSampleEncodingForExtension(ext: str) -> str: """ Given an extension, return the best sample encoding. .. note:: float64 is not considered necessary for holding sound information Args: ext (str): the extension of the file will determine the format Returns: a sample format of the form "pcmXX" or "floatXX", where XX determines the bit rate ("pcm16", "float32", etc) ========== ================ Extension Sample Format ========== ================ wav float32 aif float32 flac pcm24 mp3 pcm16 ogg vorbis ========== ================ """ if ext[0] == ".": ext = ext[1:] if ext in {"wav", "aif", "aiff"}: return "float32" elif ext == "flac": return "pcm24" elif ext == 'ogg': return 'vorbis' else: raise ValueError(f"Format {ext} not supported. Formats supported: wav, aiff, flac and ogg")
def _parsePresetSflistprograms(line: str) -> tuple[str, int, int] | None: # 012345678 # xxx:yyy zzzzzzzzzz bank = int(line[:3]) num = int(line[4:7]) name = line[8:].strip() return (name, bank, num) def _parsePreset(line: str) -> tuple[str, int, int] | None: match = _re.search(r">> Bank: (\d+)\s+Preset:\s+(\d+)\s+Name:\s*(.+)", line) if not match: return None name = match.group(3).strip() bank = int(match.group(1)) presetnum = int(match.group(2)) return (name, bank, presetnum)
[docs] class SoundFontIndex: """ Creates an index of presets for a given soundfont Attributes: instrs: a list of instruments, where each instrument is a tuple (instr. index, name) presets: a list of presets, where each preset is a tuple (bank, num, name) nameToIndex: a dict mapping instr name to index indexToName: a dict mapping instr idx to name nameToPreset: a dict mapping preset name to (bank, num) presetToName: a dict mapping (bank, num) to preset name """ def __init__(self, soundfont: str): assert _os.path.exists(soundfont) self.soundfont = soundfont instrs, presets = _soundfontInstrumentsAndPresets(soundfont) self.instrs: list[tuple[int, str]] = instrs self.presets: list[tuple[int, int, str]] = presets self.nameToIndex: dict[str, int] = {name:idx for idx, name in self.instrs} self.indexToName: dict[int, str] = {idx:name for idx, name in self.instrs} self.nameToPreset: dict[str, tuple[int, int]] = {name: (bank, num) for bank, num, name in self.presets} self.presetToName: dict[tuple[int, int], str] = {(bank, num): name for bank, num, name in self.presets}
[docs] @_functools.cache def soundfontIndex(sfpath: str) -> SoundFontIndex: """ Make a SoundFontIndex for the given soundfont Args: sfpath: the path to a soundfont (.sf2) file Returns: a SoundFontIndex Example ~~~~~~~ >>> from csoundengine import csoundlib >>> idx = csoundlib.soundfontIndex("/path/to/piano.sf2") >>> idx.nameToPreset {'piano': (0, 0)} >>> idx.nameToIndex {'piano': 0} """ return SoundFontIndex(sfpath)
@_functools.cache def _sf2file(path: str) -> Sf2File: from sf2utils.sf2parse import Sf2File f = open(path, 'rb') return Sf2File(f) @_functools.cache def _soundfontInstrumentsAndPresets(sfpath: str ) -> tuple[list[tuple[int, str]], list[tuple[int, int, str]]]: """ Returns a tuple (instruments, presets) Where instruments is a list of tuples(instridx, instrname) and presets is a list of tuples (bank, presetnum, name) Args: sfpath: the path to the soundfont Returns: a tuple (instruments, presets), where instruments is a list of tuples (instrindex, instrname) and prests is a list of tuples (bank, presetindex, name) """ sf = _sf2file(sfpath) instruments: list[tuple[int, str]] = [(num, instr.name.strip()) for num, instr in enumerate(sf.instruments) if instr.name and instr.name != 'EOI'] presets: list[tuple[int, int, str]] = [(p.bank, p.preset, p.name.strip()) for p in sf.presets if p.name and p.name != 'EOP'] presets.sort() return instruments, presets
[docs] def soundfontInstruments(sfpath: str) -> list[tuple[int, str]]: """ Get instruments for a soundfont The instrument index is used by csound opcodes like `sfinstr`. These are different from soundfont programs, which are ordered in banks/presets Args: sfpath: the path to the soundfont. "?" to open a file-browser dialog Returns: list[tuple[int,str]] - a list of tuples, where each tuple has the form (index: int, instrname: str) """ if sfpath == "?": sfpath = _state.openSoundfont(ensureSelection=True) instrs, _ = _soundfontInstrumentsAndPresets(sfpath) return instrs
[docs] def soundfontPresets(sfpath: str) -> list[tuple[int, int, str]]: """ Get presets from a soundfont Args: sfpath: the path to the soundfont. "?" to open a file-browser dialog Returns: a list of tuples ``(bank:int, presetnum:int, name: str)`` """ if sfpath == "?": sfpath = _state.openSoundfont(ensureSelection=True) _, presets = _soundfontInstrumentsAndPresets(sfpath) return presets
[docs] def soundfontSelectPreset(sfpath: str ) -> tuple[str, int, int] | None: """ Select a preset from a soundfont interactively Returns: a tuple (preset name, bank, preset number) if a selection was made, None otherwise .. figure:: ../assets/select-preset.png """ presets = soundfontPresets(sfpath) items = [f'{bank:03d}:{pnum:03d}:{name}' for bank, pnum, name in presets] item = emlib.dialogs.selectItem(items, ensureSelection=True) if item is None: return None idx = items.index(item) preset = presets[idx] bank, pnum, name = preset return (name, bank, pnum)
[docs] def soundfontInstrument(sfpath: str, name: str) -> int | None: """ Get the instrument number from a preset The returned instrument number can be used with csound opcodes like `sfinstr` or `sfinstr3` Args: sfpath: the path to a .sf2 file. "?" to open a file-browser dialog name: the instrument name Returns: the instrument index, if exists """ if sfpath == "?": sfpath = _state.openSoundfont(ensureSelection=True) sfindex = soundfontIndex(sfpath) return sfindex.nameToIndex.get(name)
[docs] @_functools.cache def soundfontKeyrange(sfpath: str, preset: tuple[int, int]) -> tuple[int, int] | None: sf = _sf2file(sfpath) for p in sf.presets: if p.bank == preset[0] and p.preset == preset[1]: return p.key_range.start, p.key_range.stop return None
[docs] def soundfontPeak(sfpath: str, preset: tuple[int, int], pitches: tuple[int, int] | None = None, dur=0.05 ) -> float: from csoundengine.offline import OfflineEngine e = OfflineEngine(nchnls=0, ksmps=128, numAudioBuses=0, numControlBuses=0) bank, prog = preset presetnum = 1 if pitches is None: keyrange = soundfontKeyrange(sfpath, preset) if not keyrange: raise ValueError(f"No defined key range for preset {preset} in soundfont {sfpath}") minpitch, maxpitch = keyrange pitch1 = int((maxpitch - minpitch) * 0.2 + minpitch) pitch2 = int((maxpitch - minpitch) * 0.8 + minpitch) pitches = (pitch1, pitch2) e.compile(fr''' gi_sfhandle sfload "{sfpath}" gi_presetindex sfpreset {prog}, {bank}, gi_sfhandle, {presetnum} chnset 0, "sfpeak" instr sfpeak ipreset = p4 ipitch1 = p5 ipitch2 = p6 kmax0 init 0 a1 sfplaym 127, ipitch1, 1, 1, ipreset, 0 a2 sfplaym 127, ipitch2, 1, 1, ipreset, 0 kmax1 peak a1 kmax2 peak a2 kmax = max(kmax1, kmax2) if kmax > kmax0 then chnset kmax, "sfpeak" endif kmax0 = kmax endin ''') e.sched('sfpeak', 0, dur, (presetnum, pitches[0], pitches[1])) e.perform(extratime=0.1) assert e.csound is not None value, error = e.csound.controlChannel("sfpeak") e.stop() return float(value)
[docs] def splitScoreLine(line: str, quote=False) -> list[float | str]: # i "instr" 1 2 3 "foo bar" 0.5 "foofi" kind = line[0] assert kind in 'ife' rest = line[1:] allparts: list[str | int | float] = [kind] # even parts are not quoted strings, odd parts are quoted strings for i, part in enumerate(rest.split('"')): if i % 2 == 0: allparts.extend(float(sub.strip()) for sub in part.split()) else: allparts.append(f'"{part}"' if quote else part) return allparts
[docs] def splitInclude(line: str) -> str: """ Given an include line it splits the include path Example ~~~~~~~ >>> splitInclude(r' #include "foo/bar" ') foo/bar NB: the quotation marks are not included """ match = _re.search(r'#include\s+"(.+)""', line) if not match: raise ValueError("Could not parse include") return match.group(1)
[docs] def makeIncludeLine(include: str) -> str: """ Given a path, creates the #include directive In particula, it checks the need for quotation marks Args: include: path to include Returns: """ s = emlib.textlib.quoteIfNeeded(include.strip()) return f'#include {s}'
@_functools.cache def _pygmentsOrcLexer(): import pygments.lexers.csound return pygments.lexers.csound.CsoundOrchestraLexer()
[docs] def highlightCsoundOrc(code: str, theme='') -> str: """ Converts csound code to html with syntax highlighting Args: code: the code to highlight theme: the theme used, one of 'light', 'dark'. If not given, a default is used (see config['html_theme']) Returns: the corresponding html """ if not theme: from .config import config theme = config['html_theme'] import pygments if theme == 'light': htmlfmt = pygments.formatters.HtmlFormatter(noclasses=True, wrapcode=True) else: htmlfmt = pygments.formatters.HtmlFormatter(noclasses=True, style='fruity', wrapcode=True) html = pygments.highlight(code, lexer=_pygmentsOrcLexer(), formatter=htmlfmt) return html
[docs] def channelTypeFromValue(value: int | float | str) -> str: """ Channel type (k, S, a) from value """ if isinstance(value, (int, float)): return 'k' elif isinstance(value, str): return 'S' elif isinstance(value, np.ndarray): return 'a' else: raise TypeError(f"Value of type {type(value)} not supported")
[docs] def isPfield(name: str) -> bool: """ Is name a pfield? """ return _re.match(r'\bp[1-9][0-9]*\b', name) is not None
[docs] def fillPfields(args: list[float | str], namedpargs: dict[int, float], defaults: dict[int, float] | None) -> list[float | str]: if not defaults: if namedpargs and not args: maxp = max(namedpargs.keys()) out = [0.] * (maxp - 3) for idx, value in namedpargs.items(): out[idx - 4] = value return out elif namedpargs and args: maxp = max(len(args) + 3, max(namedpargs.keys())) out = [0.] * (maxp - 3) for i, arg in enumerate(args): out[i] = arg for idx, value in namedpargs.items(): out[idx-4] = value return out elif args: return args else: # no args at all raise ValueError("No args or namedargs given and no default values defined") # with defaults if namedpargs and not args: maxp = max(max(defaults.keys()), max(namedpargs.keys())) out = [0.] * (maxp - 3) for idx, value in defaults.items(): out[idx - 4] = value for idx, value in namedpargs.items(): out[idx - 4] = value return out elif namedpargs and args: maxp = max(len(args)+3, max(defaults.keys()), max(namedpargs.keys())) out = [0.] * (maxp - 3) for idx, value in defaults.items(): out[idx - 4] = value out[:len(args)] = args for idx, value in namedpargs.items(): out[idx - 4] = value return out elif args: maxp = max(len(args) + 3, max(defaults.keys())) out = [0.] * (maxp - 3) for idx, value in defaults.items(): out[idx - 4] = value out[:len(args)] = args return out else: # only defaults maxp = max(defaults.keys()) out = [0.] * (maxp - 3) for idx, value in defaults.items(): out[idx - 4] = value return out
[docs] def normalizeNamedPfields(pfields: dict[str, float], namesToIndexes: dict[str, int] | None = None ) -> dict[int, float]: """ Given a dict mapping pfield as str to value, return a dict mapping pfield index to value Args: pfields: a dict of the form {pfield: value} where pfield can be a key like 'p<n>' or a variable name which was assigned to this pfield (like ``ifreq = p4`` namesToIndexes: a dict mapping variable names to indexes Returns: a dict of the form {pfieldindex: value} Example ~~~~~~~ >>> normalizeNamedPfields({'p4': 0.5, 'ifreq': 2000}, {'ifreq': 5}) {4: 0.5, 5: 2000} """ out: dict[int, float] = {} for k, value in pfields.items(): if k.startswith('p'): out[int(k[1:])] = value elif namesToIndexes: assert k.startswith('k') or k.startswith('i') idx = namesToIndexes.get(k) if idx is None: raise KeyError(f"Keyword pfield not known: {k}") out[idx] = value else: raise KeyError(f"Keyword pfield not known: {k}") return out