Skip to content

Add step parsing for each agent section #1151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 304 additions & 26 deletions report/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
information such as the crash details, crash symptoms,
stack traces, etc. to be rendered in the report."""

import html
import re

from report.common import LogPart
Expand All @@ -35,6 +36,277 @@ class LogsParser:
def __init__(self, logs: list[LogPart]):
self._logs = logs

def _extract_bash_commands(self, content: str) -> list[str]:
"""Extract and parse bash commands from content."""
commands = []
lines = content.split('\n')

for i, line in enumerate(lines):
line = line.strip()
if line == '<bash>':
for j in range(i + 1, len(lines)):
if lines[j].strip() == '</bash>':
bash_content = '\n'.join(lines[i + 1:j]).strip()
if bash_content:
first_line = bash_content.split('\n')[0].strip()
if first_line:
# skip comments and placeholder text
if (first_line.startswith('#') or
first_line.startswith('[The command') or
first_line.startswith('No bash') or
'No bash' in first_line or len(first_line) < 3):
continue

parts = first_line.split()
if parts:
cmd = parts[0]

if cmd == 'grep':
# Extract the search term (usually the first quoted argument)
import re
quoted_match = re.search(r"'([^']+)'", first_line)
if quoted_match:
search_term = quoted_match.group(1)
command_summary = f"grep '{search_term}'"
else:
key_args = []
for part in parts[1:]:
if not part.startswith('-') and len(part) > 1:
if len(part) > 20:
part = part[:17] + '...'
key_args.append(part)
if len(key_args) >= 1: # Limit to 1 arg for grep
break
command_summary = f"{cmd} {' '.join(key_args)}".strip()
else:
key_args = []
for part in parts[1:]:
if not part.startswith('-') and len(part) > 1:
if len(part) > 20:
part = part[:17] + '...'
key_args.append(part)
if len(key_args) >= 2: # Limit to 2 key args
break

command_summary = f"{cmd} {' '.join(key_args)}".strip()

if len(command_summary) > 40:
command_summary = command_summary[:37] + '...'

if command_summary not in commands:
commands.append(command_summary)
break

return commands

def _extract_tool_names(self, content: str) -> list[str]:
"""Extract tool names from content."""
tool_counts = {}
lines = content.split('\n')

for i, line in enumerate(lines):
line = line.strip()
if (line in ['<bash>', '<conclusion>'] and not line.startswith('</')):
tool_name = line[1:-1].title()
tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
elif line == '<stderr>':
if i + 1 < len(lines) and lines[i + 1].strip():
tool_counts['Stderr'] = tool_counts.get('Stderr', 0) + 1

tool_names = []
for tool_name, count in tool_counts.items():
tool_names.append(tool_name)

return tool_names

def _parse_steps_from_logs(self, agent_logs: list[LogPart]) -> list[dict]:
"""Parse steps from agent logs, grouping by chat prompt/response pairs."""
step_pattern = re.compile(r"Step #(\d+) - \"(.+?)\":")
simple_step_pattern = re.compile(r"Step #(\d+)")

steps_dict = {}
current_step_number = None
current_step_name = None

for log_part in agent_logs:
content = log_part.content.strip()
if not content:
continue

lines = content.split('\n')

step_header_found = False
for line in lines:
step_match = step_pattern.search(line)
if not step_match:
simple_match = simple_step_pattern.search(line)
if simple_match:
step_match = simple_match

if step_match:
step_header_found = True
current_step_number = step_match.group(1)

if current_step_number not in steps_dict:
steps_dict[current_step_number] = {
'number': current_step_number,
'type': 'Step',
'log_parts': []
}
break

if not step_header_found and current_step_number:
steps_dict[current_step_number]['log_parts'].append(log_part)
elif not step_header_found and not current_step_number and not steps_dict:
steps_dict['0'] = {
'number': None,
'type': 'Content',
'log_parts': [log_part]
}

return self._parse_steps_by_chat_pairs(agent_logs)

def _parse_steps_by_chat_pairs(self, agent_logs: list[LogPart]) -> list[dict]:
steps = []

first_prompt_idx = -1
for i, log_part in enumerate(agent_logs):
if log_part.chat_prompt:
first_prompt_idx = i
break

if first_prompt_idx == -1:
return []

steps.append({
'number': '0 - System Instructions',
'type': 'System Instructions',
'log_parts': [agent_logs[first_prompt_idx]]
})

# Process logs after the system prompt to group into steps.
logs_to_process = agent_logs[first_prompt_idx + 1:]
step_counter = 1
current_step_parts = []

for log_part in logs_to_process:
if "agent-step" in log_part.content or "Trial ID:" in log_part.content:
continue

# A chat_response marks the beginning of a new step.
if log_part.chat_response:
if current_step_parts:
step_data = self._create_step_data(step_counter, current_step_parts)
steps.append(step_data)
step_counter += 1
current_step_parts = [log_part]
else:
current_step_parts.append(log_part)

# Append the last step.
if current_step_parts:
step_data = self._create_step_data(step_counter, current_step_parts)
steps.append(step_data)

return steps

def _syntax_highlight_content(self,
content: str,
default_language: str = "") -> str:
"""Syntax highlights content while preserving visible tags."""

# Escape everything first so raw logs are safe to render in HTML
escaped = html.escape(content)

# Helper to simplify substitutions
def _sub(pattern: str, repl: str, text: str) -> str:
return re.sub(pattern, repl, text, flags=re.DOTALL)

def _normalize_lang(lang: str) -> str:
if not lang:
return 'cpp'
lang = lang.strip().lower()
if lang in ['c++', 'cpp', 'cxx']:
return 'cpp'
if lang in ['c']:
return 'c'
if lang in ['python', 'py']:
return 'python'
if lang in ['java']:
return 'java'
if lang in ['rust', 'rs']:
return 'rust'
if lang in ['go', 'golang']:
return 'go'
return 'cpp'

lang_key = _normalize_lang(default_language)

escaped = _sub(
r'&lt;conclusion&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/conclusion&gt;',
r'<span class="log-tag">&lt;conclusion&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto reason-block">\1</pre>'
r'<span class="log-tag">&lt;/conclusion&gt;</span>', escaped)
escaped = _sub(
r'&lt;reason&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/reason&gt;',
r'<span class="log-tag">&lt;reason&gt;</span>'
r'<div class="markdown-block whitespace-pre-wrap break-words overflow-x-auto">\1</div>'
r'<span class="log-tag">&lt;/reason&gt;</span>', escaped)

escaped = _sub(
r'&lt;bash&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/bash&gt;',
r'<span class="log-tag">&lt;bash&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code class="language-bash">\1</code></pre>'
r'<span class="log-tag">&lt;/bash&gt;</span>', escaped)
escaped = _sub(
r'&lt;build_script&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/build_script&gt;',
r'<span class="log-tag">&lt;build_script&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code class="language-cpp">\1</code></pre>'
r'<span class="log-tag">&lt;/build_script&gt;</span>', escaped)
escaped = _sub(
r'&lt;fuzz target&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/fuzz target&gt;',
rf'<span class="log-tag">&lt;fuzz target&gt;</span>'
rf'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code class="language-{lang_key}">\1</code></pre>'
rf'<span class="log-tag">&lt;/fuzz target&gt;</span>', escaped)

escaped = _sub(
r'&lt;stdout&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/stdout&gt;',
r'<span class="log-tag">&lt;stdout&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code class="language-bash">\1</code></pre>'
r'<span class="log-tag">&lt;/stdout&gt;</span>', escaped)
escaped = _sub(
r'&lt;stderr&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/stderr&gt;',
r'<span class="log-tag">&lt;stderr&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code class="language-bash">\1</code></pre>'
r'<span class="log-tag">&lt;/stderr&gt;</span>', escaped)
escaped = _sub(
r'&lt;return_code&gt;(\s*[^\s].*?[^\s]\s*|(?:\s*[^\s].*?)?)&lt;/return_code&gt;',
r'<span class="log-tag">&lt;return_code&gt;</span>'
r'<pre class="whitespace-pre-wrap break-words overflow-x-auto"><code>\1</code></pre>'
r'<span class="log-tag">&lt;/return_code&gt;</span>', escaped)

return escaped

def _create_step_data(self, step_number: int,
log_parts: list[LogPart]) -> dict:
"""Create step data from log parts."""
step_data = {
'number': str(step_number),
'type': 'Step',
'log_parts': log_parts
}

all_content = '\n'.join([part.content for part in log_parts])
tool_names = self._extract_tool_names(all_content)
bash_commands = self._extract_bash_commands(all_content)

if tool_names:
step_data['name'] = f"{', '.join(tool_names)}"
if bash_commands:
step_data['bash_commands'] = bash_commands

return step_data

def get_agent_sections(self) -> dict[str, list[LogPart]]:
"""Get the agent sections from the logs."""

Expand Down Expand Up @@ -91,16 +363,22 @@ def get_agent_cycles(self) -> list[dict]:
cycles_dict = {}

for agent_name, agent_logs in agent_sections.items():
# Parse steps for this agent
steps = self._parse_steps_from_logs(agent_logs)

cycle_match = re.search(r'\(Cycle (\d+)\)', agent_name)
if cycle_match:
cycle_number = int(cycle_match.group(1))
if cycle_number not in cycles_dict:
cycles_dict[cycle_number] = {}
cycles_dict[cycle_number][agent_name] = agent_logs
cycles_dict[cycle_number][agent_name] = {
'logs': agent_logs,
'steps': steps
}
else:
if 0 not in cycles_dict:
cycles_dict[0] = {}
cycles_dict[0][agent_name] = agent_logs
cycles_dict[0][agent_name] = {'logs': agent_logs, 'steps': steps}

return [cycles_dict[cycle] for cycle in sorted(cycles_dict.keys())]

Expand Down Expand Up @@ -178,30 +456,30 @@ def get_formatted_stack_traces(self,

function_name = in_match.group(1)
path = in_match.group(2)
if '/src/' in path and 'llvm-project' not in path and self._benchmark_id and self._sample_id:
path_parts = path.split(':')
file_path = path_parts[0] # Just the file path without line numbers
line_number = path_parts[1] if len(path_parts) > 1 else None

relative_path = file_path.lstrip('/')

# If coverage_report_path is set, it's a local run
# Otherwise it's cloud
if self._coverage_report_path:
url = f'{self._coverage_report_path}{relative_path}.html'
url_with_line_number = f'{url}#L{line_number}' if line_number else url
else:
url = (
f'{base_url}/results/{self._benchmark_id}/code-coverage-reports/'
f'{self._sample_id}.fuzz_target/report/linux/'
f'{relative_path}.html')
url_with_line_number = f'{url}#L{line_number}' if line_number else url
stack_traces[frame_num] = {
"url": url_with_line_number,
"path": path,
"function": function_name,
"memory_address": memory_addr
}
if '/src/' in path and 'llvm-project' not in path:
if self._benchmark_id and self._sample_id:
path_parts = path.split(':')
file_path = path_parts[0]
line_number = path_parts[1] if len(path_parts) > 1 else None

relative_path = file_path.lstrip('/')

# If coverage_report_path is set, it's a local run
# Otherwise it's cloud
if self._coverage_report_path:
url = f'{self._coverage_report_path}{relative_path}.html'
url_line_number = f'{url}#L{line_number}' if line_number else url
else:
url = (f'{base_url}/results/{self._benchmark_id}/'
f'code-coverage-reports/{self._sample_id}.fuzz_target/'
f'report/linux/{relative_path}.html')
url_line_number = f'{url}#L{line_number}' if line_number else url
stack_traces[frame_num] = {
"url": url_line_number,
"path": path,
"function": function_name,
"memory_address": memory_addr
}

return stack_traces

Expand Down
Loading