Skip to content
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ dependencies = [
]

[dependency-groups]
dev = [
tests = [
"diff-cover>=10.2.0",
"pytest>=9.0.2",
"pytest-cases>=3.9.1",
"pytest-coverage>=0.0",
"pytest-env>=1.2.0",
"pyyaml>=6.0.3",
"ruff>=0.14.8",
]

lint = ["ruff>=0.14.8"]

dev = [
{include-group = "lint"},
{include-group = "tests"},
]

[build-system]
Expand Down
9 changes: 5 additions & 4 deletions scripts/fly-by.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
mics = ac.MicGeom()
grid = ac.RectGrid(x_min=-1, x_max=1, y_min=-1, y_max=1, z=1)

traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
traj = synth.SplineTrajectory(times=[0.0, T], locations=[[-50.0, 0.0, 1.0], [50.0, 0.0, 1.0]])
gen1 = ac.SineGenerator(freq=10, num_samples=ns, sample_freq=sf)
gen2 = ac.SineGenerator(freq=1, num_samples=ns, sample_freq=sf, amplitude=0.5, phase=-np.pi / 2)

mps1 = ac.MovingPointSource(signal=gen1, trajectory=traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=traj, mics=mics)
ac_traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
mps1 = ac.MovingPointSource(signal=gen1, trajectory=ac_traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=ac_traj, mics=mics)

mix = ac.SourceMixer(sources=[mps1, mps2])

Expand All @@ -43,7 +44,7 @@
mic = synth.Microphone()

scene = synth.Scene()
scene.environment = ac.Environment()
scene.environment = synth.Environment()
scene.microphones = [mic]
scene.sources = [source1, source2]

Expand Down
2 changes: 2 additions & 0 deletions src/scene_synthesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
from .microphones import Microphone as Microphone
from .scene import Scene as Scene
from .sources import Source as Source
from .trajectory import SplineTrajectory as SplineTrajectory
from .trajectory import Trajectory as Trajectory
Comment thread
jtodev marked this conversation as resolved.
2 changes: 1 addition & 1 deletion src/scene_synthesis/microphones.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Microphone(HasStrictTraits):
#: self.orientation[0] = right_vec
#: self.orientation[1] = up_vec
#: self.orientation[2] = forward_vec
orientation = CArray(shape=(3, 3), desc='source orientation matrix', value=np.eye(3))
orientation = CArray(shape=(3, 3), value=np.eye(3))

#: The directivity of the microphone
directivity = Instance(Directivity)
2 changes: 1 addition & 1 deletion src/scene_synthesis/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _source_state(self, source, sending_time):
"""Return source position and velocity at one sending time."""
if source.trajectory is not None:
source_loc = np.array(source.trajectory.location(sending_time)).T
source_vel = np.array(source.trajectory.location(sending_time, der=1)).T
source_vel = np.array(source.trajectory.velocity(sending_time)).T
else:
source_loc = np.array(source.location)
source_vel = np.array([0, 0, 0])
Expand Down
16 changes: 10 additions & 6 deletions src/scene_synthesis/sources.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
"""Acoustic source definition and properties."""

import numpy as np
from acoular import SignalGenerator, Trajectory
from acoular import SignalGenerator
from traits.api import CArray, HasStrictTraits, Instance

from scene_synthesis.directivities import Directivity
from scene_synthesis.trajectory import Trajectory

Comment thread
jtodev marked this conversation as resolved.

class Source(HasStrictTraits):
"""Class representing an acoustic source.

Examples
--------
Instantiate a simple source with a sine signal and a default trajectory:
Instantiate a simple source with a sine signal and a fixed trajectory:

>>> from acoular import SineGenerator, Trajectory
>>> from scene_synthesis.sources import Source
>>> from acoular import SineGenerator
>>> from scene_synthesis import Source, SplineTrajectory
>>> signal = SineGenerator(freq=1000, sample_freq=44100, num_samples=44100)
>>> trajectory = Trajectory()
>>> trajectory = SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> source = Source(signal=signal, trajectory=trajectory)
Comment thread
jtodev marked this conversation as resolved.
"""

Expand All @@ -38,7 +42,7 @@ class Source(HasStrictTraits):
#: self.orientation[0] = right_vec
#: self.orientation[1] = up_vec
#: self.orientation[2] = forward_vec
orientation = CArray(shape=(3, 3), desc='source orientation matrix', value=np.eye(3))
orientation = CArray(shape=(3, 3), value=np.eye(3))

#: The directivity of the source.
directivity = Instance(Directivity)
175 changes: 175 additions & 0 deletions src/scene_synthesis/trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Trajectory definitions for scene synthesis."""

import numpy as np
from scipy.interpolate import make_interp_spline
from traits.api import Any, Array, Callable, HasStrictTraits, Property


class Trajectory(HasStrictTraits):
"""Trajectory with independently supplied location and velocity functions.

Parameters
----------
Comment thread
jtodev marked this conversation as resolved.
location : callable
Function mapping time ``t`` to a 3D position.
velocity : callable
Function mapping time ``t`` to a 3D velocity. This does not need to be
the time derivative of ``location``.

Notes
-----
Trajectory callables must return arrays describing 3D coordinates. Scalar
times must produce shape ``(3,)`` and array-valued times must produce shape
``(3, N)``.

Examples
--------
>>> import scene_synthesis as ss
>>> location = lambda t: np.array([t, 0.0 * t, 1.0 + 0.0 * t], dtype=float)
>>> velocity = lambda t: np.array([1.0 + 0.0 * t, 0.0 * t, 0.0 * t], dtype=float)
>>> traj = ss.Trajectory(location=location, velocity=velocity)
>>> traj.location(0.5)
array([0.5, 0. , 1. ])
>>> traj.velocity(0.5)
array([1., 0., 0.])
"""

#: Time-dependent position function.
location = Property()

#: Backing trait for :attr:`location`.
_location = Callable

#: Time-dependent velocity function.
velocity = Property()

#: Backing trait for :attr:`velocity`.
_velocity = Callable

@staticmethod
def _validate_output_shape(value):
"""Validate that trajectory outputs describe 3D coordinates."""
array = np.asarray(value, dtype=float)
if array.shape == (3,):
return
if array.ndim == 2 and array.shape[0] == 3:
return

msg = f'Trajectory output must have shape (3,) or (3, N), got {array.shape}.'
raise ValueError(msg)

def _get_location(self):
return self._location

def _set_location(self, value):
self._validate_output_shape(value(0.0))
self._location = value

def _get_velocity(self):
return self._velocity

def _set_velocity(self, value):
self._validate_output_shape(value(0.0))
self._velocity = value


class SplineTrajectory(Trajectory):
"""Spline-based trajectory built from sampled times and locations.

Parameters
----------
times : array-like of float
Sample times.
locations : array-like of float
Sample positions with shape ``(N, 3)`` matching ``times``.

Notes
-----
The location spline order is chosen automatically up to cubic, which keeps
the interpolated trajectory at least :math:`C^1` whenever the available
number of samples permits it.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> trajectory.location(0.5)
array([0.5, 0. , 0. ])
>>> trajectory.velocity(0.5)
array([1., 0., 0.])
"""

#: Sample times.
times = Array(dtype=float)

#: Sample locations with shape ``(N, 3)``.
locations = Array(dtype=float)

#: Internal spline objects.
_location_spline = Any
_velocity_spline = Any

def __init__(self, times, locations):
Comment thread
jtodev marked this conversation as resolved.
Outdated
self.times = np.asarray(times, dtype=float)
self.locations = np.asarray(locations, dtype=float)
self._validate_inputs()
self._prepare_samples()

order = min(3, self.times.size - 1)
self._location_spline = make_interp_spline(self.times, self.locations, k=order, axis=0)
self._velocity_spline = self._location_spline.derivative()

super().__init__(location=self._location_callable, velocity=self._velocity_callable)

@staticmethod
def _evaluate_spline(spline, t):
"""Return spline values using the trajectory output convention."""
values = np.asarray(spline(t), dtype=float)
if values.shape == (3,):
return values
return values.T

def _location_callable(self, t):
return self._evaluate_spline(self._location_spline, t)

def _velocity_callable(self, t):
return self._evaluate_spline(self._velocity_spline, t)

def _validate_inputs(self):
"""Validate spline trajectory inputs."""
if self.times.ndim != 1:
msg = f'times must be a one-dimensional array, got shape {self.times.shape}.'
raise ValueError(msg)
if self.times.size < 2:
msg = 'times must contain at least two samples.'
raise ValueError(msg)
if self.locations.shape != (self.times.size, 3):
msg = f'locations must have shape ({self.times.size}, 3), got {self.locations.shape}.'
raise ValueError(msg)

def _prepare_samples(self):
"""Sort sample times and merge exact duplicate times with identical locations."""
order = np.argsort(self.times)
sorted_times = self.times[order]
sorted_locations = self.locations[order]

unique_times = [sorted_times[0]]
unique_locations = [sorted_locations[0]]
for time, location in zip(sorted_times[1:], sorted_locations[1:], strict=True):
if time == unique_times[-1]:
if not np.allclose(location, unique_locations[-1]):
Comment thread
jtodev marked this conversation as resolved.
Outdated
msg = 'duplicate times must map to identical locations.'
raise ValueError(msg)
Comment thread
jtodev marked this conversation as resolved.
continue
unique_times.append(time)
unique_locations.append(location)

self.times = np.asarray(unique_times, dtype=float)
self.locations = np.asarray(unique_locations, dtype=float)

if self.times.size < 2:
msg = 'times must contain at least two distinct samples.'
raise ValueError(msg)
37 changes: 25 additions & 12 deletions tests/cases_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Test cases for source trajectories."""

import acoular as ac
import numpy as np
import scene_synthesis as ss


class Trajectories:
"""Test cases for all :class:`acoular.trajectories.Trajectory`-derived classes.
"""Test cases for :class:`scene_synthesis.trajectory.SplineTrajectory` objects.

Also goes over ``None`` (no trajectory).

New trajectories should be added here.
"""
Expand All @@ -16,26 +18,37 @@ def case_none(self):

def case_static(self):
"""Static trajectory test case."""
points = {0.0: (1.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_pass(self):
"""Linear pass trajectory (fly by) test case."""
points = {0.0: (-1.0, -1.0, 1.0), 1.0: (1.0, 1.0, 1.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[-1.0, -1.0, 1.0], [1.0, 1.0, 1.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_approach(self):
"""Linear approach trajectory (fly at) test case."""
points = {0.0: (5.0, 0.0, 0.0), 1.0: (0.5, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[5.0, 0.0, 0.0], [0.5, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_circular(self):
"""Circular trajectory (fly around) test case."""
n = 3600
points = {i / n: (1.0 * np.cos(2 * np.pi * i / n), 1.0 * np.sin(2 * np.pi * i / n), 0.0) for i in range(n + 1)}
return [ac.Trajectory(points=points)]
times = np.linspace(0.0, 1.0, n + 1)
locations = np.column_stack(
[
np.cos(2 * np.pi * times),
np.sin(2 * np.pi * times),
np.zeros_like(times),
]
)
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_static_array(self):
"""Static array trajectory test case."""
points_array = [{0.0: (x, y, 1.0), 1.0: (x, y, 1.0)} for x, y in [(1.0, 1.0), (1.0, 0.0), (0.0, 0.0)]]
return [ac.Trajectory(points=points) for points in points_array]
times = [0.0, 1.0]
static_points = [(1.0, 1.0, 1.0), (1.0, 0.0, 1.0), (0.0, 0.0, 1.0)]
return [ss.SplineTrajectory(times=times, locations=[point, point]) for point in static_points]
Loading
Loading