Skip to content

Commit 20bd8dc

Browse files
gijskoningnetadvanced
authored andcommitted
Laikad: process executor to fetch orbits (commaai#24843)
* Use ProcessPoolExecutor to fetch orbits * update laika repo * Minor
1 parent 22f469a commit 20bd8dc

File tree

3 files changed

+57
-42
lines changed

3 files changed

+57
-42
lines changed

selfdrive/locationd/laikad.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22
import time
3-
from multiprocessing import Process, Queue
3+
from concurrent.futures import Future, ProcessPoolExecutor
44
from typing import List, Optional
55

66
import numpy as np
@@ -10,7 +10,7 @@
1010

1111
from cereal import log, messaging
1212
from laika import AstroDog
13-
from laika.constants import SECS_IN_MIN
13+
from laika.constants import SECS_IN_HR, SECS_IN_MIN
1414
from laika.ephemeris import EphemerisType, convert_ublox_ephem
1515
from laika.gps_time import GPSTime
1616
from laika.helpers import ConstellationId
@@ -29,8 +29,9 @@ class Laikad:
2929
def __init__(self, valid_const=("GPS", "GLONASS"), auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV)):
3030
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
3131
self.gnss_kf = GNSSKalman(GENERATED_DIR)
32-
self.orbit_p: Optional[Process] = None
33-
self.orbit_q = Queue()
32+
self.orbit_fetch_executor = ProcessPoolExecutor()
33+
self.orbit_fetch_future: Optional[Future] = None
34+
self.last_fetch_orbits_t = None
3435

3536
def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
3637
if ublox_msg.which == 'measurementReport':
@@ -82,7 +83,7 @@ def process_ublox_msg(self, ublox_msg, ublox_mono_time: int, block=False):
8283
return dat
8384
elif ublox_msg.which == 'ephemeris':
8485
ephem = convert_ublox_ephem(ublox_msg.ephemeris)
85-
self.astro_dog.add_ephems([ephem], self.astro_dog.nav)
86+
self.astro_dog.add_navs([ephem])
8687
# elif ublox_msg.which == 'ionoData':
8788
# todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
8889

@@ -100,7 +101,7 @@ def update_localizer(self, pos_fix, t: float, measurements: List[GNSSMeasurement
100101
cloudlog.error("Gnss kalman std too far")
101102

102103
if len(pos_fix) == 0:
103-
cloudlog.error("Position fix not available when resetting kalman filter")
104+
cloudlog.warning("Position fix not available when resetting kalman filter")
104105
return
105106
post_est = pos_fix[0][:3].tolist()
106107
self.init_gnss_localizer(post_est)
@@ -124,36 +125,33 @@ def init_gnss_localizer(self, est_pos):
124125

125126
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
126127

127-
def get_orbit_data(self, t: GPSTime, queue):
128-
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
129-
start_time = time.monotonic()
130-
try:
131-
self.astro_dog.get_orbit_data(t, only_predictions=True)
132-
except RuntimeError as e:
133-
cloudlog.info(f"No orbit data found. {e}")
134-
return
135-
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.2f}s")
136-
if queue is not None:
137-
queue.put((self.astro_dog.orbits, self.astro_dog.orbit_fetched_times))
138-
139128
def fetch_orbits(self, t: GPSTime, block):
140-
if t not in self.astro_dog.orbit_fetched_times:
141-
if block:
142-
self.get_orbit_data(t, None)
143-
return
144-
if self.orbit_p is None:
145-
self.orbit_p = Process(target=self.get_orbit_data, args=(t, self.orbit_q))
146-
self.orbit_p.start()
147-
if not self.orbit_q.empty():
148-
ret = self.orbit_q.get()
129+
if t not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or t - self.last_fetch_orbits_t > SECS_IN_HR):
130+
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types
131+
if self.orbit_fetch_future is None:
132+
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
133+
if block:
134+
self.orbit_fetch_future.result()
135+
if self.orbit_fetch_future.done():
136+
ret = self.orbit_fetch_future.result()
149137
if ret:
150138
self.astro_dog.orbits, self.astro_dog.orbit_fetched_times = ret
151-
self.orbit_p.join()
152-
self.orbit_p = None
153-
154-
def __del__(self):
155-
if self.orbit_p is not None:
156-
self.orbit_p.kill()
139+
self.orbit_fetch_future = None
140+
self.last_fetch_orbits_t = t
141+
142+
143+
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types):
144+
astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types)
145+
cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}")
146+
start_time = time.monotonic()
147+
data = None
148+
try:
149+
astro_dog.get_orbit_data(t, only_predictions=True)
150+
data = (astro_dog.orbits, astro_dog.orbit_fetched_times)
151+
except RuntimeError as e:
152+
cloudlog.info(f"No orbit data found. {e}")
153+
cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s")
154+
return data
157155

158156

159157
def create_measurement_msg(meas: GNSSMeasurement):

selfdrive/locationd/test/test_laikad.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,29 +69,46 @@ def test_laika_offline(self, downloader_mock):
6969
self.assertEqual(256, len(correct_msgs))
7070
self.assertEqual(256, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
7171

72-
def test_laika_get_orbits(self):
73-
laikad = Laikad(auto_update=False)
74-
first_gps_time = None
72+
def get_first_gps_time(self):
7573
for m in self.logs:
7674
if m.ubloxGnss.which == 'measurementReport':
7775
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
7876
if len(new_meas) != 0:
79-
first_gps_time = new_meas[0].recv_time
80-
break
77+
return new_meas[0].recv_time
78+
79+
def test_laika_get_orbits(self):
80+
laikad = Laikad(auto_update=False)
81+
first_gps_time = self.get_first_gps_time()
8182
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
8283
laikad.fetch_orbits(first_gps_time, block=True)
83-
self.assertEqual(29, len(laikad.astro_dog.orbits.keys()))
84+
self.assertEqual(29, len(laikad.astro_dog.orbits.values()))
85+
self.assertGreater(min([len(v) for v in laikad.astro_dog.orbits.values()]), 0)
8486

8587
@unittest.skip("Use to debug live data")
8688
def test_laika_get_orbits_now(self):
8789
laikad = Laikad(auto_update=False)
8890
laikad.fetch_orbits(GPSTime.from_datetime(datetime.utcnow()), block=True)
8991
prn = "G01"
90-
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
92+
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
9193
prn = "R01"
92-
self.assertLess(0, len(laikad.astro_dog.orbits[prn]))
94+
self.assertGreater(len(laikad.astro_dog.orbits[prn]), 0)
9395
print(min(laikad.astro_dog.orbits[prn], key=lambda e: e.epoch).epoch.as_datetime())
9496

97+
def test_get_orbits_in_process(self):
98+
laikad = Laikad(auto_update=False)
99+
has_orbits = False
100+
for m in self.logs:
101+
laikad.process_ublox_msg(m.ubloxGnss, m.logMonoTime, block=False)
102+
if laikad.orbit_fetch_future is not None:
103+
laikad.orbit_fetch_future.result()
104+
vals = laikad.astro_dog.orbits.values()
105+
has_orbits = len(vals) > 0 and max([len(v) for v in vals]) > 0
106+
if has_orbits:
107+
break
108+
self.assertTrue(has_orbits)
109+
self.assertGreater(len(laikad.astro_dog.orbit_fetched_times._ranges), 0)
110+
self.assertEqual(None, laikad.orbit_fetch_future)
111+
95112

96113
if __name__ == "__main__":
97114
unittest.main()

0 commit comments

Comments
 (0)