Skip to content

Commit f28afb4

Browse files
authored
Merge branch 'main' into awarno/reasoning-tokens
2 parents bcb8ce0 + e256abd commit f28afb4

File tree

4 files changed

+201
-17
lines changed

4 files changed

+201
-17
lines changed

packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,17 @@ def _update_basic_stats(self, resp: AdapterResponse, current_time: float) -> Non
243243
# Update inference_run_times for current run
244244
run_id = self._stats["run_id"]
245245
if run_id not in self._stats["inference_run_times"]:
246-
# First request in this run - set first_request_time
246+
# First request in this run - estimate when inference actually started using latency
247+
estimated_first_request_start = current_time
248+
if hasattr(resp, "latency_ms") and resp.latency_ms is not None:
249+
# Estimate when this request was sent (current_time - latency)
250+
estimated_first_request_start = current_time - (
251+
resp.latency_ms / 1000.0
252+
)
253+
247254
self._stats["inference_run_times"][run_id] = {
248255
"run_start": self._adapter_start_time,
249-
"first_request_time": current_time,
256+
"first_request_time": estimated_first_request_start,
250257
"last_request_time": current_time,
251258
"inference_time": 0.0,
252259
}

packages/nemo-evaluator/src/nemo_evaluator/core/evaluate.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
EvaluationTarget,
3131
)
3232
from nemo_evaluator.core.input import prepare_output_directory, validate_configuration
33-
from nemo_evaluator.core.resources import monitor_memory_usage
33+
from nemo_evaluator.core.resources import (
34+
aggregate_runtime_metrics,
35+
monitor_memory_usage,
36+
)
3437
from nemo_evaluator.core.utils import run_command
3538
from nemo_evaluator.logging import get_logger
3639

@@ -109,7 +112,10 @@ def run_evaluation_core():
109112
logger.info("No cache directory configured, token usage will not be collected")
110113

111114
evaluation_result, metrics = monitor_memory_usage(
112-
run_evaluation_core, interval_ms=100, cache_dir=cache_dir
115+
run_evaluation_core,
116+
interval_ms=100,
117+
cache_dir=cache_dir,
118+
output_dir=evaluation.config.output_dir,
113119
)
114120

