from __future__ import annotations
import copy
from dataclasses import dataclass
from functools import cache
from typing import TYPE_CHECKING, Sequence
import emlib.numpytools as nptools
import numpy as np
from ._common import EMPTYSET
from .baseschedevent import BaseSchedEvent
from .config import logger
if TYPE_CHECKING:
from .abstractrenderer import AbstractRenderer
from . import instr as _instr
[docs]
@dataclass
class SchedAutomation:
param: str
"""The parameter to automate (pfield or control)"""
pairs: Sequence[float] | np.ndarray
"""A flat list of automation data (t0, v1, t1, v1, ...)"""
interpolation: str = 'linear'
"""Interpolation kind, one of linear, cos, ..."""
delay: float | None = None
"""A delay of None indicates that the delay is the same as the event
to which this automation belongs"""
overtake: bool = False
"""
If True, use the current value of the parameter as initial value,
diregarding the value in the automation line
This is also done if the first value in the automation line is NAN
"""
[docs]
class SchedEvent(BaseSchedEvent):
"""
Represents a scheduled event.
It is used to control / automate / keep track of scheduled events.
Used as is to represent offline events, it is the base class
for realtime events (:class:`~csoundengine.synth.Synth`)
Args:
p1: the p1 of the scheduled event
start: start time
dur: duration
args: pfields starting with p4
controls: the dynamic controls used to schedule this event
uniqueId: an integer unique to this event
parent: the renderer which scheduled this event
instrname: the instr name of this event (if applies)
priority: the priority at which this event was scheduled (if applies)
controlsSlot: the slot/token assigned for dynamic controls
username: a name given by the user to identify this event. Normally not set
"""
__slots__ = ('uniqueId', 'parent', 'instrname', 'priority',
'args', 'p1', 'controlsSlot', 'automations', 'controls', 'username', '_instr')
def __init__(self,
instrname: str = '',
start: float = 0.,
dur: float = -1,
args: Sequence[float|str] = (),
p1: float | str = 0,
uniqueId: int = 0,
parent: AbstractRenderer | None = None,
priority: int = 0,
controls: dict[str, float] | None = None,
controlsSlot: int = -1,
username=''):
if parent and instrname:
assert instrname in parent.registeredInstrs()
super().__init__(start=start, dur=dur)
self.p1 = p1
"""p1 of this event"""
self.instrname: str = instrname
"""The instrument template this event was created from, if applicable"""
self.args = args
"""Args used for this event (p4, p5, ...)"""
self.priority: int = priority
"""The priority of this event, if applicable"""
self.uniqueId: int = uniqueId
"""A unique id of this event, as integer"""
self.parent: AbstractRenderer | None = parent
"""The Renderer to which this event belongs (can be None)"""
self.controls: dict[str, float] | None = controls
"""The dynamic controls used to schedule this event"""
self.controlsSlot: int = controlsSlot
"""The slot/token assigned for dynamic controls"""
self.automations: list[SchedAutomation] | None = None
self.username = username
"""A user given name to identify this event, normally not set"""
self._instr: _instr.Instr | None = None
def __hash__(self) -> int:
return hash(('SchedEvent', self.uniqueId))
# return hash((self.p1, self.uniqueId, self.instrname, self.priority, hash(tuple(self.args))))
def __repr__(self):
parts = [f"p1={self.p1}, start={self.start}, dur={self.dur}, uniqueId={self.uniqueId}"]
if self.args:
parts.append(f'args={self.args}')
if self.instrname:
parts.append(f'instrname={self.instrname}')
if self.priority:
parts.append(f'priority={self.priority}')
if self.username:
parts.append(f'username={self.username}')
partsstr = ', '.join(parts)
return f"{type(self).__name__}({partsstr})"
[docs]
def playStatus(self) -> str:
"""
Returns the playing status of this event (offline, playing, stopped or future)
For offline events this will always return 'offline'
Returns:
'playing' if currently playing, 'stopped' if this event has already stopped
or 'future' if it has not started. For offline events always returns 'offline'
"""
return 'offline'
[docs]
def clone(self, **kws) -> SchedEvent:
event = copy.copy(self)
for k, v in kws.items():
setattr(event, k, v)
return event
def _setTable(self, param: str, value: float, delay=0.) -> None:
if not self.parent:
raise RuntimeError("This event has no parent")
if not self.start <= delay <= self.end:
logger.error(f"This operation's time offset ({delay}) is not within "
f"the time range of the event ({self.start}-{self.end}")
if not self.controlsSlot:
raise RuntimeError("This event has no associated controls slot")
self.parent._setNamedControl(event=self, param=param, value=value, delay=delay)
def _setPfield(self, param: str, value: float, delay=0.) -> None:
"""
Modify a parg of this synth (offline).
Multiple pfields can be modified simultaneously. It only makes sense
to modify a parg if a k-rate variable was assigned to this parg
(see Renderer.setp for an example). A parg can be referred to via an integer,
corresponding to the p index (5 would refer to p5), or to the name
of the assigned k-rate variable as a string (for example, if there
is a line "kfreq = p6", both 6 and "kfreq" refer to the same parg).
Example
~~~~~~~
>>> from csoundengine import *
>>> r = OfflineSession()
>>> Instr('sine', r'''
... |kamp=0.1, kfreq=1000|
... outch 1, oscili:ar(kamp, freq)
... ''')
>>> event = r.sched('sine', 0, dur=4, args=[0.1, 440])
>>> event._setPfield(2, kfreq=880)
>>> event._setPfield(3, kfreq=660, kamp=0.5)
"""
if self.parent is None:
raise RuntimeError("This event is not assigned to a Renderer")
self.parent._setPfield(self, delay=delay, param=param, value=value)
[docs]
def aliases(self) -> dict[str, str]:
if self.parent is None:
raise RuntimeError("This event is not assigned to a Renderer")
if not self.instrname:
logger.error(f"This SchedEvent does not have an instrument assigned ({self=})")
return {}
instr = self.parent.getInstr(self.instrname)
if instr is None:
raise RuntimeError(f"The instr '{self.instrname}' does not exist for this "
f"event's renverer (event={self}, renderer={self.parent}")
return instr.aliases
@property
def instr(self) -> _instr.Instr:
"""
The Instr corresponding to this Event, if applicable
Raises ValueError if this event cannot access to the Instr
instance (if it has no parent or its instrument name is invalid)
"""
if self._instr:
return self._instr
if not self.parent:
raise ValueError(f"This event {self} has no parent")
instr = self.parent.getInstr(self.instrname)
if instr is None:
raise ValueError(f"Instrument {self.instrname} not known")
self._instr = instr
return instr
[docs]
def paramNames(self, aliases=False) -> frozenset[str]:
return self.instr.paramNames(aliases=aliases)
[docs]
def dynamicParamNames(self, aliases=False) -> frozenset[str]:
"""
The set of all dynamic parameters accepted by this Synth
Args:
aliases: if True, include aliases
Returns:
a set of the dynamic (modifiable) parameters accepted by this synth
"""
instr = self.instr
return instr.dynamicParamNames(aliases=aliases) if instr else EMPTYSET
[docs]
def automate(self,
param: str,
pairs: Sequence[float] | np.ndarray | tuple[np.ndarray, np.ndarray],
mode="linear",
delay: float | None = None,
overtake=False,
) -> float:
param = self.unaliasParam(param, param)
if self.parent is None:
if param not in self.instr.aliases and param not in self.instr.paramNames():
raise KeyError(f"Unknown parameter '{param}' for {self}. Possible parameters: {self.instr.paramNames()}")
if isinstance(pairs, tuple) and len(pairs) == 2 and isinstance(pairs[0], np.ndarray):
ts, values = pairs
assert isinstance(values, np.ndarray) and isinstance(ts, np.ndarray)
pairs = nptools.interlace(ts, values)
automation = SchedAutomation(param=param, pairs=pairs, interpolation=mode, delay=delay)
if self.automations is None:
self.automations = [automation]
else:
self.automations.append(automation)
else:
self.parent.automate(self, param=param, pairs=pairs, mode=mode, delay=delay)
return 0
[docs]
def stop(self, delay=0.) -> None:
"""
Stop this event
When using this in offline mode, delay is an absolute time
Args:
delay: when to stop
"""
if self.parent is None:
if self.start < delay < self.end:
self.dur = delay - self.start
else:
logger.error(f"Stop time {delay} outside the lifetime of this event "
f"({self.start} - {self.end})")
else:
self.parent.unsched(self, delay=delay)
[docs]
def controlNames(self, aliases=False) -> frozenset[str]:
return self.instr.controlNames(aliases=aliases)
[docs]
def isPfield(self, param: str) -> bool:
pfields = self.instr.pfieldNames()
return param in pfields and (param2 := self.aliases().get(param)) is not None and param2 in pfiels
[docs]
def pfieldNames(self, aliases=False) -> frozenset[str]:
return self.instr.pfieldNames(aliases=aliases)
[docs]
def paramValue(self, param: str) -> float | str | None:
param = self.unaliasParam(param, param)
instr = self.instr
if param in self.pfieldNames(aliases=False):
pindex = instr.pfieldIndex(param)
argindex = pindex - 4
if self.args and 0 >= argindex < len(self.args):
return self.args[argindex]
return instr.pfieldDefaultValue(pindex)
elif param in self.controlNames(aliases=False):
if self.controls and param in self.controls:
return self.controls[param]
return instr.controls[param]
else:
raise KeyError(f"Parameter '{param}' unknown. Possible parameters: {instr.paramNames(aliases=False)},"
f" (aliases={instr.aliases})")
[docs]
class SchedEventGroup(BaseSchedEvent):
"""
Represents a group of scheduled events
These events can be controlled together, similar to a SynthGroup
"""
def __init__(self, events: list[SchedEvent]):
if not events:
raise ValueError("No events given")
start = min(ev.start for ev in events)
end = max(ev.end for ev in events)
dur = end - start
super().__init__(start=start, dur=dur)
self.events = events
def __iter__(self):
return iter(self.events)
def __getitem__(self, item):
return self.events.__getitem__(item)
def __len__(self):
return len(self.events)
[docs]
def stop(self, delay=0.) -> None:
for ev in self:
ev.stop(delay=delay)
def _setPfield(self, param: str, value: float, delay=0.) -> None:
count = 0
for ev in self:
if param in ev.pfieldNames(aliases=True):
ev._setPfield(delay=delay, param=param, value=value)
count += 1
if count == 0:
raise KeyError(f"Parameter '{param}' unknown. Possible paramters: {self.dynamicParamNames(aliased=True)}")
def _setTable(self, param: str, value: float, delay=0.) -> None:
count = 0
for ev in self:
if param in ev.controlNames(aliases=True):
ev._setTable(param=param, value=value, delay=delay)
count += 1
if count == 0:
raise KeyError(f"Parameter '{param}' unknown. "
f"Possible parameters: {self.dynamicParamNames()}, "
f"aliases: {self.aliases()}")
[docs]
@cache
def paramNames(self, aliases=True) -> frozenset[str]:
allparams = set()
for ev in self:
allparams.update(ev.paramNames(aliases=aliases))
return frozenset(allparams)
[docs]
def paramValue(self, param: str) -> float | str | None:
"""
Returns the parameter value for the given parameter
Within a group the first synth which has the given parameter
will be used to determine the parameter value
"""
if param not in self.paramNames():
raise KeyError(f"Parameter '{param}' not known. Possible parameters: "
f"{self.paramNames()}")
for ev in self:
value = ev.paramValue(param)
if value is not None:
return value
return None
[docs]
@cache
def dynamicParamNames(self, aliases=False) -> frozenset[str]:
params = set()
for ev in self:
params.update(ev.dynamicParamNames(aliases=aliases))
return frozenset(params)
def __hash__(self):
return hash(tuple(hash(ev) for ev in self))
[docs]
@cache
def controlNames(self, aliases=True) -> frozenset[str]:
"""
Returns a set of available table named parameters for this group
"""
allparams = set()
for event in self:
params = event.controlNames(aliases=aliases)
if params:
allparams.update(params)
return frozenset(allparams)
[docs]
def set(self, param='', value: float = 0., delay=0., **kws) -> None:
if kws:
for k, v in kws.items():
self.set(param=k, value=v, delay=delay)
if param:
count = 0
allparams = set()
for ev in self:
evparams = ev.dynamicParamNames(aliases=True)
allparams.update(evparams)
if param in evparams:
count += 1
ev.set(param=param, value=value, delay=delay)
if count == 0:
raise KeyError(f"Param '{param}' not known by any events in this group. "
f"Possible parameters: {allparams}")
[docs]
def aliases(self) -> dict[str, str]:
out = {}
for ev in self.events:
out.update(ev.aliases())
return out
[docs]
def automate(self,
param: str,
pairs: Sequence[float] | np.ndarray | tuple[np.ndarray, np.ndarray],
mode="linear",
delay: float | None = None,
overtake=False,
) -> float:
count = 0
for ev in self:
if param in ev.dynamicParamNames(aliases=True):
count += 1
ev.automate(param=param, pairs=pairs, mode=mode,
delay=delay, overtake=overtake)
if count == 0:
raise KeyError(f"Param '{param}' not known by any events in this group. "
f"Possible parameters: {self.dynamicParamNames()}, aliases: {self.aliases()}")
return 0.