Skip to content
Closed
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
name: python-package-distributions
path: dist/
- name: Sign the dists with Sigstore
uses: sigstore/[email protected].0
uses: sigstore/[email protected].1
with:
inputs: >-
./dist/*.tar.gz
Expand Down
6 changes: 6 additions & 0 deletions doc/plan_stubs/redefining.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def my_plan():
yield from redefine_motor(motor, 0.)
```

By default, the {py:obj}`redefine_motor <ibex_bluesky_core.plan_stubs.redefine_motor>`
plan stub sleeps for 1 second after redefining the motor. This avoids race conditions, where a motor is moved too soon
after being redefined to a new position, and the redefined position has not yet been read back from the controller.
This behaviour can be controlled with the `sleep` keyword argument to
{py:obj}`redefine_motor <ibex_bluesky_core.plan_stubs.redefine_motor>`.

## `redefine_refl_parameter`

The {py:obj}`ibex_bluesky_core.plan_stubs.redefine_refl_parameter` plan stub can be used to redefine the current
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ classifiers = [

dependencies = [
"bluesky", # Bluesky framework
"ophyd-async[ca] == 0.10.1", # Device abstraction
"ophyd-async[ca] == 0.11", # Device abstraction
"matplotlib", # Plotting
"lmfit", # Fitting
"scipy", # Definitions of erf/erfc functions
Expand Down
225 changes: 225 additions & 0 deletions src/ibex_bluesky_core/larmor_jack_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Spin-echo techniques for use on LARMOR."""

from collections.abc import Generator
from dataclasses import dataclass, field

import lmfit
import scipp as sc
from bluesky import plan_stubs as bps
from bluesky.callbacks import CallbackBase, LiveTable
from bluesky.plans import scan
from bluesky.preprocessors import subs_decorator
from bluesky.utils import Msg
from lmfit import Parameter
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from ophyd_async.plan_stubs import ensure_connected

from ibex_bluesky_core.callbacks import (
ChainedLiveFit,
HumanReadableFileCallback,
LiveFitLogger,
LivePlot,
)
from ibex_bluesky_core.devices import isis_epics_signal_rw
from ibex_bluesky_core.devices.block import BlockWriteConfig, block_rw
from ibex_bluesky_core.devices.dae import (
DaeSettingsData,
DaeTCBSettingsData,
TimeRegime,
TimeRegimeRow,
)
from ibex_bluesky_core.devices.polarisingdae import polarising_dae
from ibex_bluesky_core.fitting import DampedOsc, FitMethod
from ibex_bluesky_core.plan_stubs import call_qt_aware

DETECTORS = {
"alanis": 12,
"scruffy": 13,
}


def _get_detector_i(detector: int | str) -> int:
"""Get detector index from name."""
if isinstance(detector, int):
return detector

elif detector in DETECTORS:
return DETECTORS[detector]

else:
raise ValueError("Detector not found.")


@dataclass
class EchoScanConfig:
"""Configuration for echo scan."""

axis: str
start: float
stop: float
num_points: int = 21
wavelength_bounds: list[list[float]] = field(
default_factory=lambda: [[222, 666], [222, 370], [370, 518], [518, 666]]
# change these to angstrom instead of spectrum indicies
)
frames: int = 200
flight_path_length_m: float = 10
detector: int | str = "alanis"
monitor: int = 2
flipper: str = "IN:LARMOR:SPINFLIPPER_01:FLIPSTATE"
flipper_states: list[float] = field(default_factory=lambda: [1.0, 0.0])
dae_settings: DaeSettingsData | None = None
tcb_settings: DaeTCBSettingsData | None = None


@dataclass
class AutoTuneConfig:
"""Configuration for auto tuning."""

confidence: float = 0.5
model: FitMethod | None = None
param: str = "center"


def _callbacks_init(
config: EchoScanConfig,
model: FitMethod,
polarisation_names: list[str],
polarisation_stddev_names: list[str],
axis_dev_name: str,
axes: list[Axes],
) -> tuple[ChainedLiveFit, list[CallbackBase]]:
"""Initialise callbacks for echoscan_axis_ib."""
spinecho_cb = ChainedLiveFit(
method=model,
y=polarisation_names,
yerr=polarisation_stddev_names,
x=axis_dev_name,
ax=list(axes),
)

plots_cb = [
LivePlot(y=polarisation_names[i], x=axis_dev_name, marker="x", linestyle="none", ax=axes[i])
for i in range(len(config.wavelength_bounds))
]

measured_fields = [axis_dev_name, *polarisation_names, *polarisation_stddev_names]

table_cb = LiveTable(measured_fields)
hrfile_cb = HumanReadableFileCallback(measured_fields, output_dir=None)

lflogs_cb = [
LiveFitLogger(
livefit=spinecho_cb.live_fits[i],
x=axis_dev_name,
y=polarisation_names[i],
yerr=polarisation_stddev_names[i],
output_dir=None,
postfix=f"_band{i}",
)
for i in range(len(config.wavelength_bounds))
]

return spinecho_cb, [table_cb, hrfile_cb, *plots_cb, *lflogs_cb]


def echoscan_axis_ib(
config: EchoScanConfig, model: FitMethod, param: str
) -> Generator[Msg, None, lmfit.Parameter]:
"""Technique for doing a general echo scan."""
flipper_dev = isis_epics_signal_rw(datatype=float, read_pv=config.flipper, name="flipper")
# Would change to a blockrw but no flipper block on LARMOR
axis_dev = block_rw(float, config.axis, write_config=BlockWriteConfig(settle_time_s=0.5))

intervals = [
sc.array(dims=["tof"], values=bound, unit=sc.units.angstrom, dtype="float64")
for bound in config.wavelength_bounds
]

total_flight_path_length = sc.scalar(value=config.flight_path_length_m, unit=sc.units.m)

dae = polarising_dae(
det_pixels=[_get_detector_i(config.detector)],
frames=config.frames,
flipper=flipper_dev,
flipper_states=config.flipper_states,
intervals=intervals,
total_flight_path_length=total_flight_path_length,
monitor=config.monitor,
)

yield from call_qt_aware(plt.close, "all")

_, axes = yield from call_qt_aware(plt.subplots, len(config.wavelength_bounds))

polarisation_names = dae.reducer.polarisation_names
polarisation_stddev_names = dae.reducer.polarisation_stddev_names

spinecho_cb, callbacks = _callbacks_init(
config, model, polarisation_names, polarisation_stddev_names, axis_dev.name, axes
)

@subs_decorator([spinecho_cb, *callbacks])
def _inner() -> Generator[Msg, None, None]:
yield from ensure_connected(flipper_dev, dae, axis_dev)
yield from scan([dae], axis_dev, config.start, config.stop, num=config.num_points)

# waiting for daniel's implementation to be able to do this
# Here we want to make a backup of dae and tcb settings
# then set new dae/tcb settings from config
# e.g with_dae_tables(wiring, spectra, detector) & with_time_channels(tcbs)
yield from _inner()
# here we want to restore dae/tcb settings

# Returns the fit parameter for the last livefit in the chain
return spinecho_cb.live_fits[-1].result.params[param]


def auto_tune_ib(
scan_config: EchoScanConfig, tune_config: AutoTuneConfig | None = None
) -> Generator[Msg, None, lmfit.Parameter]:
"""Perform a more specialised version of the echoscan_axis_ib function."""
if tune_config is None:
tune_config = AutoTuneConfig()

if tune_config.model is None:
tune_config.model = DampedOsc.fit()

if scan_config.dae_settings is None: # Only change DAE/TCB settings if not already set
if scan_config.detector == DETECTORS["alanis"]:
scan_config.dae_settings = DaeSettingsData(
detector_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\Alanis_Detector.dat",
spectra_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\spectra_scanning_Alanis.dat",
wiring_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\Alanis_Wiring_dae3.dat",
)
elif scan_config.detector == DETECTORS["scruffy"]:
scan_config.dae_settings = DaeSettingsData(
detector_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\scruffy_Detector.dat",
spectra_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\spectra_scanning_scruffy.dat",
wiring_filepath=r"C:\Instrument\Settings\config\NDXLARMOR\configurations\tables\scruffy_Wiring_dae3.dat",
)

if scan_config.tcb_settings is None:
tr1 = TimeRegime({1: TimeRegimeRow(from_=5.0, to=100000.0, steps=100.0)})
scan_config.tcb_settings = DaeTCBSettingsData(tcb_tables={1: tr1})

# Do the scan and return the parameter we're interested in
optimal_param: Parameter = yield from echoscan_axis_ib(
scan_config, tune_config.model, tune_config.param
)

# Add null check for stderr
if optimal_param.stderr is None:
raise ValueError(f"Fit did not produce uncertainty estimate for {tune_config.param}")

if tune_config.confidence < optimal_param.stderr:
raise ValueError(
f"Error {tune_config.confidence} is less than uncertainty in "
f"optimal param {optimal_param.stderr}"
)

# Move the axis to the optimal value
bps.mv(scan_config.axis, optimal_param.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bps.mv takes a device rather than a string - you could just make the axis_dev at this level and then pass it into echoscan_axis_ib() though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I know this won't be merged but just thinking before we copy and paste it onto LARMOR)


return optimal_param
7 changes: 6 additions & 1 deletion src/ibex_bluesky_core/plan_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def call_qt_aware(
return cast(T, (yield Msg(CALL_QT_AWARE_MSG_KEY, func, *args, **kwargs)))


def redefine_motor(motor: Motor, position: float) -> Generator[Msg, None, None]:
def redefine_motor(
motor: Motor, position: float, *, sleep: float = 1
) -> Generator[Msg, None, None]:
"""Redefines the current positions of a motor.

Note:
Expand All @@ -97,12 +99,15 @@ def redefine_motor(motor: Motor, position: float) -> Generator[Msg, None, None]:
Args:
motor: The motor to set a position on.
position: The position to set.
sleep: An amount of time to sleep, in seconds, after redefining. Defaults to 1 second.
This avoids race conditions where a motor is redefined and then immediately moved.

"""
logger.info("Redefining motor %s to %s", motor.name, position)

def make_motor_usable() -> Generator[Msg, None, None]:
yield from bps.abs_set(motor.set_use_switch, UseSetMode.USE)
yield from bps.sleep(sleep)

def inner() -> Generator[Msg, None, None]:
yield from bps.abs_set(motor.set_use_switch, UseSetMode.SET)
Expand Down
Loading