115121
metrics_path = os.path.join(
@@ -125,15 +131,34 @@ def run_evaluation_core():
125131
except (json.JSONDecodeError, IOError):
126132
pass # Start fresh if file is corrupted
127133

134+
# Aggregate all run data from run_times directory
135+
aggregated_metrics = aggregate_runtime_metrics(evaluation.config.output_dir)
136+
137+
if aggregated_metrics:
138+
runtime = aggregated_metrics.get("runtime_seconds", 0)
139+
inference_time = aggregated_metrics.get("inference_time_seconds", 0)
140+
scoring_time = aggregated_metrics.get("scoring_time_seconds", 0)
141+
logger.info(
142+
"Aggregated metrics",
143+
runtime_seconds=runtime,
144+
inference_time_seconds=inference_time,
145+
scoring_time_seconds=scoring_time,
146+
peak_memory_bytes=aggregated_metrics.get("peak_memory_bytes", 0),
147+
total_runs=aggregated_metrics.get("total_runs", 0),
148+
)
149+
150+
# Use aggregated metrics if available, otherwise use current metrics
151+
final_metrics = aggregated_metrics if aggregated_metrics else metrics
152+
128153
# Merge with existing metrics, using "evaluation" as the key
129154
# If evaluation key already exists, merge the metrics instead of overwriting
130155
if "evaluation" in existing_metrics:
131156
# Aggregate existing evaluation metrics with new ones
132157
existing_eval = existing_metrics["evaluation"]
133-
if isinstance(existing_eval, dict) and isinstance(metrics, dict):
158+
if isinstance(existing_eval, dict) and isinstance(final_metrics, dict):
134159
# Merge dictionaries with appropriate aggregation strategy
135160
merged_eval = existing_eval.copy()
136-
for key, value in metrics.items():
161+
for key, value in final_metrics.items():
137162
if (
138163
key in merged_eval
139164
and isinstance(merged_eval[key], (int, float))
@@ -153,9 +178,9 @@ def run_evaluation_core():
153178
merged_eval[key] = value
154179
merged_metrics = {**existing_metrics, "evaluation": merged_eval}
155180
else:
156-
merged_metrics = {**existing_metrics, "evaluation": metrics}
181+
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
157182
else:
158-
merged_metrics = {**existing_metrics, "evaluation": metrics}
183+
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
159184

160185
# Write merged metrics to file
161186
with open(metrics_path, "w") as f:

packages/nemo-evaluator/src/nemo_evaluator/core/resources.py

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515

1616

17+
import json
1718
import os
1819
import sqlite3
1920
import threading
@@ -67,7 +68,7 @@ def get_token_usage_from_cache_db(cache_db_path: str | Path) -> dict:
6768
"total_cached_requests": row[3],
6869
}
6970
except Exception as e:
70-
logger.warning(f"Failed to read token usage from cache: {e}")
71+
logger.warning("Failed to read token usage from cache", error=str(e))
7172

7273
return {}
7374

@@ -81,8 +82,126 @@ def get_token_usage_from_cache(cache_dir: str) -> dict:
8182
return get_token_usage_from_cache_db(cache_db_path)
8283

8384

85+
def aggregate_runtime_metrics(output_dir: str) -> dict[str, Any]:
86+
"""Aggregate all run data from run_times directory."""
87+
run_times_dir = Path(output_dir) / "run_times"
88+
aggregated_metrics = {}
89+
90+
if not run_times_dir.exists():
91+
return aggregated_metrics
92+
93+
total_runtime = 0
94+
earliest_start = None
95+
latest_end = None
96+
max_peak_memory = 0
97+
max_peak_tree_memory = 0
98+
run_count = 0
99+
100+
for run_file in run_times_dir.glob("runtime_*.json"):
101+
try:
102+
with open(run_file, "r") as f:
103+
run_data = json.load(f)
104+
total_runtime += run_data.get("runtime_seconds", 0)
105+
run_count += 1
106+
107+
# Track earliest start and latest end
108+
run_start = run_data.get("start_time", "")
109+
run_end = run_data.get("end_time", "")
110+
if earliest_start is None or run_start < earliest_start:
111+
earliest_start = run_start
112+
if latest_end is None or run_end > latest_end:
113+
latest_end = run_end
114+
115+
# Track peak memory across all runs
116+
max_peak_memory = max(
117+
max_peak_memory, run_data.get("peak_memory_bytes", 0)
118+
)
119+
max_peak_tree_memory = max(
120+
max_peak_tree_memory, run_data.get("peak_tree_memory_bytes", 0)
121+
)
122+
except Exception:
123+
pass
124+
125+
if run_count > 0:
126+
aggregated_metrics = {
127+
"runtime_seconds": total_runtime,
128+
"start_time": earliest_start,
129+
"end_time": latest_end,
130+
"peak_memory_bytes": max_peak_memory,
131+
"peak_tree_memory_bytes": max_peak_tree_memory,
132+
"total_runs": run_count,
133+
}
134+
135+
# Try to get inference time from response stats and calculate scoring time
136+
try:
137+
metrics_file = Path(output_dir) / "eval_factory_metrics.json"
138+
if metrics_file.exists():
139+
with open(metrics_file, "r") as f:
140+
metrics_data = json.load(f)
141+
response_stats = metrics_data.get("response_stats", {})
142+
inference_time = response_stats.get("inference_time", 0.0)
143+
144+
# Calculate scoring time as runtime - inference time
145+
scoring_time = max(0.0, total_runtime - inference_time)
146+
aggregated_metrics["inference_time_seconds"] = inference_time
147+
aggregated_metrics["scoring_time_seconds"] = scoring_time
148+
except Exception as e:
149+
# If we can't read response stats, just continue without scoring time
150+
logger.warning(
151+
"Could not extract inference time from response stats", error=str(e)
152+
)
153+
154+
return aggregated_metrics
155+
156+
157+
def _update_persistent_metrics(
158+
output_dir: str,
159+
start_time: float,
160+
peak_memory: int,
161+
peak_tree_memory: int,
162+
run_id: str,
163+
) -> None:
164+
"""Save individual run data and update peak memory only."""
165+
try:
166+
# Create run_times directory
167+
run_times_dir = Path(output_dir) / "run_times"
168+
run_times_dir.mkdir(exist_ok=True)
169+
170+
# Save individual run runtime
171+
current_time = time.time()
172+
current_runtime = current_time - start_time
173+
run_file = run_times_dir / f"runtime_{run_id}.json"
174+
175+
with open(run_file, "w") as f:
176+
json.dump(
177+
{
178+
"run_id": run_id,
179+
"start_time": time.strftime(
180+
"%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(start_time)
181+
),
182+
"end_time": time.strftime(
183+
"%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(current_time)
184+
),
185+
"runtime_seconds": current_runtime,
186+
"peak_memory_bytes": peak_memory,
187+
"peak_tree_memory_bytes": peak_tree_memory,
188+
},
189+
f,
190+
)
191+
192+
except Exception as e:
193+
logger.warning(
194+
"Failed to update persistent metrics", error=str(e), run_id=run_id
195+
)
196+
197+
84198
def monitor_memory_usage(
85-
func, *args, interval_ms, cache_dir: str | None = None, **kwargs
199+
func,
200+
*args,
201+
interval_ms,
202+
cache_dir: str | None = None,
203+
output_dir: str | None = None,
204+
**kwargs,
86205
) -> tuple[EvaluationResult, dict[str, Any]]:
87206
"""
88207
Run func(*args, **kwargs) while polling RSS via psutil.
@@ -91,8 +210,21 @@ def monitor_memory_usage(
91210
- peak_tree_rss_bytes: peak memory usage of the entire process tree (main + children)
92211
"""
93212
proc = psutil.Process(os.getpid())
213+
214+
# Generate meaningful run ID (counter or date)
215+
if output_dir:
216+
run_times_dir = Path(output_dir) / "run_times"
217+
run_times_dir.mkdir(exist_ok=True)
218+
# Count existing runs to get next ID
219+
existing_runs = list(run_times_dir.glob("runtime_*.json"))
220+
run_id = str(len(existing_runs))
221+
else:
222+
run_id = "0"
223+
224+
# Initialize values
94225
peak = 0
95226
peak_tree = 0
227+
96228
stop = False
97229
ret = None
98230

@@ -111,6 +243,9 @@ def get_tree_memory(process):
111243

112244
def sampler():
113245
nonlocal peak, peak_tree
246+
last_save_time = 0
247+
save_interval = 5.0 # Save every 5 seconds
248+
114249
while not stop:
115250
# Get memory for current process
116251
rss = proc.memory_info().rss
@@ -120,6 +255,15 @@ def sampler():
120255
tree_rss = get_tree_memory(proc)
121256
peak_tree = max(peak_tree, tree_rss)
122257

258+
# Update persistent metrics file if output_dir is provided and enough time has passed
259+
if output_dir:
260+
current_time = time.time()
261+
if current_time - last_save_time >= save_interval:
262+
_update_persistent_metrics(
263+
output_dir, start_time, peak, peak_tree, run_id
264+
)
265+
last_save_time = current_time
266+
123267
time.sleep(interval_ms / 1000.0)
124268

125269
th = threading.Thread(target=sampler, daemon=True)
@@ -144,15 +288,15 @@ def sampler():
144288
try:
145289
token_usage = get_token_usage_from_cache(cache_dir)
146290
except Exception as e:
147-
logger.warning(f"Failed to get token usage from cache: {e}")
291+
logger.warning("Failed to get token usage from cache", error=str(e))
148292

149293
metrics = {
150294
"runtime_seconds": runtime_seconds,
151295
"start_time": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(start_time)),
152296
"end_time": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(end_time)),
153297
"token_usage": token_usage,
154-
"peak_memory_bytes": peak, # Memory of main process
155-
"peak_tree_memory_bytes": peak_tree, # Memory of entire process tree
298+
"peak_memory_bytes": peak,
299+
"peak_tree_memory_bytes": peak_tree,
156300
}
157301

