from __future__ import annotations
import math
import os
import tempfile
import textwrap
import contextlib
from dataclasses import dataclass
from functools import cache
import numpy as np
from . import (
csoundparse,
engineorc,
internal,
tools,
csounddefs
)
from .config import config, logger
from .enginebase import TableInfo, _EngineBase
from .engineorc import BUSKIND_AUDIO, BUSKIND_CONTROL
from .errors import CsoundError
from .renderjob import RenderJob
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from csoundengine.csd import Csd
import libcsound
from typing import Sequence
__all__ = [
'OfflineEngine'
]
class _OfflineComponent:
"""
Base class for keeping track of offline events
The history can then be used to generate a csd file from
an offline engine
"""
def apply(self, csd: Csd) -> None:
pass
@dataclass
class _CompileEvent(_OfflineComponent):
code: str
def apply(self, csd: Csd) -> None:
csd.addGlobalCode(self.code)
@dataclass
class _ScoreEvent(_OfflineComponent):
kind: str
pfields: list[float | str]
comment: str = ''
def __post_init__(self):
assert self.kind in 'ifCed', f"Invalid event kind, got {self.kind}"
def apply(self, csd: Csd) -> None:
if self.kind == 'i':
start = self.pfields[1]
dur = self.pfields[2]
assert isinstance(start, (int, float)) and isinstance(dur, (int, float))
csd.addEvent(instr=self.pfields[0], start=start, dur=dur,
args=self.pfields[3:], comment=self.comment)
else:
pfields = [self.kind]
pfields.extend(f'"{p}"' if isinstance(p, str) else str(p) for p in self.pfields)
line = ' '.join(pfields)
csd.addScoreLine(line)
@dataclass
class _SchedEvent(_OfflineComponent):
instr: int | float | str
delay: float
dur: float
args: list
eventid: float
comment: str = ''
def apply(self, csd: Csd) -> None:
csd.addEvent(instr=self.instr,
start=self.delay,
dur=self.dur,
args=self.args,
comment=self.comment)
@dataclass
class _TableDataEvent(_OfflineComponent):
data: np.ndarray
tabnum: int
delay: float = 0.
sr: int = 0
def apply(self, csd: Csd) -> None:
csd.addTableFromData(data=self.data, tabnum=self.tabnum, start=self.delay, sr=self.sr)
@dataclass
class _ReadSoundfileEvent(_OfflineComponent):
path: str
tabnum: int
delay: float = 0.
skiptime: float = 0.
chan: int = 0
def apply(self, csd: Csd) -> None:
csd.addSndfile(sndfile=self.path, tabnum=self.tabnum, start=self.delay,
skiptime=self.skiptime, chan=self.chan)
@dataclass
class _SetParamEvent(_OfflineComponent):
p1: int | float | str
pindex: int
value: int | float
delay: float
def apply(self, csd: Csd) -> None:
csd.setPfield(p1=self.p1, pindex=self.pindex, value=self.value, start=self.delay)
@dataclass
class _AutomateEvent(_OfflineComponent):
p1: int | float | str
pfield: int | str
pairs: Sequence[float] | np.ndarray
delay: float = 0.
overtake: bool = False
def apply(self, csd: Csd) -> None:
assert isinstance(self.pfield, int), f"Pfield must be an integer, got {self.pfield}"
pairs = self.pairs.tolist() if isinstance(self.pairs, np.ndarray) else self.pairs
csd.automatePfield(p1=self.p1,
pindex=self.pfield,
pairs=pairs,
start=self.delay)
[docs]
class OfflineEngine(_EngineBase):
"""
Non-real-time engine using the csound API
This is similar to an :class:`csoundengine.engine.Engine` but
it renders offline to a soundfile
Args:
sr: sample rate
ksmps: samples per cycle
outfile: soundfile to render to. If not given a tempfile is used
nchnls: number of output channels
a4: reference frequency
numAudioBuses: number of audio buses (see :ref:`Bus Opcodes<busopcodes>`)
numControlBuses: number of control buses (see :ref:`Bus Opcodes<busopcodes>`)
busSupport: if True, add bus support at creation time. This should be True
if you are using the bus opcodes from csound code before creating
any bus from python. Otherwise, the creation of any bus (via assignBus)
adds bus support automatically
quiet: if True, suppress output of csound (-m 0)
includes: a list of files to include. Can be added later via :meth:`~OfflineEngine.includeFile`
sampleAccurate: use sample-accurate scheduling
commandlineOptions: command line options passed verbatim to the
csound process when started
encoding: the sample encoding of the rendered file, given as 'pcm<bits>' or 'float<bits>',
where bits presents the bit-depth (for example, 'pcm16', 'pcm24', 'float32', etc).
If no encoding is given a suitable default for the sample format is chosen.
"""
def __init__(self,
sr=44100,
ksmps: int = 64,
outfile: str = '',
nchnls=2,
a4: int = 0,
globalcode='',
busSupport=False,
numAudioBuses: int | None = None,
numControlBuses: int | None = None,
quiet=True,
includes: list[str] | None = None,
sampleAccurate=False,
encoding='',
nosound=False,
commandlineOptions: list[str] | None = None):
super().__init__(sr=sr,
ksmps=ksmps,
a4=a4 or config['A4'],
nchnls=nchnls,
numAudioBuses=numAudioBuses if numAudioBuses is not None else config['num_audio_buses'],
numControlBuses=numControlBuses if numControlBuses is not None else config['num_control_buses'],
sampleAccurate=sampleAccurate)
if outfile:
outfile = internal.normalizePath(outfile)
self.outfile = outfile or tempfile.mktemp(prefix='csoundengine-', suffix='.wav') if not nosound else ''
self.globalcode = globalcode
self.numAudioBuses = numAudioBuses if numAudioBuses is not None else config['num_audio_buses']
self.numControlBuses = numControlBuses if numControlBuses is not None else config['num_control_buses']
self.includes = includes if includes is not None else []
self.encoding = encoding or csounddefs.bestSampleEncodingForExtension(os.path.splitext(self.outfile)[1][1:])
self.version = 0
self._renderjob: RenderJob | None = None
self._strToIndex: dict[str, int] = {}
self._indexToStr: dict[int, str] = {}
self._tableInfo: dict[int, TableInfo] = {}
self._soundfilesLoaded: dict[tuple[str, int, float], int] = {}
self._parsedInstrs: dict[str, csoundparse.ParsedInstrBody] = {}
# maps (path, chan, skiptime) -> tablenumber
self._instanceCounters: dict[int, int] = {}
self._fracnumdigits = 4 # number of fractional digits used for unique instances
self._strLastIndex = 20
self._stopped = False
self._history: list[_OfflineComponent] = []
self._trackHistory = True
self._usesBuses = False # does any instr uses the bus system?
self._hasBusSupport = busSupport
self._reservedInstrnums: set[int] = set()
self._reservedInstrnumRanges: list[tuple[str, int, int]] = [
('builtinorc', engineorc.CONSTS['reservedInstrsStart'], engineorc.CONSTS['userInstrsStart'] - 1)]
self._shouldPerform = False
self._endtime = 0.
self._tableCounter = engineorc.CONSTS['reservedTablesStart']
self.nosound = nosound
self.options = ["-d"]
if quiet:
self.options.extend(["--messagelevel=0", "--m-amps=0", "--m-range=0"])
if nchnls == 0 or nosound:
self.options.append('--nosound')
self.nosound = True
if sampleAccurate:
self.options.append('--sample-accurate')
if commandlineOptions:
self.options.extend(commandlineOptions)
self.csound: libcsound.Csound
self._start()
for s in ["cos", "linear", "smooth", "smoother"]:
self.strSet(s)
@property
def endtime(self) -> float:
return self._endtime
def _makeIncludeBlock(self) -> str:
if not self.includes:
return ''
includelines = [f'#include "{include}"' for include in self.includes]
return "\n".join(includelines)
def _compile(self, code: str) -> None:
err = self.csound.compileOrc(code)
if err:
logger.error(internal.addLineNumbers(code))
raise CsoundError(f"Error compiling base ochestra, error: {err}")
self._history.append(_CompileEvent(code))
def _start(self) -> None:
import libcsound
self.version = libcsound.VERSION
self.csound = csound = libcsound.Csound()
for option in self.options:
csound.setOption(option)
if not self.nosound:
csound.setOption(f'-o{self.outfile}')
header = engineorc.makeOrcHeader(sr=self.sr, ksmps=self.ksmps, nchnls=self.nchnls, nchnls_i=0, a4=self.a4)
csound.compileOrc(header)
orc, instrmap = engineorc.makeOrc(globalcode=self.globalcode,
includestr=self._makeIncludeBlock())
self._builtinInstrs = instrmap
self._reservedInstrnums.update(set(instrmap.values()))
self._compile(orc)
csound.start()
if self._hasBusSupport:
self.addBusSupport()
def evalCode(self, code: str) -> float:
return self.csound.evalCode(code)
def addBusSupport(self, numAudioBuses: int|None = None, numControlBuses: int|None = None) -> None:
numAudioBuses = numAudioBuses if numAudioBuses is not None else self.numAudioBuses
numControlBuses = numControlBuses if numControlBuses is not None else self.numControlBuses
startInstr = max(instrnum for instrnum in self._builtinInstrs.values() if instrnum < engineorc.CONSTS['postProcInstrnum']) + 1
postInstrnum = 1 + max(max(self._builtinInstrs.values()), engineorc.CONSTS['postProcInstrnum'])
busorc, businstrs = engineorc.makeBusOrc(numAudioBuses=self.numAudioBuses,
numControlBuses=self.numControlBuses,
startInstr=startInstr,
postInstr=postInstrnum)
self._builtinInstrs.update(businstrs)
self._reservedInstrnumRanges.append(('busorc', min(businstrs.values()), max(businstrs.values())))
self._hasBusSupport = True
self.numAudioBuses = numAudioBuses
self.numControlBuses = numControlBuses
chanptr, error = self.csound.channelPtr("_busTokenCount", kind='control', mode='rw')
if error:
raise RuntimeError(f"Error in csound.channelPtr: {error}")
assert isinstance(chanptr, np.ndarray)
self._busTokenCountPtr = chanptr
self._compile(busorc)
kbustable = int(self.csound.evalCode("return gi__bustable"))
self._kbusTable = self.tableData(kbustable)
[docs]
def hasBusSupport(self) -> bool:
"""
Returns True if this Engine was started with bus support
.. seealso::
:meth:`~csoundengine.engine.Engine.assignBus`
:meth:`~csoundengine.engine.Engine.writeBus`
:meth:`~csoundengine.engine.Engine.readBus`
"""
return self._hasBusSupport and (self.numAudioBuses > 0 or self.numControlBuses > 0)
[docs]
def compile(self, code: str) -> None:
"""
Send orchestra code to csound
The code sent can be any orchestra code
Args:
code: the code to compile
"""
if not self.csound:
raise RuntimeError("This OfflineEngine does not have an associated csound process")
codeblocks = csoundparse.parseOrc(code)
for codeblock in codeblocks:
if codeblock.kind == 'instr':
parsedbody = csoundparse.instrParseBody(csoundparse.instrGetBody(codeblock.lines))
self._parsedInstrs[codeblock.name] = parsedbody
if codeblock.name[0].isdigit():
instrnum = int(codeblock.name)
for rangename, mininstr, maxinstr in self._reservedInstrnumRanges:
if mininstr <= instrnum < maxinstr:
logger.error(f"Instrument number {instrnum} is reserved. Code:")
logger.error("\n" + textwrap.indent(codeblock.text, " "))
raise ValueError(f"Cannot use instrument number {instrnum}, "
f"the range {mininstr} - {maxinstr} is reserved for '{rangename}'")
if instrnum in self._reservedInstrnums:
raise ValueError("Cannot compile instrument with number "
f"{instrnum}: this is a reserved instr and "
f"cannot be redefined. Reserved instrs: "
f"{sorted(self._reservedInstrnums)}")
self.csound.compileOrc(code)
self._shouldPerform = True
self._addHistory(_CompileEvent(code))
def _addHistory(self, component: _OfflineComponent) -> None:
if self._trackHistory:
self._history.append(component)
@cache
def instrNum(self, name: str) -> int:
assert not self._stopped
return int(self.csound.evalCode(f'return nstrnum("{name}")'))
def assignInstanceNum(self, instr: int | str) -> int:
if isinstance(instr, str):
instr = self.instrNum(instr)
c = self._instanceCounters.get(instr, 0) + 1
self._instanceCounters[instr] = c
instancenum = (c % int(10 ** self._fracnumdigits - 2)) + 1
return instancenum
[docs]
def assignEventId(self, instr: int | str) -> float:
"""
Assign a unique p1 value for the given instrument number
This is used internally to assign a fractional p1 when
called sched with unique=True, but it can also be used
to generate unique p1 for :meth:`OfflineEngine.inputMessage`
or when using csound's ``schedule`` with :meth:`OfflineEngine.compile`
Args:
instr: the instrument number
Returns:
a unique fractional p1
"""
instancenum = self.assignInstanceNum(instr=instr)
frac = (instancenum / (10 ** self._fracnumdigits)) % 1
return (instr if isinstance(instr, int) else self.instrNum(instr)) + frac
[docs]
def sched(self,
instr: int | float | str,
delay=0.,
dur=-1.,
*pfields,
args: np.ndarray | list[float | str] | None = None,
unique=False,
comment='',
relative=True,
**namedpfields
) -> float | str:
"""
Schedule an instrument
Args:
instr : the instrument number/name. If it is a fractional number,
that value will be used as the instance number.
An integer or a string will result in a unique instance assigned
by csound if unique is True. Named instruments
with a fractional number can also be scheduled (for example,
for an instrument named "myinstr" you canuse "myinstr.001")
delay: start time, relative to the elapsed time. (see :attr:`OfflineEngine.now`)
dur: duration of the event
args: any other args expected by the instrument, starting with p4
(as a list of floats/strings, or a numpy array). Any
string arguments will be converted to a string index via strSet. These
can be retrieved in csound via strget
unique: if True, assign a unique p1
namedpfields: pfields can be given as keyword arguments of the form p4=..., p6=...
Defaults are filled with values defined via ``pset``
Returns:
a fractional p1 of the instr started, which identifies this event.
If instr is a fractional named instr, like "synth.01", then this
same instr is returned as eventid (as a string).
Example
-------
.. code-block :: python
from csoundengine import *
e = Engine()
e.compile(r'''
instr 100
pset 0, 0, 0, 1000, 2000, 0, 9.0
kfreq = p4
kcutoff = p5
Smode strget p6
iq = p7
asig vco2 0.1, kfreq
if strcmp(Smode, "lowpass") == 0 then
asig moogladder2 asig, kcutoff, 0.95
else
asig K35_hpf asig, kcutoff, iq
endif
outch 1, asig
endin
''')
eventid = e.sched(100, 2, args=[200, 400, "lowpass"])
# simple automation in python
for cutoff in range(400, 3000, 10):
e.setp(eventid, 5, cutoff)
time.sleep(0.01)
e.unsched(eventid)
#
# To ensure simultaneity between events:
now = e.elapsedTime()
for t in np.arange(2, 4, 0.2):
e.sched(100, t+now, 0.2, relative=False, p7=9.8)
.. seealso:: :meth:`~csoundengine.engine.Engine.unschedAll`
"""
if not self.csound:
raise RuntimeError("This OfflineEngine has no associated csound process")
if pfields and args:
raise ValueError("Either pfields as positional arguments or args can be given, "
"got both")
elif pfields:
args = pfields
if namedpfields:
instrdef = self._parsedInstrs.get(str(int(instr)) if isinstance(instr, (int, float)) else instr)
if instrdef:
kwargs = csoundparse.normalizeNamedPfields(namedpfields, instrdef.pfieldNameToIndex)
else:
assert all(csoundparse.isPfield(key) for key in namedpfields)
kwargs = {int(key[1:]):value for key, value in namedpfields.items()}
args = csoundparse.fillPfields(args, kwargs, defaults=instrdef.pfieldIndexToValue if instrdef else None)
if unique:
if isinstance(instr, str):
p1 = self.assignEventId(instr)
frac = round(math.modf(p1)[0], self._fracnumdigits)
instr = instr + '.' + str(frac).split('.')[1]
elif isinstance(instr, int):
instr = p1 = self.assignEventId(instr)
elif isinstance(instr, float) and int(instr) == instr:
instr = p1 = self.assignEventId(int(instr))
else:
raise TypeError(f"Expected an instrument number of name, got {instr}")
else:
p1 = instr if not isinstance(instr, str) else self.instrNum(instr)
now = self.elapsedTime()
if not relative:
# We always convert to relative
delay = delay - now
if delay < 0:
raise ValueError(f"The time offset is in the pase, elapsed time: {now}, absolute offset given: {delay + now}")
if dur >= 0 and delay + dur > self._endtime:
self._endtime = delay + dur
elif delay > self._endtime:
self._endtime = delay
if isinstance(args, np.ndarray):
pfields = np.empty((len(args) + 3,), dtype=float)
pfields[0] = p1
pfields[1] = delay
pfields[2] = dur
pfields[3:] = args
else:
pfields = [p1, delay, dur]
if args:
with self.nohistory():
# do not keep track of strsets for scheduling events in the history
# since these end up in the score and csound does the string handling for us
pfields.extend(float(a) if not isinstance(a, str) else self.strSet(a) for a in args)
self.csound.scoreEvent("i", pfields=pfields)
# self.csound.scoreEventAbsolute(type_='i', pFields=pfields, timeOffset=timeOffset)
self._addHistory(_SchedEvent(instr=instr, delay=delay+now, dur=dur, args=args,
eventid=p1, comment=comment))
self._shouldPerform = True
return instr
[docs]
def unsched(self, p1: float | str, delay: float = 0) -> None:
"""
Stop a playing event
If p1 is a round number, all events with the given number
are unscheduled. Otherwise only an exact matching event
is unscheduled, if it exists
Args:
p1: the instrument number/name to stop
delay: absolute time to turnoff the given event. A value of 0 means to
unschedule it at the current time
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine(...)
>>> e.compile(r'''
... instr sine
... a0 oscili 0.1, 1000
... outch 1, a0
... endin
... ''')
>>> # sched an event with indefinite duration
>>> eventid = e.sched(10, 0, -1)
>>> e.unsched(eventid, 10)
.. seealso:: :meth:`~Engine.unschedAll`
"""
if (isinstance(p1, int) and int(p1) != p1) or (isinstance(p1, str) and "." in p1):
mode = 4
else:
mode = 0
self.sched(self._builtinInstrs['turnoff'], delay, 0, p1, mode)
[docs]
@contextlib.contextmanager
def nohistory(self):
"""
A context manager to suppress tracking history
"""
try:
self._trackHistory = False
yield self
finally:
self._trackHistory = True
[docs]
def setEndMarker(self, time: float) -> None:
"""
Set the end marker
When calling :meth:`OfflineEngine.perform`, performance
will be advanced up to this absolute time
The current end time can be queried via the :attr:`OfflineEngine.endtime`
attribute
"""
self._endtime = time
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._endtime and self._shouldPerform:
self.perform()
@property
def renderjob(self) -> RenderJob | None:
if not self._renderjob:
info = tools.sndfileInfo(self.outfile)
if info.duration == 0:
return None
renderjob = RenderJob(outfile=self.outfile, samplerate=self.sr, encoding=self.encoding)
self._renderjob = renderjob
return self._renderjob
[docs]
def cancel(self, remove=True) -> None:
"""
Cancel performance of this engine
Args:
remove: if True, remove the generated outfile
"""
if self._stopped:
logger.info("This engine is already stopped")
return
self.csound.stop()
self.csound.cleanup()
if remove and os.path.exists(self.outfile):
logger.debug(f"Removing outfile '{self.outfile}'")
os.remove(self.outfile)
[docs]
def stop(self, render=True, extratime=0.) -> RenderJob | None:
"""
Stop this csound process, optionally rendering any pending events
Args:
render: if True, advance the clock to render any pending scheduled
event.
extratime: if render is True, add extra render time to allow for
decays
Returns:
a RenderJob containing information about the rendered file, or None
if no render took place.
"""
if self._stopped:
return None
if self._shouldPerform and render:
self.perform(extratime=extratime)
self.csound.stop()
self.csound.reset()
self._stopped = True
if not os.path.exists(self.outfile):
raise RuntimeError(f"Did not find rendered file '{self.outfile}'")
info = tools.sndfileInfo(self.outfile)
if info.duration > 0:
renderjob = RenderJob(outfile=self.outfile, samplerate=self.sr, encoding=self.encoding)
self._renderjob = renderjob
return renderjob
else:
return None
def __del__(self):
if self.csound and self._shouldPerform:
self.stop(render=True)
[docs]
def strSet(self, s: str) -> int:
"""
Assign a numeric index to a string to be used inside csound
Args:
s: the string to set
Returns:
the index associated with ``s``. When passed to a csound instrument
it can be retrieved via ``strget``.
"""
stringIndex = self._strToIndex.get(s)
if stringIndex:
return stringIndex
stringIndex = self._strLastIndex
self._strLastIndex += 1
self._strToIndex[s] = stringIndex
self._indexToStr[stringIndex] = s
self.compile(fr'strset {stringIndex}, "{s}"')
return stringIndex
def _assignTableNumber(self) -> int:
"""
Return a free table number and mark that as being used.
To release the table, call unassignTable
Returns:
the table number (an integer)
"""
self._tableCounter += 1
return self._tableCounter - 1
[docs]
def readSoundfile(self, path: str, tabnum=0, chan=0, skiptime=0., delay=0., unique=True
) -> int:
"""
Read a soundfile into a table (via GEN1), returns the table number
Args:
path: the path to the output -- **"?" to open file interactively**
tabnum: if given, a table index. If None, an index is
autoassigned
chan: the channel to read. 0=read all channels
skiptime: time to skip at the beginning of the soundfile.
delay: performance time at which to read the soundfile
unique: if False and the same file with the same params is already loaded,
the existing table is returned
Returns:
the table number. Information about the read soundfile can be obtained
via :meth:`OfflineEngine.tableInfo`
"""
assert not self._stopped
if not unique and tabnum == 0:
if existingtab := self._soundfilesLoaded.get((path, chan, skiptime)):
return existingtab
if not tabnum:
tabnum = self._assignTableNumber()
self._tableInfo[tabnum] = TableInfo.get(path)
self._soundfilesLoaded[(path, chan, skiptime)] = tabnum
self.csound.compileOrc(fr'itab ftgen {tabnum}, {delay}, 0, -1, "{path}", {skiptime}, 0, {chan}')
self._addHistory(_ReadSoundfileEvent(path=path, tabnum=tabnum, delay=delay, skiptime=skiptime, chan=chan))
return tabnum
[docs]
def queryVariable(self, variable: str) -> float:
"""
Query the value of a csound variable via the ``return`` opcode
Example
~~~~~~~
>>> engine.compile(r'''
... gkfoo init 0
... instr 100
... gkfoo = p4
... endin
... ''')
>>> engine.sched(100, 0, 0, 314)
>>> engine.perform()
>>> engine.queryVariable('gkfoo')
314.0
"""
assert not self._stopped
return self.csound.evalCode(fr'return {variable}\n')
[docs]
def call(self, func: str) -> float:
"""
Call an init-time csound function, returns the result
Args:
func: the function to call, like 'ftgen(200, 0, 100, 2, 0)'. It must be
an init-time function returning one init value
Returns:
the value returned by the function
Example
~~~~~~~
# Create an empty table manually with size 100
>>> engine.compile(r'i0 = ftgen(200, 0, 100, 2, 0)')
# Check that the size is correct
>>> engine.call('ftlen(200)')
100
"""
assert not self._stopped
return self.csound.evalCode(fr'return {func}')
def tableInfo(self, tabnum: int, cache=True) -> TableInfo:
info = self._tableInfo.get(tabnum)
if info and cache:
return info
assert not self._stopped
sr = self.csound.evalCode(f'return ftsr({tabnum})')
numchannels = self.csound.evalCode(f'return ftchnls({tabnum})')
tablen = self.csound.evalCode(f'return ftlen({tabnum}')
return TableInfo(sr=int(sr), size=int(tablen), nchnls=int(numchannels))
[docs]
def makeEmptyTable(self, size: int, numchannels=1, sr=0, delay=0.) -> int:
"""
Create an empty table, returns the index of the created table
Args:
size: the size of the table
numchannels: if the table will be used to hold audio, the
number of channels of the audio
sr: the samplerate of the audio, if the table is used to hold audio
delay: when to create the table
Returns:
the table number
"""
tabnum = self._assignTableNumber()
self._tableInfo[tabnum] = TableInfo(sr=sr, size=size, nchnls=numchannels)
self.compile(f'itab ftgen {tabnum}, {delay}, {-size}, -2, 0')
if sr > 0:
self.sched(instr=self._builtinInstrs['ftsetparams'], delay=delay, dur=0,
args=[tabnum, sr, numchannels])
return tabnum
def tableData(self, idx: int) -> np.ndarray:
assert not self._stopped
arr = self.csound.table(idx)
if arr is None:
raise ValueError(f"Table {idx} does not exist")
return arr
[docs]
def channelPointer(self, channel: str, kind='control', mode='rw') -> np.ndarray:
if kind != 'control' and kind != 'audio':
raise NotImplementedError("Only kind 'control' and 'audio' are implemented "
"at the moment")
assert not self._stopped
ptr, err = self.csound.channelPtr(channel, kind, mode)
if err:
raise RuntimeError(f"Error while trying to retrieve/create a channel pointer: {err}")
assert isinstance(ptr, np.ndarray)
return ptr
[docs]
def initChannel(self,
channel: str,
value: float | str | np.ndarray = 0,
kind='',
mode="r") -> None:
modei = {"r": 1, "w": 2, "rw": 3, "wr": 3}[mode]
if not kind:
if isinstance(value, (int, float)):
kind = 'k'
elif isinstance(value, str):
kind = 'S'
elif isinstance(value, np.ndarray):
kind = 'a'
if kind == 'k':
self.compile(f'chn_k "{channel}", {modei}\n')
self.setChannel(channel, value)
elif kind == 'a':
self.compile(f'chn_a "{channel}", {modei}')
if value:
self.setChannel(channel, value)
elif kind == 'S':
self.compile(f'chn_S "{channel}", {modei}\n')
self.setChannel(channel, value)
else:
raise TypeError(f"Expected an initial value of type float or string, got {value}")
[docs]
def channelValue(self, channel: str) -> float:
assert not self._stopped
value, err = self.csound.controlChannel(channel)
if err != 0:
raise KeyError(f"Control channel '{channel}' not found, error: {err}, value: {value}")
return value
def setChannel(self, channel: str, value: float | str | np.ndarray, delay=0.):
assert not self._stopped
if delay == 0.:
if isinstance(value, (int, float)):
self.csound.setControlChannel(channel, value)
elif isinstance(value, str):
self.csound.setStringChannel(channel, value)
else:
self.csound.setAudioChannel(channel, value)
else:
if isinstance(value, (int, float)):
instrnum = self._builtinInstrs['chnset']
self.sched(instrnum, delay, 0, args=[channel, value])
elif isinstance(value, str):
instrnum = self._builtinInstrs['chnsets']
self.sched(instrnum, delay, 0, args=[channel, value])
else:
raise TypeError(f"Expected a number or a str as value, got {value}")
[docs]
def fillTable(self, tabnum: int, data: np.ndarray | Sequence[float]) -> None:
"""
Fill an existing table with data
If data is bigger than the table itself, then only the
data which fits in the table is copied. If, on the
contrary, the data is smaller than the table, then
the rest of the table is left unmodified. For
more control considere using :meth:`~OfflineEngine.getTableData`
Args:
tabnum: the table number of an already existing table
data: the data to put into the table
"""
tablearray = self.tableData(tabnum)
maxidx = min(len(tablearray), len(data))
tablearray[:maxidx] = data[:maxidx]
[docs]
def makeTable(self,
data: np.ndarray | Sequence[float],
sr: int = 0,
tabnum: int = -1,
delay=0.
) -> int:
"""
Create a new table and fill it with data.
Args:
data: the data used to fill the table
tabnum: the table number. If -1, a number is assigned by the engine.
If 0, a number is assigned by csound (this operation will be blocking
if no callback was given)
sr: only needed if filling sample data. If given, it is used to fill the
table metadata in csound, as if this table had been read via gen01
delay: when to allocate the table
Returns:
the index of the new table
"""
if not self.csound:
raise RuntimeError("This OfflineEngine does not have an associated csound process")
if tabnum == -1:
tabnum = self._assignTableNumber()
if not isinstance(data, np.ndarray):
data = np.asarray(data)
numchannels = 1 if len(data.shape) == 1 else data.shape[1]
flatdata = data if numchannels == 1 else data.flatten()
arr = np.zeros((len(flatdata)+4,), dtype=float)
if delay > 0 and tabnum == 0:
raise ValueError("Cannot schedule a table in the future without assigning a "
"table number. Set tabnum=-1 to autoassign a number")
if delay > 0 and sr == 0 and numchannels == 1:
arr[0:4] = [tabnum, delay, len(flatdata), -2]
arr[4:] = flatdata
self.csound.scoreEvent("f", arr)
else:
if delay > 0:
logger.warning("delay will be ignored")
if tabnum == 0:
tabnum = int(self.csound.evalCode(fr'return ftgen(0, 0, {len(flatdata)}, -2, 0)'))
else:
self.csound.compileOrc(fr'i0_ ftgen {tabnum}, 0, {len(flatdata)}, -2, 0')
tabptr = self.csound.table(tabnum)
assert tabptr is not None
tabptr[:] = flatdata
if sr > 0 or numchannels > 0:
self.csound.compileOrc(fr'ftsetparams {tabnum}, {sr}, {numchannels}')
self._tableInfo[tabnum] = TableInfo(sr=sr, size=len(flatdata), nchnls=numchannels)
self._addHistory(_TableDataEvent(data=data, delay=delay, tabnum=tabnum, sr=sr))
return int(tabnum)
def freeTable(self, tableindex: int, delay=0.) -> None:
self.sched(self._builtinInstrs['freetable'], delay, 0., tableindex)
@property
def now(self) -> float:
"""
The current elapsed time
This is the same as :meth:`OfflineEngine.elapsedTime`
"""
return self.elapsedTime()
[docs]
def elapsedTime(self) -> float:
"""
Reports the logical elapsed time of this engine
"""
return self.csound.currentTimeSamples() / self.sr
[docs]
def playSample(self, tabnum: int, delay=0., chan=1, speed=1., gain=1., fade=0.,
starttime=0., lagtime=0.01, dur=-1.
) -> float:
"""
Play a sample already loaded into a table.
Speed and gain can be modified via setp while playing
Args:
tabnum: the table where the sample data was loaded
delay: when to start playback
chan: the first channel to send output to (channels start with 1)
speed: the playback speed
gain: a gain applied to this sample
fade: fadein/fadeout time in seconds
starttime: playback can be started from anywhere within the table
lagtime: a lag value for dynamic pfields (see below)
dur: the duration of playback. Use -1 to play until the end
Returns:
the instance number of the playing instrument.
.. admonition:: Dynamic Fields
:class: important
- **p4**: `gain`
- **p5**: `speed`
Example
~~~~~~~
>>> from csoundengine import *
>>> e = Engine()
>>> import sndfileio
>>> sample, sr = sndfileio.sndread("stereo.wav")
>>> # modify the samples in python
>>> sample *= 0.5
>>> table = e.makeTable(sample, sr=sr, block=True)
>>> eventid = e.playSample(table)
... # gain (p4) and speed (p5) can be modified while playing
>>> e.setp(eventid, 5, 0.5) # Play at half speed
"""
info = self._tableInfo.get(tabnum)
if not info:
raise ValueError(f"Invalid table number {tabnum}, available tables "
f"are {self._tableInfo.keys()}")
if dur < 0:
if info.sr == 0:
sr = self.sr
else:
sr = self.sr
sampledur = info.numFrames / sr
estimatedDuration = sampledur / speed
endtime = delay + estimatedDuration
if self._endtime < endtime:
self._endtime = endtime
args = [gain, speed, tabnum, chan, fade, starttime, lagtime]
eventid = self.sched(self._builtinInstrs['playgen1'], delay=delay, dur=dur,
args=args, unique=True)
assert isinstance(eventid, (int, float))
return eventid
[docs]
def automatep(self,
p1: float | str,
pfield: int | str,
pairs: Sequence[float] | np.ndarray,
mode='linear',
delay=0.,
overtake=False
) -> float:
"""
Automate a pfield of a scheduled event
Args:
p1: the fractional instr number of a running event, or an int number
to modify all running instances of that instr
pfield: the pfield index. For example, if the pfield to modify if p4,
pidx should be 4. Values of 1, 2, and 3 are not allowed.
pairs: the automation data is given as a flat data. of pairs (time, value).
Times are relative to the start of the automation event
mode: one of 'linear', 'cos', 'expon(xx)', 'smooth'. See the csound opcode
`interp1d` for more information
(https://csound-plugins.github.io/csound-plugins/opcodes/interp1d.html)
delay: the time delay to start the automation, relative to the elapsed time
overtake: if True, the first value of pairs is replaced with
the current value in the running instance
Returns:
the p1 associated with the automation synth
Example
~~~~~~~
>>> e = OfflineEngine()
>>> e.compile(r'''
... instr 100
... kfreq = p4
... outch 1, oscili:a(0.1, kfreq)
... endin
... ''')
>>> eventid = e.sched(100, 0, 10, args=(1000,))
>>> e.automatep(eventid, 4, [0, 1000, 3, 200, 5, 200])
.. seealso:: :meth:`~Engine.setp`, :meth:`~Engine.automateTable`
"""
if math.isnan(pairs[1]):
overtake = True
if len(pairs) % 2 != 0:
raise ValueError(f"Pairs needs to be a flat sequence of floats with the form"
f" t0, value0, t1, value1, ..., with an even number of "
f"total elements. Got {len(pairs)} items: {pairs}")
if isinstance(pfield, str):
instrdef = self._parsedInstrs.get(str(int(p1)) if isinstance(p1, (int, float)) else p1.split(".")[0])
if not instrdef:
raise ValueError(f"Could not find definition for instr '{p1}'")
pidx = instrdef.pfieldNameToIndex.get(pfield)
if pidx is None:
raise ValueError(f"Invalid pfield name '{pfield}'. Valid pfields: {instrdef.pfieldNameToIndex.keys()}")
else:
pidx = pfield
self._addHistory(_AutomateEvent(p1, pfield=pfield, pairs=pairs, delay=delay, overtake=overtake))
if self.version >= 7000 or len(pairs) <= 1900:
if isinstance(pairs, np.ndarray):
pairs = pairs.tolist()
p1IsString = int(isinstance(p1, str))
args = [p1, pidx, mode, int(overtake), len(pairs), p1IsString, *pairs]
eventid = self.sched(self._builtinInstrs['automatePargViaPargs'],
delay=delay,
dur=pairs[-2] + self.onecycle,
args=args)
assert isinstance(eventid, (int, float))
return eventid
else:
if isinstance(pairs, np.ndarray):
pairs = list(pairs)
events = [self.automatep(p1=p1, pfield=pfield, pairs=subgroup, mode=mode, delay=delay + subdelay,
overtake=overtake)
for subdelay, subgroup in internal.splitAutomation(pairs, 1900 // 2)]
return events[0]
def renderHistory(self, outfile='') -> RenderJob:
csd = self.generateCsd()
return csd.render(outfile=outfile)
[docs]
def rewind(self, offset=0.) -> None:
"""
Unschedule future events and set the time pointer to the given offset
"""
if offset > 0:
self.csound.setScoreOffsetSeconds(offset)
self.csound.rewindScore()
[docs]
def generateCsd(self) -> Csd:
"""
Generate a :class:`~csoundengine.csd.Csd` from this OfflineEngine
This can be used to export a project file and use the csound
binary to render it. It might also be useful for debugging
Returns:
the generated :class:`~csoundengine.csd.Csd`
"""
from csoundengine.csd import Csd
csd = Csd(sr=self.sr, ksmps=self.ksmps, nchnls=self.nchnls, a4=self.a4,
options=self.options, nchnls_i=0)
csd.setSampleEncoding(self.encoding)
for event in self._history:
event.apply(csd)
return csd
[docs]
def write(self, outfile: str) -> None:
"""
Dump this OfflineEngine as a .csd
This will include all code, instruments and opcodes compiled until now
as well as any events scheduled, etc.
Any data files added are written to a folder ``'<csdfile>.assets'`` besides the
generated .csd file.
Args:
outfile: the path of the generated .csd file
"""
self.generateCsd().write(outfile)
[docs]
def includeFile(self, include: str) -> None:
"""
Adds an #include file to this engine and evaluates the file contents
Args:
include: the path to the file to be included and evaluated
"""
abspath = os.path.abspath(include)
for f in self.includes:
if abspath == f:
return
self.includes.append(abspath)
code = open(abspath).read()
self.compile(code)
[docs]
def playSoundFromDisk(self, path: str, delay=0., dur=-1, chan=0, speed=1., fade=0.01
) -> float:
"""
Play a soundfile from disk via diskin2
Args:
path: the path to the output
delay: time offset to start playing
dur: duration of playback, use -1 to play until the
end of the file
chan: first channel to output to
speed: playback speed (2.0 will sound an octave higher)
fade: fadein/out in seconds
Returns:
the instance number of the scheduled event
.. seealso::
* :meth:`~OfflineEngine.readSoundfile`
* :meth:`~OfflineEngine.playSample`
"""
assert not self._stopped
if dur < 0:
info = tools.sndfileInfo(path)
sampledur = info.duration
estimatedDuration = sampledur / speed
endtime = delay + estimatedDuration
if self._endtime < endtime:
self._endtime = endtime
eventid = self.sched(instr=self._builtinInstrs['playsndfile'],
delay=delay,
dur=dur,
args=[path, chan, speed, fade],
unique=True)
assert isinstance(eventid, float)
return eventid
[docs]
def setp(self, p1: int | float | str, *pairs, delay=0.) -> None:
"""
Modify a pfield of an active note
Multiple pfields can be modified simultaneously. It only makes sense to
modify a pfield if a control-rate (k) variable was assigned to this pfield
(see example)
Args:
p1: the p1 of the instrument to automate. A float or a "<name>.<instanceid>" will set
the value for a specific instance, an int or a unqualified name will set
the value of the given parameter for all instances
*pairs: each pair consists of a pfield index and a value. The index is an int,
matching the pfield number (4=p4, 5=p5, etc), the value can be a number
(string values are not supported)
delay: when to start the automation
.. rubric:: Example
.. code-block:: python
>>> engine = OfflineEngine(...)
>>> engine.compile(r'''
... instr foo
... kamp = p5
... kfreq = p6
... a0 oscili kamp, kfreq
... outch 1, a0
... endin
... ''')
>>> p1 = engine.sched('foo', args=[0.1, 440], unique=True)
>>> p1
'foo.0001'
>>> engine.setp(p1, 5, 0.2, delay=0.5)
"""
numpairs = len(pairs) // 2
if len(pairs) % 2 == 1:
raise ValueError(f"Pairs needs to be even, got {pairs}")
if numpairs > 5:
# split and schedule the parts
# TODO
raise ValueError(f"Only up to 5 pairs supported, got {pairs}")
args = [1 if isinstance(p1, str) else 0, p1, numpairs]
args.extend(pairs)
instr = self._builtinInstrs['pwrite']
self.sched(instr, delay=delay, dur=0, args=args)
for pair in range(numpairs):
self._addHistory(_SetParamEvent(p1=p1, pindex=pairs[pair*2], value=pairs[pair*2+1], delay=delay))
def _getBusIndex(self, bus: int) -> int | None:
bus = int(bus)
if (index := self._busIndexes.get(bus)) is not None:
return index
busindex = int(self.csound.evalCode(f'return dict_get:i(gi__bustoken2num, {bus})'))
if busindex < 0:
return None
self._busIndexes[bus] = busindex
return busindex
[docs]
def assignBus(self,
kind='',
value: float | None = None,
persist=True
) -> int:
"""
Assign one audio/control bus, returns the bus number.
Audio buses are always mono.
Args:
kind: the kind of bus, "audio" or "control". If left unset and value
is not given it defaults to an audio bus. Otherwise, if value
is given a control bus is created. Explicitely asking for an
audio bus and setting an initial value will raise an expection
value: for control buses it is possible to set an initial value for
the bus. If a value is given the bus is created as a control
bus. For audio buses this should be left as None
persist: if True the bus created is kept alive until the user
calls :meth:`~OfflineEngine.releaseBus`. Otherwise, the bus is
reference counted and is released after the last
user releases it.
Returns:
the bus token, can be passed to any instrument expecting a bus
to be used with the built-in opcodes :ref:`busin`, :ref:`busout`, etc.
A bus created here can be used together with the built-in opcodes :ref:`busout`,
:ref:`busin` and :ref:`busmix`. A bus can also be created directly in csound by
calling :ref:`busassign`
A non-persistent bus is reference counted: it is kept alive as long as there
are clients using it and it is released when it is not used anymore. At
creation the bus is "parked", waiting to be used by any client.
As long as no clients use it, the bus stays in this state and is ready to
be used. A persistent bus stays alive until it is freed via :meth:`OfflineEngine.releaseBus`
Order of evaluation is important: **audio buses are cleared at the end of each
performance cycle** and can only be used to communicate from a low
priority to a high priority instrument.
For more information, see :ref:`Bus Opcodes<busopcodes>`
Example
~~~~~~~
Pass audio from one instrument to another. The bus will be released after the events
are finished.
>>> e = OfflineEngine(...)
>>> e.compile(r'''
... instr 100
... ibus = p4
... kfreq = 1000
... asig vco2 0.1, kfreq
... busout(ibus, asig)
... endin
... ''')
>>> e.compile(r'''
... instr 110
... ibus = p4
... asig = busin(ibus)
... ; do something with asig
... asig *= 0.5
... outch 1, asig
... endin
... ''')
>>> bus = e.assignBus("audio")
>>> s1 = e.sched(100, 0, 4, (bus,))
>>> s2 = e.sched(110, 0, 4, (bus,))
>>> e.perform()
Modulate one instr with another, at k-rate. **NB: control buses act like global
variables, the are not cleared at the end of each cycle**.
>>> e = OfflineEngine(...)
>>> e.compile(r'''
... instr 130
... ibus = p4
... ; lfo between -0.5 and 0 at 6 Hz
... kvibr = linlin(lfo:k(1, 6), -0.5, 0, -1, 1)
... busout(ibus, kvibr)
... endin
...
... instr 140
... itranspbus = p4
... kpitch = p5
... ktransp = busin:k(itranspbus, 0)
... kpitch += ktransp
... asig vco2 0.1, mtof(kpitch)
... outch 1, asig
... endin
... ''')
>>> bus = e.assignBus()
>>> s1 = e.sched(140, 0, -1, (bus, 67))
>>> s2 = e.sched(130, 0, -1, (bus,)) # start moulation
>>> e.unsched(s2) # remove modulation
>>> e.writeBus(bus, 0) # reset value
>>> e.unschedAll()
.. seealso:: :meth:`~Engine.writeBus`, :meth:`~Engine.readBus`, :meth:`~Engine.releaseBus`
"""
if not self.hasBusSupport():
raise RuntimeError("This Engine was created without bus support")
if kind:
if value is not None and kind == 'audio':
raise ValueError("Audio buses cannot be given an initial value")
else:
kind = 'audio' if value is None else 'control'
bustoken = int(self._busTokenCountPtr[0])
assert isinstance(bustoken, int) and (kind == 'audio' or kind == 'control')
self._busTokenToKind[bustoken] = kind
ikind = BUSKIND_AUDIO if kind == 'audio' else BUSKIND_CONTROL
ivalue = value if value is not None else 0.
self._busTokenCountPtr[0] = bustoken + 1
synctoken = 0
pfields = [synctoken, bustoken, ikind, int(persist), ivalue]
self.sched(self._builtinInstrs['busassign'], delay=self.elapsedTime(), dur=0, args=pfields)
self._usesBuses = True
return bustoken
[docs]
def releaseBus(self, bus: int, delay: float = 0.) -> None:
"""
Release a persistent bus
Args:
bus: the bus to release, as returned by :meth:`OfflineEngine.assignBus`
delay: when to release the bus (realtive time).
.. seealso:: :meth:`~OfflineEngine.assignBus`
"""
# bus is the bustoken
if not self.hasBusSupport():
raise RuntimeError("This OfflineEngine was created without bus support")
self.sched(self._builtinInstrs['busrelease'], delay, 0, int(bus))
[docs]
def writeBus(self, bus: int, value: float, delay=0.) -> None:
"""
Set the value of a control bus
Args:
bus: the bus token, as returned by ``.assignBus``
value: the new value of the bus
delay: when to set the value, relative to the elapsed time. A value of
0 indicates direct writing to the bus, bypassing the scheduler.
"""
if not self.hasBusSupport():
raise RuntimeError("This engine does not have bus support")
bus = int(bus)
kind = self._busTokenToKind.get(bus)
if not kind:
logger.warning(f"Bus token {bus} not known")
elif kind != 'control':
raise ValueError(f"Only control buses can be written to, got {kind}")
if (delay == 0) and (busindex := self._getBusIndex(bus)) is not None:
assert self._kbusTable is not None
self._kbusTable[busindex] = value
else:
self.sched(self._builtinInstrs['busoutk'], delay=delay, dur=self.onecycle*2, args=[bus, value])
[docs]
def openOutfile(self, app='') -> None:
"""
Open the generated soundfile in an external app
"""
if not self.renderjob:
raise RuntimeError("No render job found")
if not os.path.exists(self.renderjob.outfile):
raise FileNotFoundError(f"The rendered outfile '{self.renderjob.outfile}' "
f"does not exists")
self.renderjob.openOutfile(app=app)