Skip to content

Commit 179465a

Browse files
committed
Added new injection script and some hotfixes after production tests
1 parent 4a9ff98 commit 179465a

File tree

5 files changed

+256
-12
lines changed

5 files changed

+256
-12
lines changed

T2/audit.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ def __init__(
331331
):
332332
self.audit_dir = Path(audit_dir) if audit_dir else DEFAULT_AUDIT_DIR
333333
self.audit_dir.mkdir(parents=True, exist_ok=True)
334+
self._source_file_path: Optional[str] = None
335+
self._source_mtime: Optional[float] = None
334336

335337
self.persist_json = bool(persist_json)
336338

@@ -356,7 +358,9 @@ def __init__(
356358
h.setFormatter(logging.Formatter(fmt=fmt, datefmt=dfmt))
357359
self.log.addHandler(h)
358360
self.log.setLevel(logging.INFO)
359-
361+
362+
#self._ensure_audit_header()
363+
#self._write_injections_csv()
360364
self.log.info(
361365
f"[AUDIT] init audit_dir={self.audit_dir} "
362366
f"persist_json={self.persist_json} "
@@ -372,6 +376,38 @@ def __init__(
372376
f"beam_window={self.beam_window}"
373377
)
374378

379+
@staticmethod
380+
def _parse_injection_file(path: str):
381+
"""
382+
Yield dicts from
383+
`injections_for_audit.txt`. Skips lines starting with '#'.
384+
"""
385+
if not os.path.exists(path):
386+
return
387+
with open(path, "r") as fh:
388+
for ln in fh:
389+
ln = ln.strip()
390+
if not ln or ln.startswith("#"):
391+
continue
392+
# Expect 7 tokens: MJD Beam DM SNR Width_fwhm spec_ind FRBno
393+
toks = ln.split()
394+
if len(toks) < 7:
395+
continue
396+
try:
397+
mjd = float(toks[0])
398+
beam = int(toks[1])
399+
dm = float(toks[2])
400+
snr = float(toks[3])
401+
width_fwhm = float(toks[4])
402+
spec_ind = float(toks[5])
403+
frbno = str(toks[6])
404+
except Exception:
405+
continue
406+
yield dict(
407+
MJD=mjd, Beam=beam, DM=dm, SNR=snr,
408+
Width_fwhm=width_fwhm, spec_ind=spec_ind, FRBno=frbno
409+
)
410+
375411
def ingest_legacy_injections(self, legacy_path: str) -> int:
376412
"""
377413
Read the old injection_list.txt and assign a unique id for each of them. FRBNo was not unique.
@@ -417,6 +453,60 @@ def ingest_legacy_injections(self, legacy_path: str) -> int:
417453
print(f"[AUDIT][ingest] added {added} new injections from legacy file")
418454
return added
419455

456+
def attach_injection_source(self, path: str):
457+
"""
458+
Register the on-disk file we should watch and refresh from.
459+
Does not force an immediate load; call refresh_from_source() to load.
460+
"""
461+
self._source_file_path = str(path)
462+
try:
463+
self._source_mtime = os.path.getmtime(self._source_file_path)
464+
except Exception:
465+
self._source_mtime = None
466+
467+
def refresh_from_source(self, force: bool = False) -> int:
468+
"""
469+
If the source file changed (mtime bumped) or force=True,
470+
parse it and seed/update injections. Returns number of
471+
new/updated rows applied.
472+
- New rows: seed as new injection (G0=1, others -1).
473+
- Existing rows (same inj_id): update meta fields (SNR, width, spec_ind, FRBno)
474+
without touching gate states.
475+
"""
476+
if not self._source_file_path:
477+
return 0
478+
479+
try:
480+
mtime = os.path.getmtime(self._source_file_path)
481+
except Exception:
482+
return 0
483+
484+
if (not force) and (self._source_mtime is not None) and (mtime <= self._source_mtime):
485+
return 0 # no change
486+
487+
applied = 0
488+
for meta in self._parse_injection_file(self._source_file_path):
489+
inj_id = _stable_inj_id(meta["MJD"], meta["Beam"], meta["DM"])
490+
if inj_id not in self.injections:
491+
# brand-new
492+
self.seed_injection(inj_id, meta)
493+
applied += 1
494+
else:
495+
# update meta (non-destructive to gates/candname/stats)
496+
st = self.injections[inj_id]
497+
inj = st.get("inj", {})
498+
inj["SNR"] = float(meta.get("SNR", inj.get("SNR", ""))) if str(meta.get("SNR", "")) != "" else inj.get("SNR", "")
499+
inj["Width_fwhm"] = float(meta.get("Width_fwhm", inj.get("Width_fwhm", ""))) if str(meta.get("Width_fwhm", "")) != "" else inj.get("Width_fwhm", "")
500+
inj["spec_ind"] = float(meta.get("spec_ind", inj.get("spec_ind", ""))) if str(meta.get("spec_ind", "")) != "" else inj.get("spec_ind", "")
501+
inj["FRBno"] = str(meta.get("FRBno", inj.get("FRBno", "")))
502+
st["inj"] = inj
503+
applied += 1
504+
505+
# bump mtime and rewrite rolling CSV snapshot
506+
self._source_mtime = mtime
507+
self._write_injections_csv()
508+
return applied
509+
420510
def _now_iso(self) -> str:
421511
return datetime.datetime.utcnow().isoformat()
422512

T2/socket.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
#Auditor import
4444
from T2.audit import Auditor
45-
INJECTION_FILE = "/home/ubuntu/data/injections/injection_list.txt"
45+
INJECTION_FILE = "/operations/T2/injection_audit_results/injections_for_audit.txt"
4646

4747
from collections import deque
4848

@@ -155,10 +155,15 @@ def parse_socket(
155155
beam_window=beam_window,
156156
persist_json=bool(audit_dump_json),
157157
)
158-
try:
159-
auditor.ingest_legacy_injections(INJECTION_FILE)
160-
except Exception as e:
161-
logger.warning(f"AUDIT init: legacy ingestion/mirror failed: {e}")
158+
# try:
159+
# auditor.ingest_legacy_injections(INJECTION_FILE)
160+
# except Exception as e:
161+
# logger.warning(f"AUDIT init: legacy ingestion/mirror failed: {e}")
162+
163+
auditor.attach_injection_source(INJECTION_FILE)
164+
added0 = auditor.refresh_from_source(force=True)
165+
if added0:
166+
print(f"[AUDIT] initial load: applied {added0} injections from {INJECTION_FILE}")
162167
# end of audit setup
163168

164169
logger.info(f"Reading from {len(ports)} sockets...")
@@ -299,6 +304,13 @@ def parse_socket(
299304

300305

301306
tab = cluster_heimdall.parse_candsfile(candsfile)
307+
try:
308+
added = auditor.refresh_from_source(force=False)
309+
if added:
310+
print(f"[AUDIT] reload: applied {added} injection rows from {INJECTION_FILE}")
311+
except Exception as e:
312+
logger.warning(f"[AUDIT] refresh_from_source failed: {e}")
313+
302314
#injection auditor changes...
303315
if audit_enabled and auditor is not None:
304316
try:

scripts/dsa_injection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
ind = 0#int(ii%2)#np.where(params[:,-1]==float(frbno))[0]
4848

4949
#mysnr = 0.08#0.2-np.random.uniform()*0.14
50-
#mysnr = 0.125
51-
mysnr=0.2
50+
mysnr = 0.125
51+
#mysnr=0.2
5252

5353
DM, SNR, Width_fwhm, spec_ind = params[ind][0],params[ind][1],params[ind][2],params[ind][3]
5454
print("pushing injection to command to etcd")

scripts/new_injection_script.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/env python3
2+
import os, time, random
3+
import numpy as np
4+
from astropy.time import Time
5+
import dsautils.dsa_store as ds
6+
import slack_sdk as slack
7+
8+
# ----------------- config -----------------
9+
SLACK_CHANNEL = "candidates"
10+
SLACK_TOKEN_FILE = f"{os.path.expanduser('~')}/.config/slack_api"
11+
INJ_LIST = "/home/ubuntu/data/injections/injection_list.txt"
12+
AUDIT_INJECTION_FILENAME = "/operations/T2/injection_audit_results/injections_for_audit.txt"
13+
PARAMS_TXT = "/home/ubuntu/simulated_frb_params.txt" # columns: DM SNR width_fwhm spec_ind
14+
TEMPLATES = [
15+
"/home/ubuntu/data/burst_0.inject",
16+
"/home/ubuntu/data/burst_1.inject",
17+
"/home/ubuntu/data/burst_2.inject",
18+
"/home/ubuntu/data/burst_3.inject",
19+
"/home/ubuntu/data/burst_4.inject",
20+
]
21+
22+
SLEEP_SEC = 600 # 10 minutes
23+
24+
NODES = [17, 18, 19, 20]
25+
PAIR = {17: 19, 18: 20, 19: 17, 20: 18}
26+
LOCAL_BEAMS_PER_NODE = 128
27+
28+
# Legacy output formatting (unchanged)
29+
FMT_OUT = "%5.9f %d %0.2f %0.1f %0.3f %0.2f %s\n"
30+
31+
# --- SNR→scale mapping (Recovered_SNR ≈ k * scale) ---
32+
K_DEFAULT = 135.0 # 27/.2 from old script
33+
SCALE_MIN = 0.075
34+
SCALE_MAX = 0.3
35+
niterations = 288
36+
scale = np.random.uniform(SCALE_MIN, SCALE_MAX, niterations)
37+
# ------------------------------------------------------
38+
39+
def ensure_slack():
40+
if not os.path.exists(SLACK_TOKEN_FILE):
41+
raise RuntimeError(f"Could not find file with slack api token at {SLACK_TOKEN_FILE}")
42+
with open(SLACK_TOKEN_FILE, "r") as sf_handler:
43+
slack_token = sf_handler.read()
44+
return slack.WebClient(token=slack_token)
45+
46+
def slack_msg(cli, text):
47+
try:
48+
cli.chat_postMessage(channel=SLACK_CHANNEL, text=text)
49+
except Exception as e:
50+
print(f"[slack] {e}")
51+
52+
def ensure_injection_list(filename):
53+
if not os.path.exists(filename):
54+
with open(filename, "w") as f:
55+
f.write("# MJD Beam DM SNR Width_fwhm spec_ind FRBno\n")
56+
57+
def global_to_node_local(g):
58+
if not (0 <= g <= 511):
59+
raise ValueError("global beam must be in [0, 511]")
60+
group = g // LOCAL_BEAMS_PER_NODE
61+
node = NODES[group]
62+
local = g % LOCAL_BEAMS_PER_NODE
63+
return node, local
64+
65+
def load_params_and_templates():
66+
params = np.genfromtxt(PARAMS_TXT) # (N,4): DM, SNR, width_fwhm, spec_ind
67+
n = min(len(params), len(TEMPLATES))
68+
if n == 0:
69+
raise RuntimeError("No params/templates found")
70+
if n < len(params):
71+
print(f"[warn] Truncating params to {n} to match templates")
72+
return params[:n], TEMPLATES[:n]
73+
74+
def snr_to_scale(target_snr: float) -> float:
75+
sc = float(target_snr) / float(K_DEFAULT)
76+
if sc < SCALE_MIN: sc = SCALE_MIN
77+
if sc > SCALE_MAX: sc = SCALE_MAX
78+
return sc
79+
80+
def scale_to_snr(scale: float) -> float:
81+
return float(scale * K_DEFAULT)
82+
83+
def run():
84+
random.seed()
85+
np.random.seed()
86+
ensure_injection_list(INJ_LIST)
87+
ensure_injection_list(AUDIT_INJECTION_FILENAME)
88+
89+
slack_cli = ensure_slack()
90+
store = ds.DsaStore()
91+
92+
params, templates = load_params_and_templates()
93+
94+
for i in range(niterations):
95+
current_scale = scale[i]
96+
print(f"[info] Starting iteration {i+1}/{niterations}")
97+
for idx, row in enumerate(params):
98+
DM, SNR, width_fwhm, spec_ind = map(float, row) # SNR is fixed from file (e.g., 15.0)
99+
template = templates[idx]
100+
frbno = os.path.splitext(os.path.basename(template))[0].split("_")[-1]
101+
102+
# choose a random global beam, then derive EW/NS pair
103+
gbeam = random.randint(0, 511)
104+
print(f"[info] Injecting FRB {frbno} with DM={DM}, SNR={SNR}, into global beam {gbeam} using template {template}")
105+
106+
if gbeam <= 255:
107+
ew_beam, ns_beam = gbeam, gbeam + 256
108+
else:
109+
ns_beam, ew_beam = gbeam, gbeam - 256
110+
111+
node, local = global_to_node_local(gbeam)
112+
partner = PAIR[node]
113+
114+
115+
effective_snr = scale_to_snr(current_scale)
116+
117+
# event bookkeeping (legacy append)
118+
mjd = Time.now().mjd
119+
with open(INJ_LIST, "a") as f:
120+
# Beam column uses the global beam you selected
121+
f.write(FMT_OUT % (mjd, gbeam, DM, effective_snr, width_fwhm, spec_ind, frbno))
122+
123+
with open(AUDIT_INJECTION_FILENAME, "a") as f:
124+
f.write(FMT_OUT % (mjd, gbeam, DM, effective_snr, width_fwhm, spec_ind, frbno))
125+
126+
127+
injection_cmd = {"cmd": "inject", "val": f"{local}-{template}-{current_scale:.3f}-"}
128+
store.put_dict(f"/cmd/corr/{node}", injection_cmd)
129+
store.put_dict(f"/cmd/corr/{partner}", injection_cmd)
130+
131+
132+
slack_msg(
133+
slack_cli,
134+
(f"Sending injection to beam {ew_beam} (EW) and {ns_beam} (NS) "
135+
f"with DM={DM:.1f} and SNR={effective_snr:.1f}"
136+
)
137+
)
138+
139+
time.sleep(SLEEP_SEC)
140+
141+
if __name__ == "__main__":
142+
run()

tests/data/T2_output.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
2-
"910226skrx": {
3-
"mjds": 121362.64345804593,
2+
"910227rmpk": {
3+
"mjds": 121363.20344096332,
44
"snr": 60.7232,
55
"ibox": 3,
66
"dm": 598.681,
@@ -28,8 +28,8 @@
2828
"beams8": 26,
2929
"beams9": 3,
3030
"specnum": 11452,
31-
"ra": 271.89909423911513,
32-
"dec": 54.56475690009641,
31+
"ra": 111.16160202119792,
32+
"dec": 54.91910166617094,
3333
"injected": false
3434
}
3535
}

0 commit comments

Comments
 (0)