158302
return ret, metrics

packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,9 @@ def test_caching_and_aggregation_with_multiple_runs(self, tmp_path):
591591
assert interceptor1._stats["avg_latency_ms"] == 150.0 # (100 + 200) / 2
592592
assert interceptor1._stats["max_latency_ms"] == 200.0
593593
run1_inference_time = interceptor1._stats["inference_time"]
594-
assert 0.05 <= run1_inference_time <= 0.15, (
594+
# With latency-based estimation: sleep_time + latency_adjustment
595+
# Expected: ~0.1s (sleep) + ~0.1s (first request latency) = ~0.2s
596+
assert 0.15 <= run1_inference_time <= 0.25, (
595597
f"Run 1 inference time {run1_inference_time} not in expected range"
596598
)
597599

@@ -819,7 +821,9 @@ def test_comprehensive_cache_scenarios(
819821

820822
# Verify Run 1 inference time
821823
run1_time = interceptor1._stats["inference_time"]
822-
assert 0.05 <= run1_time <= 0.15, (
824+
# With latency-based estimation: sleep_time + latency_adjustment
825+
# Expected: ~0.1s (sleep) + ~0.1s (first request latency) = ~0.2s
826+
assert 0.15 <= run1_time <= 0.25, (
823827
f"Run 1 time {run1_time} not in expected range"
824828
)
825829
assert interceptor1._stats["run_id"] == 0
@@ -847,7 +851,9 @@ def test_comprehensive_cache_scenarios(
847851
# Verify Run 2 inference time
848852
# All run_ids should be integers after cache loading fix
849853
run2_time = interceptor2._stats["inference_run_times"][1]["inference_time"]
850-
assert 0.05 <= run2_time <= 0.15, (
854+
# With latency-based estimation: sleep_time + latency_adjustment
855+
# Expected: ~0.08s (sleep) + ~0.3s (first request latency) = ~0.38s
856+
assert 0.35 <= run2_time <= 0.45, (
851857
f"Run 2 time {run2_time} not in expected range"
852858
)
853859
assert interceptor2._stats["run_id"] == 1
@@ -874,7 +880,9 @@ def test_comprehensive_cache_scenarios(
874880
# Verify Run 3 inference time
875881
# All run_ids should be integers after cache loading fix
876882
run3_time = interceptor3._stats["inference_run_times"][2]["inference_time"]
877-
assert 0.05 <= run3_time <= 0.15, (
883+
# With latency-based estimation: sleep_time + latency_adjustment
884+
# Expected: ~0.06s (sleep) + ~0.5s (first request latency) = ~0.56s
885+
assert 0.50 <= run3_time <= 0.65, (
878886
f"Run 3 time {run3_time} not in expected range"
879887
)
880888
assert interceptor3._stats["run_id"] == 2

0 commit comments

Comments
 (0)