Skip to content

ListToEntries processor #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/sdp/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ Data modifications

.. autodata:: sdp.processors.LambdaExpression
:annotation:

.. autodata:: sdp.processors.ListToEntries
:annotation:

Data filtering
''''''''''''''
Expand Down
1 change: 1 addition & 0 deletions sdp/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
SubIfASRSubstitution,
SubMakeLowercase,
SubRegex,
ListToEntries,
LambdaExpression,
)
from sdp.processors.modify_manifest.data_to_dropbool import (
Expand Down
132 changes: 132 additions & 0 deletions sdp/processors/modify_manifest/data_to_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,138 @@ def process(self):
logger.debug(f"Failed files: {self.failed_files}")


class ListToEntries(BaseParallelProcessor):
"""
A dataset processor that transforms a single entry containing a list of items into multiple entries,
one for each item in the list.

This is useful when a manifest field (e.g., "segments") contains a list of sub-entries, and you want
to flatten these into individual records for further processing.

Args:
field_with_list (str): The name of the field in the input entry that contains a list.
output_field (str, optional): The name of the output field to assign to items in the list
if they are not dictionaries. Required if the list contains primitive types (e.g., strings).
**kwargs: Additional arguments passed to the BaseParallelProcessor.

Raises:
TypeError: If the specified list field is not of type list.
ValueError: If the list items are not dictionaries and `output_field` is not provided.

Returns:
A manifest where each entry corresponds to one item in the original list from the input entry.
This effectively transforms a single input entry containing a list of items into multiple standalone
entries, each suitable for further dataset processing.

.. admonition:: Example 1 (list of dicts)

.. code-block:: yaml

- _target_: sdp.processors.ListToEntries
input_manifest_file: ${workspace_dir}/input_manifest.json
output_manifest_file: ${workspace_dir}/output_manifest.json
field_with_list: "segments"

Input::

{
"audio_filepath": "sample.wav",
"segments": [
{"start": 0.0, "end": 1.5, "text": "Hello"},
{"start": 1.6, "end": 3.0, "text": "World"}
]
}

Output::

[
{
"audio_filepath": "sample.wav",
"start": 0.0,
"end": 1.5,
"text": "Hello"
},
{
"audio_filepath": "sample.wav",
"start": 1.6,
"end": 3.0,
"text": "World"
}
]

.. admonition:: Example 2 (list of primitives)

.. code-block:: yaml

- _target_: sdp.processors.ListToEntries
input_manifest_file: ${workspace_dir}/input_manifest.json
output_manifest_file: ${workspace_dir}/output_manifest.json
field_with_list: "text_chunks"
output_field: "text"

Input::

{
"audio_filepath": "sample.wav",
"text_chunks": [
"Hello",
"World"
]
}

Output::

[
{
"audio_filepath": "sample.wav",
"text": "Hello"
},
{
"audio_filepath": "sample.wav",
"text": "World"
}
]

"""

def __init__(self,
field_with_list: str,
output_field: str = None,
**kwargs):
super().__init__(**kwargs)
self.field_with_list = field_with_list
self.output_field = output_field

def process_dataset_entry(self, data_entry):
_entries = []

# Check that the target field is actually a list
if not isinstance(data_entry[self.field_with_list], list):
raise TypeError(f'Values of {self.field_with_list} field should be list type only: {data_entry}')

# Remove the list field from the entry and get the list of items
items_list = data_entry.pop(self.field_with_list)

# If items are not dicts, output_field must be specified to store the item
if not isinstance(items_list[0], dict) and not self.output_field:
raise ValueError(f'Type of items in items list `{self.field_with_list}` is not dict ({type(items_list[0])}). In this case `output_field` should be provided.')

# Expand the list into multiple entries
for item in items_list:
_entry = data_entry.copy()

# If item is a dict, merge its keys; otherwise, store it in `output_field`
if isinstance(item, dict):
_entry.update(item)
else:
_entry[self.output_field] = item

_entry = DataEntry(_entry)
_entries.append(_entry)

return _entries


class LambdaExpression(BaseParallelProcessor):
"""
A dataset processor that evaluates a Python expression on each data entry and either stores
Expand Down
19 changes: 19 additions & 0 deletions tests/test_data_to_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
SubIfASRSubstitution,
SubMakeLowercase,
SubRegex,
ListToEntries,
LambdaExpression,
)

Expand Down Expand Up @@ -91,6 +92,24 @@
]
)

test_params_list.extend(
[
# Test: list of dictionaries (e.g., segments)
(
ListToEntries,
{"field_with_list": "segments"},
{"audio_filepath": "a.wav", "segments": [{"start": 0.0, "end": 1.0, "text": "Hello"}, {"start": 1.1, "end": 2.0, "text": "World"}], "duration": 2.5},
[{"audio_filepath": "a.wav", "duration": 2.5, "start": 0.0, "end": 1.0, "text": "Hello"}, {"audio_filepath": "a.wav", "duration": 2.5, "start": 1.1, "end": 2.0, "text": "World"}]
),
# Test: list of primitive values (strings), requires output_field
(
ListToEntries,
{"field_with_list": "text_chunks", "output_field": "text"},
{"audio_filepath": "b.wav", "text_chunks": ["Привет", "Мир"], "lang": "ru"},
[{"audio_filepath": "b.wav", "lang": "ru", "text": "Привет"}, {"audio_filepath": "b.wav", "lang": "ru", "text": "Мир"}]
),
]
)

test_params_list.extend(
[
Expand Down
Loading